From 5416f24a2b24dbc2dd5a05aec50fe5d73cf02b54 Mon Sep 17 00:00:00 2001 From: tdruez Date: Fri, 4 Jun 2021 19:51:18 +0200 Subject: [PATCH] Add the ability to disable multiprocessing and threading entirely #185 Signed-off-by: tdruez --- CHANGELOG.rst | 5 ++++ docs/scancodeio-settings.rst | 9 +++++-- scanpipe/pipes/scancode.py | 46 ++++++++++++++++++++++++++---------- 3 files changed, 45 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e81ea2d26..1328ef95c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,11 @@ ### unreleased +- Add the ability to disable multiprocessing and threading entirely through the + SCANCODEIO_PROCESSES setting. Use 0 to disable multiprocessing and use -1 to also + disable threading. + https://github.com/nexB/scancode.io/issues/185 + - Missing project workspace are restored on reports (xlsx, json) creation. This allow to download reports even if the project workspace (input, codebase) was deleted. https://github.com/nexB/scancode.io/issues/154 diff --git a/docs/scancodeio-settings.rst b/docs/scancodeio-settings.rst index 221a0c898..209d09bb0 100644 --- a/docs/scancodeio-settings.rst +++ b/docs/scancodeio-settings.rst @@ -45,14 +45,19 @@ SCANCODEIO_PROCESSES By default, multiprocessing is enabled and setup to use the number of available CPUs on the machine, minus 1. + You can control the number of parallel processes available to ScanCode.io using the SCANCODE_PROCESSES setting:: SCANCODE_PROCESSES=4 -To disable multiprocessing and run on a single process:: +Multiprocessing can be disable entirely using "0":: + + SCANCODE_PROCESSES=0 + +To disable multiprocessing and threading use "-1":: - SCANCODE_PROCESSES=1 + SCANCODE_PROCESSES=-1 SCANCODE_DEFAULT_OPTIONS ------------------------ diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index b89dcd32e..711834f25 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -53,7 +53,7 @@ scanpipe_app = apps.get_app_config("scanpipe") # The maximum number of processes that can be used to execute multiprocessing calls. -# If None or not give,n then as many worker processes, minus one, will be created as the +# If None or not given, then as many worker processes, minus one, will be created as the # machine has CPUs. SCANCODEIO_PROCESSES = getattr(settings, "SCANCODEIO_PROCESSES", None) @@ -123,7 +123,7 @@ def get_resource_info(location): return file_info -def _scan_resource(location, scanners): +def _scan_resource(location, scanners, with_threading=True): """ Wrap the scancode-toolkit `scan_resource` method to support timeout on direct scanner functions calls. @@ -132,11 +132,15 @@ def _scan_resource(location, scanners): """ # `rid` is not needed in this context, yet required in the scan_resource args location_rid = location, 0 - _, _, errors, _, results, _ = scancode_cli.scan_resource(location_rid, scanners) + _, _, errors, _, results, _ = scancode_cli.scan_resource( + location_rid, + scanners, + with_threading=with_threading, + ) return results, errors -def scan_file(location): +def scan_file(location, with_threading=True): """ Run a license, copyright, email, and url scan on provided `location`, using the scancode-toolkit direct API. @@ -149,10 +153,10 @@ def scan_file(location): Scanner("emails", scancode_api.get_emails), Scanner("urls", scancode_api.get_urls), ] - return _scan_resource(location, scanners) + return _scan_resource(location, scanners, with_threading) -def scan_for_package_info(location): +def scan_for_package_info(location, with_threading=True): """ Run a package scan on provided `location` using the scancode-toolkit direct API. @@ -161,7 +165,7 @@ def scan_for_package_info(location): scanners = [ Scanner("packages", scancode_api.get_package_info), ] - return _scan_resource(location, scanners) + return _scan_resource(location, scanners, with_threading) def save_scan_file_results(codebase_resource, scan_results, scan_errors): @@ -196,13 +200,20 @@ def save_scan_package_results(codebase_resource, scan_results, scan_errors): codebase_resource.save() +def _log_progress(scan_func, resource, resource_count, index): + progress = f"{index / resource_count * 100:.1f}% ({index}/{resource_count})" + logger.info(f"{scan_func.__name__} {progress} pk={resource.pk}") + + def _scan_and_save(project, scan_func, save_func): """ Run the `scan_func` on files without status for `project`. The `save_func` is called to save the results. Multiprocessing is enabled by default on this pipe, the number of processes can be - controlled through the SCANCODEIO_PROCESSES setting. + controlled through the `SCANCODEIO_PROCESSES` setting. + Multiprocessing can be disable using `SCANCODEIO_PROCESSES=0`, + and threading can also be disabled `SCANCODEIO_PROCESSES=-1` The codebase resources QuerySet is chunked in 2000 results at the time, this can result in a significant reduction in memory usage. @@ -214,7 +225,19 @@ def _scan_and_save(project, scan_func, save_func): resource_count = codebase_resources.count() logger.info(f"Scan {resource_count} codebase resources with {scan_func.__name__}") resource_iterator = codebase_resources.iterator(chunk_size=2000) - max_workers = SCANCODEIO_PROCESSES or os.cpu_count() - 1 or 1 + + if SCANCODEIO_PROCESSES is None: + max_workers = os.cpu_count() - 1 or 1 + else: + max_workers = SCANCODEIO_PROCESSES + + if max_workers <= 0: + with_threading = True if max_workers == 0 else False + for index, resource in enumerate(resource_iterator): + _log_progress(scan_func, resource, resource_count, index) + scan_results, scan_errors = scan_func(resource.location, with_threading) + save_func(resource, scan_results, scan_errors) + return with concurrent.futures.ProcessPoolExecutor(max_workers) as executor: future_to_resource = { @@ -227,10 +250,7 @@ def _scan_and_save(project, scan_func, save_func): for index, future in enumerate(future_as_completed): resource = future_to_resource[future] - - progress = f"{index / resource_count * 100:.1f}% ({index}/{resource_count})" - logger.info(f"{scan_func.__name__} {progress} pk={resource.pk}") - + _log_progress(scan_func, resource, resource_count, index) scan_results, scan_errors = future.result() save_func(resource, scan_results, scan_errors)