Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

refactor the web3 ProviderProxy retry logic #4971

Merged
merged 12 commits into from
Dec 10, 2019
93 changes: 65 additions & 28 deletions golem/ethereum/web3/providers.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,102 @@
import itertools
import logging
import socket

from typing import Optional
import time

from web3.exceptions import CannotHandleRequest
from web3.providers.rpc import HTTPProvider

logger = logging.getLogger(__name__)

RETRIES = 3
RETRY_COUNT_LIMIT = 10
RETRY_COUNT_INTERVAL = 1800 # seconds
SINGLE_QUERY_RETRY_LIMIT = 3
SINGLE_QUERY_NODE_LIMIT = 3


class ProviderProxy(HTTPProvider):

def __init__(self, initial_addr_list) -> None:
super().__init__()
self.initial_addr_list = initial_addr_list
self.addr_list = initial_addr_list
self._initial_addr_list = initial_addr_list
self._node_addresses = itertools.cycle(initial_addr_list)
self.provider = self._create_remote_rpc_provider()

self._retries = RETRIES
self._cur_errors = 0
self._single_query_node_limit = min(
SINGLE_QUERY_NODE_LIMIT, len(initial_addr_list)
)
self._init_retries_count()

def _init_retries_count(self, ts: float = None):
self._retries = 0
self._first_retry_time = ts

def _register_retry(self):
now = time.time()
if not self._first_retry_time \
or self._first_retry_time + RETRY_COUNT_INTERVAL < now:
self._init_retries_count(now)

self._retries += 1

def make_request(self, method, params):
logger.debug('ProviderProxy.make_request(%r, %r)', method, params)

nodes_tried = 0
retries = 0
response = None

while response is None:
try:
response = self.provider.make_request(method, params)
logger.debug('ProviderProxy.make_request(..) -- result = %r',
response)
logger.debug(
'GETH: request successful %s (%r, %r) -- result = %r',
self.provider.endpoint_uri, method, params, response
)
except (ConnectionError, ValueError,
socket.error, CannotHandleRequest) as exc:
logger.warning(
'GETH: request failure, retrying: %s',
exc,
retries += 1
self._register_retry()

retry = retries < SINGLE_QUERY_RETRY_LIMIT \
or self._retries < RETRY_COUNT_LIMIT

logger.debug(
"GETH: request failure%s"
". %s (%r, %r), error='%s', "
'single query retries=%s, node retries=%s',
', retrying' if retry else '',
self.provider.endpoint_uri, method, params, exc,
retries, self._retries,
)
self._cur_errors += 1
if self._cur_errors % self._retries == 0:
self._handle_remote_rpc_provider_failure()
self.reset()

if not retry:
nodes_tried += 1
self._handle_remote_rpc_provider_failure(
method,
nodes_tried >= self._single_query_node_limit
)
except Exception as exc:
logger.error("Unknown exception %r", exc)
raise
else:
self.reset()
self.addr_list = self.initial_addr_list

return response

def _create_remote_rpc_provider(self):
addr = self.addr_list.pop(0)
addr = next(self._node_addresses)
logger.info('GETH: connecting to remote RPC interface at %s', addr)
return HTTPProvider(addr)

def _handle_remote_rpc_provider_failure(self):
if not self.addr_list:
raise Exception("GETH: No more addresses to try, failed to connect")
logger.warning('GETH: reconnecting to another provider')
def _handle_remote_rpc_provider_failure(self, method: str, final: bool):
if final:
raise Exception(
"GETH: Node limit exhausted, request failed. method='%s'",
method
)
logger.warning(
"GETH: '%s' request failed on '%s', "
"reconnecting to another provider.",
method, self.provider.endpoint_uri,
)
self.provider = self._create_remote_rpc_provider()

def reset(self):
""" Resets the current error number counter """
self._cur_errors = 0
self._init_retries_count()