From c52d9d708cc6b30bc0b49d1b9c0c64dda5598a66 Mon Sep 17 00:00:00 2001 From: hweawer Date: Mon, 16 Dec 2024 14:10:55 +0100 Subject: [PATCH] Add metrics middleware for an onchain transport w3 (#296) * Add metrics middleware for an onchain transport w3 * Get chain_id on initialization * Single labels call * Refactor * Pass metric as a parameter --- src/blockchain/web3_extentions/middleware.py | 13 +++++++------ src/bots/depositor.py | 2 +- src/bots/pauser.py | 2 +- src/bots/unvetter.py | 2 +- src/main.py | 3 ++- src/metrics/metrics.py | 7 +++++++ src/transport/msg_providers/onchain_transport.py | 8 +++++--- tests/fixtures/provider.py | 3 ++- 8 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/blockchain/web3_extentions/middleware.py b/src/blockchain/web3_extentions/middleware.py index b14fadc8..53dd6fb2 100644 --- a/src/blockchain/web3_extentions/middleware.py +++ b/src/blockchain/web3_extentions/middleware.py @@ -2,7 +2,8 @@ from typing import Any, Callable, Set, cast from urllib.parse import urlparse -from metrics.metrics import ETH_RPC_REQUESTS, ETH_RPC_REQUESTS_DURATION +from metrics.metrics import ETH_RPC_REQUESTS_DURATION +from prometheus_client import Counter from requests import HTTPError, Response from web3 import Web3 from web3.middleware import construct_simple_cache_middleware @@ -11,7 +12,7 @@ logger = logging.getLogger(__name__) -def add_requests_metric_middleware(web3: Web3) -> Web3: +def add_requests_metric_middleware(web3: Web3, rpc_metric: Counter) -> Web3: """ Works correctly with MultiProvider and vanilla Providers. @@ -28,7 +29,7 @@ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: response = make_request(method, params) except HTTPError as ex: failed: Response = ex.response - ETH_RPC_REQUESTS.labels( + rpc_metric.labels( method=method, code=failed.status_code, domain=urlparse(web3.provider.endpoint_uri).netloc, # pyright: ignore @@ -42,7 +43,7 @@ def middleware(method: RPCEndpoint, params: Any) -> RPCResponse: if isinstance(error, dict): code = error.get('code') or code - ETH_RPC_REQUESTS.labels( + rpc_metric.labels( method=method, code=code, domain=urlparse(web3.provider.endpoint_uri).netloc, # pyright: ignore @@ -70,12 +71,12 @@ def add_cache_middleware(web3: Web3) -> Web3: return web3 -def add_middlewares(web3: Web3) -> Web3: +def add_middlewares(web3: Web3, rpc_metric: Counter) -> Web3: """ Cache middleware should go first to avoid rewriting metrics for cached requests. If middleware has level = 0, the middleware will be appended to the end of the middleware list. So we need [..., cache, other middlewares] """ add_cache_middleware(web3) - add_requests_metric_middleware(web3) + add_requests_metric_middleware(web3, rpc_metric) return web3 diff --git a/src/bots/depositor.py b/src/bots/depositor.py index c93ab33b..cacb8683 100644 --- a/src/bots/depositor.py +++ b/src/bots/depositor.py @@ -84,7 +84,7 @@ def __init__( self._onchain_transport_w3 = None if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: - self._onchain_transport_w3 = OnchainTransportProvider.create_ochain_transport_w3() + self._onchain_transport_w3 = OnchainTransportProvider.create_onchain_transport_w3() transports.append( OnchainTransportProvider( w3=self._onchain_transport_w3, diff --git a/src/bots/pauser.py b/src/bots/pauser.py index 0919134c..beb83da9 100644 --- a/src/bots/pauser.py +++ b/src/bots/pauser.py @@ -50,7 +50,7 @@ def __init__(self, w3: Web3): if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: transports.append( OnchainTransportProvider( - w3=OnchainTransportProvider.create_ochain_transport_w3(), + w3=OnchainTransportProvider.create_onchain_transport_w3(), onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(PauseMessageSchema, PingMessageSchema)), parsers_providers=[PauseV2Parser, PauseV3Parser, PingParser], diff --git a/src/bots/unvetter.py b/src/bots/unvetter.py index 63b43181..a91e2f54 100644 --- a/src/bots/unvetter.py +++ b/src/bots/unvetter.py @@ -55,7 +55,7 @@ def prepare_transport_bus(self): if TransportType.ONCHAIN_TRANSPORT in variables.MESSAGE_TRANSPORTS: transports.append( OnchainTransportProvider( - w3=OnchainTransportProvider.create_ochain_transport_w3(), + w3=OnchainTransportProvider.create_onchain_transport_w3(), onchain_address=variables.ONCHAIN_TRANSPORT_ADDRESS, message_schema=Schema(Or(UnvetMessageSchema, PingMessageSchema)), parsers_providers=[UnvetParser, PingParser], diff --git a/src/main.py b/src/main.py index 02800f0a..7d1de32a 100644 --- a/src/main.py +++ b/src/main.py @@ -11,6 +11,7 @@ from bots.unvetter import run_unvetter from metrics.healthcheck_pulse import start_pulse_server from metrics.logging import logging +from metrics.metrics import ETH_RPC_REQUESTS from prometheus_client import start_http_server from web3_multi_provider import FallbackProvider @@ -49,7 +50,7 @@ def main(bot_name: str): ) logger.info({'msg': 'Add metrics to web3 requests.'}) - add_middlewares(w3) + add_middlewares(w3, ETH_RPC_REQUESTS) if bot_name == BotModule.DEPOSITOR: run_depositor(w3) diff --git a/src/metrics/metrics.py b/src/metrics/metrics.py index 1aa38da2..f3503980 100644 --- a/src/metrics/metrics.py +++ b/src/metrics/metrics.py @@ -114,6 +114,13 @@ namespace=PROMETHEUS_PREFIX, ) +ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS = Counter( + 'onchain_transport_rpc_requests', + 'Total count of requests to onchain transport RPC', + ['method', 'code', 'domain'], + namespace=PROMETHEUS_PREFIX, +) + UNEXPECTED_EXCEPTIONS = Counter( 'unexpected_exceptions', 'Total count of unexpected exceptions', diff --git a/src/transport/msg_providers/onchain_transport.py b/src/transport/msg_providers/onchain_transport.py index 782c910b..97b41cca 100644 --- a/src/transport/msg_providers/onchain_transport.py +++ b/src/transport/msg_providers/onchain_transport.py @@ -3,9 +3,10 @@ from typing import Callable, List, Optional import variables -from blockchain.web3_extentions.middleware import add_cache_middleware +from blockchain.web3_extentions.middleware import add_middlewares from eth_typing import ChecksumAddress from eth_utils import to_bytes +from metrics.metrics import ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS from schema import Schema from transport.msg_providers.common import BaseMessageProvider from transport.msg_providers.rabbit import MessageType @@ -325,5 +326,6 @@ def _parse_log(self, log: LogReceipt) -> Optional[dict]: return None @staticmethod - def create_ochain_transport_w3() -> Web3: - return add_cache_middleware(Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS))) + def create_onchain_transport_w3() -> Web3: + w3 = Web3(FallbackProvider(variables.ONCHAIN_TRANSPORT_RPC_ENDPOINTS)) + return add_middlewares(w3, ONCHAIN_TRANSPORT_ETH_RPC_REQUESTS) diff --git a/tests/fixtures/provider.py b/tests/fixtures/provider.py index 7ceebc88..177d5e91 100644 --- a/tests/fixtures/provider.py +++ b/tests/fixtures/provider.py @@ -6,6 +6,7 @@ from blockchain.web3_extentions.lido_contracts import LidoContracts from blockchain.web3_extentions.middleware import add_middlewares from blockchain.web3_extentions.transaction import TransactionUtils +from metrics.metrics import ETH_RPC_REQUESTS from web3 import HTTPProvider, Web3 from tests.fork import anvil_fork @@ -35,7 +36,7 @@ def web3_provider_integration(request) -> Web3: with anvil_fork(anvil_path, rpc_endpoint, block_num): w3 = Web3(HTTPProvider('http://127.0.0.1:8545', request_kwargs={'timeout': 3600})) - add_middlewares(w3) + add_middlewares(w3, ETH_RPC_REQUESTS) assert w3.is_connected(), 'Failed to connect to the Web3 provider.' yield w3