Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Persist pending messages #2299

Closed
wants to merge 22 commits into from
Closed

Persist pending messages #2299

wants to merge 22 commits into from

Conversation

mfranciszkiewicz
Copy link
Contributor

@mfranciszkiewicz mfranciszkiewicz commented Mar 6, 2018

  • extra database session.db for storing unsent messages and session details
  • tries to periodically restore sessions and send pending messages
  • identifies lost sessions by the ConnectionLost error from twisted
  • Session.send now returns a success boolean value
  • removes the send_unverified argument from Session.send in favor of the Session.can_be_unverified collection
  • adds node_info to TaskSession's Hello message

@codecov
Copy link

codecov bot commented Mar 6, 2018

Codecov Report

Merging #2299 into develop will decrease coverage by 1.15%.
The diff coverage is 69.58%.

@@             Coverage Diff             @@
##           develop    #2299      +/-   ##
===========================================
- Coverage    89.81%   88.65%   -1.16%     
===========================================
  Files          177      178       +1     
  Lines        15556    15729     +173     
===========================================
- Hits         13971    13945      -26     
- Misses        1585     1784     +199

@mfranciszkiewicz mfranciszkiewicz changed the base branch from multiple_dbs to develop March 7, 2018 09:24
@mfranciszkiewicz mfranciszkiewicz force-pushed the pending_messages branch 2 times, most recently from d288c6e to febc8ea Compare March 7, 2018 19:13
@mfranciszkiewicz mfranciszkiewicz force-pushed the pending_messages branch 2 times, most recently from 0ee1235 to d28862f Compare March 7, 2018 23:05
@jiivan
Copy link
Contributor

jiivan commented Mar 20, 2018

related to: #2223

Copy link
Contributor

@jiivan jiivan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The golem.network.pending part looks very complicated. Would it be possible to expose simple API/shortcuts as I suggested in https://github.com/golemfactory/golem/issues/2223#issuecomment-368491180 ? (pending.put(), pending.get(), pending.waiting())

class PendingMessage(PendingObjectModel):

type = IntegerField()
slots = MessageSlotsField()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should probably implement .as_message() method. (It should know how to create a message from itself)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

def get(cls,
key_id: str,
task_id: Optional[object] = ANY,
subtask_id: Optional[object] = ANY) -> Iterator[PendingMessage]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with .put() this should return Message iterator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

return reduce(operator.and_, clauses)


class PendingTaskSession(PendingObjectModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since in current implementation TaskSession instance is bound to a subtask, is it necessary to have separate PendingSession and PendingMessage? Logically we're interested in sending a pending message, and address:port should be accessible from PeerKeeper.

Copy link
Contributor Author

@mfranciszkiewicz mfranciszkiewicz Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PendingMessage is enough to send the message, but might not be enough to correctly process the response and send the following messages; e.g. self.result_owner is required in some places.

@mfranciszkiewicz
Copy link
Contributor Author

I don't think the API is that simple, since this is not a message queue. Sessions are lost on both ends, and sending a mid-protocol message requires consistency (on both ends). Pending task session's data may be redundant, but at least it's complete.

Please correct me if I'm missing something.

Copy link
Contributor

@Elfoniok Elfoniok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but why we need aux database?

@@ -116,15 +117,12 @@ def interpret(self, msg):
)
BasicSafeSession.interpret(self, msg)

def send(self, msg, send_unverified=False):
def send(self, msg) -> bool:
"""Send given message if connection was verified or send_unverified
option is set to True.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to change docs

@mfranciszkiewicz
Copy link
Contributor Author

@Elfoniok The main database is mildly congested at times and the second database is there in order not to make the situation worse.

@mfranciszkiewicz
Copy link
Contributor Author

@jiivan Another approach would be to store session's state outside the TaskSession class and implement the message queue. Are there any plans for the latter?

@ghost ghost removed the needs review label Aug 23, 2018
@mfranciszkiewicz mfranciszkiewicz deleted the pending_messages branch May 13, 2019 11:04
# for free to subscribe to this conversation on GitHub. Already have an account? #.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants