From 65df13229b98e540f39248fe6aed6a5ed5d4f530 Mon Sep 17 00:00:00 2001 From: Dariusz Rybi Date: Wed, 22 Jan 2020 11:55:57 +0100 Subject: [PATCH] Handle task failure during creation --- golem/task/rpc.py | 18 +++++++++++++++--- golem/task/taskmanager.py | 33 +++++++++++++++++++++++++++++++-- tests/golem/task/test_rpc.py | 17 +++++++++++++++++ 3 files changed, 63 insertions(+), 5 deletions(-) diff --git a/golem/task/rpc.py b/golem/task/rpc.py index 27856ed633..2d70e8d8da 100644 --- a/golem/task/rpc.py +++ b/golem/task/rpc.py @@ -433,9 +433,20 @@ def _create_task_error(e, _self, task_dict, *args, **_kwargs) \ return None, str(e) -def _restart_task_error(e, _self, task_id, *args, **_kwargs) \ - -> typing.Tuple[None, str]: +def _restart_task_error( + e: Exception, + self: 'ClientProvider', + task_id: str, + *_args, **kwargs +) -> typing.Tuple[None, str]: logger.error("Cannot restart task %r: %s", task_id, e) + try: + new_task = kwargs['new_task'] + self.task_manager.put_task_in_failed_state(new_task.task_id) + except KeyError: + logger.debug('No new task given') + except Exception: # pylint: disable=broad-except + logger.exception("Can't put task in failed state. task_id=%r", task_id) if hasattr(e, 'to_dict'): return None, rpc_utils.int_to_string(e.to_dict()) @@ -776,8 +787,9 @@ def restart_legacy_task( deferred.addErrback( lambda failure: _restart_task_error( e=failure.value, - _self=self, + self=self, task_id=task_id, + new_task=new_task, ) ) self.task_manager.put_task_in_restarted_state(task_id) diff --git a/golem/task/taskmanager.py b/golem/task/taskmanager.py index 877ce2bd56..57003cb5b8 100644 --- a/golem/task/taskmanager.py +++ b/golem/task/taskmanager.py @@ -15,7 +15,6 @@ Iterable, List, Optional, - Tuple, Type, TYPE_CHECKING, ) @@ -28,7 +27,6 @@ from apps.appsmanager import AppsManager from apps.core.task.coretask import CoreTask -from apps.core.task.coretaskstate import TaskDefinition from apps.wasm.environment import WasmTaskEnvironment from golem import model @@ -59,6 +57,7 @@ if TYPE_CHECKING: # pylint:disable=unused-import, ungrouped-imports + from typing import Tuple from apps.appsmanager import App from apps.core.task.coretaskstate import TaskDefinition from golem.task.taskbase import TaskTypeInfo, TaskBuilder @@ -979,6 +978,36 @@ def put_task_in_restarted_state(self, task_id, clear_tmp=True): logger.info("Task %s put into restarted state", task_id) self.notice_task_updated(task_id, op=TaskOp.RESTARTED) + @handle_task_key_error + def put_task_in_failed_state( + self, + task_id: str, + task_status=TaskStatus.errorCreating, + ) -> None: + assert not task_status.is_active() + assert not task_status.is_completed() + task_state = self.tasks_states[task_id] + if task_state.status.is_completed(): + logger.debug( + "Task is already completed. Won't change status." + " current_status=%(current_status)s," + " refused_status=%(refused_status)s", + { + 'current_status': task_state.status, + 'refused_status': task_status, + }, + ) + return + + task_state.status = task_state + + logger.info( + "Task %s put into failed state. task_status=%s", + task_id, + task_state, + ) + self.notice_task_updated(task_id, op=TaskOp.ABORTED) + @handle_subtask_key_error def restart_subtask( self, diff --git a/tests/golem/task/test_rpc.py b/tests/golem/task/test_rpc.py index 7036c5a94d..d50acbd1b3 100644 --- a/tests/golem/task/test_rpc.py +++ b/tests/golem/task/test_rpc.py @@ -1,4 +1,5 @@ # pylint: disable=protected-access,too-many-ancestors,too-many-lines +# pylint: disable=arguments-differ import copy import itertools from pathlib import Path @@ -28,6 +29,7 @@ from golem.task import taskserver from golem.task import taskstate from golem.task import tasktester +from golem.task.rpc import _restart_task_error from golem.task.rpc import ClientProvider from tests.golem import test_client from tests.golem.test_client import TestClientBase @@ -364,6 +366,21 @@ def add_resources(*_args, **_kwargs): output_path = Path(task_def['options']['output_path']) self.assertEqual(str(output_path.parent), task_output_path) + def test_restart_task_error(self): + result = _restart_task_error( + e=RuntimeError('Test error'), + self=self.provider, + task_id='task_id', + new_task=Mock(task_id='new_task_id'), + ) + self.assertEqual( + result, + ( + None, + 'Test error', + ), + ) + class TestGetMaskForTask(test_client.TestClientBase): def test_get_mask_for_task(self, *_):