Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Fixed task timeout in RequestedTaskManager #4830

Merged
merged 1 commit into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 44 additions & 23 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Operation,
SubtaskOp,
SubtaskStatus,
SUBTASK_STATUS_ACTIVE,
TaskOp,
TaskStatus,
TASK_STATUS_ACTIVE,
Expand Down Expand Up @@ -152,7 +153,7 @@ def create_task(
loop = asyncio.get_event_loop()
loop.call_at(
loop.time() + golem_params.task_timeout,
self._check_task_timeout,
self._time_out_task,
task.task_id,
)

Expand Down Expand Up @@ -321,7 +322,7 @@ async def get_next_subtask(
loop = asyncio.get_event_loop()
loop.call_at(
loop.time() + task.subtask_timeout,
self._check_subtask_timeout,
self._time_out_subtask,
subtask.task,
subtask.subtask_id,
)
Expand Down Expand Up @@ -408,19 +409,18 @@ async def abort_task(self, task_id: TaskId):
if not task.status.is_active():
raise RuntimeError(
f"Task not active, can not abort. task_id={task_id}")
app_client = await self._get_app_client(task.app_id)
await app_client.abort_task(task_id)

task.status = TaskStatus.aborted
task.save()
subtasks = self._get_pending_subtasks(task_id)
for subtask in subtasks:

for subtask in self._get_pending_subtasks(task_id):
subtask.status = SubtaskStatus.cancelled # type: ignore
subtask.save()
self._finish_subtask(subtask, SubtaskOp.ABORTED)

self._notice_task_updated(task, op=TaskOp.ABORTED)

await self._shutdown_app_client(task.app_id)
await self._abort_task_and_shutdown(task)

@staticmethod
def get_requested_task(task_id: TaskId) -> Optional[RequestedTask]:
Expand Down Expand Up @@ -610,15 +610,27 @@ def _get_task_api_service(
shared_dir=shared_dir
)

def _check_task_timeout(self, task_id: TaskId) -> None:
def _time_out_task(self, task_id: TaskId) -> None:
task = RequestedTask.get(RequestedTask.task_id == task_id)
if task.status.is_active():
logger.info("Task timed out. task_id=%r", task_id)
task.status = TaskStatus.timeout
task.save()
self._notice_task_updated(task, op=TaskOp.TIMEOUT)
if not task.status.is_active():
return # Already finished

def _check_subtask_timeout(
logger.info("Task timed out. task_id=%r", task_id)

task.status = TaskStatus.timeout
task.save()

for subtask in self._get_pending_subtasks(task_id):
subtask.status = SubtaskStatus.timeout # type: ignore
subtask.save()
self._finish_subtask(subtask, SubtaskOp.TIMEOUT)

self._notice_task_updated(task, op=TaskOp.TIMEOUT)

# Don't wait for the future because nothing depends on it
asyncio.ensure_future(self._abort_task_and_shutdown(task))

def _time_out_subtask(
self,
task_id: TaskId,
subtask_id: SubtaskId
Expand All @@ -633,8 +645,8 @@ def _check_subtask_timeout(
subtask.task,
subtask.subtask_id
)
# TODO: Add SubtaskStatus.timeout?
subtask.status = SubtaskStatus.failure
# FIXME: Call discard_subtasks
subtask.status = SubtaskStatus.timeout
subtask.save()
self._finish_subtask(subtask, SubtaskOp.TIMEOUT)

Expand All @@ -657,15 +669,24 @@ def _get_unfinished_subtasks_for_node(
def _get_pending_subtasks(task_id: TaskId) -> List[RequestedSubtask]:
return RequestedSubtask.select().where(
RequestedSubtask.task_id == task_id,
# FIXME: duplicate list with SubtaskStatus.is_active()
RequestedSubtask.status.in_([
SubtaskStatus.starting,
SubtaskStatus.downloading,
SubtaskStatus.verifying,
])
RequestedSubtask.status.in_(SUBTASK_STATUS_ACTIVE)
)

async def _shutdown_app_client(self, app_id) -> None:
async def _abort_task_and_shutdown(self, task: RequestedTask) -> None:
client = await self._get_app_client(task.app_id)
try:
await client.abort_task(task.task_id)
except Exception: # pylint: disable=broad-except
logger.exception(
'Failed to abort task. app_id=%r task_id=%r',
task.app_id, task.task_id)
try:
await self._shutdown_app_client(task.app_id)
except Exception: # pylint: disable=broad-except
logger.exception(
'Failed to shut down client. app_id=%r', task.app_id)

async def _shutdown_app_client(self, app_id: AppId) -> None:
# Check if app completed all tasks
unfinished_tasks = RequestedTask.select(
fn.Count(RequestedTask.task_id)
Expand Down
10 changes: 9 additions & 1 deletion golem/task/taskstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ class SubtaskStatus(Enum):
failure = "Failure"
restarted = "Restart"
cancelled = "Cancelled"
timeout = "Timeout"

def is_computed(self) -> bool:
return self in [self.starting, self.downloading]

def is_active(self) -> bool:
return self in [self.starting, self.downloading, self.verifying]
return self in SUBTASK_STATUS_ACTIVE

def is_finished(self) -> bool:
return self == self.finished
Expand All @@ -82,6 +83,13 @@ def is_finishing(self) -> bool:
return self in {self.downloading, self.verifying}


SUBTASK_STATUS_ACTIVE = [
SubtaskStatus.starting,
SubtaskStatus.downloading,
SubtaskStatus.verifying
]


validate_varchar_inf = functools.partial(
validators.validate_varchar,
max_length=float('infinity'),
Expand Down
23 changes: 23 additions & 0 deletions tests/golem/task/test_requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,30 @@ async def test_task_timeout(self, mock_client):
# Unfortunately feezegun doesn't mock asyncio's time
# and can't be used here
await asyncio.sleep(task_timeout)

assert self.rtm.is_task_finished(task_id)
mock_client.abort_task.assert_called_once_with(task_id)
mock_client.shutdown.assert_called_once_with()

@pytest.mark.asyncio
async def test_task_timeout_with_subtask(self, mock_client):
self._add_next_subtask_to_client_mock(mock_client)
task_timeout = 1
task_id = await self._start_task(task_timeout=task_timeout)
subtask_id = (await self.rtm.get_next_subtask(
task_id, self._get_computing_node()
)).subtask_id

# Unfortunately feezegun doesn't mock asyncio's time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shame, hope they solve this soon..
Maybe good to link the issue on their github?
spulec/freezegun#290

# and can't be used here
await asyncio.sleep(task_timeout)

task = RequestedTask.get(RequestedTask.task_id == task_id)
subtask = RequestedSubtask.get(
RequestedSubtask.task == task_id,
RequestedSubtask.subtask_id == subtask_id)
assert task.status == TaskStatus.timeout
assert subtask.status == SubtaskStatus.timeout

@pytest.mark.asyncio
async def test_get_started_tasks(self, mock_client):
Expand Down