Skip to content

Commit

Permalink
Make object store read timeout configurable (#763)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek authored Jun 14, 2024
1 parent 3c72aa5 commit 65d1323
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 7 deletions.
3 changes: 3 additions & 0 deletions medusa-example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ use_sudo_for_restore = True

;aws_cli_path = <Location of the aws cli binary if not in PATH>

; Read timeout in seconds for the storage provider.
;read_timeout = 60

[monitoring]
;monitoring_provider = <Provider used for sending metrics. Currently either of "ffwd" or "local">

Expand Down
5 changes: 3 additions & 2 deletions medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
['bucket_name', 'key_file', 'prefix', 'fqdn', 'host_file_separator', 'storage_provider',
'base_path', 'max_backup_age', 'max_backup_count', 'api_profile', 'transfer_max_bandwidth',
'concurrent_transfers', 'multi_part_upload_threshold', 'host', 'region', 'port', 'secure', 'ssl_verify',
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode']
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode', 'read_timeout']
)

CassandraConfig = collections.namedtuple(
Expand Down Expand Up @@ -116,7 +116,8 @@ def _build_default_config():
'fqdn': socket.getfqdn(),
'region': 'default',
'backup_grace_period_in_days': 10,
'use_sudo_for_restore': 'True'
'use_sudo_for_restore': 'True',
'read_timeout': 60
}

config['logging'] = {
Expand Down
9 changes: 8 additions & 1 deletion medusa/storage/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def __init__(self, config):
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING)
logging.getLogger('chardet.universaldetector').setLevel(logging.WARNING)

self.read_timeout = int(config.read_timeout)

super().__init__(config)

def _make_blob_service_url(self, account_name, config):
Expand Down Expand Up @@ -85,7 +87,10 @@ async def _disconnect(self):

async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
blobs = []
async for b_props in self.azure_container_client.list_blobs(name_starts_with=str(prefix)):
async for b_props in self.azure_container_client.list_blobs(
name_starts_with=str(prefix),
timeout=self.read_timeout
):
blobs.append(AbstractBlob(
b_props.name,
b_props.size,
Expand Down Expand Up @@ -150,6 +155,7 @@ async def _download_blob(self, src: str, dest: str):
downloader = await self.azure_container_client.download_blob(
blob=object_key,
max_concurrency=workers,
timeout=self.read_timeout,
)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
await downloader.readinto(open(file_path, "wb"))
Expand Down Expand Up @@ -206,6 +212,7 @@ async def _read_blob_as_bytes(self, blob: AbstractBlob) -> bytes:
downloader = await self.azure_container_client.download_blob(
blob=blob.name,
max_concurrency=1,
timeout=self.read_timeout,
)
return await downloader.readall()

Expand Down
10 changes: 7 additions & 3 deletions medusa/storage/google_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def __init__(self, config):

logging.getLogger('gcloud.aio.storage.storage').setLevel(logging.WARNING)

self.read_timeout = int(config.read_timeout)

super().__init__(config)

def connect(self):
Expand Down Expand Up @@ -94,7 +96,8 @@ async def _paginate_objects(self, prefix=None):
# fetch a page
page = await self.gcs_storage.list_objects(
bucket=self.bucket_name,
params=params
params=params,
timeout=self.read_timeout,
)

# got nothing, return from the function
Expand Down Expand Up @@ -151,7 +154,7 @@ async def _download_blob(self, src: str, dest: str):
stream = await self.gcs_storage.download_stream(
bucket=self.bucket_name,
object_name=object_key,
timeout=-1,
timeout=self.read_timeout if self.read_timeout is not None else -1,
)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
Expand All @@ -171,6 +174,7 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
blob = await self.gcs_storage.download_metadata(
bucket=self.bucket_name,
object_name=object_key,
timeout=self.read_timeout,
)
return AbstractBlob(
blob['name'],
Expand Down Expand Up @@ -233,7 +237,7 @@ async def _read_blob_as_bytes(self, blob: AbstractBlob) -> bytes:
bucket=self.bucket_name,
object_name=blob.name,
session=self.session,
timeout=-1
timeout=self.read_timeout if self.read_timeout is not None else -1,
)
return content

Expand Down
3 changes: 2 additions & 1 deletion medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def connect(self):
region_name=self.credentials.region,
signature_version='v4',
tcp_keepalive=True,
max_pool_connections=max_pool_size
max_pool_connections=max_pool_size,
read_timeout=int(self.config.read_timeout),
)
if self.credentials.access_key_id is not None:
self.s3_client = boto3.client(
Expand Down
3 changes: 3 additions & 0 deletions tests/storage/azure_storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_make_connection_url(self):
'concurrent_transfers': '1',
'host': None,
'port': None,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand All @@ -52,6 +53,7 @@ def test_make_connection_url_with_custom_host(self):
'concurrent_transfers': '1',
'host': 'custom.host.net',
'port': None,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand All @@ -71,6 +73,7 @@ def test_make_connection_url_with_custom_host_port(self):
'concurrent_transfers': '1',
'host': 'custom.host.net',
'port': 123,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand Down

0 comments on commit 65d1323

Please # to comment.