Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Message Queue #4111

Merged
merged 35 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bcfbb04
Message Queue
jiivan Apr 11, 2019
31e2e46
Merge branch 'develop' of github.com:golemfactory/golem into msg_queue
jiivan Apr 11, 2019
f310aa1
Update tests
jiivan Apr 12, 2019
9d463bc
Merge branch 'develop' of github.com:golemfactory/golem into msg_queue
jiivan Apr 12, 2019
fe0c87b
[WIP] Update tests
jiivan Apr 12, 2019
827945a
Tests updated
jiivan Apr 15, 2019
a2877fa
lints
jiivan Apr 15, 2019
08af7cb
[review] I am my node
jiivan Apr 15, 2019
d2cd920
Merge branch 'develop' of github.com:golemfactory/golem into msg_queue
jiivan Apr 15, 2019
466d6e4
Dropped unnecessary dropped
jiivan Apr 15, 2019
9bffb78
[review] copy_and_sign from GM
jiivan Apr 17, 2019
9e48170
[review] Assertion failure message
jiivan Apr 17, 2019
7a339b6
[review] Lessa mbigous sending failure messages
jiivan Apr 17, 2019
fe7bfd7
[review] More descriptive comment about TaskServer.sessions
jiivan Apr 17, 2019
20ef396
[review] Removed dummy TaskComputer.session_[closed|timeout]. Thanks …
jiivan Apr 17, 2019
58d83f3
[review] More ELI5 comment about already established TaskSession
jiivan Apr 17, 2019
b5b104b
[review] Debug log for final connection failure
jiivan Apr 17, 2019
fd84392
[review] address formatting in log
jiivan Apr 17, 2019
4235673
nodeskeeper unittest
jiivan Apr 17, 2019
0cdac7e
Add more debugs
jiivan Apr 17, 2019
936a124
Call verification finished in dummy task
jiivan Apr 17, 2019
c7f7d8f
Provide payment processor with correct node_id
jiivan Apr 17, 2019
21d8b45
Use proper node_id accessor
jiivan Apr 17, 2019
649e93d
Dummy Mock ETS returns proper timestamp
jiivan Apr 17, 2019
6afeb66
Use proper private_key
jiivan Apr 17, 2019
9b9ebdd
Logging formatter prefix
jiivan Apr 18, 2019
13b2a03
Forbidden classes in msg_queue
jiivan Apr 18, 2019
3c5e3ea
Use current time in mock_ets (dummy)
jiivan Apr 18, 2019
c5edb87
dummy_task_debug
jiivan Apr 18, 2019
e973167
Test helpers.send_task_failure
jiivan Apr 18, 2019
64bebf8
Handle disconnect in unverified TaskSession
jiivan Apr 18, 2019
81bd4d4
Test offer chosen without subtasks (ctd is None)
jiivan Apr 19, 2019
69f58c7
Log every ban action
jiivan Apr 19, 2019
cd6e689
Merge branch 'develop' of github.com:golemfactory/golem into msg_queue
jiivan Apr 24, 2019
c8b19a8
Test TaskManager.task_needs_computation()
jiivan Apr 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions golem/network/p2p/p2pservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,20 +788,6 @@ def want_to_start_task_session(
self.task_server\
.task_connections_helper.cannot_start_task_session(conn_id)

def peer_want_task_session(self, node_info, super_node_info, conn_id):
"""Process request to start task session from this node to a node
from node_info.
:param Node node_info: node that requests task session with this node
:param Node|None super_node_info: information about supernode
that has passed this information
:param conn_id: connection id
"""
self.task_server.start_task_session(
node_info,
super_node_info,
conn_id
)

#############################
# RANKING FUNCTIONS #
#############################
Expand Down
10 changes: 4 additions & 6 deletions golem/network/p2p/peersession.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,10 @@ def _react_to_challenge_solution(self, msg):
message.base.Disconnect.REASON.Unverified
)

def _react_to_want_to_start_task_session(self, msg):
self.p2p_service.peer_want_task_session(
msg.node_info,
msg.super_node_info,
msg.conn_id
)
@classmethod
def _react_to_want_to_start_task_session(cls, msg):
# TODO: https://github.com/golemfactory/golem/issues/4005
logger.debug("Ignored WTSTS. msg=%s", msg)

def _react_to_set_task_session(self, msg):
self.p2p_service.want_to_start_task_session(
Expand Down
9 changes: 3 additions & 6 deletions golem/network/transport/msg_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,19 @@

import golem_messages
from golem_messages import exceptions as msg_exceptions
from golem_messages import message

from golem import decorators
from golem import model
from golem.core import variables


if typing.TYPE_CHECKING:
# pylint: disable=ungrouped-imports,unused-import
from golem_messages import message


logger = logging.getLogger(__name__)
READ_LOCK = threading.Lock()


def put(node_id: str, msg: 'message.base.Base') -> None:
def put(node_id: str, msg: message.base.Message) -> None:
assert not isinstance(msg, message.base.Disconnect)
db_model = model.QueuedMessage.from_message(node_id, msg)
db_model.save()

Expand Down
138 changes: 138 additions & 0 deletions golem/task/server/helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import logging
import typing

from golem_messages import message
from golem_messages import helpers as msg_helpers

from golem import model
from golem.core import common
from golem.network import history
from golem.network.transport import msg_queue

if typing.TYPE_CHECKING:
# pylint: disable=unused-import
from golem.network.p2p.local_node import LocalNode

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -80,3 +90,131 @@ def on_error(exc, *_args, **_kwargs):
client_options=client_options,
output_dir=output_dir
)

def send_report_computed_task(task_server, waiting_task_result) -> None:
""" Send task results after finished computations
"""
from golem.task.tasksession import copy_and_sign
task_to_compute = history.get(
message_class_name='TaskToCompute',
node_id=waiting_task_result.owner.key,
task_id=waiting_task_result.task_id,
subtask_id=waiting_task_result.subtask_id
)

if not task_to_compute:
logger.warning(
"I won't report computed task. TTC missing."
" node=%s, task_id=%r, subtask_id=%r",
common.node_info_str(
waiting_task_result.owner.node_name,
waiting_task_result.owner.key,
),
waiting_task_result.task_id,
waiting_task_result.subtask_id,
)
return

my_node: LocalNode = task_server.node
client_options = task_server.get_share_options(
waiting_task_result.task_id,
waiting_task_result.owner.prv_addr,
)

report_computed_task = message.tasks.ReportComputedTask(
task_to_compute=task_to_compute,
node_name=my_node.node_name,
address=my_node.prv_addr,
port=task_server.cur_port,
key_id=my_node.key,
node_info=my_node.to_dict(),
extra_data=[],
size=waiting_task_result.result_size,
package_hash='sha1:' + waiting_task_result.package_sha1,
multihash=waiting_task_result.result_hash,
secret=waiting_task_result.result_secret,
options=client_options.__dict__,
)

msg_queue.put(
waiting_task_result.owner.key,
report_computed_task,
)
report_computed_task = copy_and_sign(
msg=report_computed_task,
private_key=task_server.keys_auth._private_key, # noqa pylint: disable=protected-access
)
history.add(
msg=report_computed_task,
node_id=waiting_task_result.owner.key,
local_role=model.Actor.Provider,
remote_role=model.Actor.Requestor,
)

# if the Concent is not available in the context of this subtask
# we can only assume that `ReportComputedTask` above reaches
# the Requestor safely

if not task_to_compute.concent_enabled:
logger.debug(
"Concent not enabled for this task, "
"skipping `ForceReportComputedTask`. "
"task_id=%r, "
"subtask_id=%r, ",
task_to_compute.task_id,
task_to_compute.subtask_id,
)
return

# we're preparing the `ForceReportComputedTask` here and
# scheduling the dispatch of that message for later
# (with an implicit delay in the concent service's `submit` method).
#
# though, should we receive the acknowledgement for
# the `ReportComputedTask` sent above before the delay elapses,
# the `ForceReportComputedTask` message to the Concent will be
# cancelled and thus, never sent to the Concent.

delayed_forcing_msg = message.concents.ForceReportComputedTask(
report_computed_task=report_computed_task,
result_hash='sha1:' + waiting_task_result.package_sha1
)
logger.debug('[CONCENT] ForceReport: %s', delayed_forcing_msg)

task_server.client.concent_service.submit_task_message(
waiting_task_result.subtask_id,
delayed_forcing_msg,
)


def send_task_failure(waiting_task_failure) -> None:
"""Inform task owner that an error occurred during task computation
"""

task_to_compute = history.get(
message_class_name='TaskToCompute',
node_id=waiting_task_failure.owner.key,
task_id=waiting_task_failure.task_id,
subtask_id=waiting_task_failure.subtask_id
)

if not task_to_compute:
logger.warning(
"I won't report task failure. TTC missing."
" node=%s, task_id=%r, subtask_id=%r",
common.node_info_str(
waiting_task_failure.owner.node_name,
waiting_task_failure.owner.key,
),
waiting_task_failure.task_id,
waiting_task_failure.subtask_id,
)
return

msg_queue.put(
waiting_task_failure.owner.key,
message.tasks.TaskFailure(
task_to_compute=task_to_compute,
err=waiting_task_failure.err_msg
),
)
98 changes: 0 additions & 98 deletions golem/task/server/queue.py

This file was deleted.

Loading