Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Task API task creation RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
Krigpl committed Jul 23, 2019
1 parent 4eeb2ed commit 4627293
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 2 deletions.
41 changes: 41 additions & 0 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path
from typing import Dict, Any, List

from dataclasses import dataclass

TaskId = str


@dataclass
class CreateTaskParams:
app_id: str
name: str
environment: str
task_timeout: int
subtask_timeout: int
output_directory: Path
resources: List[Path]
max_subtasks: int
max_price_per_hour: int
concent_enabled: bool


class RequestedTaskManager:
def create_task(
self,
golem_params: CreateTaskParams,
app_params: Dict[str, Any],
) -> TaskId:
""" Creates an entry in the storage about the new task and assigns
the task_id to it. The task then has to be initialized and started. """
raise NotImplementedError

def init_task(self, task_id: TaskId) -> None:
""" Initialize the task by calling create_task on the Task API.
The application performs validation of the params which may result in
an error marking the task as failed. """
raise NotImplementedError

def start_task(self, task_id: TaskId) -> None:
""" Marks an already initialized task as ready for computation. """
raise NotImplementedError
78 changes: 77 additions & 1 deletion golem/task/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os.path
import re
import typing
from pathlib import Path

from ethereum.utils import denoms
from golem_messages import helpers as msg_helpers
Expand All @@ -23,7 +24,13 @@
from golem.model import Actor
from golem.resource import resource
from golem.rpc import utils as rpc_utils
from golem.task import taskbase, taskkeeper, taskstate, tasktester
from golem.task import (
taskbase,
taskkeeper,
taskstate,
tasktester,
requestedtaskmanager,
)

if typing.TYPE_CHECKING:
from golem.client import Client # noqa pylint: disable=unused-import
Expand Down Expand Up @@ -464,6 +471,12 @@ def __init__(self, client):
def task_manager(self):
return self.client.task_server.task_manager

@property
def requested_task_manager(
self,
) -> requestedtaskmanager.RequestedTaskManager:
return self.client.task_server.requested_task_manager

@rpc_utils.expose('comp.task.create')
@safe_run(_create_task_error)
def create_task(self, task_dict, force=False) \
Expand Down Expand Up @@ -499,6 +512,69 @@ def create_task(self, task_dict, force=False) \

return task_id, None

@rpc_utils.expose('comp.task_api.create')
def create_task_api_task(self, task_params: dict, golem_params: dict):
logger.info('Creating Task API task. golem_params=%r', golem_params)

force_concent_deposit = \
bool(golem_params.get('force_concent_deposit', False))
create_task_params = requestedtaskmanager.CreateTaskParams(
app_id=golem_params['app_id'],
name=golem_params['name'],
environment=golem_params['environment'],
output_directory=Path(golem_params['output_directory']),
resources=list(map(Path, golem_params['resources'])),
max_price_per_hour=int(golem_params['max_price_per_hour']),
max_subtasks=int(golem_params['max_subtasks']),
task_timeout=int(golem_params['task_timeout']),
subtask_timeout=int(golem_params['subtask_timeout']),
concent_enabled=bool(golem_params['concent_enabled']),
)

self._validate_enough_funds_to_pay_for_task(
create_task_params.max_price_per_hour,
create_task_params.max_subtasks,
create_task_params.concent_enabled,
force_concent_deposit,
)

task_id = self.requested_task_manager.create_task(
create_task_params,
task_params,
)

self.client.funds_locker.lock_funds(
task_id,
create_task_params.max_price_per_hour,
create_task_params.max_subtasks,
)

@defer.inlineCallbacks
def init_task():
try:
self.requested_task_manager.init_task(task_id)

# ensure Concent deposit
min_amount, opt_amount = msg_helpers.requestor_deposit_amount(
create_task_params.max_price_per_hour,
)
yield self.client.transaction_system.concent_deposit(
required=min_amount,
expected=opt_amount,
)
except Exception:
self.client.funds_locker.remove_task(task_id)
raise
else:
self.requested_task_manager.start_task(task_id)

# Do not yield, this is a fire and forget deferred as it may take long
# time to complete and shouldn't block the RPC call.
d = init_task()
d.addErrback(lambda e: logger.info("Task creation error %r", e)) # noqa pylint: disable=no-member

return task_id

def _validate_enough_funds_to_pay_for_task(
self,
subtask_price: int,
Expand Down
2 changes: 2 additions & 0 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from golem.task.task_api.docker import DockerTaskApiPayloadBuilder
from golem.task.benchmarkmanager import BenchmarkManager
from golem.task.envmanager import EnvironmentManager
from golem.task.requestedtaskmanager import RequestedTaskManager
from golem.task.taskbase import Task, AcceptClientVerdict
from golem.task.taskconnectionshelper import TaskConnectionsHelper
from golem.task.taskstate import TaskOp
Expand Down Expand Up @@ -139,6 +140,7 @@ def __init__(self,
apps_manager=apps_manager,
finished_cb=task_finished_cb,
)
self.requested_task_manager = RequestedTaskManager()
benchmarks = self.task_manager.apps_manager.get_benchmarks()
self.benchmark_manager = BenchmarkManager(
node_name=config_desc.node_name,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ constantly==15.1.0
crossbar==17.12.1
cryptography==2.3.1
cytoolz==0.9.0.1
dataclasses==0.6
distro==1.3.0
dnspython==1.15.0
docker==3.5.0
Expand Down
3 changes: 2 additions & 1 deletion requirements_to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ certifi
cffi==1.10.0
click>=4.0
crossbar==17.12.1
dataclasses
distro
docker==3.5.0
enforce==0.3.4
Expand Down Expand Up @@ -56,4 +57,4 @@ txaio==18.8.1
urllib3==1.24.3
web3==4.2.1
zope.interface==4.4.2
zxcvbn-python
zxcvbn-python
132 changes: 132 additions & 0 deletions tests/golem/task/test_rpc_task_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import os
from pathlib import Path
import tempfile
import unittest
from unittest import mock
from mock import Mock

from golem.client import Client
from golem.ethereum import fundslocker, transactionsystem
from golem.task import requestedtaskmanager
from golem.task import taskserver
from golem.task import rpc


class TestTaskApiCreate(unittest.TestCase):
def setUp(self):
self.client = Mock(spec=Client)
self.client.transaction_system = Mock(
spec=transactionsystem.TransactionSystem,
)
self.client.transaction_system.get_available_gnt.return_value = 1000
self.client.transaction_system.get_available_eth.return_value = 1000
self.client.transaction_system.eth_for_batch_payment.return_value = 10

self.client.concent_service = Mock()
self.client.concent_service.available.return_value = False

self.requested_task_manager = Mock(
spec=requestedtaskmanager.RequestedTaskManager,
)
self.client.task_server = Mock(spec=taskserver.TaskServer)
self.client.task_server.requested_task_manager = \
self.requested_task_manager

self.client.funds_locker = Mock(spec=fundslocker.FundsLocker)

self.rpc = rpc.ClientProvider(self.client)

@staticmethod
def get_golem_params():
random_dir = tempfile.gettempdir()
return {
'app_id': 'testappid',
'environment': 'testenv',
'name': 'testname',
'output_directory': random_dir,
'resources': [
os.path.join(random_dir, 'resource1'),
os.path.join(random_dir, 'resource2'),
],
'max_price_per_hour': 123,
'max_subtasks': 4,
'task_timeout': 60,
'subtask_timeout': 60,
'concent_enabled': False,
}

def test_success(self):
task_params = {
'app_param1': 'value1',
'app_param2': 'value2',
}
golem_params = self.get_golem_params()
task_id = 'test_task_id'
self.requested_task_manager.create_task.return_value = task_id

new_task_id = self.rpc.create_task_api_task(task_params, golem_params)
self.assertEqual(task_id, new_task_id)
self.requested_task_manager.create_task.assert_called_once_with(
mock.ANY,
task_params,
)
create_task_params = \
self.requested_task_manager.create_task.call_args[0][0]
self.assertEqual(
golem_params['environment'],
create_task_params.environment,
)
self.assertEqual(
golem_params['app_id'],
create_task_params.app_id,
)
self.assertEqual(
golem_params['name'],
create_task_params.name,
)
self.assertEqual(
Path(golem_params['output_directory']),
create_task_params.output_directory,
)
self.assertEqual(
[Path(r) for r in golem_params['resources']],
create_task_params.resources,
)
self.assertEqual(
golem_params['max_price_per_hour'],
create_task_params.max_price_per_hour,
)
self.assertEqual(
golem_params['max_subtasks'],
create_task_params.max_subtasks,
)
self.assertEqual(
golem_params['task_timeout'],
create_task_params.task_timeout,
)
self.assertEqual(
golem_params['subtask_timeout'],
create_task_params.subtask_timeout,
)
self.assertEqual(
golem_params['concent_enabled'],
create_task_params.concent_enabled,
)
self.client.funds_locker.lock_funds.assert_called_once_with(
task_id,
golem_params['max_price_per_hour'],
golem_params['max_subtasks'],
)

self.requested_task_manager.init_task.assert_called_once_with(task_id)
self.client.transaction_system.concent_deposit.assert_called_once()
self.requested_task_manager.start_task.assert_called_once_with(task_id)

def test_failed_init(self):
self.requested_task_manager.init_task.side_effect = Exception

task_id = self.rpc.create_task_api_task({}, self.get_golem_params())

self.client.funds_locker.remove_task.assert_called_once_with(task_id)
self.client.transaction_system.concent_deposit.assert_not_called()
self.requested_task_manager.start_task.assert_not_called()

0 comments on commit 4627293

Please # to comment.