Skip to content

Commit

Permalink
Client request retry logic using tenacity (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
mephenor authored Aug 23, 2024
1 parent b319946 commit 8d4c200
Show file tree
Hide file tree
Showing 14 changed files with 1,426 additions and 1,214 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ repos:
- id: no-commit-to-branch
args: [--branch, dev, --branch, int, --branch, main]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.5
rev: v0.6.1
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.0
rev: v1.11.1
hooks:
- id: mypy
args: [--no-warn-unused-ignores]
3 changes: 2 additions & 1 deletion .pyproject_generation/pyproject_custom.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
[project]
name = "ghga_datasteward_kit"
version = "4.1.2"
version = "4.2.0"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
"hexkit[s3] >=3, <4",
"ghga-transpiler >=2.1.2, <3",
"metldata~=2.1.1",
"tenacity >=9.0.0, <10",
]

[project.urls]
Expand Down
1,339 changes: 705 additions & 634 deletions lock/requirements-dev.txt

Large diffs are not rendered by default.

1,053 changes: 553 additions & 500 deletions lock/requirements.txt

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ classifiers = [
"Intended Audience :: Developers",
]
name = "ghga_datasteward_kit"
version = "4.1.2"
version = "4.2.0"
description = "GHGA Data Steward Kit - A utils package for GHGA data stewards."
dependencies = [
"crypt4gh >=1.6, <2",
"hexkit[s3] >=3, <4",
"ghga-transpiler >=2.1.2, <3",
"metldata~=2.1.1",
"tenacity >=9.0.0, <10",
]

[project.license]
Expand Down
5 changes: 5 additions & 0 deletions src/ghga_datasteward_kit/s3_upload/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class LegacyConfig(S3ObjectStoragesConfig):
default="https://data.ghga.de/.well-known",
description="URL to the root of the WKVS API. Should start with https://.",
)
client_exponential_backoff_max: NonNegativeInt = Field(
default=60,
description="Maximum number of seconds to wait for when using exponential backoff retry strategies.",
)
client_retry_status_codes: list[int] = Field(default=[408, 500, 502, 503, 504])
client_timeout: NonNegativeInt | None = Field(
default=60, description="Timeout for client requests in seconds"
)
Expand Down
21 changes: 16 additions & 5 deletions src/ghga_datasteward_kit/s3_upload/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import math
from functools import partial

from httpx import Response

from ghga_datasteward_kit import models
from ghga_datasteward_kit.s3_upload.config import LegacyConfig
from ghga_datasteward_kit.s3_upload.file_decryption import Decryptor
from ghga_datasteward_kit.s3_upload.utils import (
LOG,
StorageCleaner,
configure_retries,
get_bucket_id,
get_object_storage,
get_ranges,
Expand Down Expand Up @@ -52,8 +55,9 @@ def __init__( # noqa: PLR0913
self.part_size = part_size
self.target_checksums = target_checksums
self.storage_cleaner = storage_cleaner
self.retry_handler = configure_retries(config)

def _download_parts(self, download_url):
async def _download_parts(self, download_url):
"""Download file parts"""
for part_number, (start, stop) in enumerate(
get_ranges(file_size=self.file_size, part_size=self.config.part_size),
Expand All @@ -62,9 +66,10 @@ def _download_parts(self, download_url):
headers = {"Range": f"bytes={start}-{stop}"}
LOG.debug("Downloading part number %i. %s", part_number, headers)
try:
with httpx_client() as client:
response = client.get(download_url, headers=headers)
yield response.content
response: Response = await self.retry_handler(
fn=self._run_request, url=download_url, headers=headers
)
yield response.content
except (
Exception,
KeyboardInterrupt,
Expand All @@ -75,6 +80,12 @@ def _download_parts(self, download_url):
part_number=part_number,
) from exc

async def _run_request(self, *, url: str, headers: dict[str, str]) -> Response:
"""Request to be wrapped by retry handler."""
async with httpx_client() as client:
response = await client.get(url, headers=headers)
return response

async def download(self):
"""Download file in parts and validate checksums"""
LOG.info("(4/7) Downloading file %s for validation.", self.file_id)
Expand All @@ -86,7 +97,7 @@ async def download(self):
file_secret=self.file_secret, num_parts=num_parts, part_size=self.part_size
)
download_func = partial(self._download_parts, download_url=download_url)
decryptor.process_parts(download_func)
await decryptor.process_parts(download_func)
await self.validate_checksums(checkums=decryptor.checksums)

async def validate_checksums(self, checkums: models.Checksums):
Expand Down
14 changes: 5 additions & 9 deletions src/ghga_datasteward_kit/s3_upload/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ghga_datasteward_kit.s3_upload.uploader import ChunkedUploader
from ghga_datasteward_kit.s3_upload.utils import (
LOG,
HttpxClientConfig,
RequestConfigurator,
StorageCleaner,
check_adjust_part_size,
check_output_path,
Expand Down Expand Up @@ -109,9 +109,9 @@ async def exchange_secret_for_id(
payload = encrypt(data=file_secret, key=config.secret_ingest_pubkey)
encrypted_secret = models.EncryptedPayload(payload=payload)

with httpx_client() as client:
async with httpx_client() as client:
headers = {"Authorization": f"Bearer {token}"}
response = client.post(
response = await client.post(
url=endpoint_url, json=encrypted_secret.model_dump(), headers=headers
)

Expand All @@ -131,9 +131,7 @@ async def async_main(input_path: Path, alias: str, config: Config, token: str):
Run encryption, upload and validation.
Prints metadata to <alias>.json in the specified output directory
"""
HttpxClientConfig.configure(
num_retries=config.client_num_retries, timeout=config.client_timeout
)
RequestConfigurator.configure(config=config)

async with StorageCleaner(config=config) as storage_cleaner:
uploader, file_size = await validate_and_transfer_content(
Expand Down Expand Up @@ -189,9 +187,7 @@ async def legacy_async_main(input_path: Path, alias: str, config: LegacyConfig):
Run encryption, upload and validation.
Prints metadata to <alias>.json in the specified output directory
"""
HttpxClientConfig.configure(
num_retries=config.client_num_retries, timeout=config.client_timeout
)
RequestConfigurator.configure(config=config)

async with StorageCleaner(config=config) as storage_cleaner:
uploader, file_size = await validate_and_transfer_content(
Expand Down
9 changes: 6 additions & 3 deletions src/ghga_datasteward_kit/s3_upload/file_decryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
#
"""Functionality to decrypt Crypt4GH encrypted files on-the-fly for validation purposes."""

from collections.abc import Generator
from collections.abc import AsyncGenerator
from functools import partial
from time import time
from typing import Any

import crypt4gh.lib # type: ignore

Expand Down Expand Up @@ -52,13 +53,14 @@ def _decrypt_segment(self, segment: bytes):
ciphersegment=segment, session_keys=[self.file_secret]
)

def process_parts(self, download_files: partial[Generator[bytes, None, None]]):
async def process_parts(self, download_files: partial[AsyncGenerator[bytes, Any]]):
"""Encrypt and upload file parts."""
unprocessed_bytes = b""
download_buffer = b""
start = time()

for part_number, file_part in enumerate(download_files()):
part_number = 0
async for file_part in download_files():
# process unencrypted
self.checksums.update_encrypted(file_part)
unprocessed_bytes += file_part
Expand All @@ -81,6 +83,7 @@ def process_parts(self, download_files: partial[Generator[bytes, None, None]]):
self.num_parts,
avg_speed,
)
part_number += 1

# process dangling bytes
if unprocessed_bytes:
Expand Down
53 changes: 18 additions & 35 deletions src/ghga_datasteward_kit/s3_upload/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,28 @@
#
"""Functionality to upload encrypted file chunks using multipart upload."""

import asyncio
import math
from pathlib import Path
from time import time
from uuid import uuid4

import crypt4gh.lib # type: ignore
from httpx import TimeoutException
from httpx import Response

from ghga_datasteward_kit.s3_upload.config import LegacyConfig
from ghga_datasteward_kit.s3_upload.file_encryption import Encryptor
from ghga_datasteward_kit.s3_upload.utils import (
LOG,
HttpxClientConfig,
StorageCleaner,
configure_retries,
get_bucket_id,
get_object_storage,
httpx_client,
)

MAX_TIMEOUT_DEBUG = (
3600 # maximum timeout for upload request used for debugging purposes
600 # maximum timeout for upload request used for debugging purposes
)


Expand Down Expand Up @@ -122,6 +123,7 @@ def __init__( # noqa: PLR0913
self.upload_id = ""
self.storage_cleaner = storage_cleaner
self.debug_mode = debug_mode
self.retry_handler = configure_retries(config)

async def __aenter__(self):
"""Start multipart upload"""
Expand All @@ -148,7 +150,7 @@ async def __aexit__(self, exc_t, exc_v, exc_tb):
upload_id=self.upload_id,
) from exc

async def send_part(self, part: bytes, part_number: int):
async def send_part(self, *, part: bytes, part_number: int):
"""Handle upload of one file part"""
try:
upload_url = await self.storage.get_part_upload_url(
Expand All @@ -157,47 +159,28 @@ async def send_part(self, part: bytes, part_number: int):
object_id=self.file_id,
part_number=part_number,
)

if self.debug_mode:
num_retries = HttpxClientConfig.num_retries
timeout = HttpxClientConfig.timeout

while True and timeout is not None:
LOG.info(
f"Attempting upload of part {part_number} ({len(part)} bytes) with a timeout of {
timeout} seconds."
)
HttpxClientConfig.configure(
num_retries=num_retries, timeout=timeout
)
with httpx_client() as client:
try:
response = client.put(url=upload_url, content=part)
break
except TimeoutException as error:
LOG.info(f"Encountered timeout for {
timeout} seconds. Details:\n{str(error)}")

# increase by a minute and reraise if we need to wait more than one hour
timeout += 60
if timeout > MAX_TIMEOUT_DEBUG:
raise error
LOG.info(f"Upload successful for a timeout of {
timeout} seconds")
else:
with httpx_client() as client:
response = client.put(url=upload_url, content=part)
# wait slightly before using the upload URL
await asyncio.sleep(0.1)
response: Response = await self.retry_handler(
fn=self._run_request, url=upload_url, part=part
)

status_code = response.status_code
if status_code != 200:
raise ValueError(f"Received unexpected status code {
status_code} when trying to upload file part {part_number}.")

except (Exception, KeyboardInterrupt, ValueError) as exc:
except (Exception, KeyboardInterrupt) as exc:
raise self.storage_cleaner.PartUploadError(
cause=str(exc),
bucket_id=get_bucket_id(self.config),
object_id=self.file_id,
part_number=part_number,
upload_id=self.upload_id,
) from exc

async def _run_request(self, *, url: str, part: bytes) -> Response:
"""Request to be wrapped by retry handler."""
async with httpx_client() as client:
response = await client.put(url=url, content=part)
return response
Loading

0 comments on commit 8d4c200

Please # to comment.