From 08afa8ee32b413dc46d4d0b7067e6bf479b88034 Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Wed, 17 Apr 2019 10:09:36 +0200 Subject: [PATCH 1/8] Better errors when task_keeper forgot task. No more cleanup of running tasks --- golem/task/taskcomputer.py | 14 +++++++++----- golem/task/taskkeeper.py | 17 ++++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index e9b688148d..063350cb8c 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -138,20 +138,22 @@ def task_computed(self, task_thread: TaskThread) -> None: task_thread.end_time = time.time() work_wall_clock_time = task_thread.end_time - task_thread.start_time + fallback_error = "Wrong result format" try: subtask = self.assigned_subtask assert subtask is not None self.assigned_subtask = None subtask_id = subtask['subtask_id'] + task_id = subtask['task_id'] + task_header = self.task_server.task_keeper.task_headers[task_id] # get paid for max working time, # thus task withholding won't make profit - task_header = \ - self.task_server.task_keeper.task_headers[subtask['task_id']] work_time_to_be_paid = task_header.subtask_timeout except KeyError: - logger.error("No subtask with id %r", subtask_id) - return + fallback_error = "Task header not found in task keeper" + logger.error("%s. task_id=%r, subtask_id=%r", + fallback_error, task_id, subtask_id) was_success = False @@ -190,7 +192,7 @@ def task_computed(self, task_thread: TaskThread) -> None: self.task_server.send_task_failed( subtask_id, subtask['task_id'], - "Wrong result format", + fallback_error, ) dispatcher.send(signal='golem.monitor', event='computation_time_spent', @@ -382,6 +384,7 @@ def __compute_task(self, subtask_id, docker_images, with self.lock: self.counting_thread = tt + self.task_server.task_keeper.task_started(task_id) tt.start().addBoth(lambda _: self.task_computed(tt)) def __task_finished(self, ctd: 'ComputeTaskDef') -> None: @@ -396,6 +399,7 @@ def __task_finished(self, ctd: 'ComputeTaskDef') -> None: with self.lock: self.counting_thread = None + self.task_server.task_keeper.task_ended(ctd['task_id']) if self.finished_cb: self.finished_cb() diff --git a/golem/task/taskkeeper.py b/golem/task/taskkeeper.py index 714075c01e..4a680508c4 100644 --- a/golem/task/taskkeeper.py +++ b/golem/task/taskkeeper.py @@ -334,6 +334,8 @@ def __init__( self.task_headers: typing.Dict[str, dt_tasks.TaskHeader] = {} # ids of tasks that this node may try to compute self.supported_tasks: typing.List[str] = [] + # ids of tasks that are computing on this node + self.running_tasks = [] # results of tasks' support checks self.support_status: typing.Dict[str, SupportStatus] = {} # tasks that were removed from network recently, so they won't @@ -527,7 +529,9 @@ def check_max_tasks_per_owner(self, owner_key_id): if len(owner_task_set) <= self.max_tasks_per_requestor: return - by_age = sorted(owner_task_set, + not_running = [x for x in owner_task_set if x not in self.running_tasks] + + by_age = sorted(not_running, key=lambda tid: self.last_checking[tid]) # leave alone the first (oldest) max_tasks_per_requestor @@ -649,3 +653,14 @@ def get_unsupport_reasons(self): avg = None ret.append({'reason': reason.value, 'ntasks': count, 'avg': avg}) return ret + + def task_started(self, task_id): + self.running_tasks.append(task_id) + + def task_ended(self, task_id): + try: + self.running_tasks.remove(task_id) + except ValueError: + logger.warning("Can not remove running task, already removed. " + "Maybe the callback is called twice. task_id=%r", + task_id) From a0143db1023a6b4cacc4c69e80fb065a7efc16fd Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Thu, 18 Apr 2019 16:18:28 +0200 Subject: [PATCH 2/8] Better type to new List --- golem/task/taskkeeper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/golem/task/taskkeeper.py b/golem/task/taskkeeper.py index 4a680508c4..8bb5c14701 100644 --- a/golem/task/taskkeeper.py +++ b/golem/task/taskkeeper.py @@ -335,7 +335,7 @@ def __init__( # ids of tasks that this node may try to compute self.supported_tasks: typing.List[str] = [] # ids of tasks that are computing on this node - self.running_tasks = [] + self.running_tasks: typing.List[str] = [] # results of tasks' support checks self.support_status: typing.Dict[str, SupportStatus] = {} # tasks that were removed from network recently, so they won't From 1dcc1c3412056f99347e6a421bb19959aefb2a20 Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Thu, 18 Apr 2019 17:16:28 +0200 Subject: [PATCH 3/8] - Replace list with set - Remove running tasks before counting --- golem/task/taskkeeper.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/golem/task/taskkeeper.py b/golem/task/taskkeeper.py index 8bb5c14701..1dbb632930 100644 --- a/golem/task/taskkeeper.py +++ b/golem/task/taskkeeper.py @@ -335,7 +335,7 @@ def __init__( # ids of tasks that this node may try to compute self.supported_tasks: typing.List[str] = [] # ids of tasks that are computing on this node - self.running_tasks: typing.List[str] = [] + self.running_tasks: typing.Set[str] = set() # results of tasks' support checks self.support_status: typing.Dict[str, SupportStatus] = {} # tasks that were removed from network recently, so they won't @@ -526,10 +526,10 @@ def find_newest_node(self, node_id) -> typing.Optional[dt_p2p.Node]: def check_max_tasks_per_owner(self, owner_key_id): owner_task_set = self._get_tasks_by_owner_set(owner_key_id) - if len(owner_task_set) <= self.max_tasks_per_requestor: - return + not_running = owner_task_set - self.running_tasks - not_running = [x for x in owner_task_set if x not in self.running_tasks] + if len(not_running) <= self.max_tasks_per_requestor: + return by_age = sorted(not_running, key=lambda tid: self.last_checking[tid]) @@ -655,7 +655,7 @@ def get_unsupport_reasons(self): return ret def task_started(self, task_id): - self.running_tasks.append(task_id) + self.running_tasks.add(task_id) def task_ended(self, task_id): try: From ddecb27d3e12caa585ea949ec24366eb5d311b89 Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Fri, 19 Apr 2019 12:31:53 +0200 Subject: [PATCH 4/8] Extended unit tests with 2 running cases --- tests/golem/task/test_taskkeeper.py | 56 +++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/tests/golem/task/test_taskkeeper.py b/tests/golem/task/test_taskkeeper.py index 11e71ece2c..6f5f6ee451 100644 --- a/tests/golem/task/test_taskkeeper.py +++ b/tests/golem/task/test_taskkeeper.py @@ -276,6 +276,7 @@ def test_task_limit(frozen_time, self): # pylint: disable=no-self-argument self.assertIn(tb_id, tk.task_headers) def test_check_max_tasks_per_owner(self): + tk = TaskHeaderKeeper( environments_manager=EnvironmentsManager(), node=dt_p2p_factory.Node(), @@ -295,9 +296,13 @@ def test_check_max_tasks_per_owner(self): tb0_id = thd.task_id tk.add_task_header(thd) - for id_ in ids: - self.assertIn(id_, tk.task_headers) - self.assertIn(tb0_id, tk.task_headers) + def _assert_headers(ids_, len_): + ids_.append(tb0_id) + for id_ in ids_: + self.assertIn(id_, tk.task_headers) + self.assertEqual(len_, len(tk.task_headers)) + + _assert_headers(ids, len(ids) + 1) while time.time() == last_add_time: time.sleep(0.1) @@ -308,28 +313,49 @@ def test_check_max_tasks_per_owner(self): new_ids.append(thd.task_id) tk.add_task_header(thd) - for id_ in ids + new_ids: - self.assertIn(id_, tk.task_headers) - self.assertIn(tb0_id, tk.task_headers) - self.assertEqual(limit + 1, len(tk.task_headers)) + + _assert_headers(ids + new_ids, limit + 1) # shouldn't remove any tasks tk.check_max_tasks_per_owner(thd.task_owner.key) - for id_ in ids + new_ids: - self.assertIn(id_, tk.task_headers) - self.assertIn(tb0_id, tk.task_headers) - self.assertEqual(limit + 1, len(tk.task_headers)) + _assert_headers(ids + new_ids, limit + 1) + + # Test if it skips a running task + running_task_id = ids[0] + tk.task_started(running_task_id) + assert running_task_id in tk.running_tasks + tk.max_tasks_per_requestor = tk.max_tasks_per_requestor - 1 + # shouldn't remove any tasks + tk.check_max_tasks_per_owner(thd.task_owner.key) + + _assert_headers(ids + new_ids, limit + 1) + + # finish the task, restore state + tk.task_ended(running_task_id) + assert running_task_id not in tk.running_tasks tk.max_tasks_per_requestor = new_limit # should remove ta{3..9} tk.check_max_tasks_per_owner(thd.task_owner.key) - for id_ in ids: - self.assertIn(id_, tk.task_headers) - self.assertIn(tb0_id, tk.task_headers) - self.assertEqual(new_limit + 1, len(tk.task_headers)) + _assert_headers(ids, new_limit + 1) + + # Test if it skips a running task + running_task_id = ids[2] + tk.task_started(running_task_id) + assert running_task_id in tk.running_tasks + tk.max_tasks_per_requestor = 1 + # shouldn't remove running_task_id + tk.check_max_tasks_per_owner(thd.task_owner.key) + + # Should keep 0 and 2, since 2 is running + _assert_headers([ids[0], ids[2]], 3) + + # finish the task, restore state + tk.task_ended(running_task_id) + assert running_task_id not in tk.running_tasks def test_get_unsupport_reasons(self): tk = TaskHeaderKeeper( From dc29569f1c222cb82fc81f468a31a974bbbd35b9 Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Wed, 24 Apr 2019 11:24:21 +0200 Subject: [PATCH 5/8] Never enter success with no task_header available --- golem/task/taskcomputer.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index 063350cb8c..af41b99ec0 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -139,6 +139,7 @@ def task_computed(self, task_thread: TaskThread) -> None: work_wall_clock_time = task_thread.end_time - task_thread.start_time fallback_error = "Wrong result format" + has_header = False try: subtask = self.assigned_subtask assert subtask is not None @@ -149,6 +150,7 @@ def task_computed(self, task_thread: TaskThread) -> None: # get paid for max working time, # thus task withholding won't make profit work_time_to_be_paid = task_header.subtask_timeout + has_header = True except KeyError: fallback_error = "Task header not found in task keeper" @@ -169,7 +171,7 @@ def task_computed(self, task_thread: TaskThread) -> None: task_thread.error_msg, ) - elif task_thread.result and 'data' in task_thread.result: + elif has_header and task_thread.result and 'data' in task_thread.result: logger.info("Task %r computed, work_wall_clock_time %s", subtask_id, From fbf206eaafe94b44bc58b40a79f86f4a06271c3f Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Wed, 24 Apr 2019 15:33:55 +0200 Subject: [PATCH 6/8] Stop deleting header when task is running ( with warning log ). Improved log when removing tasks from one owner. --- golem/task/taskkeeper.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/golem/task/taskkeeper.py b/golem/task/taskkeeper.py index 1dbb632930..0fb9394939 100644 --- a/golem/task/taskkeeper.py +++ b/golem/task/taskkeeper.py @@ -538,8 +538,10 @@ def check_max_tasks_per_owner(self, owner_key_id): # headers, remove the rest to_remove = by_age[self.max_tasks_per_requestor:] - logger.warning("Too many tasks from %s, dropping %d tasks", - owner_key_id, len(to_remove)) + logger.warning( + "Too many tasks, dropping %d tasks. owner=%s, ids_to_remove=%r", + len(to_remove), common.short_node_id(owner_key_id), to_remove + ) for tid in to_remove: self.remove_task_header(tid) @@ -551,6 +553,11 @@ def remove_task_header(self, task_id) -> bool: if task_id in self.removed_tasks: return False + if task_id in self.running_tasks: + logger.warning("Can not remove task header, task is running. " + "task_id=%s", task_id) + return False + try: owner_key_id = self.task_headers[task_id].task_owner.key self.tasks_by_owner[owner_key_id].discard(task_id) From fbe82fef6d0705cd9b9e96a541921ec3cc3684ea Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Mon, 29 Apr 2019 22:35:50 +0200 Subject: [PATCH 7/8] Unset assigned_subtask on resource failure --- golem/task/taskcomputer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index af41b99ec0..2ca5c05833 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -123,6 +123,7 @@ def resource_collected(self, res_id): def resource_failure(self, res_id, reason): subtask = self.assigned_subtask + self.assigned_subtask = None if not subtask or subtask['task_id'] != res_id: logger.error("Resource failure for a wrong task, %s", res_id) return From fb46d3f7942c540d0cbd487632adb48effde889c Mon Sep 17 00:00:00 2001 From: maaktweluit <10008353+maaktweluit@users.noreply.github.com> Date: Tue, 30 Apr 2019 13:02:15 +0200 Subject: [PATCH 8/8] Split max_tasks_per_owner test. --- tests/golem/task/test_taskkeeper.py | 50 +++++++++++++++++------------ 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/tests/golem/task/test_taskkeeper.py b/tests/golem/task/test_taskkeeper.py index 6f5f6ee451..8de8145ba4 100644 --- a/tests/golem/task/test_taskkeeper.py +++ b/tests/golem/task/test_taskkeeper.py @@ -275,15 +275,18 @@ def test_task_limit(frozen_time, self): # pylint: disable=no-self-argument self.assertIn(ids[i], tk.task_headers) self.assertIn(tb_id, tk.task_headers) - def test_check_max_tasks_per_owner(self): + def _assert_headers(self, tk, ids, count): + for id in ids: + self.assertIn(id, tk.task_headers) + self.assertEqual(count, len(tk.task_headers)) + def _prep_max_tasks_test(self, new_limit): tk = TaskHeaderKeeper( environments_manager=EnvironmentsManager(), node=dt_p2p_factory.Node(), min_price=10, max_tasks_per_requestor=10) limit = tk.max_tasks_per_requestor - new_limit = 3 ids = [] for _ in range(new_limit): @@ -296,13 +299,7 @@ def test_check_max_tasks_per_owner(self): tb0_id = thd.task_id tk.add_task_header(thd) - def _assert_headers(ids_, len_): - ids_.append(tb0_id) - for id_ in ids_: - self.assertIn(id_, tk.task_headers) - self.assertEqual(len_, len(tk.task_headers)) - - _assert_headers(ids, len(ids) + 1) + self._assert_headers(tk, [tb0_id] + ids, len(ids) + 1) while time.time() == last_add_time: time.sleep(0.1) @@ -313,13 +310,32 @@ def _assert_headers(ids_, len_): new_ids.append(thd.task_id) tk.add_task_header(thd) + self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1) - _assert_headers(ids + new_ids, limit + 1) + return tk, thd, tb0_id, ids, new_ids + + def test_check_max_tasks_per_owner(self): + new_limit = 3 + tk, thd, tb0_id, ids, new_ids = self._prep_max_tasks_test(new_limit) + limit = tk.max_tasks_per_requestor # shouldn't remove any tasks tk.check_max_tasks_per_owner(thd.task_owner.key) - _assert_headers(ids + new_ids, limit + 1) + self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1) + + tk.max_tasks_per_requestor = new_limit + + # should remove ta{3..9} + tk.check_max_tasks_per_owner(thd.task_owner.key) + + self._assert_headers(tk, [tb0_id] + ids, new_limit + 1) + + + def test_check_max_tasks_per_owner_running(self): + new_limit = 3 + tk, thd, tb0_id, ids, new_ids = self._prep_max_tasks_test(new_limit) + limit = tk.max_tasks_per_requestor # Test if it skips a running task running_task_id = ids[0] @@ -329,7 +345,7 @@ def _assert_headers(ids_, len_): # shouldn't remove any tasks tk.check_max_tasks_per_owner(thd.task_owner.key) - _assert_headers(ids + new_ids, limit + 1) + self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1) # finish the task, restore state tk.task_ended(running_task_id) @@ -337,21 +353,15 @@ def _assert_headers(ids_, len_): tk.max_tasks_per_requestor = new_limit - # should remove ta{3..9} - tk.check_max_tasks_per_owner(thd.task_owner.key) - - _assert_headers(ids, new_limit + 1) - # Test if it skips a running task - running_task_id = ids[2] + running_task_id = new_ids[0] tk.task_started(running_task_id) assert running_task_id in tk.running_tasks - tk.max_tasks_per_requestor = 1 # shouldn't remove running_task_id tk.check_max_tasks_per_owner(thd.task_owner.key) # Should keep 0 and 2, since 2 is running - _assert_headers([ids[0], ids[2]], 3) + self._assert_headers(tk, [tb0_id] + ids + [new_ids[0]], 5) # finish the task, restore state tk.task_ended(running_task_id)