Skip to content

Reticulum network #97

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Services.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ In your configuration ini file you will add the following section:
```ini
[application_name.service_id]
__class = digitalpy.core.service_management.domain.model.service_configuration.ServiceConfiguration
status = STOPPED
status = [RUNNING|STOPPED]
name = MyNewService
port = 8443
host = 0.0.0.0
protocol = TCP
protocol = [FlaskHTTPNetworkBlueprints|TCPNetwork|Reticulum]
```

Next you will add or update the ServiceManagementConfiguration section as follows:
Expand Down
4 changes: 4 additions & 0 deletions digitalpy/core/core_config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ blueprint_import_base = digitalpy.blueprints
; NETWORK OBJECTS

; the default tcp_network
[Reticulum]
__class = digitalpy.core.network.impl.network_reticulum.ReticulumNetwork
client = DefaultClient

[TCPNetwork]
__class = digitalpy.core.network.impl.network_tcp.TCPNetwork
client = DefaultClient
Expand Down
197 changes: 197 additions & 0 deletions digitalpy/core/network/impl/network_reticulum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""This file contains the reticulum network implementation.
This consits of two main classes
1. reticulum manager which is responsible for running the reticulum network stack in a separate process and exposing to the network.
2. reticulum network which is responsible for exposing the network interface to a service
"""

import threading
import RNS
import LXMF
import os
import time
import zmq
from multiprocessing import Queue
from typing import Callable
from digitalpy.core.zmanager.response import Response
from digitalpy.core.domain.object_id import ObjectId
from digitalpy.core.network.domain.client_status import ClientStatus
from digitalpy.core.domain.domain.network_client import NetworkClient
from digitalpy.core.main.object_factory import ObjectFactory
from digitalpy.core.network.network_sync_interface import NetworkSyncInterface
from digitalpy.core.zmanager.request import Request

APP_NAME = LXMF.APP_NAME + ".delivery"

class AnnounceHandler:
def __init__(self, identities):
self.aspect_filter = APP_NAME # Filter for LXMF announcements
self.identities = identities # Dictionary to store identities

def received_announce(self, destination_hash, announced_identity, app_data):
if destination_hash not in self.identities:
self.identities[destination_hash] = announced_identity

class ReticulumNetwork(NetworkSyncInterface):
def __init__(self):
self._storage_path = None
self._identity_path = None
self._announcer_thread = None
self.message_queue = Queue()
self._clients = {}
self._ret = None
self._lxm_router = None
self._identity = None
self._my_identity = None
self._identities = {}

def initialize_network(self, _, _port, storage_path, identity_path, service_desc):
self._storage_path = storage_path
self._identity_path = identity_path
self._ret = RNS.Reticulum()
self._lxm_router = LXMF.LXMRouter(storagepath=self._storage_path)
RNS.Transport.register_announce_handler(AnnounceHandler(self._identities))
self._identity = self._load_or_generate_identity()
self._my_identity = self._lxm_router.register_delivery_identity(self._identity)
self._lxm_router.register_delivery_callback(self._ret_deliver)
announcer_thread = threading.Thread(target=self._announcer)
announcer_thread.start()
self._service_desc = service_desc

def _load_or_generate_identity(self):
if os.path.exists(self._identity_path):
try:
return RNS.Identity.from_file(self._identity_path)
except RNS.InvalidIdentityFile:
pass
identity = RNS.Identity()
identity.to_file(self._identity_path)
return identity

def _get_client(self, identity: RNS.Identity) -> NetworkClient:
if identity.hash in self._clients:
return self._clients[identity.hash]
else:
client = self._register_new_client(identity.hash)
self._clients[identity.hash] = client
self._identities[identity.hash] = identity
return client

def _ret_deliver(self, message: LXMF.LXMessage):
try:
# validate the message
if message.signature_validated:
validated = True
elif message.unverified_reason == LXMF.LXMessage.SIGNATURE_INVALID:
validated = False
elif message.unverified_reason == LXMF.LXMessage.SOURCE_UNKNOWN:
validated = False
else:
validated = False

# deliver the message to the network
if validated and message.content is not None and message.content != b"":
req: Request = ObjectFactory.get_new_instance("Request")
req.set_value("body", message.content.decode("utf-8"))
req.set_action("reticulum_message")
req.set_value("client", self._get_client(message.source.identity))
self.message_queue.put(req, block=False, timeout=0)
except Exception as e:
print(e)

def _register_new_client(self, destination_hash: bytes):
"""Register a new client to the network.
Args:
destination_hash (bytes): The hash of the client destination to register.
"""
oid = ObjectId("network_client", id=str(destination_hash))
client: NetworkClient = ObjectFactory.get_new_instance(
"DefaultClient", dynamic_configuration={"oid": oid}
)
client.id = destination_hash
client.status = ClientStatus.CONNECTED
client.service_id = self._service_desc.name
client.protocol = self._service_desc.protocol
return client

def _get_client_identity(self, message: LXMF.LXMessage) -> bytes:
"""Get the identity of the client that sent the message. This is used for IAM and client tracking.
Args:
message (LXMF.LXMessage): The message to extract the identity from.

Returns:
bytes: The identity of the client as bytes
"""
return message.source.identity.hash

def _announcer(self, interval: int = 60):
"""Announce the reticulum network to the network."""
while True:
try:
self._my_identity.announce()
except Exception as e:
pass
time.sleep(interval)

def _send_message_to_all_clients(self, message: str):
for identity in self._clients.values():
dest = RNS.Destination(
self._identities[identity.id],
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery",
)
msg = LXMF.LXMessage(
destination=dest,
source=self._my_identity,
content=message.encode("utf-8"),
desired_method=LXMF.LXMessage.DIRECT,
)
self._lxm_router.handle_outbound(msg)

def _send_message_to_client(self, message: dict, client: NetworkClient):
identity = self._identities.get(client.id)
if identity is not None:
dest = RNS.Destination(
identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"lxmf",
"delivery",
)
msg = LXMF.LXMessage(
destination=dest,
source=self._my_identity,
content=message.encode("utf-8"),
desired_method=LXMF.LXMessage.DIRECT,
)
self._lxm_router.handle_outbound(msg)

def service_connections(self, max_requests=1000, blocking=False, timeout=0):
start_time = time.time()
messages = []
if self.message_queue.empty():
return []
messages.append(self.message_queue.get(block=blocking, timeout=timeout))
while time.time() - start_time < timeout and len(messages) < max_requests:
try:
message = self.message_queue.get(block=False)
messages.append(message)
except Exception as e:
break
return messages

def send_response(self, response):
if response.get_value("client") is not None:
self._send_message_to_client(response.get_value("message"), response.get_value("client"))
else:
self._send_message_to_all_clients(response.get_value("message"))

def receive_message(self, blocking = False):
return self.message_queue.get(block=blocking)

def receive_message_from_client(self, client, blocking = False):
raise NotImplementedError

def teardown_network(self):
pass
Empty file.
82 changes: 82 additions & 0 deletions examples/reticulum_app/components/Chat/Chat_facade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from digitalpy.core.component_management.impl.default_facade import DefaultFacade
from digitalpy.core.zmanager.impl.async_action_mapper import AsyncActionMapper
from digitalpy.core.zmanager.impl.default_action_mapper import DefaultActionMapper
from digitalpy.core.zmanager.request import Request
from digitalpy.core.zmanager.response import Response
from .controllers.chat_controller import ChatController
from .configuration.chat_constants import (
ACTION_MAPPING_PATH,
LOGGING_CONFIGURATION_PATH,
INTERNAL_ACTION_MAPPING_PATH,
MANIFEST_PATH,
CONFIGURATION_PATH_TEMPLATE,
LOG_FILE_PATH,
ACTION_FLOW_PATH,
)

from . import base


class Chat(DefaultFacade):
"""
"""

def __init__(self, sync_action_mapper: DefaultActionMapper, request: Request,
response: Response, configuration,
action_mapper: AsyncActionMapper = None, # type: ignore
tracing_provider_instance=None): # type: ignore
super().__init__(
# the path to the external action mapping
action_mapping_path=str(ACTION_MAPPING_PATH),
# the path to the internal action mapping
internal_action_mapping_path=str(INTERNAL_ACTION_MAPPING_PATH),
# the path to the logger configuration
logger_configuration=str(LOGGING_CONFIGURATION_PATH),
# the package containing the base classes
base=base, # type: ignore
# the general action mapper (passed by constructor)
action_mapper=sync_action_mapper,
# the request object (passed by constructor)
request=request,
# the response object (passed by constructor)
response=response,
# the configuration object (passed by constructor)
configuration=configuration,
# log file path
log_file_path=LOG_FILE_PATH,
# the tracing provider used
tracing_provider_instance=tracing_provider_instance,
# the template for the absolute path to the model object definitions
configuration_path_template=CONFIGURATION_PATH_TEMPLATE,
# the path to the manifest file
manifest_path=str(MANIFEST_PATH),
# the general action mapper (passed by constructor)
action_flow_path=str(ACTION_FLOW_PATH),
)
self.Chat_controller = ChatController(
request, response, sync_action_mapper, configuration)

def initialize(self, request: Request, response: Response):
self.Chat_controller.initialize(request, response)

return super().initialize(request, response)

def execute(self, method=None):
try:
if hasattr(self, method): # type: ignore
print("executing method "+str(method)) # type: ignore
getattr(self, method)(**self.request.get_values()) # type: ignore
else:
self.request.set_value("logger", self.logger)
self.request.set_value("config_loader", self.config_loader)
self.request.set_value("tracer", self.tracer)
response = self.execute_sub_action(self.request.get_action())
self.response.set_values(response.get_values())
except Exception as e:
self.logger.fatal(str(e))

@DefaultFacade.public
def Chat(self, *args, **kwargs):
"""Updates an existing Genre record.
"""
self.Chat_controller.Chat(*args, **kwargs)
48 changes: 48 additions & 0 deletions examples/reticulum_app/components/Chat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# FilmologyManagement Digital Component

## Description
"Filmology is a test API that describes all the possible variant of a DAF + API model and his implementation as a DigitalPy application"

## Configuration
1. copy the file FilmologyManagement_blueprint.py into your application blueprints folder
2. add the following to your configured core api flows in the configuration/object_configuration.ini file.
```ini
[
; FilmologyManagement component flows
,FilmologyManagement__POSTMovie
,FilmologyManagement__DELETEMovie
,FilmologyManagement__GETMovie
,FilmologyManagement__PATCHMovie
,FilmologyManagement__GETDirectorId
,FilmologyManagement__POSTPoster
,FilmologyManagement__DELETEPoster
,FilmologyManagement__GETPoster
,FilmologyManagement__PATCHPoster
,FilmologyManagement__GETGenreId
,FilmologyManagement__POSTDate
,FilmologyManagement__DELETEDate
,FilmologyManagement__GETDate
,FilmologyManagement__PATCHDate
,FilmologyManagement__GETLanguageId
,FilmologyManagement__POSTDirector
,FilmologyManagement__DELETEDirector
,FilmologyManagement__GETDirector
,FilmologyManagement__PATCHDirector
,FilmologyManagement__GETDateId
,FilmologyManagement__POSTActor
,FilmologyManagement__DELETEActor
,FilmologyManagement__GETActor
,FilmologyManagement__PATCHActor
,FilmologyManagement__GETMovieId
,FilmologyManagement__POSTLanguage
,FilmologyManagement__DELETELanguage
,FilmologyManagement__GETLanguage
,FilmologyManagement__PATCHLanguage
,FilmologyManagement__GETPosterId
,FilmologyManagement__GETActorId
,FilmologyManagement__POSTGenre
,FilmologyManagement__DELETEGenre
,FilmologyManagement__GETGenre
,FilmologyManagement__PATCHGenre
]
```
4 changes: 4 additions & 0 deletions examples/reticulum_app/components/Chat/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""This module contains all the supporting components without business logic
it should also be noted that the component action mapper must be exposed as action mapper.
"""
from .chat_action_mapper import ChatActionMapper as ActionMapper
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from digitalpy.core.zmanager.impl.default_action_mapper import DefaultActionMapper


class ChatActionMapper(DefaultActionMapper):
pass
2 changes: 2 additions & 0 deletions examples/reticulum_app/components/Chat/base/chat_domain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ChatDomain():
pass
Empty file.
Loading
Loading