Skip to content

Commit

Permalink
Add the ability to disable multiprocessing and threading entirely #185
Browse files Browse the repository at this point in the history
  • Loading branch information
tdruez authored Jun 7, 2021
1 parent feaba12 commit 40c41fe
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 15 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
installation details.
https://github.com/nexB/scancode.io/issues/132

- Add the ability to disable multiprocessing and threading entirely through the
SCANCODEIO_PROCESSES setting. Use 0 to disable multiprocessing and use -1 to also
disable threading.
https://github.com/nexB/scancode.io/issues/185

- Missing project workspace are restored on reports (xlsx, json) creation. This allow
to download reports even if the project workspace (input, codebase) was deleted.
https://github.com/nexB/scancode.io/issues/154
Expand Down
9 changes: 7 additions & 2 deletions docs/scancodeio-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ SCANCODEIO_PROCESSES

By default, multiprocessing is enabled and setup to use the number of available CPUs on
the machine, minus 1.

You can control the number of parallel processes available to ScanCode.io using the
SCANCODE_PROCESSES setting::

SCANCODE_PROCESSES=4

To disable multiprocessing and run on a single process::
Multiprocessing can be disable entirely using "0"::

SCANCODE_PROCESSES=0

To disable multiprocessing and threading use "-1"::

SCANCODE_PROCESSES=1
SCANCODE_PROCESSES=-1

SCANCODE_DEFAULT_OPTIONS
------------------------
Expand Down
46 changes: 33 additions & 13 deletions scanpipe/pipes/scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
scanpipe_app = apps.get_app_config("scanpipe")

# The maximum number of processes that can be used to execute multiprocessing calls.
# If None or not give,n then as many worker processes, minus one, will be created as the
# If None or not given, then as many worker processes, minus one, will be created as the
# machine has CPUs.
SCANCODEIO_PROCESSES = getattr(settings, "SCANCODEIO_PROCESSES", None)

Expand Down Expand Up @@ -123,7 +123,7 @@ def get_resource_info(location):
return file_info


def _scan_resource(location, scanners):
def _scan_resource(location, scanners, with_threading=True):
"""
Wrap the scancode-toolkit `scan_resource` method to support timeout on direct
scanner functions calls.
Expand All @@ -132,11 +132,15 @@ def _scan_resource(location, scanners):
"""
# `rid` is not needed in this context, yet required in the scan_resource args
location_rid = location, 0
_, _, errors, _, results, _ = scancode_cli.scan_resource(location_rid, scanners)
_, _, errors, _, results, _ = scancode_cli.scan_resource(
location_rid,
scanners,
with_threading=with_threading,
)
return results, errors


def scan_file(location):
def scan_file(location, with_threading=True):
"""
Run a license, copyright, email, and url scan on provided `location`,
using the scancode-toolkit direct API.
Expand All @@ -149,10 +153,10 @@ def scan_file(location):
Scanner("emails", scancode_api.get_emails),
Scanner("urls", scancode_api.get_urls),
]
return _scan_resource(location, scanners)
return _scan_resource(location, scanners, with_threading)


def scan_for_package_info(location):
def scan_for_package_info(location, with_threading=True):
"""
Run a package scan on provided `location` using the scancode-toolkit direct API.
Expand All @@ -161,7 +165,7 @@ def scan_for_package_info(location):
scanners = [
Scanner("packages", scancode_api.get_package_info),
]
return _scan_resource(location, scanners)
return _scan_resource(location, scanners, with_threading)


def save_scan_file_results(codebase_resource, scan_results, scan_errors):
Expand Down Expand Up @@ -196,13 +200,20 @@ def save_scan_package_results(codebase_resource, scan_results, scan_errors):
codebase_resource.save()


def _log_progress(scan_func, resource, resource_count, index):
progress = f"{index / resource_count * 100:.1f}% ({index}/{resource_count})"
logger.info(f"{scan_func.__name__} {progress} pk={resource.pk}")


def _scan_and_save(project, scan_func, save_func):
"""
Run the `scan_func` on files without status for `project`.
The `save_func` is called to save the results.
Multiprocessing is enabled by default on this pipe, the number of processes can be
controlled through the SCANCODEIO_PROCESSES setting.
controlled through the `SCANCODEIO_PROCESSES` setting.
Multiprocessing can be disable using `SCANCODEIO_PROCESSES=0`,
and threading can also be disabled `SCANCODEIO_PROCESSES=-1`
The codebase resources QuerySet is chunked in 2000 results at the time,
this can result in a significant reduction in memory usage.
Expand All @@ -214,7 +225,19 @@ def _scan_and_save(project, scan_func, save_func):
resource_count = codebase_resources.count()
logger.info(f"Scan {resource_count} codebase resources with {scan_func.__name__}")
resource_iterator = codebase_resources.iterator(chunk_size=2000)
max_workers = SCANCODEIO_PROCESSES or os.cpu_count() - 1 or 1

if SCANCODEIO_PROCESSES is None:
max_workers = os.cpu_count() - 1 or 1
else:
max_workers = SCANCODEIO_PROCESSES

if max_workers <= 0:
with_threading = True if max_workers == 0 else False
for index, resource in enumerate(resource_iterator):
_log_progress(scan_func, resource, resource_count, index)
scan_results, scan_errors = scan_func(resource.location, with_threading)
save_func(resource, scan_results, scan_errors)
return

with concurrent.futures.ProcessPoolExecutor(max_workers) as executor:
future_to_resource = {
Expand All @@ -227,10 +250,7 @@ def _scan_and_save(project, scan_func, save_func):

for index, future in enumerate(future_as_completed):
resource = future_to_resource[future]

progress = f"{index / resource_count * 100:.1f}% ({index}/{resource_count})"
logger.info(f"{scan_func.__name__} {progress} pk={resource.pk}")

_log_progress(scan_func, resource, resource_count, index)
scan_results, scan_errors = future.result()
save_func(resource, scan_results, scan_errors)

Expand Down
20 changes: 20 additions & 0 deletions scanpipe/tests/test_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,26 @@ def test_scanpipe_pipes_scancode_scan_package_and_save_results_timeout_error(sel
)
self.assertEqual(expected_message, error.message)

def test_scanpipe_pipes_scancode_scan_and_save_multiprocessing_with_threading(self):
def noop(*args, **kwargs):
pass

project1 = Project.objects.create(name="Analysis")
CodebaseResource.objects.create(project=project1, path="notice.NOTICE")

scan_func = mock.Mock(return_value=(None, None))
scan_func.__name__ = ""

with mock.patch("scanpipe.pipes.scancode.SCANCODEIO_PROCESSES", -1):
scancode._scan_and_save(project1, scan_func, noop)
with_threading = scan_func.call_args[0][-1]
self.assertFalse(with_threading)

with mock.patch("scanpipe.pipes.scancode.SCANCODEIO_PROCESSES", 0):
scancode._scan_and_save(project1, scan_func, noop)
with_threading = scan_func.call_args[0][-1]
self.assertTrue(with_threading)

def test_scanpipe_pipes_scancode_virtual_codebase(self):
project = Project.objects.create(name="asgiref")
input_location = self.data_location / "asgiref-3.3.0_scan.json"
Expand Down

0 comments on commit 40c41fe

Please # to comment.