Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Added subtask timeouts and unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
maaktweluit committed Nov 15, 2019
1 parent 7bd28d2 commit a690b00
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 3 deletions.
22 changes: 20 additions & 2 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,28 @@ def __init__(

def restore_tasks(self):
logger.debug('restore_tasks()')
loop = asyncio.get_event_loop()

running_subtasks = RequestedSubtask.select() \
.where(RequestedSubtask.status.in_(SUBTASK_STATUS_ACTIVE))
for subtask in running_subtasks:
subtask_id = subtask.subtask_id

if subtask.deadline is not None and \
subtask.deadline.timestamp() < loop.time():
logger.info('restoring subtask. subtask_id=%r', subtask_id)
loop.call_at(
subtask.deadline,
self._time_out_subtask,
subtask.task_id,
subtask_id,
)
else:
logger.info('subtask timed out. subtask_id=%r', subtask_id)
self._time_out_subtask(subtask.task_id, subtask_id)

running_tasks = RequestedTask.select() \
.where(RequestedTask.status.not_in(TASK_STATUS_COMPLETED))

loop = asyncio.get_event_loop()
for task in running_tasks:
if task.deadline is not None and \
task.deadline.timestamp() < loop.time():
Expand Down
61 changes: 60 additions & 1 deletion tests/golem/task/test_requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

import asyncio
from pathlib import Path
import datetime

from freezegun import freeze_time
from golem_task_api.client import RequestorAppClient
from golem_task_api.enums import VerifyResult
from golem_task_api.structs import Subtask, Infrastructure
from mock import ANY, Mock
from mock import ANY, call, MagicMock, Mock, patch
import pytest
from twisted.internet import defer

Expand Down Expand Up @@ -73,6 +74,64 @@ def setup_method(self, tmpdir, monkeypatch):
'_build_legacy_task_state',
lambda *_: TaskState())

@pytest.mark.asyncio
async def test_restore_tasks_timedout(self, freezer, mock_client):
# given
mock_loop = MagicMock()
mock_client.has_pending_subtasks.return_value = True
self._add_next_subtask_to_client_mock(mock_client)
self.rtm._time_out_task = Mock()
self.rtm._time_out_subtask = Mock()

task_id = self._create_task()
await self.rtm.init_task(task_id)
self.rtm.start_task(task_id)
computing_node = self._get_computing_node()
subtask = await self.rtm.get_next_subtask(task_id, computing_node)
mock_loop.time.return_value = datetime.datetime.now().timestamp()
# when
with patch(
'golem.task.requestedtaskmanager.asyncio.get_event_loop',
return_value=mock_loop
):
self.rtm.restore_tasks()
# then
self.rtm._time_out_task.assert_called_once_with(task_id)
self.rtm._time_out_subtask.assert_called_once_with(
task_id,
subtask.subtask_id
)

@pytest.mark.asyncio
@pytest.mark.freeze_time("1000")
async def test_restore_tasks_schedule(self, freezer, mock_client):
# given
mock_loop = MagicMock()
mock_client.has_pending_subtasks.return_value = True
self._add_next_subtask_to_client_mock(mock_client)

task_id = self._create_task(
task_timeout=20,
subtask_timeout=20)
await self.rtm.init_task(task_id)
self.rtm.start_task(task_id)
computing_node = self._get_computing_node()
subtask = await self.rtm.get_next_subtask(task_id, computing_node)
freezer.move_to("1010")
mock_loop.time.return_value = datetime.datetime.now().timestamp()
# when
with patch(
'golem.task.requestedtaskmanager.asyncio.get_event_loop',
return_value=mock_loop
):
self.rtm.restore_tasks()
# then
assert mock_loop.call_at.call_count == 2
mock_loop.call_at.assert_has_calls([
call(ANY, ANY, task_id, subtask.subtask_id),
call(ANY, ANY, task_id),
])

def test_create_task(self):
# given
golem_params = self._build_golem_params()
Expand Down

0 comments on commit a690b00

Please # to comment.