Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Task computer "stuck" on subtask #4123

Closed
wants to merge 8 commits into from
19 changes: 13 additions & 6 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def resource_collected(self, res_id):

def resource_failure(self, res_id, reason):
subtask = self.assigned_subtask
self.assigned_subtask = None
if not subtask or subtask['task_id'] != res_id:
logger.error("Resource failure for a wrong task, %s", res_id)
return
Expand All @@ -138,20 +139,24 @@ def task_computed(self, task_thread: TaskThread) -> None:
task_thread.end_time = time.time()

work_wall_clock_time = task_thread.end_time - task_thread.start_time
fallback_error = "Wrong result format"
has_header = False
try:
subtask = self.assigned_subtask
assert subtask is not None
self.assigned_subtask = None
subtask_id = subtask['subtask_id']
task_id = subtask['task_id']
task_header = self.task_server.task_keeper.task_headers[task_id]
# get paid for max working time,
# thus task withholding won't make profit
task_header = \
self.task_server.task_keeper.task_headers[subtask['task_id']]
work_time_to_be_paid = task_header.subtask_timeout
has_header = True

except KeyError:
logger.error("No subtask with id %r", subtask_id)
return
fallback_error = "Task header not found in task keeper"
logger.error("%s. task_id=%r, subtask_id=%r",
fallback_error, task_id, subtask_id)

was_success = False

Expand All @@ -167,7 +172,7 @@ def task_computed(self, task_thread: TaskThread) -> None:
task_thread.error_msg,
)

elif task_thread.result and 'data' in task_thread.result:
elif has_header and task_thread.result and 'data' in task_thread.result:

logger.info("Task %r computed, work_wall_clock_time %s",
subtask_id,
Expand All @@ -190,7 +195,7 @@ def task_computed(self, task_thread: TaskThread) -> None:
self.task_server.send_task_failed(
subtask_id,
subtask['task_id'],
"Wrong result format",
fallback_error,
)

dispatcher.send(signal='golem.monitor', event='computation_time_spent',
Expand Down Expand Up @@ -382,6 +387,7 @@ def __compute_task(self, subtask_id, docker_images,
with self.lock:
self.counting_thread = tt

self.task_server.task_keeper.task_started(task_id)
tt.start().addBoth(lambda _: self.task_computed(tt))

def __task_finished(self, ctd: 'ComputeTaskDef') -> None:
Expand All @@ -396,6 +402,7 @@ def __task_finished(self, ctd: 'ComputeTaskDef') -> None:

with self.lock:
self.counting_thread = None
self.task_server.task_keeper.task_ended(ctd['task_id'])
if self.finished_cb:
self.finished_cb()

Expand Down
30 changes: 26 additions & 4 deletions golem/task/taskkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ def __init__(
self.task_headers: typing.Dict[str, dt_tasks.TaskHeader] = {}
# ids of tasks that this node may try to compute
self.supported_tasks: typing.List[str] = []
# ids of tasks that are computing on this node
self.running_tasks: typing.Set[str] = set()
# results of tasks' support checks
self.support_status: typing.Dict[str, SupportStatus] = {}
# tasks that were removed from network recently, so they won't
Expand Down Expand Up @@ -524,18 +526,22 @@ def find_newest_node(self, node_id) -> typing.Optional[dt_p2p.Node]:
def check_max_tasks_per_owner(self, owner_key_id):
owner_task_set = self._get_tasks_by_owner_set(owner_key_id)

if len(owner_task_set) <= self.max_tasks_per_requestor:
not_running = owner_task_set - self.running_tasks

if len(not_running) <= self.max_tasks_per_requestor:
return

by_age = sorted(owner_task_set,
by_age = sorted(not_running,
key=lambda tid: self.last_checking[tid])

# leave alone the first (oldest) max_tasks_per_requestor
# headers, remove the rest
to_remove = by_age[self.max_tasks_per_requestor:]

logger.warning("Too many tasks from %s, dropping %d tasks",
owner_key_id, len(to_remove))
logger.warning(
"Too many tasks, dropping %d tasks. owner=%s, ids_to_remove=%r",
len(to_remove), common.short_node_id(owner_key_id), to_remove
)

for tid in to_remove:
self.remove_task_header(tid)
Expand All @@ -547,6 +553,11 @@ def remove_task_header(self, task_id) -> bool:
if task_id in self.removed_tasks:
return False

if task_id in self.running_tasks:
logger.warning("Can not remove task header, task is running. "
"task_id=%s", task_id)
return False

try:
owner_key_id = self.task_headers[task_id].task_owner.key
self.tasks_by_owner[owner_key_id].discard(task_id)
Expand Down Expand Up @@ -649,3 +660,14 @@ def get_unsupport_reasons(self):
avg = None
ret.append({'reason': reason.value, 'ntasks': count, 'avg': avg})
return ret

def task_started(self, task_id):
self.running_tasks.add(task_id)

def task_ended(self, task_id):
try:
self.running_tasks.remove(task_id)
except ValueError:
logger.warning("Can not remove running task, already removed. "
"Maybe the callback is called twice. task_id=%r",
task_id)
70 changes: 53 additions & 17 deletions tests/golem/task/test_taskkeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,18 @@ def test_task_limit(frozen_time, self): # pylint: disable=no-self-argument
self.assertIn(ids[i], tk.task_headers)
self.assertIn(tb_id, tk.task_headers)

def test_check_max_tasks_per_owner(self):
def _assert_headers(self, tk, ids, count):
for id in ids:
self.assertIn(id, tk.task_headers)
self.assertEqual(count, len(tk.task_headers))

def _prep_max_tasks_test(self, new_limit):
tk = TaskHeaderKeeper(
environments_manager=EnvironmentsManager(),
node=dt_p2p_factory.Node(),
min_price=10,
max_tasks_per_requestor=10)
limit = tk.max_tasks_per_requestor
new_limit = 3

ids = []
for _ in range(new_limit):
Expand All @@ -295,9 +299,7 @@ def test_check_max_tasks_per_owner(self):
tb0_id = thd.task_id
tk.add_task_header(thd)

for id_ in ids:
self.assertIn(id_, tk.task_headers)
self.assertIn(tb0_id, tk.task_headers)
self._assert_headers(tk, [tb0_id] + ids, len(ids) + 1)

while time.time() == last_add_time:
time.sleep(0.1)
Expand All @@ -308,28 +310,62 @@ def test_check_max_tasks_per_owner(self):
new_ids.append(thd.task_id)
tk.add_task_header(thd)

for id_ in ids + new_ids:
self.assertIn(id_, tk.task_headers)
self.assertIn(tb0_id, tk.task_headers)
self.assertEqual(limit + 1, len(tk.task_headers))
self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1)

return tk, thd, tb0_id, ids, new_ids

def test_check_max_tasks_per_owner(self):
new_limit = 3
tk, thd, tb0_id, ids, new_ids = self._prep_max_tasks_test(new_limit)
limit = tk.max_tasks_per_requestor

# shouldn't remove any tasks
tk.check_max_tasks_per_owner(thd.task_owner.key)

for id_ in ids + new_ids:
self.assertIn(id_, tk.task_headers)
self.assertIn(tb0_id, tk.task_headers)
self.assertEqual(limit + 1, len(tk.task_headers))
self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1)

tk.max_tasks_per_requestor = new_limit

# should remove ta{3..9}
tk.check_max_tasks_per_owner(thd.task_owner.key)

for id_ in ids:
self.assertIn(id_, tk.task_headers)
self.assertIn(tb0_id, tk.task_headers)
self.assertEqual(new_limit + 1, len(tk.task_headers))
self._assert_headers(tk, [tb0_id] + ids, new_limit + 1)


def test_check_max_tasks_per_owner_running(self):
new_limit = 3
tk, thd, tb0_id, ids, new_ids = self._prep_max_tasks_test(new_limit)
limit = tk.max_tasks_per_requestor

# Test if it skips a running task
running_task_id = ids[0]
tk.task_started(running_task_id)
assert running_task_id in tk.running_tasks
tk.max_tasks_per_requestor = tk.max_tasks_per_requestor - 1
# shouldn't remove any tasks
tk.check_max_tasks_per_owner(thd.task_owner.key)

self._assert_headers(tk, [tb0_id] + ids + new_ids, limit + 1)

# finish the task, restore state
tk.task_ended(running_task_id)
assert running_task_id not in tk.running_tasks

tk.max_tasks_per_requestor = new_limit

# Test if it skips a running task
running_task_id = new_ids[0]
tk.task_started(running_task_id)
assert running_task_id in tk.running_tasks
# shouldn't remove running_task_id
tk.check_max_tasks_per_owner(thd.task_owner.key)

# Should keep 0 and 2, since 2 is running
self._assert_headers(tk, [tb0_id] + ids + [new_ids[0]], 5)

# finish the task, restore state
tk.task_ended(running_task_id)
assert running_task_id not in tk.running_tasks

def test_get_unsupport_reasons(self):
tk = TaskHeaderKeeper(
Expand Down