Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Make read timeout configurable #763

Merged
merged 4 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions medusa-example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ use_sudo_for_restore = True

;aws_cli_path = <Location of the aws cli binary if not in PATH>

; Read timeout in seconds for the storage provider.
;read_timeout = 60

[monitoring]
;monitoring_provider = <Provider used for sending metrics. Currently either of "ffwd" or "local">

Expand Down
5 changes: 3 additions & 2 deletions medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
['bucket_name', 'key_file', 'prefix', 'fqdn', 'host_file_separator', 'storage_provider',
'base_path', 'max_backup_age', 'max_backup_count', 'api_profile', 'transfer_max_bandwidth',
'concurrent_transfers', 'multi_part_upload_threshold', 'host', 'region', 'port', 'secure', 'ssl_verify',
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode']
'aws_cli_path', 'kms_id', 'backup_grace_period_in_days', 'use_sudo_for_restore', 'k8s_mode', 'read_timeout']
)

CassandraConfig = collections.namedtuple(
Expand Down Expand Up @@ -116,7 +116,8 @@ def _build_default_config():
'fqdn': socket.getfqdn(),
'region': 'default',
'backup_grace_period_in_days': 10,
'use_sudo_for_restore': 'True'
'use_sudo_for_restore': 'True',
'read_timeout': 60
}

config['logging'] = {
Expand Down
9 changes: 8 additions & 1 deletion medusa/storage/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def __init__(self, config):
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING)
logging.getLogger('chardet.universaldetector').setLevel(logging.WARNING)

self.read_timeout = int(config.read_timeout)

super().__init__(config)

def _make_blob_service_url(self, account_name, config):
Expand Down Expand Up @@ -85,7 +87,10 @@ async def _disconnect(self):

async def _list_blobs(self, prefix=None) -> t.List[AbstractBlob]:
blobs = []
async for b_props in self.azure_container_client.list_blobs(name_starts_with=str(prefix)):
async for b_props in self.azure_container_client.list_blobs(
name_starts_with=str(prefix),
timeout=self.read_timeout
):
blobs.append(AbstractBlob(
b_props.name,
b_props.size,
Expand Down Expand Up @@ -150,6 +155,7 @@ async def _download_blob(self, src: str, dest: str):
downloader = await self.azure_container_client.download_blob(
blob=object_key,
max_concurrency=workers,
timeout=self.read_timeout,
)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
await downloader.readinto(open(file_path, "wb"))
Expand Down Expand Up @@ -206,6 +212,7 @@ async def _read_blob_as_bytes(self, blob: AbstractBlob) -> bytes:
downloader = await self.azure_container_client.download_blob(
blob=blob.name,
max_concurrency=1,
timeout=self.read_timeout,
)
return await downloader.readall()

Expand Down
10 changes: 7 additions & 3 deletions medusa/storage/google_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def __init__(self, config):

logging.getLogger('gcloud.aio.storage.storage').setLevel(logging.WARNING)

self.read_timeout = int(config.read_timeout)

super().__init__(config)

def connect(self):
Expand Down Expand Up @@ -94,7 +96,8 @@ async def _paginate_objects(self, prefix=None):
# fetch a page
page = await self.gcs_storage.list_objects(
bucket=self.bucket_name,
params=params
params=params,
timeout=self.read_timeout,
)

# got nothing, return from the function
Expand Down Expand Up @@ -151,7 +154,7 @@ async def _download_blob(self, src: str, dest: str):
stream = await self.gcs_storage.download_stream(
bucket=self.bucket_name,
object_name=object_key,
timeout=-1,
timeout=self.read_timeout if self.read_timeout is not None else -1,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @rzvoncek,

I am a user of the k8ssandra operator on gcp and this change can make a breaking change in the restore case if the files are big. We are switching from infinite to 60 seconds timeout if I am not mistaken.

Do you think we should rollback the change for the gcp case or should I post a comment here k8ssandra/k8ssandra-operator#1353 to know if we know when this will be work on ?

)
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'wb') as f:
Expand All @@ -171,6 +174,7 @@ async def _stat_blob(self, object_key: str) -> AbstractBlob:
blob = await self.gcs_storage.download_metadata(
bucket=self.bucket_name,
object_name=object_key,
timeout=self.read_timeout,
)
return AbstractBlob(
blob['name'],
Expand Down Expand Up @@ -233,7 +237,7 @@ async def _read_blob_as_bytes(self, blob: AbstractBlob) -> bytes:
bucket=self.bucket_name,
object_name=blob.name,
session=self.session,
timeout=-1
timeout=self.read_timeout if self.read_timeout is not None else -1,
)
return content

Expand Down
3 changes: 2 additions & 1 deletion medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def connect(self):
region_name=self.credentials.region,
signature_version='v4',
tcp_keepalive=True,
max_pool_connections=max_pool_size
max_pool_connections=max_pool_size,
read_timeout=int(self.config.read_timeout),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is there a similar feature for GCP and Azure? It would be nice to apply the same settings for all 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they are available. I've added commits applying the read timeout config value to Azure and GCP read operations.

)
if self.credentials.access_key_id is not None:
self.s3_client = boto3.client(
Expand Down
3 changes: 3 additions & 0 deletions tests/storage/azure_storage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_make_connection_url(self):
'concurrent_transfers': '1',
'host': None,
'port': None,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand All @@ -52,6 +53,7 @@ def test_make_connection_url_with_custom_host(self):
'concurrent_transfers': '1',
'host': 'custom.host.net',
'port': None,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand All @@ -71,6 +73,7 @@ def test_make_connection_url_with_custom_host_port(self):
'concurrent_transfers': '1',
'host': 'custom.host.net',
'port': 123,
'read_timeout': 60,
})
azure_storage = AzureStorage(config)
self.assertEqual(
Expand Down
Loading