Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Message Queue #2223

Closed
7 tasks done
shadeofblue opened this issue Feb 26, 2018 · 3 comments · Fixed by #4111
Closed
7 tasks done

Message Queue #2223

shadeofblue opened this issue Feb 26, 2018 · 3 comments · Fixed by #4111
Assignees

Comments

@shadeofblue
Copy link
Contributor

shadeofblue commented Feb 26, 2018

Implement a Message Queue service that persistently stores messages outside of task sessions.

It would simplify TASK_CONN_TYPES. Developer would no longer need to create infamous _add_pending_request, __connection_established_for_XXX, __connection_failure_for_XXX, __connection_final_failure_for_XXX quartet. He would instead just call message_queue.put(node_id, ..., msg description)

It would also solve the persistence issue.

Updated 2018-02-26: more elaborated gains @jiivan
Updated 2018-02-26: no longer needed by Concent -- @jiivan

  • make TaskSession stateless
  • implement the Message Queue service
  • implement the Message Queue persistence layer
  • add unit tests for the new Message Queue service
  • simplify TaskServer - replace the connection types with a unified transport layer utilizing the Message Queue
  • Use MQ in offer_chosen callback
  • Implement task session timeouts
@jiivan
Copy link
Contributor

jiivan commented Feb 26, 2018

Queue should be implemented as separate sqlite3 database (separated from golem.db).

Possible schema implementation could look like this:

CREATE TABLE queue (
 node_id text not null primary key,
 task_id text,
 subtask_id text,
 msg_type int,
 -- pickled slots tuple
 msg_slots blob, 
)

Alternative approach would be to store msg blob instead of msg_type int and msg_slots blob. With whole message blob approach we would have to remember to reset message timestamp (and signature), because timestamp should reflect actual sending time and not the time of queue insertion.

API proposal:

def put(node_id: receivers uuid, task_id: uuid, subtask_id: uuid, msg_type: int, msg_slots: tuple) -> None:
    # ...

def get(node_id: receivers uuid, task_id: uuid, subtask_id: uuid) -> Iterator[golem_messages.message.base.Message]:
    """Returns iterator of messages.
        Can be used by an established TaskSession between ourselves and node_id
        to know what messages should be sent."""
    # ...

def waiting() -> Iterator[tuple(node_id, task_id, subtask_id)]:
    """Returns iterator of (node_id, task_id, subtask_id) that has pending messages.
        Can be used by TaskServer to know which nodes it should connect to.
    """
    # ...

@mplebanski
Copy link
Contributor

Queue should be implemented as separate sqlite3 database (separated from golem.db).

Why separate DB?

@jiivan
Copy link
Contributor

jiivan commented Apr 10, 2019

@mplebanski idea of separate db has been dropped.

# for free to subscribe to this conversation on GitHub. Already have an account? #.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants