Skip to content

Commit

Permalink
Move to the loop
Browse files Browse the repository at this point in the history
  • Loading branch information
hweawer committed Oct 16, 2024
1 parent 9688c29 commit 0cc2218
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 58 deletions.
5 changes: 3 additions & 2 deletions src/bots/depositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
QUORUM,
UNEXPECTED_EXCEPTIONS,
)
from metrics.transport_message_metrics import message_metrics_curried
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import DepositParser, OnchainTransportProvider, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
Expand Down Expand Up @@ -105,10 +105,11 @@ def __init__(
self.message_storage = MessageStorage(
transports,
filters=[
message_metrics_curried(web3_clients),
message_metrics_filter,
to_check_sum_address,
get_messages_sign_filter(self.w3),
],
web3_clients=web3_clients,
)

def execute(self, block: BlockData) -> bool:
Expand Down
5 changes: 3 additions & 2 deletions src/bots/pauser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from blockchain.executor import Executor
from blockchain.typings import Web3
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_curried
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PauseV2Parser, PauseV3Parser, PingParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
Expand Down Expand Up @@ -68,10 +68,11 @@ def __init__(self, w3: Web3):
self.message_storage = MessageStorage(
transports,
filters=[
message_metrics_curried(web3_clients),
message_metrics_filter,
to_check_sum_address,
get_messages_sign_filter(self.w3),
],
web3_clients=web3_clients,
)

def execute(self, block: BlockData) -> bool:
Expand Down
5 changes: 3 additions & 2 deletions src/bots/unvetter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from blockchain.executor import Executor
from blockchain.typings import Web3
from metrics.metrics import UNEXPECTED_EXCEPTIONS
from metrics.transport_message_metrics import message_metrics_curried
from metrics.transport_message_metrics import message_metrics_filter
from schema import Or, Schema
from transport.msg_providers.onchain_transport import OnchainTransportProvider, PingParser, UnvetParser
from transport.msg_providers.rabbit import MessageType, RabbitProvider
Expand Down Expand Up @@ -73,10 +73,11 @@ def prepare_transport_bus(self):
self.message_storage = MessageStorage(
transports,
filters=[
message_metrics_curried(web3_clients),
message_metrics_filter,
to_check_sum_address,
get_messages_sign_filter(self.w3),
],
web3_clients=web3_clients,
)

def execute(self, block: BlockData) -> bool:
Expand Down
93 changes: 42 additions & 51 deletions src/metrics/transport_message_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import TypedDict

from blockchain.typings import Web3
from metrics.metrics import DEPOSIT_MESSAGES, GUARDIAN_BALANCE, PAUSE_MESSAGES, PING_MESSAGES, UNVET_MESSAGES
from metrics.metrics import DEPOSIT_MESSAGES, PAUSE_MESSAGES, PING_MESSAGES, UNVET_MESSAGES
from transport.msg_providers.rabbit import MessageType

logger = logging.getLogger(__name__)
Expand All @@ -16,55 +16,46 @@ def _chain_id_to_web3_mapping(clients: list[Web3]):
return chain_id_web3


def message_metrics_curried(clients: list[Web3]):
chain_id_to_clients = _chain_id_to_web3_mapping(clients)

def message_metrics_filter(msg: TypedDict) -> bool:
"""
Processes guardian messages and updates Prometheus metrics based on the message type.
Returns True for valid message types to allow further processing, and False for messages
that should be filtered (such as PING messages).
Args:
msg: A dictionary containing message details.
Returns:
bool: True if the message should be processed, False otherwise.
"""
msg_type = msg.get('type')
logger.info({'msg': 'Guardian message received.', 'value': msg, 'type': msg_type})

address = msg.get('guardianAddress')
version = msg.get('app', {}).get('version')
transport = msg.get('transport', '')
chain_id = msg.get('chain_id', '')
staking_module_id = msg.get('stakingModuleId', -1)

for chain_id, client in chain_id_to_clients.items():
balance = client.eth.get_balance(address)
GUARDIAN_BALANCE.labels(address=address, chain_id=str(chain_id)).set(balance)

metrics_map = {
MessageType.PAUSE: PAUSE_MESSAGES,
MessageType.DEPOSIT: DEPOSIT_MESSAGES,
MessageType.UNVET: UNVET_MESSAGES,
}

if msg_type in metrics_map:
metrics_map[msg_type].labels(
address=address,
module_id=staking_module_id,
version=version,
transport=transport,
chain_id=chain_id,
).inc()
return True

if msg_type == MessageType.PING:
PING_MESSAGES.labels(address=address, version=version, transport=transport, chain_id=chain_id).inc()
return False

logger.warning({'msg': 'Received unexpected msg type.', 'value': msg, 'type': msg_type})
def message_metrics_filter(msg: TypedDict) -> bool:
"""
Processes guardian messages and updates Prometheus metrics based on the message type.
Returns True for valid message types to allow further processing, and False for messages
that should be filtered (such as PING messages).
Args:
msg: A dictionary containing message details.
Returns:
bool: True if the message should be processed, False otherwise.
"""
msg_type = msg.get('type')
logger.info({'msg': 'Guardian message received.', 'value': msg, 'type': msg_type})

address = msg.get('guardianAddress')
version = msg.get('app', {}).get('version')
transport = msg.get('transport', '')
chain_id = msg.get('chain_id', '')
staking_module_id = msg.get('stakingModuleId', -1)

metrics_map = {
MessageType.PAUSE: PAUSE_MESSAGES,
MessageType.DEPOSIT: DEPOSIT_MESSAGES,
MessageType.UNVET: UNVET_MESSAGES,
}

if msg_type in metrics_map:
metrics_map[msg_type].labels(
address=address,
module_id=staking_module_id,
version=version,
transport=transport,
chain_id=chain_id,
).inc()
return True

if msg_type == MessageType.PING:
PING_MESSAGES.labels(address=address, version=version, transport=transport, chain_id=chain_id).inc()
return False

return message_metrics_filter
logger.warning({'msg': 'Received unexpected msg type.', 'value': msg, 'type': msg_type})
return False
20 changes: 19 additions & 1 deletion src/transport/msg_storage.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
from typing import Any, Callable, Iterable, List

from blockchain.typings import Web3
from metrics.metrics import GUARDIAN_BALANCE
from transport.msg_providers.common import BaseMessageProvider


def _chain_id_to_web3_mapping(clients: list[Web3]):
chain_id_web3 = dict()
for w3_client in clients:
chain = w3_client.eth.chain_id
chain_id_web3[chain] = w3_client
return chain_id_web3


class MessageStorage:
messages: List = []

"""Fetches all messages, filter them and storing"""

def __init__(self, transports: List[BaseMessageProvider], filters: List[Callable]):
def __init__(self, transports: List[BaseMessageProvider], filters: List[Callable], web3_clients: list[Web3]):
"""
transports - List of objects with working get_messages method.
filters - functions that would be applied to messages when they are received. (That would need only one check)
"""
self._transports = transports
self._filters = filters
self._chain_id_to_web3 = _chain_id_to_web3_mapping(web3_clients)

def receive_messages(self) -> Iterable[dict]:
"""Fetch all messages from transport and filter them"""
Expand All @@ -38,3 +49,10 @@ def get_messages(self, actualize_rule: Callable[[Any], bool]) -> List[Any]:

def clear(self):
self.messages = []

def update_metrics(self):
addresses = set([m.get('guardianAddress') for m in self.messages])
for address in addresses:
for chain_id, client in self._chain_id_to_web3.items():
balance = client.eth.get_balance(address)
GUARDIAN_BALANCE.labels(address=address, chain_id=str(chain_id)).set(balance)

0 comments on commit 0cc2218

Please # to comment.