Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Created task fragments RPC #4120

Merged
merged 6 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions golem/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Deferred)

from apps.appsmanager import AppsManager
from apps.rendering.task.renderingtask import RenderingTask
import golem
from golem.appconfig import TASKARCHIVE_MAINTENANCE_INTERVAL, AppConfig
from golem.clientconfigdescriptor import ConfigApprover, ClientConfigDescriptor
Expand Down Expand Up @@ -879,6 +880,27 @@ def get_subtasks(self, task_id: str) \
except KeyError:
logger.info("Task not found: '%s'", task_id)

@rpc_utils.expose('comp.task.rendering.task_fragments')
def get_fragments(self, task_id: str) -> Optional[Dict[int, List[Dict]]]:
if not self.task_server:
return None

task = self.task_server.task_manager.tasks.get(task_id)
if task is None or not isinstance(task, RenderingTask):
return None

fragments: Dict[int, List[Dict]] = {}

for subtask_index in range(1, task.total_tasks + 1):
fragments[subtask_index] = []

for extra_data in task.subtasks_given.values():
subtask = self.task_server.task_manager.get_subtask_dict(
extra_data['subtask_id'])
fragments[extra_data['start_task']].append(subtask)

return fragments

@rpc_utils.expose('comp.task.subtask')
def get_subtask(self, subtask_id: str) \
-> Tuple[Optional[Dict], Optional[str]]:
Expand Down
38 changes: 36 additions & 2 deletions tests/golem/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pydispatch import dispatcher
from twisted.internet.defer import Deferred, inlineCallbacks

from apps.rendering.task.renderingtask import RenderingTask
from golem import model
from golem import testutils
from golem.appconfig import (
Expand All @@ -41,7 +42,7 @@
from golem.rpc.mapping.rpceventnames import UI, Environment, Golem
from golem.task.acl import Acl
from golem.task.taskserver import TaskServer
from golem.task.taskstate import TaskTestStatus
from golem.task.taskstate import TaskTestStatus, SubtaskStatus
from golem.tools import testwithreactor
from golem.tools.assertlogs import LogTestCase

Expand Down Expand Up @@ -393,7 +394,6 @@ def test_restore_locks(self, *_):
deadline,
)


class TestClientRestartSubtasks(TestClientBase):

def setUp(self):
Expand Down Expand Up @@ -1166,6 +1166,40 @@ def test_block_node(self, *_):
self.client.task_server.acl.disallow.assert_called_once_with(
'node_id', -1, True)

@patch('golem.task.taskmanager.TaskManager.get_subtask_dict',
return_value=Mock())
def test_get_fragments(self, *_):
task_id = str(uuid.uuid4())
subtasks_count = 3
mock_task = Mock(spec=RenderingTask)
mock_task.total_tasks = subtasks_count
mock_task.subtasks_given = {
'subtask-uuid-1': {
'subtask_id': 'subtask-uuid-1',
'start_task': 1,
},
'subtask-uuid-2': {
'subtask_id': 'subtask-uuid-2',
'start_task': 2,
},
'subtask-uuid-3': {
'subtask_id': 'subtask-uuid-3',
'start_task': 2,
},
'subtask-uuid-4': {
'subtask_id': 'subtask-uuid-4',
'start_task': 2,
},
}
self.client.task_server.task_manager.tasks[task_id] = mock_task

task_fragments = self.client.get_fragments(task_id)

self.assertTrue(len(task_fragments) == subtasks_count)
self.assertTrue(len(task_fragments[1]) == 1)
self.assertTrue(len(task_fragments[2]) == 3)
self.assertTrue(len(task_fragments[3]) == 0)

@classmethod
def __new_incoming_peer(cls):
return dict(node=cls.__new_session())
Expand Down