From 6615cac853e5c68679b23b6d4839baae8181d50a Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Thu, 24 Oct 2019 18:15:40 +0200 Subject: [PATCH] Fixed task timeout in RequestedTaskManager * Update subtask statuses * Notify application (AbortTask) * Shutdown application Signed-off-by: Adam Wierzbicki --- golem/task/requestedtaskmanager.py | 67 ++++++++++++------- golem/task/taskstate.py | 10 ++- tests/golem/task/test_requestedtaskmanager.py | 23 +++++++ 3 files changed, 76 insertions(+), 24 deletions(-) diff --git a/golem/task/requestedtaskmanager.py b/golem/task/requestedtaskmanager.py index f3723e53a0..30461270dc 100644 --- a/golem/task/requestedtaskmanager.py +++ b/golem/task/requestedtaskmanager.py @@ -28,6 +28,7 @@ Operation, SubtaskOp, SubtaskStatus, + SUBTASK_STATUS_ACTIVE, TaskOp, TaskStatus, TASK_STATUS_ACTIVE, @@ -152,7 +153,7 @@ def create_task( loop = asyncio.get_event_loop() loop.call_at( loop.time() + golem_params.task_timeout, - self._check_task_timeout, + self._time_out_task, task.task_id, ) @@ -321,7 +322,7 @@ async def get_next_subtask( loop = asyncio.get_event_loop() loop.call_at( loop.time() + task.subtask_timeout, - self._check_subtask_timeout, + self._time_out_subtask, subtask.task, subtask.subtask_id, ) @@ -408,19 +409,18 @@ async def abort_task(self, task_id: TaskId): if not task.status.is_active(): raise RuntimeError( f"Task not active, can not abort. task_id={task_id}") - app_client = await self._get_app_client(task.app_id) - await app_client.abort_task(task_id) + task.status = TaskStatus.aborted task.save() - subtasks = self._get_pending_subtasks(task_id) - for subtask in subtasks: + + for subtask in self._get_pending_subtasks(task_id): subtask.status = SubtaskStatus.cancelled # type: ignore subtask.save() self._finish_subtask(subtask, SubtaskOp.ABORTED) self._notice_task_updated(task, op=TaskOp.ABORTED) - await self._shutdown_app_client(task.app_id) + await self._abort_task_and_shutdown(task) @staticmethod def get_requested_task(task_id: TaskId) -> Optional[RequestedTask]: @@ -610,15 +610,27 @@ def _get_task_api_service( shared_dir=shared_dir ) - def _check_task_timeout(self, task_id: TaskId) -> None: + def _time_out_task(self, task_id: TaskId) -> None: task = RequestedTask.get(RequestedTask.task_id == task_id) - if task.status.is_active(): - logger.info("Task timed out. task_id=%r", task_id) - task.status = TaskStatus.timeout - task.save() - self._notice_task_updated(task, op=TaskOp.TIMEOUT) + if not task.status.is_active(): + return # Already finished - def _check_subtask_timeout( + logger.info("Task timed out. task_id=%r", task_id) + + task.status = TaskStatus.timeout + task.save() + + for subtask in self._get_pending_subtasks(task_id): + subtask.status = SubtaskStatus.timeout # type: ignore + subtask.save() + self._finish_subtask(subtask, SubtaskOp.TIMEOUT) + + self._notice_task_updated(task, op=TaskOp.TIMEOUT) + + # Don't wait for the future because nothing depends on it + asyncio.ensure_future(self._abort_task_and_shutdown(task)) + + def _time_out_subtask( self, task_id: TaskId, subtask_id: SubtaskId @@ -633,8 +645,8 @@ def _check_subtask_timeout( subtask.task, subtask.subtask_id ) - # TODO: Add SubtaskStatus.timeout? - subtask.status = SubtaskStatus.failure + # FIXME: Call discard_subtasks + subtask.status = SubtaskStatus.timeout subtask.save() self._finish_subtask(subtask, SubtaskOp.TIMEOUT) @@ -657,15 +669,24 @@ def _get_unfinished_subtasks_for_node( def _get_pending_subtasks(task_id: TaskId) -> List[RequestedSubtask]: return RequestedSubtask.select().where( RequestedSubtask.task_id == task_id, - # FIXME: duplicate list with SubtaskStatus.is_active() - RequestedSubtask.status.in_([ - SubtaskStatus.starting, - SubtaskStatus.downloading, - SubtaskStatus.verifying, - ]) + RequestedSubtask.status.in_(SUBTASK_STATUS_ACTIVE) ) - async def _shutdown_app_client(self, app_id) -> None: + async def _abort_task_and_shutdown(self, task: RequestedTask) -> None: + client = await self._get_app_client(task.app_id) + try: + await client.abort_task(task.task_id) + except Exception: # pylint: disable=broad-except + logger.exception( + 'Failed to abort task. app_id=%r task_id=%r', + task.app_id, task.task_id) + try: + await self._shutdown_app_client(task.app_id) + except Exception: # pylint: disable=broad-except + logger.exception( + 'Failed to shut down client. app_id=%r', task.app_id) + + async def _shutdown_app_client(self, app_id: AppId) -> None: # Check if app completed all tasks unfinished_tasks = RequestedTask.select( fn.Count(RequestedTask.task_id) diff --git a/golem/task/taskstate.py b/golem/task/taskstate.py index 779b4cd5ed..aed4d03cf6 100644 --- a/golem/task/taskstate.py +++ b/golem/task/taskstate.py @@ -68,12 +68,13 @@ class SubtaskStatus(Enum): failure = "Failure" restarted = "Restart" cancelled = "Cancelled" + timeout = "Timeout" def is_computed(self) -> bool: return self in [self.starting, self.downloading] def is_active(self) -> bool: - return self in [self.starting, self.downloading, self.verifying] + return self in SUBTASK_STATUS_ACTIVE def is_finished(self) -> bool: return self == self.finished @@ -82,6 +83,13 @@ def is_finishing(self) -> bool: return self in {self.downloading, self.verifying} +SUBTASK_STATUS_ACTIVE = [ + SubtaskStatus.starting, + SubtaskStatus.downloading, + SubtaskStatus.verifying +] + + validate_varchar_inf = functools.partial( validators.validate_varchar, max_length=float('infinity'), diff --git a/tests/golem/task/test_requestedtaskmanager.py b/tests/golem/task/test_requestedtaskmanager.py index 354ef1925c..1e3eddc551 100644 --- a/tests/golem/task/test_requestedtaskmanager.py +++ b/tests/golem/task/test_requestedtaskmanager.py @@ -262,7 +262,30 @@ async def test_task_timeout(self, mock_client): # Unfortunately feezegun doesn't mock asyncio's time # and can't be used here await asyncio.sleep(task_timeout) + assert self.rtm.is_task_finished(task_id) + mock_client.abort_task.assert_called_once_with(task_id) + mock_client.shutdown.assert_called_once_with() + + @pytest.mark.asyncio + async def test_task_timeout_with_subtask(self, mock_client): + self._add_next_subtask_to_client_mock(mock_client) + task_timeout = 1 + task_id = await self._start_task(task_timeout=task_timeout) + subtask_id = (await self.rtm.get_next_subtask( + task_id, self._get_computing_node() + )).subtask_id + + # Unfortunately feezegun doesn't mock asyncio's time + # and can't be used here + await asyncio.sleep(task_timeout) + + task = RequestedTask.get(RequestedTask.task_id == task_id) + subtask = RequestedSubtask.get( + RequestedSubtask.task == task_id, + RequestedSubtask.subtask_id == subtask_id) + assert task.status == TaskStatus.timeout + assert subtask.status == SubtaskStatus.timeout @pytest.mark.asyncio async def test_get_started_tasks(self, mock_client):