diff --git a/nodestream/pipeline/progress_reporter.py b/nodestream/pipeline/progress_reporter.py index 636ac840..6b389ca0 100644 --- a/nodestream/pipeline/progress_reporter.py +++ b/nodestream/pipeline/progress_reporter.py @@ -13,6 +13,10 @@ def no_op(*_, **__): pass +def raise_exception(ex): + raise ex + + def get_max_mem_mb(): """Get the maximum memory used by the current process in MB. @@ -54,6 +58,7 @@ def for_testing(cls, results_list: list) -> "PipelineProgressReporter": callback=lambda _, record: results_list.append(record), on_start_callback=no_op, on_finish_callback=no_op, + on_fatal_error_callback=raise_exception, ) def report(self, index, record):