From 2fd8b8da116e39461751983a025de8b7086dda42 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Sat, 21 Oct 2023 14:56:46 +0100 Subject: [PATCH 01/13] Add some performance tracking to pynbody_server.py --- tangos/parallel_tasks/pynbody_server.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tangos/parallel_tasks/pynbody_server.py b/tangos/parallel_tasks/pynbody_server.py index 7195cd23..f35844e8 100644 --- a/tangos/parallel_tasks/pynbody_server.py +++ b/tangos/parallel_tasks/pynbody_server.py @@ -171,10 +171,14 @@ def send(self, destination): backend.send_numpy_array(self.contents.view(np.ndarray), destination) class RequestPynbodyArray(Message): - def __init__(self, filter_or_object_spec, array, fam=None): + _time_to_start_processing = [] + _num_retained_timings = 20 + + def __init__(self, filter_or_object_spec, array, fam=None, request_sent_time=None): self.filter_or_object_spec = filter_or_object_spec self.array = array self.fam = fam + self.request_sent_time = request_sent_time @classmethod def deserialize(cls, source, message): @@ -183,10 +187,18 @@ def deserialize(cls, source, message): return obj def serialize(self): - return (self.filter_or_object_spec, self.array, self.fam) + return (self.filter_or_object_spec, self.array, self.fam, time.time()) def process(self): start_time = time.time() + self._time_to_start_processing.append(start_time - self.request_sent_time) + if len(self._time_to_start_processing)>self._num_retained_timings: + self._time_to_start_processing = [] + + log.logger.info("pynbody server typical time to start retrieving an array: %.1fs +/- %.1fs", + np.mean(self._time_to_start_processing), + np.std(self._time_to_start_processing)) + try: log.logger.debug("Receive request for array %r from %d",self.array,self.source) subsnap = _server_queue.get_subsnap(self.filter_or_object_spec, self.fam) From 8aff92f0b9aa962d615bf07356a6d2538a108683 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Sat, 28 Oct 2023 20:26:43 +0100 Subject: [PATCH 02/13] Enabling work: allow two timing monitors to be added together Works towards reporting all-processes stats instead of the current per-process stats --- tangos/util/timing_monitor.py | 33 +++++++++++++++++++++-------- tests/test_timing.py | 39 +++++++++++++++++++++++++++++++++-- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/tangos/util/timing_monitor.py b/tangos/util/timing_monitor.py index 36085d29..14273fe8 100644 --- a/tangos/util/timing_monitor.py +++ b/tangos/util/timing_monitor.py @@ -52,22 +52,25 @@ def _end(self): self._unset_as_monitor_for(self._monitoring) self._time_marks_info.append("end") self._time_marks.append(time.time()) + + self._add_run_to_running_totals(cl, self._time_marks, self._time_marks_info) + + def _add_run_to_running_totals(self, cl, latest_run_time_marks, latest_run_time_marks_labels): previous_timings = self.timings_by_class.get(cl, None) - if previous_timings is None or len(previous_timings) == len(self._time_marks)-1: - cumulative_timings = np.diff(self._time_marks) + if previous_timings is None or len(previous_timings) == len(latest_run_time_marks) - 1: + cumulative_timings = np.diff(latest_run_time_marks) if previous_timings is not None: - cumulative_timings+=previous_timings + cumulative_timings += previous_timings self.timings_by_class[cl] = cumulative_timings - self.labels_by_class[cl] = self._time_marks_info + self.labels_by_class[cl] = latest_run_time_marks_labels else: # Incompatibility between this and previous timings from the same procedure. Can only track total time spent. - start_time = self._time_marks[0] - end_time = self._time_marks[-1] - time_elapsed = end_time-start_time + sum(previous_timings) - self.labels_by_class[cl] = ['start','end'] + start_time = latest_run_time_marks[0] + end_time = latest_run_time_marks[-1] + time_elapsed = end_time - start_time + sum(previous_timings) + self.labels_by_class[cl] = ['start', 'end'] self.timings_by_class[cl] = [time_elapsed] - def mark(self, label=None): """Mark a named event so that more detailed timing can be given""" self._time_marks.append(time.time()) @@ -77,6 +80,18 @@ def mark(self, label=None): else: self._time_marks_info.append(label) + def add(self, other): + """Add the time taken by another TimingMonitor to this one""" + if self._monitoring is not None: + raise RuntimeError("Cannot add timings to a TimingMonitor that is currently monitoring a procedure") + + for c in other.labels_by_class.keys(): + labels = other.labels_by_class[c] + timings = other.timings_by_class[c] + print("ADD", c, labels, timings) + self._add_run_to_running_totals(c, np.cumsum(np.concatenate(([0.0],timings))), labels) + print("RESULT", c, self.labels_by_class[c], self.timings_by_class[c]) + def summarise_timing(self, logger): logger.info("CUMULATIVE RUNNING TIMES (just this node)") v_tot = 1e-10 diff --git a/tests/test_timing.py b/tests/test_timing.py index 1c351fc5..abbc25ee 100644 --- a/tests/test_timing.py +++ b/tests/test_timing.py @@ -1,16 +1,17 @@ import logging +import time import tangos.util.timing_monitor as tm from tangos.log import LogCapturer, logger class Dummy(): - pass + def __init__(self): + self.timing_monitor = None def test_timing_graceful_fail(): """Issue #135: TimingMonitor could crash when monitoring procedures which themselves terminated early""" x=Dummy() - x.timing_monitor=None TM = tm.TimingMonitor() @@ -56,3 +57,37 @@ def test_timing_graceful_fail(): assert "goodbye" not in lc.get_output() assert "INTERNAL BREAKDOWN" not in lc.get_output() assert "Dummy" in lc.get_output() + +def test_timing_add(): + TM = tm.TimingMonitor() + TM2 = tm.TimingMonitor() + x = Dummy() + + with TM(x): + time.sleep(0.1) + TM.mark("hello") + time.sleep(0.2) + + with TM2(x): + time.sleep(0.1) + TM2.mark("hello") + time.sleep(0.1) + + lc = LogCapturer() + + with lc: + TM.summarise_timing(logger) + TM2.summarise_timing(logger) + + assert "Dummy 0.3s" in lc.get_output() + assert "0.1s" in lc.get_output() + + TM.add(TM2) + lc = LogCapturer() + + with lc: + TM.summarise_timing(logger) + + assert "Dummy 0.5s" in lc.get_output() + assert "hello 0.2s" in lc.get_output() + assert "end 0.3s" in lc.get_output() \ No newline at end of file From 46fbb74a6dcdde2af9fc921b0cd487c561ff8e83 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 2 Nov 2023 12:08:38 +0000 Subject: [PATCH 03/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tangos/parallel_tasks/pynbody_server.py | 4 ++-- tests/test_timing.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tangos/parallel_tasks/pynbody_server.py b/tangos/parallel_tasks/pynbody_server.py index f35844e8..c82e3f13 100644 --- a/tangos/parallel_tasks/pynbody_server.py +++ b/tangos/parallel_tasks/pynbody_server.py @@ -194,11 +194,11 @@ def process(self): self._time_to_start_processing.append(start_time - self.request_sent_time) if len(self._time_to_start_processing)>self._num_retained_timings: self._time_to_start_processing = [] - + log.logger.info("pynbody server typical time to start retrieving an array: %.1fs +/- %.1fs", np.mean(self._time_to_start_processing), np.std(self._time_to_start_processing)) - + try: log.logger.debug("Receive request for array %r from %d",self.array,self.source) subsnap = _server_queue.get_subsnap(self.filter_or_object_spec, self.fam) diff --git a/tests/test_timing.py b/tests/test_timing.py index abbc25ee..439b0e41 100644 --- a/tests/test_timing.py +++ b/tests/test_timing.py @@ -90,4 +90,4 @@ def test_timing_add(): assert "Dummy 0.5s" in lc.get_output() assert "hello 0.2s" in lc.get_output() - assert "end 0.3s" in lc.get_output() \ No newline at end of file + assert "end 0.3s" in lc.get_output() From e110bad4719ae6b0d2f06304a81cd4a94bd0dbba Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Mon, 11 Dec 2023 09:13:30 +0000 Subject: [PATCH 04/13] Fix printing pynbody server performance info --- .../parallel_tasks/pynbody_server/__init__.py | 38 ++++++++++++++++++- .../pynbody_server/snapshot_queue.py | 12 +++++- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/tangos/parallel_tasks/pynbody_server/__init__.py b/tangos/parallel_tasks/pynbody_server/__init__.py index 41f05b79..b1d15c7f 100644 --- a/tangos/parallel_tasks/pynbody_server/__init__.py +++ b/tangos/parallel_tasks/pynbody_server/__init__.py @@ -56,10 +56,13 @@ def send(self, destination): transfer_array.send_array(self.contents, destination, use_shared_memory=self.shared_mem) class RequestPynbodyArray(Message): - def __init__(self, filter_or_object_spec, array, fam=None): + _time_to_start_processing = [] + + def __init__(self, filter_or_object_spec, array, fam=None, request_sent_time=None): self.filter_or_object_spec = filter_or_object_spec self.array = array self.fam = fam + self.request_sent_time = request_sent_time @classmethod def deserialize(cls, source, message): @@ -68,10 +71,41 @@ def deserialize(cls, source, message): return obj def serialize(self): - return (self.filter_or_object_spec, self.array, self.fam) + return (self.filter_or_object_spec, self.array, self.fam, time.time()) + + @classmethod + def get_mean_wait_time(cls): + if len(cls._time_to_start_processing)==0: + return 0 + else: + return np.mean(cls._time_to_start_processing) + + @classmethod + def get_total_wait_time(cls): + if len(cls._time_to_start_processing)==0: + return 0 + else: + return np.sum(cls._time_to_start_processing) + + @classmethod + def get_std_wait_time(cls): + if len(cls._time_to_start_processing)==0: + return 0 + else: + return np.std(cls._time_to_start_processing) + + @classmethod + def get_num_requests(cls): + return len(cls._time_to_start_processing) + + @classmethod + def reset_performance_stats(cls): + cls._time_to_start_processing = [] def process(self): start_time = time.time() + self._time_to_start_processing.append(start_time - self.request_sent_time) + try: log.logger.debug("Receive request for array %r from %d",self.array,self.source) subsnap = _server_queue.get_subsnap(self.filter_or_object_spec, self.fam) diff --git a/tangos/parallel_tasks/pynbody_server/snapshot_queue.py b/tangos/parallel_tasks/pynbody_server/snapshot_queue.py index 19d3cef2..e58f51e6 100644 --- a/tangos/parallel_tasks/pynbody_server/snapshot_queue.py +++ b/tangos/parallel_tasks/pynbody_server/snapshot_queue.py @@ -83,7 +83,17 @@ def get_subsnap_uncached(self, filter_or_object_spec, fam): def _free_if_unused(self): if len(self.in_use_by)==0: - log.logger.debug("Pynbody server: all clients are finished with the current snapshot; freeing.") + from . import RequestPynbodyArray + log.logger.info( + f"Closing snapshot {self.current_timestep} after processing " + f"{RequestPynbodyArray.get_num_requests()} array fetches") + if RequestPynbodyArray.get_num_requests() > 0: + log.logger.info(" Typical wait time to start retrieving an array: %.1fs +/- %.1fs", + RequestPynbodyArray.get_mean_wait_time(), + RequestPynbodyArray.get_std_wait_time()) + log.logger.info(" Total wait time: %.1fs", RequestPynbodyArray.get_total_wait_time()) + RequestPynbodyArray.reset_performance_stats() + with check_deleted(self.current_snapshot): self.current_snapshot = None self.current_timestep = None From 74c5ad41d543cea309b5d04f0b4f9f184cb5f94d Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Tue, 12 Dec 2023 23:15:49 +0000 Subject: [PATCH 05/13] Fix bug introduced into server (non-shared-mem) mode. Add unit test. --- tangos/input_handlers/pynbody.py | 5 ++++- tests/test_pynbody_server.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/tangos/input_handlers/pynbody.py b/tangos/input_handlers/pynbody.py index 6f20782b..43cb9912 100644 --- a/tangos/input_handlers/pynbody.py +++ b/tangos/input_handlers/pynbody.py @@ -88,9 +88,12 @@ def load_timestep_without_caching(self, ts_extension, mode=None): raise NotImplementedError("Load mode %r is not implemented"%mode) def load_region(self, ts_extension, region_specification, mode=None): - if mode is None or mode=='server': + if mode is None: timestep = self.load_timestep(ts_extension, mode) return timestep[region_specification] + elif mode=='server': + timestep = self.load_timestep(ts_extension, mode) + return timestep.get_view(region_specification) elif mode=='server-shared-mem': from ..parallel_tasks import pynbody_server as ps timestep = self.load_timestep(ts_extension, mode) diff --git a/tests/test_pynbody_server.py b/tests/test_pynbody_server.py index 97fd7a69..78e64da5 100644 --- a/tests/test_pynbody_server.py +++ b/tests/test_pynbody_server.py @@ -228,6 +228,17 @@ def test_correct_object_loading(): pt.launch(_test_correct_object_loading) +def _test_region_loading(): + """This test ensures that a region can be loaded correctly under server mode""" + f_remote = handler.load_region("tiny.000640", pynbody.filt.Sphere("3 Mpc"), mode='server') + f_local = handler.load_region("tiny.000640", pynbody.filt.Sphere("3 Mpc"), mode=None) + assert (f_remote.dm['pos'] == f_local.dm['pos']).all() + assert (f_remote.st['pos'] == f_local.st['pos']).all() +def test_region_loading(): + """This test ensures that a region can be loaded correctly under server mode""" + pt.use("multiprocessing-3") + pt.launch(_test_region_loading) + def _test_oserror_on_nonexistent_file(): with npt.assert_raises(OSError): From 18fa0ae3f5ad9651f786420d058db9328cbfc3e6 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 08:52:11 +0000 Subject: [PATCH 06/13] Allow parallel_tasks multiprocessing backend to capture log from all processes --- tangos/log.py | 15 ++++- .../backends/multiprocessing.py | 65 ++++++++++++++++--- tests/test_db_writer.py | 2 +- 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/tangos/log.py b/tangos/log.py index 059a0359..270fd0e8 100644 --- a/tangos/log.py +++ b/tangos/log.py @@ -17,6 +17,7 @@ def __init__(self): self.buffer = StringIO() self.handler_buffer = logging.StreamHandler(self.buffer) self.handler_buffer.setLevel(logging.INFO) + self.handler_buffer.setFormatter(formatter) self._suspended_handlers = [] def __enter__(self): @@ -24,6 +25,7 @@ def __enter__(self): for x_handler in self._suspended_handlers: logger.removeHandler(x_handler) logger.addHandler(self.handler_buffer) + return self def __exit__(self, *exc_info): for x_handler in self._suspended_handlers: @@ -34,8 +36,19 @@ def __exit__(self, *exc_info): def get_output(self): return self.buffer.getvalue() + def get_output_without_timestamps(self): + lines = self.get_output().split("\n") + result = "" + for l in lines: + try: + result += l.split(" : ", 1)[1]+"\n" + except IndexError: + result += l+"\n" + return result + def set_identity_string(identifier): global handler_stderr formatter = logging.Formatter(identifier+"%(asctime)s : %(message)s") - handler_stderr.setFormatter(formatter) + for handler in logger.handlers: + handler.setFormatter(formatter) diff --git a/tangos/parallel_tasks/backends/multiprocessing.py b/tangos/parallel_tasks/backends/multiprocessing.py index 6596587a..5e6cef81 100644 --- a/tangos/parallel_tasks/backends/multiprocessing.py +++ b/tangos/parallel_tasks/backends/multiprocessing.py @@ -4,6 +4,7 @@ import signal import sys import threading +import time from typing import Optional import tblib.pickling_support @@ -87,9 +88,9 @@ def barrier(): pass def finalize(): - _pipe.send("finalize") + pass -def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in): +def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in, capture_log): tblib.pickling_support.install() global _slave, _rank, _size, _pipe, _recv_lock @@ -97,9 +98,22 @@ def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in): _size = size_in _pipe = pipe_in _recv_lock = threading.Lock() + + result = None + try: - target_fn(*args_in) - finalize() + if capture_log: + from .. import log + try: + with log.LogCapturer() as lc: + target_fn(*args_in) + finally: + result = lc.get_output() + else: + target_fn(*args_in) + if result is not None: + _pipe.send(("log", result)) + _pipe.send("exit") except Exception as e: import sys import traceback @@ -109,14 +123,17 @@ def launch_wrapper(target_fn, rank_in, size_in, pipe_in, args_in): print("Error on a sub-process:", file=sys.stderr) traceback.print_exception(exc_type, exc_value, exc_traceback, file=sys.stderr) + if result is not None: + _pipe.send(("log", result)) _pipe.send(("error", exc_value, exc_traceback)) + _pipe.close() class RemoteException(Exception): pass -def launch_functions(functions, args): +def launch_functions(functions, args, capture_log=False): global _slave if _slave: raise RuntimeError("Multiprocessing session is already underway") @@ -130,7 +147,7 @@ def launch_functions(functions, args): child_connections, parent_connections = list(zip(*[mp_context.Pipe() for rank in range(num_procs)])) - processes = [mp_context.Process(target=launch_wrapper, args=(function, rank, num_procs, pipe, args_i)) + processes = [mp_context.Process(target=launch_wrapper, args=(function, rank, num_procs, pipe, args_i, capture_log)) for rank, (pipe, function, args_i) in enumerate(zip(child_connections, functions, args))] @@ -140,18 +157,21 @@ def launch_functions(functions, args): running = [True for rank in range(num_procs)] error: Optional[Exception] = None + log = "" if capture_log else None + while any(running): for i, pipe_i in enumerate(parent_connections): if pipe_i.poll(): message = pipe_i.recv() - if message=='finalize': - #print " ---> multiprocessing backend: finalize node ",i,running + if message=='exit': running[i]=False elif isinstance(message[0], str) and message[0]=='error': error = message[1] traceback = message[2] running = [False] break + elif isinstance(message[0], str) and message[0]=='log': + log+=message[1] else: #print "multiprocessing backend: pass message ",i,"->",message[1] parent_connections[message[1]].send((message[0],i,message[2])) @@ -170,13 +190,38 @@ def launch_functions(functions, args): if error is not None: raise error.with_traceback(traceback) + return _sort_log(log) + +def _sort_log(log): + """Sort the log by time. + + The input log is of the format, for example: + [ 3] 2023-12-12 13:55:46,004 Message + + This routine extracts the times, parses them and returns a reordered log in time order. + It is stable, i.e. messages with the same timestep are returned in the input order + """ + if log is None: + return None + lines = log.split("\n") + times = [] + for line in lines: + if line.strip() == "": + continue + times.append(line[6:28]) + times = [time.strptime(t, "%Y-%m-%d %H:%M:%S,%f") for t in times] + lines = [line for _, line in sorted(zip(times, lines), key=lambda item: item[0])] + return "\n".join(lines) + + + -def launch(function, args): +def launch(function, args, **kwargs): from .. import _num_procs if _num_procs is None: raise RuntimeError("To launch a parallel session using multiprocessing backend, you need to specify the number " "of processors. You can do this by calling the backend multiprocessing- where is the" "number of processors you want to use.") - launch_functions([function]*_num_procs, [args]*_num_procs) + return launch_functions([function]*_num_procs, [args]*_num_procs, **kwargs) diff --git a/tests/test_db_writer.py b/tests/test_db_writer.py index 669038fa..43530e9d 100644 --- a/tests/test_db_writer.py +++ b/tests/test_db_writer.py @@ -110,7 +110,7 @@ def _runner(): return stored_log.get_output() if parallel: - parallel_tasks.launch(_runner, []) + return parallel_tasks.launch(writer.run_calculation_loop, [], {'capture_log': True}) else: return _runner() From d735109921e3f7b5a6ed1d506ad6b1df1b34f01f Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 08:54:17 +0000 Subject: [PATCH 07/13] Allow tangos write to aggregate statistics across nodes Introduces a new, general module to allow this, accumulative_statistics.py Updates TimingMonitor and CalculationSuccessTracker to take advantage of this framework. --- tangos/config.py | 3 + tangos/parallel_tasks/__init__.py | 35 ++++--- .../parallel_tasks/accumulative_statistics.py | 96 +++++++++++++++++++ tangos/relation_finding/multi_hop.py | 2 +- tangos/tools/property_writer.py | 46 ++++++--- tangos/util/timing_monitor.py | 22 +++-- tests/test_db_writer.py | 58 +++++++++++ tests/test_timing.py | 45 +++++++-- 8 files changed, 261 insertions(+), 46 deletions(-) create mode 100644 tangos/parallel_tasks/accumulative_statistics.py diff --git a/tangos/config.py b/tangos/config.py index ad79dc85..c4871ee5 100644 --- a/tangos/config.py +++ b/tangos/config.py @@ -97,6 +97,9 @@ # Property writer: don't bother committing even if a timestep is finished if this time hasn't elapsed: PROPERTY_WRITER_MINIMUM_TIME_BETWEEN_COMMITS = 300 # seconds +# Minimum time between providing updates to the user during tangos write, when running in parallel +PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES = 600 # seconds + try: from .config_local import * except: diff --git a/tangos/parallel_tasks/__init__.py b/tangos/parallel_tasks/__init__.py index 4e2672b6..a81e6432 100644 --- a/tangos/parallel_tasks/__init__.py +++ b/tangos/parallel_tasks/__init__.py @@ -1,21 +1,18 @@ import importlib import re import sys -import time -import traceback import warnings -import tangos.core.creator - from .. import config, core backend = None _backend_name = config.default_backend _num_procs = None # only for multiprocessing backend +_on_exit = [] # list of functions to call when parallelism is shutting down + from .. import log -from ..log import logger -from . import backends, jobs, message +from . import accumulative_statistics, jobs, message def use(name): @@ -51,10 +48,15 @@ def parallelism_is_active(): global _backend_name return _backend_name != 'null' and backend is not None -def launch(function, args=None): +def launch(function, args=None, backend_kwargs=None): if args is None: args = [] + if backend_kwargs is None: + backend_kwargs = {} + + result = None + # we need to close any existing connections because we may fork, which leads to # buggy/unreliable behaviour. This should invalidate the session attached to # any existing objects, which is intended behaviour. If you are using parallel @@ -71,15 +73,17 @@ def launch(function, args=None): try: core.close_db() if _backend_name != 'null': - backend.launch(_exec_function_or_server, [function, connection_info, args]) + result = backend.launch(_exec_function_or_server, [function, connection_info, args], **backend_kwargs) else: - function(*args) + result = function(*args) finally: if connection_info is not None: core.init_db(*connection_info) finally: deinit_backend() + return result + def distributed(file_list, proc=None, of=None): """Distribute a list of tasks between all nodes""" @@ -138,13 +142,18 @@ def _server_thread(): else: obj.process() +def on_exit_parallelism(function): + global _on_exit + _on_exit.append(function) - log.logger.info("Terminating manager process") +def _shutdown_parallelism(): + global backend, _on_exit + log.logger.debug("Clearing up process") + for fn in _on_exit: + fn() + _on_exit = [] -def _shutdown_parallelism(): - global backend - log.logger.info("Terminating worker process") backend.barrier() backend.finalize() backend = None diff --git a/tangos/parallel_tasks/accumulative_statistics.py b/tangos/parallel_tasks/accumulative_statistics.py new file mode 100644 index 00000000..2200e9b0 --- /dev/null +++ b/tangos/parallel_tasks/accumulative_statistics.py @@ -0,0 +1,96 @@ +import logging +import time + +from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES +from ..log import logger +from .message import Message + +_new_accumulator_requested_for_ranks = [] +_new_accumulator = None +_existing_accumulators = [] +class CreateNewAccumulatorMessage(Message): + + def process(self): + from . import backend + global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators + assert issubclass(self.contents, StatisticsAccumulatorBase) + if _new_accumulator is None: + _new_accumulator = self.contents() + _new_accumulator_requested_for_ranks = [self.source] + else: + assert self.source not in _new_accumulator_requested_for_ranks + assert isinstance(_new_accumulator, self.contents) + _new_accumulator_requested_for_ranks.append(self.source) + + from . import backend + + if len(_new_accumulator_requested_for_ranks) == backend.size()-1: + self._confirm_new_accumulator() + + def _confirm_new_accumulator(self): + global _new_accumulator, _new_accumulator_requested_for_ranks, _existing_accumulators + from . import backend, on_exit_parallelism + accumulator_id = len(_existing_accumulators) + _existing_accumulators.append(_new_accumulator) + + locally_bound_accumulator = _new_accumulator + logger.debug("Created new accumulator of type %s with id %d" % (locally_bound_accumulator.__class__.__name__, accumulator_id)) + on_exit_parallelism(lambda: locally_bound_accumulator.report_to_log_if_needed(logger, 0.05)) + + _new_accumulator = None + _new_accumulator_requested_for_ranks = [] + + for destination in range(1, backend.size()): + AccumulatorIdMessage(accumulator_id).send(destination) +class AccumulatorIdMessage(Message): + pass +class AccumulateStatisticsMessage(Message): + def process(self): + global _existing_accumulators + _existing_accumulators[self.contents.id].add(self.contents) + +class StatisticsAccumulatorBase: + REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES + def __init__(self, allow_parallel=False): + """This is a base class for accumulating statistics, possibly in parallel across multiple processes. + + Note that if allow_parallel is True, then all processes must create an instance of this class + (in effect creating the class will act like a barrier). If only some processes create an instance, + they will block, possibly creating a deadlock. This is why allow_parallel defaults to False. + """ + from . import backend, parallelism_is_active + self._last_reported = time.time() + self._parallel = allow_parallel and parallelism_is_active() and backend.rank() != 0 + if self._parallel: + logger.debug(f"Registering {self.__class__}") + CreateNewAccumulatorMessage(self.__class__).send(0) + logger.debug(f"Awaiting accumulator id for {self.__class__}") + self.id = AccumulatorIdMessage.receive(0).contents + logger.debug(f"Received accumulator id={ self.id}") + + def report_to_server(self): + if self._parallel: + AccumulateStatisticsMessage(self).send(0) + self.reset() + + def reset(self): + raise NotImplementedError("This method should be overriden to reset the statistics") + + def add(self, other): + raise NotImplementedError("This method should be overriden to add two accumulations together") + + def report_to_log(self, logger): + raise NotImplementedError("This method should be overriden to log a statistics report") + + def report_to_log_or_server(self, logger): + if self._parallel: + self.report_to_server() + else: + self.report_to_log(logger) + + def report_to_log_if_needed(self, logger, after_time=None): + if after_time is None: + after_time = self.REPORT_AFTER + if time.time() - self._last_reported > after_time: + self.report_to_log(logger) + self._last_reported = time.time() diff --git a/tangos/relation_finding/multi_hop.py b/tangos/relation_finding/multi_hop.py index 8273ef04..ce307029 100644 --- a/tangos/relation_finding/multi_hop.py +++ b/tangos/relation_finding/multi_hop.py @@ -267,7 +267,7 @@ def _make_hops(self): else: filtered_count = 0 - # for performance info: self.timing_monitor.summarise_timing(logger) + # for performance info: self.timing_monitor.report_to_log(logger) if self._hopping_finished(filtered_count): break diff --git a/tangos/tools/property_writer.py b/tangos/tools/property_writer.py index 24133e0d..39c2b91f 100644 --- a/tangos/tools/property_writer.py +++ b/tangos/tools/property_writer.py @@ -13,6 +13,7 @@ from .. import config, core, live_calculation, parallel_tasks, properties from ..cached_writer import insert_list from ..log import logger +from ..parallel_tasks import accumulative_statistics from ..util import proxy_object, terminalcontroller, timing_monitor from ..util.check_deleted import check_deleted from . import GenericTangosTool @@ -170,7 +171,7 @@ def process_options(self, options): if self.options.verbose: self.redirect.enabled = False - self.timing_monitor = timing_monitor.TimingMonitor() + def _compile_inclusion_criterion(self): if self.options.include_only: @@ -184,9 +185,9 @@ def _log_one_process(self, *args): if parallel_tasks.backend is None or parallel_tasks.backend.rank()==1: logger.info(*args) - def _summarise_timing_one_process(self): - if parallel_tasks.backend is None or parallel_tasks.backend.rank() == 1: - self.timing_monitor.summarise_timing(logger) + def _summarise_timing(self): + self.timing_monitor.report_to_log_or_server(logger) + def _build_halo_list(self, db_timestep): query = core.halo.SimulationObjectBase.timestep == db_timestep @@ -276,7 +277,7 @@ def _commit_results_if_needed(self, end_of_timestep=False, end_of_simulation=Fal logger.info(f"...{num_properties} properties were committed") self._pending_properties = [] self._start_time = time.time() - self._summarise_timing_one_process() + self._summarise_timing() def _queue_results_for_later_commit(self, db_halo, names, results, existing_properties_data): for n, r in zip(names, results): @@ -471,8 +472,6 @@ def run_halo_calculation(self, db_halo, existing_properties): def run_timestep_calculation(self, db_timestep): - self.tracker = CalculationSuccessTracker() - logger.info("Processing %r", db_timestep) self._property_calculator_instances = properties.instantiate_classes(db_timestep.simulation, self.options.properties, @@ -507,7 +506,7 @@ def run_timestep_calculation(self, db_timestep): logger.info("Done with %r",db_timestep) self._unload_timestep() - self.tracker.report_to_log(logger) + self.tracker.report_to_log_or_server(logger) self._commit_results_if_needed(end_of_timestep=True) @@ -532,6 +531,11 @@ def _add_prerequisites_to_calculator_instances(self, db_timestep): def run_calculation_loop(self): + # NB both these objects must be created at the same place in all processes, + # since creating them is a 'barrier'-like operation + self.timing_monitor = timing_monitor.TimingMonitor(allow_parallel=True) + self.tracker = CalculationSuccessTracker(allow_parallel=True) + parallel_tasks.database.synchronize_creator_object() self._start_time = time.time() @@ -543,13 +547,10 @@ def run_calculation_loop(self): self._commit_results_if_needed(True,True) -class CalculationSuccessTracker: - def __init__(self): - self._skipped_existing = 0 - self._skipped_missing_prerequisite = 0 - self._skipped_error = 0 - self._skipped_loading_error = 0 - self._succeeded = 0 +class CalculationSuccessTracker(accumulative_statistics.StatisticsAccumulatorBase): + def __init__(self, allow_parallel=False): + super().__init__(allow_parallel=allow_parallel) + self.reset() self._posted_errors = set() @@ -561,6 +562,7 @@ def should_log_error(self, from_module): return False def report_to_log(self, logger): + logger.info("PROPERTY CALCULATION SUMMARY") logger.info(" Succeeded: %d property calculations", self._succeeded) logger.info(" Errored: %d property calculations", self._skipped_error) logger.info(" Errored during load: %d property calculations", self._skipped_loading_error) @@ -581,3 +583,17 @@ def register_missing_prerequisite(self): def register_already_exists(self): self._skipped_existing+=1 + + def reset(self): + self._succeeded = 0 + self._skipped_error = 0 + self._skipped_loading_error = 0 + self._skipped_existing = 0 + self._skipped_missing_prerequisite = 0 + + def add(self, other): + self._succeeded += other._succeeded + self._skipped_error += other._skipped_error + self._skipped_loading_error += other._skipped_loading_error + self._skipped_existing += other._skipped_existing + self._skipped_missing_prerequisite += other._skipped_missing_prerequisite diff --git a/tangos/util/timing_monitor.py b/tangos/util/timing_monitor.py index 14273fe8..86079e7a 100644 --- a/tangos/util/timing_monitor.py +++ b/tangos/util/timing_monitor.py @@ -4,14 +4,16 @@ import numpy as np +from ..parallel_tasks import accumulative_statistics -class TimingMonitor: + +class TimingMonitor(accumulative_statistics.StatisticsAccumulatorBase): """This class keeps track of how long a Property is taking to evaluate, and (if the Property is implemented to take advantage of this), the time spent on sub-tasks. It provides formatting to place this information into the log.""" - def __init__(self): - self.timings_by_class = {} - self.labels_by_class = {} + def __init__(self, allow_parallel=False): + super().__init__(allow_parallel=allow_parallel) + self.reset() self._monitoring = None @contextlib.contextmanager @@ -20,6 +22,10 @@ def __call__(self, object): yield self._end() + def reset(self): + self.timings_by_class = {} + self.labels_by_class = {} + def check_compatible_object(self, object): if not hasattr(object, 'timing_monitor'): raise TypeError("TimingMonitor requires a compatible object to monitor") @@ -88,12 +94,12 @@ def add(self, other): for c in other.labels_by_class.keys(): labels = other.labels_by_class[c] timings = other.timings_by_class[c] - print("ADD", c, labels, timings) self._add_run_to_running_totals(c, np.cumsum(np.concatenate(([0.0],timings))), labels) - print("RESULT", c, self.labels_by_class[c], self.timings_by_class[c]) - def summarise_timing(self, logger): - logger.info("CUMULATIVE RUNNING TIMES (just this node)") + def report_to_log(self, logger): + if len(self.timings_by_class) == 0: + return + logger.info("CUMULATIVE RUNNING TIMES") v_tot = 1e-10 for k, v in self.timings_by_class.items(): v_tot += sum(v) diff --git a/tests/test_db_writer.py b/tests/test_db_writer.py index 43530e9d..4d76b8a0 100644 --- a/tests/test_db_writer.py +++ b/tests/test_db_writer.py @@ -1,4 +1,5 @@ import os +import time from numpy import testing as npt from pytest import fixture @@ -238,3 +239,60 @@ def test_timesteps_matching(fresh_database_2): assert 'dummy_property' in db.get_halo("dummy_sim_2/step.1/2").keys() assert 'dummy_property' in db.get_halo("dummy_sim_2/step.2/1").keys() assert 'dummy_property' not in db.get_halo("dummy_sim_2/step.3/1").keys() + + +class DummyPropertyTakingTime(DummyProperty): + names = "dummy_property_taking_time", + + def calculate(self, data, entry): + time.sleep(0.1) + return 0.0, + + +@fixture +def success_tracker(): + from tangos.tools.property_writer import CalculationSuccessTracker + st = CalculationSuccessTracker() + for i in range(1): + st.register_success() + for i in range(2): + st.register_error() + for i in range(3): + st.register_already_exists() + for i in range(4): + st.register_loading_error() + for i in range(5): + st.register_missing_prerequisite() + yield st + +def test_calc_success_tracker(success_tracker): + with log.LogCapturer() as lc: + success_tracker.report_to_log(log.logger) + output = lc.get_output() + assert "Succeeded: 1 property calculations" in output + assert "Errored: 2 property calculations" in output + assert "Already exists: 3 property" in output + assert "Errored during load: 4 property calculations" in output + assert "Missing pre-requisite: 5 property" in output + +def test_calc_success_tracker_addition(success_tracker): + success_tracker.add(success_tracker) + with log.LogCapturer() as lc: + success_tracker.report_to_log(log.logger) + output = lc.get_output() + assert "Succeeded: 2 property calculations" in output + assert "Errored: 4 property calculations" in output + assert "Already exists: 6 property" in output + assert "Errored during load: 8 property calculations" in output + assert "Missing pre-requisite: 10 property" in output + +def test_writer_reports_aggregates(fresh_database): + parallel_tasks.use('multiprocessing-4') + try: + res = run_writer_with_args("dummy_property_taking_time", parallel=True) + finally: + parallel_tasks.use('null') + + + assert "Succeeded: 15 property calculations" in res + assert "myPropertyTakingTime 1.5s" in res diff --git a/tests/test_timing.py b/tests/test_timing.py index 439b0e41..54aa6380 100644 --- a/tests/test_timing.py +++ b/tests/test_timing.py @@ -23,7 +23,7 @@ def test_timing_graceful_fail(): TM.mark("goodbye") with lc: - TM.summarise_timing(logger) + TM.report_to_log(logger) assert "INTERNAL BREAKDOWN" in lc.get_output() assert "hello" in lc.get_output() @@ -36,7 +36,7 @@ def test_timing_graceful_fail(): lc = LogCapturer() with lc: - TM.summarise_timing(logger) + TM.report_to_log(logger) assert "hello" not in lc.get_output() assert "goodbye" not in lc.get_output() @@ -51,7 +51,7 @@ def test_timing_graceful_fail(): TM.mark("goodbye") with lc: - TM.summarise_timing(logger) + TM.report_to_log(logger) assert "hello" not in lc.get_output() assert "goodbye" not in lc.get_output() @@ -76,8 +76,8 @@ def test_timing_add(): lc = LogCapturer() with lc: - TM.summarise_timing(logger) - TM2.summarise_timing(logger) + TM.report_to_log(logger) + TM2.report_to_log(logger) assert "Dummy 0.3s" in lc.get_output() assert "0.1s" in lc.get_output() @@ -86,8 +86,35 @@ def test_timing_add(): lc = LogCapturer() with lc: - TM.summarise_timing(logger) + TM.report_to_log(logger) - assert "Dummy 0.5s" in lc.get_output() - assert "hello 0.2s" in lc.get_output() - assert "end 0.3s" in lc.get_output() + output = lc.get_output_without_timestamps() + + assert "Dummy 0.5s" in output + assert "hello 0.2s" in output + assert "end 0.3s" in output + +def test_picklable(): + TM = tm.TimingMonitor() + x = Dummy() + + with TM(x): + time.sleep(0.1) + TM.mark("hello") + time.sleep(0.2) + + lc = LogCapturer() + + with lc: + TM.report_to_log(logger) + + correct_results = lc.get_output_without_timestamps() + + import pickle + TM2 = pickle.loads(pickle.dumps(TM)) + + lc = LogCapturer() + with lc: + TM2.report_to_log(logger) + + assert lc.get_output_without_timestamps() == correct_results From 44d16dfe48b119f04634533ff7a433bf09abdc7f Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 08:55:16 +0000 Subject: [PATCH 08/13] Fix whitespace --- tangos/parallel_tasks/accumulative_statistics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tangos/parallel_tasks/accumulative_statistics.py b/tangos/parallel_tasks/accumulative_statistics.py index 2200e9b0..bebdd1c9 100644 --- a/tangos/parallel_tasks/accumulative_statistics.py +++ b/tangos/parallel_tasks/accumulative_statistics.py @@ -1,7 +1,7 @@ import logging import time -from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES +from ..config import PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES from ..log import logger from .message import Message @@ -50,7 +50,7 @@ def process(self): _existing_accumulators[self.contents.id].add(self.contents) class StatisticsAccumulatorBase: - REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES + REPORT_AFTER = PROPERTY_WRITER_PARALLEL_STATISTICS_TIME_BETWEEN_UPDATES def __init__(self, allow_parallel=False): """This is a base class for accumulating statistics, possibly in parallel across multiple processes. From 9c7ef7373039065900b99bf07fd2c5eec716d615 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 09:58:01 +0000 Subject: [PATCH 09/13] Bump version number --- tangos/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tangos/__init__.py b/tangos/__init__.py index e6b862f2..75f43fdd 100644 --- a/tangos/__init__.py +++ b/tangos/__init__.py @@ -20,4 +20,4 @@ from .core import * from .query import * -__version__ = '1.9.0' +__version__ = '1.9.1' From c013d48bcd903ec677b039c5531d4d0e97e70d8c Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 17:10:28 +0000 Subject: [PATCH 10/13] Make sure exceptions are really only reported once across a tangos write session --- tangos/parallel_tasks/__init__.py | 1 + tangos/parallel_tasks/shared_set.py | 62 +++++++++++++++++++++++++++++ tangos/tools/property_writer.py | 22 +++++----- tests/test_db_writer.py | 17 ++++++-- tests/test_parallel_tasks.py | 31 +++++++++++++++ 5 files changed, 119 insertions(+), 14 deletions(-) create mode 100644 tangos/parallel_tasks/shared_set.py diff --git a/tangos/parallel_tasks/__init__.py b/tangos/parallel_tasks/__init__.py index a81e6432..7ec16716 100644 --- a/tangos/parallel_tasks/__init__.py +++ b/tangos/parallel_tasks/__init__.py @@ -163,5 +163,6 @@ def _shutdown_parallelism(): from . import remote_import +from . import shared_set from .barrier import barrier from .lock import ExclusiveLock diff --git a/tangos/parallel_tasks/shared_set.py b/tangos/parallel_tasks/shared_set.py new file mode 100644 index 00000000..b7a4e3f5 --- /dev/null +++ b/tangos/parallel_tasks/shared_set.py @@ -0,0 +1,62 @@ +from .message import Message + +_remote_sets = {} +class RemoteSetOperation(Message): + def process(self): + set_id, operation, value = self.contents + global _remote_sets + if operation=="add-if-not-exists": + result = LocalSet(set_id).add_if_not_exists(value) + self.reply(result) + else: + raise ValueError("Unknown operation %s" % operation) + + def reply(self, result): + RemoteSetResult(result).send(self.source) + +class RemoteSetResult(Message): + pass + +class SharedSet: + def __new__(cls, set_id, allow_parallel=False): + if cls is SharedSet: + from . import parallelism_is_active + parallel = allow_parallel and parallelism_is_active() + if parallel: + cls = RemoteSet + else: + cls = LocalSet + return object.__new__(cls) + + def __init__(self, set_id): + self.set_id = set_id + + def __getnewargs__(self): + return self.set_id, + + def add_if_not_exists(self, value): + """Adds the value to the set, and returns True if it was already present, as an atomic operation""" + raise NotImplementedError("Constructing a SharedSet should automatically return a RemoteSet or LocalSet as appropriate") + +class RemoteSet(SharedSet): + def __init__(self, set_id, allow_parallel=False): + assert allow_parallel + super().__init__(set_id) + self._underlying_set = set() + + def add_if_not_exists(self, value): + """Adds to the set, and returns a boolean indicating whether the value was already present""" + RemoteSetOperation((self.set_id, "add-if-not-exists", value)).send(0) + result = RemoteSetResult.receive(0).contents + return result + +class LocalSet(SharedSet): + def __init__(self, set_id, allow_parallel=False): + super().__init__(set_id) + self._underlying_set = _remote_sets.get(set_id, set()) + _remote_sets[set_id] = self._underlying_set + def add_if_not_exists(self, value): + result = value in self._underlying_set + if not result: + self._underlying_set.add(value) + return result diff --git a/tangos/tools/property_writer.py b/tangos/tools/property_writer.py index 39c2b91f..6ae143bb 100644 --- a/tangos/tools/property_writer.py +++ b/tangos/tools/property_writer.py @@ -401,12 +401,15 @@ def _get_property_value(self, db_halo, property_calculator, existing_properties) with self.redirect: result = property_calculator.calculate(snapshot_data, db_data) self.tracker.register_success() - except Exception: + except Exception as e: self.tracker.register_error() - if self.tracker.should_log_error(property_calculator): - logger.exception("Uncaught exception during property calculation %r applied to %r"%(property_calculator, db_halo)) - logger.info("Further errors from this calculation on this timestep will be counted but not individually reported.") + if self.tracker.should_log_error(e): + logger.info("Uncaught exception %r during property calculation %r applied to %r"%(e, property_calculator, db_halo)) + exc_data = traceback.format_exc() + for line in exc_data.split("\n"): + logger.info(line) + logger.info("If this error arises again, it will be counted but not individually reported.") if self.options.catch: tbtype, value, tb = sys.exc_info() @@ -552,14 +555,11 @@ def __init__(self, allow_parallel=False): super().__init__(allow_parallel=allow_parallel) self.reset() - self._posted_errors = set() + self._posted_errors = parallel_tasks.shared_set.SharedSet('posted_errors',allow_parallel) - def should_log_error(self, from_module): - if from_module not in self._posted_errors: - self._posted_errors.add(from_module) - return True - else: - return False + def should_log_error(self, exception): + tb = "\n".join(traceback.format_exception(exception)) + return not self._posted_errors.add_if_not_exists(tb) def report_to_log(self, logger): logger.info("PROPERTY CALCULATION SUMMARY") diff --git a/tests/test_db_writer.py b/tests/test_db_writer.py index 4d76b8a0..4ca75279 100644 --- a/tests/test_db_writer.py +++ b/tests/test_db_writer.py @@ -4,6 +4,7 @@ from numpy import testing as npt from pytest import fixture +import pytest import tangos as db import tangos.config from tangos import log, parallel_tasks, properties, testing @@ -134,9 +135,19 @@ def _assert_properties_as_expected(): assert db.get_halo("dummy_sim_1/step.1/2")['dummy_property'] == 2.0 assert db.get_halo("dummy_sim_1/step.2/1")['dummy_property'] == 2.0 -def test_error_ignoring(fresh_database): - log = run_writer_with_args("dummy_property", "dummy_property_with_exception") - assert "Uncaught exception during property calculation" in log +@pytest.mark.parametrize('parallel', [True, False]) +def test_exception_reporting(fresh_database, parallel): + if parallel: + parallel_tasks.use('multiprocessing-3') + log = run_writer_with_args("dummy_property", "dummy_property_with_exception", parallel=parallel) + assert "Uncaught exception RuntimeError('Test of exception handling') during property calculation" in log + assert ": result = property_calculator.calculate(snapshot_data, db_data)" in log + # above tests that a bit of the traceback is present, but also that it has been put on a formatted line + + # count occurrences of the traceback, should be only one: + assert log.count("Traceback (most recent call last)")==1 + + assert "Errored: 15 property calculations" in log assert db.get_halo("dummy_sim_1/step.1/1")['dummy_property'] == 1.0 assert db.get_halo("dummy_sim_1/step.1/2")['dummy_property'] == 2.0 diff --git a/tests/test_parallel_tasks.py b/tests/test_parallel_tasks.py index 3a8e23f0..b16dadee 100644 --- a/tests/test_parallel_tasks.py +++ b/tests/test_parallel_tasks.py @@ -216,3 +216,34 @@ def _error_on_client(): with pytest.raises(RuntimeError) as e: pt.launch(_error_on_client) assert "Error on client" in str(e.value) + +def _test_remote_set(): + set = pt.shared_set.SharedSet("test_set", allow_parallel=True) + assert isinstance(set, pt.shared_set.RemoteSet) + if pt.backend.rank()==1: + result = set.add_if_not_exists("foo") + assert not result + pt.barrier() + pt.barrier() + result = set.add_if_not_exists("bar") + assert result + set2 = pt.shared_set.SharedSet("test_set2") + result = set2.add_if_not_exists("foo") + assert not result + elif pt.backend.rank()==2: + pt.barrier() + result = set.add_if_not_exists("foo") + assert result + result = set.add_if_not_exists("bar") + assert not result + pt.barrier() + + +def test_remote_set(): + pt.use("multiprocessing-3") + pt.launch(_test_remote_set) + +def test_local_set(): + set = pt.shared_set.SharedSet("test_local_set") + assert not set.add_if_not_exists("foo") + assert set.add_if_not_exists("foo") \ No newline at end of file From 14aed5e1340b899e8b75aa73bdc699bcde4c0f22 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 17:12:25 +0000 Subject: [PATCH 11/13] Fix tests --- tangos/parallel_tasks/__init__.py | 3 +-- tests/test_db_writer.py | 2 +- tests/test_parallel_tasks.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/tangos/parallel_tasks/__init__.py b/tangos/parallel_tasks/__init__.py index 7ec16716..bf849394 100644 --- a/tangos/parallel_tasks/__init__.py +++ b/tangos/parallel_tasks/__init__.py @@ -162,7 +162,6 @@ def _shutdown_parallelism(): -from . import remote_import -from . import shared_set +from . import remote_import, shared_set from .barrier import barrier from .lock import ExclusiveLock diff --git a/tests/test_db_writer.py b/tests/test_db_writer.py index 4ca75279..6411934f 100644 --- a/tests/test_db_writer.py +++ b/tests/test_db_writer.py @@ -1,10 +1,10 @@ import os import time +import pytest from numpy import testing as npt from pytest import fixture -import pytest import tangos as db import tangos.config from tangos import log, parallel_tasks, properties, testing diff --git a/tests/test_parallel_tasks.py b/tests/test_parallel_tasks.py index b16dadee..02c79a47 100644 --- a/tests/test_parallel_tasks.py +++ b/tests/test_parallel_tasks.py @@ -246,4 +246,4 @@ def test_remote_set(): def test_local_set(): set = pt.shared_set.SharedSet("test_local_set") assert not set.add_if_not_exists("foo") - assert set.add_if_not_exists("foo") \ No newline at end of file + assert set.add_if_not_exists("foo") From 45016cf711a27c61ee676ba60b7a2cae5b09d943 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 20:03:39 +0000 Subject: [PATCH 12/13] Fix for py3.9 --- tangos/tools/property_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tangos/tools/property_writer.py b/tangos/tools/property_writer.py index 6ae143bb..31b079c6 100644 --- a/tangos/tools/property_writer.py +++ b/tangos/tools/property_writer.py @@ -558,7 +558,7 @@ def __init__(self, allow_parallel=False): self._posted_errors = parallel_tasks.shared_set.SharedSet('posted_errors',allow_parallel) def should_log_error(self, exception): - tb = "\n".join(traceback.format_exception(exception)) + tb = "\n".join(traceback.format_exception(type(exception), exception, exception.__traceback__)) return not self._posted_errors.add_if_not_exists(tb) def report_to_log(self, logger): From dd6c55ef617e72a78150c5477d3de681cf975b28 Mon Sep 17 00:00:00 2001 From: Andrew Pontzen Date: Wed, 13 Dec 2023 20:31:35 +0000 Subject: [PATCH 13/13] Remove some stray print statements --- tangos/input_handlers/output_testing.py | 1 - tests/test_big_mergertree.py | 2 +- tests/test_hop_strategy.py | 2 +- tests/test_import.py | 4 ++-- tests/test_parallel_tasks.py | 5 +---- tests/test_property_gathering.py | 2 -- tests/test_stat_files.py | 1 - 7 files changed, 5 insertions(+), 12 deletions(-) diff --git a/tangos/input_handlers/output_testing.py b/tangos/input_handlers/output_testing.py index 8133d048..a13b52d8 100644 --- a/tangos/input_handlers/output_testing.py +++ b/tangos/input_handlers/output_testing.py @@ -31,7 +31,6 @@ def enumerate_timestep_extensions(self, parallel=False): pre_extension_length = len(os.path.join(config.base, self.basename)) steps = glob.glob(os.path.join(config.base, self.basename, "step.*")) for i in steps: - print(i, i[pre_extension_length:], self.strip_slashes(i[pre_extension_length:])) yield self.strip_slashes(i[pre_extension_length:]) def get_timestep_properties(self, ts_extension): diff --git a/tests/test_big_mergertree.py b/tests/test_big_mergertree.py index 8628bcaf..fddab022 100644 --- a/tests/test_big_mergertree.py +++ b/tests/test_big_mergertree.py @@ -45,7 +45,7 @@ def setup_module(): def test_major_progenitors(): results = halo_finding.MultiHopMajorProgenitorsStrategy(tangos.get_item("sim/ts3/1"), include_startpoint=True).all() - print(results) + testing.assert_halolists_equal(results, ["sim/ts3/1","sim/ts2/1","sim/ts1/1"]) def test_merger_tree(): diff --git a/tests/test_hop_strategy.py b/tests/test_hop_strategy.py index 6381554d..5418222e 100644 --- a/tests/test_hop_strategy.py +++ b/tests/test_hop_strategy.py @@ -389,7 +389,7 @@ def test_merging(): testing.assert_halolists_equal(dest, ['sim/ts3/1', 'sim/ts3/1', 'sim/ts3/2', 'sim/ts3/3', 'sim/ts3/4', 'sim/ts3/4']) source, dest = tangos.get_timestep("sim/ts1").calculate_all("halo_number()", "latest()", object_typetag='halo') - print(source,dest) + assert np.all(source==[1,2,3,4,5,6,7]) testing.assert_halolists_equal(dest, ['sim/ts3/1', 'sim/ts3/1', 'sim/ts3/2', 'sim/ts3/3', 'sim/ts1/5', 'sim/ts3/4', 'sim/ts3/4']) diff --git a/tests/test_import.py b/tests/test_import.py index 477c97d0..b45efaad 100644 --- a/tests/test_import.py +++ b/tests/test_import.py @@ -103,8 +103,8 @@ def test_import_filtered(source_engine_and_session, destination_engine_and_sessi assert tangos.get_default_session() is destination_session - print("exist:",existing_session.query(tangos.core.Simulation).all()) - print("dest:",destination_session.query(tangos.core.Simulation).all()) + + importer = _get_importer_instance(existing_engine, "--exclude-properties", "Mvir") importer.run_calculation_loop() diff --git a/tests/test_parallel_tasks.py b/tests/test_parallel_tasks.py index 02c79a47..8afe984c 100644 --- a/tests/test_parallel_tasks.py +++ b/tests/test_parallel_tasks.py @@ -166,8 +166,6 @@ def test_shared_locks(): pt.use("multiprocessing-4") pt.launch(_test_shared_locks) log = pt_testing.get_log() - print("log:") - print("".join(log)) for i in range(2): assert log[i].strip() in ("[2] shared lock acquired", "[3] shared lock acquired") assert log[2].strip() == "[1] exclusive lock acquired" @@ -177,12 +175,11 @@ def test_shared_locks_in_queue(): pt.use("multiprocessing-6") pt.launch(_test_shared_locks_in_queue) log = pt_testing.get_log() - print("log:") + # we want to verify that shared locks were held simultaneously, but exclusive locks never were lock_held = 0 for line in log: - print(line.strip()) if "exclusive lock acquired" in line: assert lock_held==0 lock_held = 'exclusive' diff --git a/tests/test_property_gathering.py b/tests/test_property_gathering.py index 2b6f2f48..07b77ec7 100644 --- a/tests/test_property_gathering.py +++ b/tests/test_property_gathering.py @@ -93,8 +93,6 @@ def live_calculate(self, halo, criterion="hole_mass"): type(self).num_calls+=1 bh_links = halo["BH"] if isinstance(bh_links,list): - for lk in bh_links: - print(list(lk.keys())) vals = [lk[criterion] if criterion in lk else self.default_val for lk in bh_links] return bh_links[np.argmax(vals)] else: diff --git a/tests/test_stat_files.py b/tests/test_stat_files.py index c392a649..ed23a43f 100644 --- a/tests/test_stat_files.py +++ b/tests/test_stat_files.py @@ -95,7 +95,6 @@ def test_insert_properties(): adder.add_objects_to_timestep(ts1) importer = property_importer.PropertyImporter() importer.parse_command_line("Mvir Rvir hostHalo childHalo --for test_stat_files".split()) - print(importer.options) importer.run_calculation_loop() npt.assert_almost_equal(ts1.halos[0]["Rvir"], 195.87) npt.assert_almost_equal(ts1.halos[0]["Mvir"], 5.02432e+11)