Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Track transport messages #261

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion src/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
'transactions',
'Amount of send transaction from bot with per module distribution.',
['status', 'module_id', 'is_mellow'],
namespace=PROMETHEUS_PREFIX
namespace=PROMETHEUS_PREFIX,
)

ACCOUNT_BALANCE = Gauge('account_balance', 'Account balance', namespace=PROMETHEUS_PREFIX)
Expand Down Expand Up @@ -116,6 +116,48 @@

MODULES = Gauge('modules', 'Modules gauge', ['module_id'], namespace=PROMETHEUS_PREFIX)

ONCHAIN_TRANSPORT_FETCHED_MESSAGES = Gauge(
'onchain_fetched_messages',
'Total count of fetched onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

ONCHAIN_TRANSPORT_PROCESSED_MESSAGES = Gauge(
'onchain_processed_messages',
'Total count of processed onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

ONCHAIN_TRANSPORT_VALID_MESSAGES = Gauge(
'onchain_valid_messages',
'Total count of valid onchain messages',
['chain_id'],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_FETCHED_MESSAGES = Gauge(
'rabbit_fetched_messages',
'Total count of fetched rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_PROCESSED_MESSAGES = Gauge(
'rabbit_processed_messages',
'Total count of processed rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

RABBIT_TRANSPORT_VALID_MESSAGES = Gauge(
'rabbit_valid_messages',
'Total count of valid rabbit messages',
[],
namespace=PROMETHEUS_PREFIX,
)

for module_id in DEPOSIT_MODULES_WHITELIST:
MODULES.labels(module_id).set(1)

Expand Down
25 changes: 23 additions & 2 deletions src/transport/msg_providers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
from typing import Any

from prometheus_client import Gauge
from schema import Schema, SchemaError

logger = logging.getLogger(__name__)
Expand All @@ -22,8 +23,13 @@ def get_messages(self) -> list[dict]:
Returns:
List[Dict]: A list of processed and valid messages.
"""
processed = [self._process_msg(m) for m in self._fetch_messages()]
return [msg for msg in processed if msg and self._is_valid(msg)]
fetched = self._fetch_messages()
self.fetched_messages_metric.set(len(fetched))
processed = [self._process_msg(m) for m in fetched]
self.processed_messages_metric.set(len(processed))
valid = [msg for msg in processed if msg and self._is_valid(msg)]
self.valid_messages_metric.set(len(valid))
return valid

@abc.abstractmethod
def _fetch_messages(self) -> list:
Expand All @@ -42,3 +48,18 @@ def _is_valid(self, msg: dict):
return False

return True

@property
@abc.abstractmethod
def fetched_messages_metric(self) -> Gauge:
raise NotImplementedError('fetched_messages_metric')

@property
@abc.abstractmethod
def processed_messages_metric(self) -> Gauge:
raise NotImplementedError('processed_messages_metric')

@property
@abc.abstractmethod
def valid_messages_metric(self) -> Gauge:
raise NotImplementedError('valid_messages_metric')
15 changes: 15 additions & 0 deletions src/transport/msg_providers/onchain_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from typing import Callable, List, Optional

from eth_typing import ChecksumAddress
from metrics.metrics import ONCHAIN_TRANSPORT_FETCHED_MESSAGES, ONCHAIN_TRANSPORT_PROCESSED_MESSAGES, ONCHAIN_TRANSPORT_VALID_MESSAGES
from prometheus_client import Gauge
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.rabbit import MessageType
Expand Down Expand Up @@ -231,6 +233,7 @@ def __init__(
logger.info('Data bus client initialized.')

self._w3 = w3
self._chain_id = self._w3.eth.chain_id
self._parsers: List[EventParser] = [provider(w3) for provider in parsers_providers]
self._topics = [self._w3.keccak(text=parser.message_abi) for parser in self._parsers]

Expand Down Expand Up @@ -282,3 +285,15 @@ def _process_msg(self, log: LogReceipt) -> Optional[dict]:
}
)
return None

@property
def fetched_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_FETCHED_MESSAGES.labels(self._chain_id)

@property
def processed_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_PROCESSED_MESSAGES.labels(self._chain_id)

@property
def valid_messages_metric(self) -> Gauge:
return ONCHAIN_TRANSPORT_VALID_MESSAGES.labels(self._chain_id)
14 changes: 14 additions & 0 deletions src/transport/msg_providers/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import List, Optional

import variables
from metrics.metrics import RABBIT_TRANSPORT_FETCHED_MESSAGES, RABBIT_TRANSPORT_PROCESSED_MESSAGES, RABBIT_TRANSPORT_VALID_MESSAGES
from prometheus_client import Gauge
from schema import Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_providers.stomp.client import Client
Expand Down Expand Up @@ -106,3 +108,15 @@ def _process_msg(self, msg: str) -> Optional[dict]:
return None

return value

@property
def fetched_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_FETCHED_MESSAGES

@property
def processed_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_PROCESSED_MESSAGES

@property
def valid_messages_metric(self) -> Gauge:
return RABBIT_TRANSPORT_VALID_MESSAGES
15 changes: 15 additions & 0 deletions tests/transport/test_base_provider.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
from unittest.mock import Mock

import pytest
from prometheus_client import Gauge
from schema import Or, Schema
from transport.msg_providers.common import BaseMessageProvider
from transport.msg_types.deposit import DepositMessageSchema


class FakeMessageProvider(BaseMessageProvider):
@property
def fetched_messages_metric(self) -> Gauge:
return Mock()

@property
def processed_messages_metric(self) -> Gauge:
return Mock()

@property
def valid_messages_metric(self) -> Gauge:
return Mock()

MAX_MESSAGES_RECEIVE = 1

def _fetch_messages(self) -> list:
Expand Down
31 changes: 17 additions & 14 deletions tests/transport/test_data_bus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import cast
from unittest import mock
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -91,22 +92,24 @@ def test_data_bus_provider(web3_transaction_integration):

@pytest.mark.unit
def test_data_bus_mock_responses(web3_lido_unit):
receipts = mock_receipts(web3_lido_unit)
web3_lido_unit.eth.get_logs = Mock(side_effect=[receipts, None])
web3_lido_unit.is_connected = Mock(return_value=True)
web3_lido_unit.eth.get_block_number = Mock(return_value=1)
provider = OnchainTransportProvider(
w3=web3_lido_unit,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
)
with mock.patch('web3.eth.Eth.chain_id', new_callable=mock.PropertyMock) as mock_chain_id:
mock_chain_id.return_value = 1
receipts = mock_receipts(web3_lido_unit)
web3_lido_unit.eth.get_logs = Mock(side_effect=[receipts, None])
web3_lido_unit.is_connected = Mock(return_value=True)
web3_lido_unit.eth.get_block_number = Mock(return_value=1)
provider = OnchainTransportProvider(
w3=web3_lido_unit,
onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS,
message_schema=Schema(Or(DepositMessageSchema, PingMessageSchema)),
parsers_providers=[DepositParser, PingParser],
)

for parser in provider._parsers:
parser._decode_event = Mock(side_effect=lambda x: x)
for parser in provider._parsers:
parser._decode_event = Mock(side_effect=lambda x: x)

messages = provider.get_messages()
assert len(messages) == len(receipts)
messages = provider.get_messages()
assert len(messages) == len(receipts)


# event MessageDepositV1(address indexed guardianAddress, (uint256 blockNumber, bytes32 blockHash, bytes32 depositRoot,
Expand Down
Loading