Skip to content

Commit

Permalink
Apply initial pylint fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Henri Rosten <henri.rosten@unikie.com>
  • Loading branch information
henrirosten committed Dec 20, 2024
1 parent a87e153 commit 725de7a
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 110 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ def project_path(*names):
return os.path.join(os.path.dirname(__file__), *names)


with open(project_path("VERSION")) as f:
with open(project_path("VERSION"), encoding="utf-8") as f:
version = f.read().strip()

long_description = []

for rst in ["README.rst", "HACKING.rst", "CHANGES.rst"]:
with open(project_path(rst)) as f:
with open(project_path(rst), encoding="utf-8") as f:
long_description.append(f.read())

setup(
Expand Down
15 changes: 10 additions & 5 deletions src/vulnix/derivation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=invalid-name, eval-used

import functools
import json
import logging
Expand All @@ -11,6 +13,7 @@
class SkipDrv(RuntimeError):
"""This derivation cannot be treated as package."""

# pylint: disable=unnecessary-pass
pass


Expand All @@ -30,7 +33,7 @@ def split_name(fullname):


def load(path):
with open(path) as f:
with open(path, encoding="utf-8") as f:
d_obj = eval(f.read(), {"__builtins__": {}, "Derive": Derive}, {})
_log.debug("Loading drv %s", d_obj.name)
d_obj.store_path = path
Expand Down Expand Up @@ -58,7 +61,7 @@ def destructure(env):


@functools.total_ordering
class Derive(object):
class Derive:
"""Nix derivation as found as .drv files in the Nix store."""

store_path = None
Expand All @@ -71,7 +74,7 @@ def __init__(
_system=None,
_builder=None,
_args=None,
envVars={},
envVars=None,
_derivations=None,
name=None,
patches=None,
Expand All @@ -81,6 +84,8 @@ def __init__(
The derivation files are just accidentally Python-syntax, but
hey! :-)
"""
if envVars is None:
envVars = {}
envVars = dict(envVars)
self.name = name or envVars.get("name")
if not self.name:
Expand All @@ -95,10 +100,10 @@ def __init__(
self.patches = patches or envVars.get("patches", "")

def __repr__(self):
return "<Derive({})>".format(repr(self.name))
return f"<Derive({repr(self.name)})>"

def __eq__(self, other):
if type(self) != type(other):
if not isinstance(other, self.__class__):
return NotImplementedError()
return self.name == other.name and self.patches == other.patches

Expand Down
9 changes: 5 additions & 4 deletions src/vulnix/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ def run(nvd, store):
"--cache-dir",
type=click.Path(file_okay=False),
default=DEFAULT_CACHE_DIR,
help="Cache directory to store parsed archive data. "
"Default: {}".format(DEFAULT_CACHE_DIR),
help=f"Cache directory to store parsed archive data. Default: {DEFAULT_CACHE_DIR}",
)
@click.option(
"-r/-R",
Expand All @@ -132,7 +131,7 @@ def run(nvd, store):
@click.option(
"-m",
"--mirror",
help="Mirror to fetch NVD archives from. Default: {}.".format(DEFAULT_MIRROR),
help=f"Mirror to fetch NVD archives from. Default: {DEFAULT_MIRROR}.",
default=DEFAULT_MIRROR,
)
# output control
Expand Down Expand Up @@ -175,6 +174,8 @@ def main(
default_whitelist,
notfixed,
):
# pylint: disable=too-many-arguments,too-many-positional-arguments,unused-argument
# pylint: disable=too-many-locals,too-many-branches
if version:
print("vulnix " + pkg_resources.get_distribution("vulnix").version)
sys.exit(0)
Expand Down Expand Up @@ -225,7 +226,7 @@ def main(
for i in filtered_items:
whitelist.add_from(i)
write_whitelist.close()
with open(write_whitelist.name, "w") as f:
with open(write_whitelist.name, "w", encoding="utf-8") as f:
f.write(str(whitelist))
sys.exit(rc)

Expand Down
21 changes: 11 additions & 10 deletions src/vulnix/nix.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
_log = logging.getLogger(__name__)


class Store(object):
class Store:

def __init__(self, requisites=True, closure=False):
self.requisites = requisites
Expand All @@ -29,7 +29,7 @@ def add_profile(self, profile):
"""Add derivations found in this nix profile."""
json_manifest_path = p.join(profile, "manifest.json")
if p.exists(json_manifest_path):
_log.debug("Loading derivations from {}".format(json_manifest_path))
_log.debug("Loading derivations from %s", json_manifest_path)
with open(json_manifest_path, "r", encoding="utf-8") as f:
json_manifest = json.load(f)
elements = json_manifest["elements"]
Expand All @@ -49,7 +49,7 @@ def add_profile(self, profile):
for path in element["storePaths"]:
self.add_path(path)
else:
_log.debug("Loading derivations from user profile {}".format(profile))
_log.debug("Loading derivations from user profile %s", profile)
for line in call(
["nix-env", "-q", "--out-path", "--profile", profile]
).splitlines():
Expand Down Expand Up @@ -86,13 +86,13 @@ def _find_deriver(self, path, qpi_deriver="undef"):

error = ""
if qpi_deriver and qpi_deriver != "unknown-deriver":
error += "Deriver `{}` does not exist. ".format(qpi_deriver)
error += f"Deriver `{qpi_deriver}` does not exist. "
if qvd_deriver and qvd_deriver != qpi_deriver:
error += "Deriver `{}` does not exist. ".format(qvd_deriver)
error += f"Deriver `{qvd_deriver}` does not exist. "
if error:
raise RuntimeError(error + "Couldn't find deriver for path `{}`".format(path))
raise RuntimeError(error + f"Couldn't find deriver for path `{path}`")
raise RuntimeError(
"Cannot determine deriver. Is this really a path into the " "nix store?", path
"Cannot determine deriver. Is this really a path into the nix store?", path
)

def _find_outputs(self, path):
Expand All @@ -106,11 +106,12 @@ def _find_outputs(self, path):
return result

def add_path(self, path):
# pylint: disable=too-many-branches
"""Add the closure of all derivations referenced by a store path."""
if not p.exists(path):
raise RuntimeError(
"path `{}` does not exist - cannot load "
"derivations referenced from it".format(path)
f"path `{path}` does not exist - cannot load "
"derivations referenced from it"
)
_log.debug('Loading derivations referenced by "%s"', path)

Expand Down Expand Up @@ -159,5 +160,5 @@ def load_pkgs_json(self, json_fobj):
patches.extend(pkg["known_vulnerabilities"])
self.derivations.add(Derive(name=pkg["name"], patches=" ".join(patches)))
except SkipDrv:
_log.debug("skipping: {}", pkg)
_log.debug("skipping: %s", pkg)
continue
25 changes: 15 additions & 10 deletions src/vulnix/nvd.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import transaction
import ZODB
import ZODB.FileStorage
# pylint: disable=no-name-in-module
from BTrees import OOBTree
from persistent import Persistent

Expand All @@ -22,20 +23,25 @@
_log = logging.getLogger(__name__)


class NVD(object):
class NVD:
"""Access to the National Vulnerability Database.
https://nvd.nist.gov/
"""

def __init__(self, mirror=DEFAULT_MIRROR, cache_dir=DEFAULT_CACHE_DIR):
self._lock = None
self._db = None
self._connection = None
self._root = None
self.mirror = mirror.rstrip("/") + "/"
self.cache_dir = p.expanduser(cache_dir)
current = date.today().year
self.available_archives = [y for y in range(current - 5, current + 1)]
self.available_archives = list(range(current - 5, current + 1))

def lock(self):
self._lock = open(p.join(self.cache_dir, "lock"), "a")
# pylint: disable=consider-using-with
self._lock = open(p.join(self.cache_dir, "lock"), "a", encoding="utf-8")
try:
fcntl.lockf(self._lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
Expand All @@ -59,10 +65,10 @@ def __enter__(self):
# may trigger exceptions if the database is inconsistent
list(self._root["by_product"].keys())
if "archives" in self._root:
_log.warn("Pre-1.9.0 database found - rebuilding")
_log.warning("Pre-1.9.0 database found - rebuilding")
self.reinit()
except (TypeError, EOFError):
_log.warn("Incompatible objects found in database - rebuilding DB")
_log.warning("Incompatible objects found in database - rebuilding DB")
self.reinit()
return self

Expand Down Expand Up @@ -173,7 +179,7 @@ def __init__(self, name):
`name` consists of a year or "modified".
"""
self.name = name
self.download_uri = "nvdcve-1.1-{}.json.gz".format(name)
self.download_uri = f"nvdcve-1.1-{name}.json.gz"
self.advisories = {}

def download(self, mirror, meta):
Expand All @@ -186,16 +192,15 @@ def download(self, mirror, meta):
"""
url = mirror + self.download_uri
_log.info("Loading %s", url)
r = requests.get(url, headers=meta.headers_for(url))
r = requests.get(url, headers=meta.headers_for(url), timeout=10)
r.raise_for_status()
if r.status_code == 200:
_log.debug('Loading JSON feed "%s"', self.name)
self.parse(gzip.decompress(r.content))
meta.update_headers_for(url, r.headers)
return True
else:
_log.debug('Skipping JSON feed "%s" (%s)', self.name, r.reason)
return False
_log.debug('Skipping JSON feed "%s" (%s)', self.name, r.reason)
return False

def parse(self, nvd_json):
added = 0
Expand Down
36 changes: 15 additions & 21 deletions src/vulnix/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@


def fmt_vuln(v, show_description=False):
out = "https://nvd.nist.gov/vuln/detail/{:17}".format(v.cve_id)
out += " {:<8} ".format(v.cvssv3 or "")
out = f"https://nvd.nist.gov/vuln/detail/{v.cve_id:17} {v.cvssv3 or '':<8} "
if show_description:
# Show the description in a different color as they can run over the
# line length, and this makes distinguishing them from the next entry
Expand Down Expand Up @@ -39,8 +38,9 @@ def __init__(self, derivation, vulnerabilities):
self.masked = set()

def __repr__(self):
return "<Filtered({}, {}, {}, {})>".format(
self.derivation.pname, self.rules, len(self.report), len(self.masked)
return (
f"<Filtered({self.derivation.pname}, {self.rules}, "
f"{len(self.report)}, {len(self.masked)})>"
)

def add(self, wl_rule):
Expand All @@ -49,7 +49,7 @@ def add(self, wl_rule):
if not self.until or self.until > wl_rule.until:
self.until = wl_rule.until
if wl_rule.cve:
for r in wl_rule.cve:
for _r in wl_rule.cve:
mask = set(vuln for vuln in self.report if vuln.cve_id in wl_rule.cve)
self.report -= mask
self.masked |= mask
Expand All @@ -63,24 +63,20 @@ def print(self, show_masked=False, show_description=False):
d = self.derivation
wl = not self.report

click.secho("\n{}".format("-" * 72), dim=wl)
click.secho("{}\n".format(d.name), fg="yellow", bold=True, dim=wl)
click.secho(f"\n{'-' * 72}", dim=wl)
click.secho(f"{d.name}\n", fg="yellow", bold=True, dim=wl)
if d.store_path:
click.secho(d.store_path, fg="magenta", dim=wl)

click.secho(
"{:50} {:<8} {}".format(
"CVE", "CVSSv3", "Description" if show_description else ""
).rstrip(),
f"{'CVE':50} {'CVSSv3':<8} {'Description' if show_description else ''}".rstrip(),
dim=wl,
)
for v in sorted(self.report, key=vuln_sort_key):
click.echo(fmt_vuln(v, show_description))
if show_masked:
for v in sorted(self.masked, key=vuln_sort_key):
click.secho(
"{} [whitelisted]".format(fmt_vuln(v, show_description)), dim=True
)
click.secho(f"{fmt_vuln(v, show_description)} [whitelisted]", dim=True)

issues = functools.reduce(set.union, (r.issue_url for r in self.rules), set())
if issues:
Expand All @@ -101,18 +97,16 @@ def output_text(vulns, show_whitelisted=False, show_description=False):
if not report and not show_whitelisted:
if wl:
click.secho(
"Nothing to show, but {} left out due to whitelisting".format(len(wl)),
f"Nothing to show, but {len(wl)} left out due to whitelisting",
fg="blue",
)
else:
click.secho("Found no advisories. Excellent!", fg="green")
return

click.secho("{} derivations with active advisories".format(len(report)), fg="red")
click.secho(f"{len(report)} derivations with active advisories", fg="red")
if wl and not show_whitelisted:
click.secho(
"{} derivations left out due to whitelisting".format(len(wl)), fg="blue"
)
click.secho(f"{len(wl)} derivations left out due to whitelisting", fg="blue")

for i in sorted(report, key=attrgetter("derivation")):
i.print(show_whitelisted, show_description)
Expand All @@ -121,7 +115,7 @@ def output_text(vulns, show_whitelisted=False, show_description=False):
i.print(show_whitelisted, show_description)
if wl and not show_whitelisted:
click.secho(
"\nuse --show-whitelisted to see derivations with only " "whitelisted CVEs",
"\nuse --show-whitelisted to see derivations with only whitelisted CVEs",
fg="blue",
)

Expand Down Expand Up @@ -153,8 +147,8 @@ def output_json(items, show_whitelisted=False):
print(json.dumps(out, indent=1))


def output(items, json=False, show_whitelisted=False, show_description=False):
if json:
def output(items, json_dump=False, show_whitelisted=False, show_description=False):
if json_dump:
output_json(items, show_whitelisted)
else:
output_text(items, show_whitelisted, show_description)
Expand Down
2 changes: 2 additions & 0 deletions src/vulnix/resource.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=too-few-public-methods,contextmanager-generator-missing-cleanup

import contextlib
import logging
import re
Expand Down
Loading

0 comments on commit 725de7a

Please # to comment.