Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Implement Task API verification flow (#4508)
Browse files Browse the repository at this point in the history
  • Loading branch information
Krigpl authored Jul 25, 2019
1 parent 4af12a7 commit a62e4fd
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 39 deletions.
14 changes: 14 additions & 0 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dataclasses import dataclass

TaskId = str
SubtaskId = str


@dataclass
Expand Down Expand Up @@ -39,3 +40,16 @@ def init_task(self, task_id: TaskId) -> None:
def start_task(self, task_id: TaskId) -> None:
""" Marks an already initialized task as ready for computation. """
raise NotImplementedError

def task_exists(self, _task_id: TaskId) -> bool: # noqa pylint: disable=no-self-use
""" Return whether task of a given task_id exists. """
return False

def get_subtasks_outputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory where subtasks outputs should be
placed. """
raise NotImplementedError

def verify(self, task_id: TaskId, subtask_id: SubtaskId) -> bool:
""" Return whether a subtask has been computed corectly. """
raise NotImplementedError
56 changes: 33 additions & 23 deletions golem/task/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,15 @@ def computed_task_reported(
report_computed_task,
after_success=lambda: None,
after_error=lambda: None):
task_manager = task_server.task_manager
concent_service = task_server.client.concent_service

task = task_manager.tasks.get(report_computed_task.task_id, None)
output_dir = task.tmp_dir if hasattr(task, 'tmp_dir') else None
client_options = task_server.get_download_options(
report_computed_task.options
)

fgtr = message.concents.ForceGetTaskResult(
report_computed_task=report_computed_task
)

# submit a delayed `ForceGetTaskResult` to the Concent
# in case the download exceeds the maximum allowable download time.
# however, if it succeeds, the message will get cancelled
# in the success handler

fgtr = message.concents.ForceGetTaskResult(
report_computed_task=report_computed_task
)
concent_service.submit_task_message(
report_computed_task.subtask_id,
fgtr,
Expand Down Expand Up @@ -79,19 +70,38 @@ def on_error(exc, *_args, **_kwargs):
)
after_error()

# Actually request results
task_manager.task_result_incoming(report_computed_task.subtask_id)
task_manager.task_result_manager.pull_package(
report_computed_task.multihash,
report_computed_task.task_id,
report_computed_task.subtask_id,
report_computed_task.secret,
success=on_success,
error=on_error,
client_options=client_options,
output_dir=output_dir
task_id = report_computed_task.task_id
client_options = task_server.get_download_options(
report_computed_task.options
)

requested_task_manager = task_server.requested_task_manager
if requested_task_manager.task_exists(task_id):
deferred = task_server.new_resource_manager.download(
report_computed_task.multihash,
requested_task_manager.get_subtasks_outputs_dir(task_id),
client_options,
)
deferred.addCallback(on_success)
deferred.addErrback(on_error)
else:
task_manager = task_server.task_manager
task = task_manager.tasks.get(task_id, None)
output_dir = task.tmp_dir if hasattr(task, 'tmp_dir') else None
# Request results
task_manager.task_result_incoming(report_computed_task.subtask_id)
task_manager.task_result_manager.pull_package(
report_computed_task.multihash,
report_computed_task.task_id,
report_computed_task.subtask_id,
report_computed_task.secret,
success=on_success,
error=on_error,
client_options=client_options,
output_dir=output_dir
)


def send_report_computed_task(task_server, waiting_task_result) -> None:
""" Send task results after finished computations
"""
Expand Down
47 changes: 31 additions & 16 deletions golem/task/server/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@
# pylint: disable=unused-import
from golem.core import keysauth
from golem.task import taskmanager
from golem.task import requestedtaskmanager

logger = logging.getLogger(__name__)


class VerificationMixin:
keys_auth: 'keysauth.KeysAuth'
task_manager: 'taskmanager.TaskManager'
requested_task_manager: 'requestedtaskmanager.RequestedTaskManager'

def verify_results(
self,
report_computed_task: message.tasks.ReportComputedTask,
extracted_package: ExtractedPackage,
) -> None:

node = dt_p2p.Node(**report_computed_task.node_info)
task_id = report_computed_task.task_id
subtask_id = report_computed_task.subtask_id
logger.info(
'Verifying results. node=%s, subtask_id=%s',
Expand All @@ -41,19 +43,13 @@ def verify_results(
),
subtask_id,
)
result_files = extracted_package.get_full_path_files()

def verification_finished():
def verification_finished(
is_verification_lenient: bool,
verification_failed: bool,
):
logger.debug("Verification finished handler.")

task_id = self.task_manager.subtask_to_task(
subtask_id, model.Actor.Requestor)
is_verification_lenient = (
self.task_manager.tasks[task_id]
.task_definition.run_verification == RunVerification.lenient)

verification_failed = \
not self.task_manager.verify_subtask(subtask_id)
if verification_failed:
if not is_verification_lenient:
logger.debug("Verification failure. subtask_id=%r",
Expand Down Expand Up @@ -114,11 +110,30 @@ def verification_finished():
remote_role=model.Actor.Provider,
)

self.task_manager.computed_task_received(
subtask_id,
result_files,
verification_finished
)
if self.requested_task_manager.task_exists(task_id):
verification_failed = not self.requested_task_manager.verify(
task_id,
subtask_id,
)
verification_finished(False, verification_failed)
else:
def verification_finished_old():
is_verification_lenient = (
self.task_manager.tasks[task_id].task_definition
.run_verification == RunVerification.lenient)
verification_failed = \
not self.task_manager.verify_subtask(subtask_id)
verification_finished(
is_verification_lenient,
verification_failed,
)

result_files = extracted_package.get_full_path_files()
self.task_manager.computed_task_received(
subtask_id,
result_files,
verification_finished_old,
)

def send_result_rejected(
self,
Expand Down
6 changes: 6 additions & 0 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from golem.envs.docker.non_hypervised import NonHypervisedDockerCPUEnvironment
from golem.marketplace import OfferPool
from golem.model import TaskPayment
from golem.network.hyperdrive.client import HyperdriveAsyncClient
from golem.network.transport import msg_queue
from golem.network.transport.network import ProtocolFactory, SessionFactory
from golem.network.transport.tcpnetwork import (
Expand All @@ -50,6 +51,7 @@
update_requestor_assigned_sum,
update_requestor_efficiency,
)
from golem.resource.resourcemanager import ResourceManager
from golem.rpc import utils as rpc_utils
from golem.task import timer
from golem.task.acl import get_acl, _DenyAcl as DenyAcl
Expand Down Expand Up @@ -141,6 +143,10 @@ def __init__(self,
finished_cb=task_finished_cb,
)
self.requested_task_manager = RequestedTaskManager()
self.new_resource_manager = ResourceManager(HyperdriveAsyncClient(
config_desc.hyperdrive_rpc_address,
config_desc.hyperdrive_rpc_port,
))
benchmarks = self.task_manager.apps_manager.get_benchmarks()
self.benchmark_manager = BenchmarkManager(
node_name=config_desc.node_name,
Expand Down
64 changes: 64 additions & 0 deletions tests/golem/task/server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from golem import testutils
from golem.core import keysauth
from golem.network.hyperdrive.client import HyperdriveClientOptions
from golem.task import requestedtaskmanager
from golem.task import taskserver
from golem.task.server import helpers
from tests import factories

Expand Down Expand Up @@ -158,3 +160,65 @@ def test_basic(self, put_mock, get_mock, *_):
get_mock.return_value = self.ttc
helpers.send_task_failure(self.wtf)
put_mock.assert_called_once()


class TestComputedTaskReportedTaskApiFlow(unittest.TestCase):
def setUp(self):
self.rtm = mock.Mock(
spec=requestedtaskmanager.RequestedTaskManager,
)
self.rtm.task_exists.return_value = True

self.task_server = mock.Mock(spec=taskserver.TaskServer)
self.task_server.requested_task_manager = self.rtm
self.task_server.new_resource_manager = mock.Mock()
self.task_server.client = mock.Mock(concent_service=mock.Mock())

self.rct = msg_factories.tasks.ReportComputedTaskFactory()

def test_basic(self):
helpers.computed_task_reported(self.task_server, self.rct)

self.rtm.task_exists.assert_called_once_with(self.rct.task_id)
self.rtm.get_subtasks_outputs_dir.assert_called_once_with(
self.rct.task_id,
)
self.task_server.new_resource_manager.download.assert_called_once_with(
self.rct.multihash,
self.rtm.get_subtasks_outputs_dir.return_value,
self.task_server.get_download_options.return_value,
)

def test_success_callback(self):
after_success = mock.Mock()
after_error = mock.Mock()
self.task_server.new_resource_manager.download.return_value = mock.Mock(
addCallback=lambda cb: cb(mock.Mock()),
)

helpers.computed_task_reported(
self.task_server,
self.rct,
after_success=after_success,
after_error=after_error,
)

after_success.assert_called_once_with()
after_error.assert_not_called()

def test_error_callback(self):
after_success = mock.Mock()
after_error = mock.Mock()
self.task_server.new_resource_manager.download.return_value = mock.Mock(
addErrback=lambda cb: cb(mock.Mock()),
)

helpers.computed_task_reported(
self.task_server,
self.rct,
after_success=after_success,
after_error=after_error,
)

after_success.assert_not_called()
after_error.assert_called_once_with()
2 changes: 2 additions & 0 deletions tests/golem/task/test_tasksession.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ def setUp(self):
})
}
ts.task_server.task_keeper.task_headers = {}
ts.task_server.requested_task_manager = \
Mock(task_exists=Mock(return_value=False))
ecc = Mock()
ecc.get_privkey.return_value = os.urandom(32)
ts.task_server.keys_auth = keys_auth
Expand Down

0 comments on commit a62e4fd

Please # to comment.