Skip to content

Commit

Permalink
Merge pull request #230 from broadinstitute/sn_make_delete_use_google…
Browse files Browse the repository at this point in the history
…_project

Make fail if no output for multithreaded jobs
  • Loading branch information
snovod authored Mar 7, 2025
2 parents 393ad63 + f4cc6dc commit 2abfd02
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions python/utils/thread_pool_executor_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from concurrent import futures
from typing import Callable, Any, Optional
from typing import Callable, Any, Optional, Tuple
import logging


Expand All @@ -10,7 +10,7 @@ def execute_with_retries(
function: Callable,
job_args_list: list[Any],
max_retries: int
) -> Any:
) -> Tuple[bool, Any]:
"""
Execute a function with retries.
Expand All @@ -20,16 +20,19 @@ def execute_with_retries(
max_retries (int): The maximum number of retries.
Returns:
Any: The result of the function if it executes successfully, None otherwise.
Tuple[Any, Any]: Job status and the result of the function
"""
retries = 0
while retries < max_retries:
try:
return function(*job_args_list)
job_output = function(*job_args_list)
job_completed = True
except Exception as e:
logging.warning(f"Job failed with error: {e}. Retry {retries + 1}/{max_retries}")
retries += 1
return None
job_completed = False
job_output = None
return job_completed, job_output

def run_multi_threaded_job(
self,
Expand Down Expand Up @@ -75,9 +78,9 @@ def run_multi_threaded_job(
for future in futures.as_completed(future_to_job):
job_args = future_to_job[future]
try:
result = future.result()
# Successful result or no result if not collecting output
if result or (result is None and not collect_output):
job_completed, job_output = future.result()
# Job completed and got job_output if expected
if job_completed and (job_output or (job_output is None and not collect_output)):
completed_jobs += 1
# Log progress every `jobs_complete_for_logging` jobs
if completed_jobs % jobs_complete_for_logging == 0:
Expand All @@ -87,7 +90,7 @@ def run_multi_threaded_job(
logging.info(f"Job {job_args} completed successfully.")
# Collect result if output is expected
if collect_output:
job_results.append(result)
job_results.append(job_output)
else:
failed_jobs += 1
logging.error(f"Job {job_args} failed after {max_retries} retries.")
Expand Down

0 comments on commit 2abfd02

Please # to comment.