From 0f16f3aee7c750e6bdc5986350afc1316f276b22 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Tue, 25 Apr 2023 09:48:16 +0000 Subject: [PATCH 01/11] Updated dependencies --- .pre-commit-config.yaml | 18 ++++++++++-------- requirements.txt | 42 ++++++++++++++++++++--------------------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d40c5d1..31f8a5b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,11 +4,11 @@ default_language_version: python: python3.9 -minimum_pre_commit_version: 2.13.0 +minimum_pre_commit_version: 3.0.0 repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.0.1 + rev: v4.4.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -31,31 +31,33 @@ repos: - id: detect-private-key - id: mixed-line-ending args: [--fix=lf] + - id: no-commit-to-branch + args: [--branch, dev, --branch, int, --branch, main] - id: debug-statements - id: debug-statements - id: debug-statements - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 23.1.0 hooks: - id: black - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.12.0 hooks: - id: isort args: [--profile, black] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v0.981 + rev: v1.0.0 hooks: - id: mypy args: [--no-warn-unused-ignores] - repo: https://github.com/PyCQA/pylint - rev: v2.13.3 + rev: v2.16.4 hooks: - id: pylint args: [--disable=E0401] - exclude: db_migration|tests|.devcontainer + exclude: tests|.devcontainer - repo: https://github.com/PyCQA/flake8 - rev: 4.0.1 + rev: 6.0.0 hooks: - id: flake8 args: [--config, .flake8] diff --git a/requirements.txt b/requirements.txt index 431045e..a53abae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,28 +1,26 @@ # All your requirements go here. Please adapt as needed: -pydantic==1.10.2 -PyYAML==5.4.1 -pytest==7.1.1 -pytest-asyncio==0.18.3 -pytest-cov==3.0.0 -mypy==0.981 -mypy-extensions==0.4.3 -pylint==2.13.3 -click==8.1.2 -black==22.3.0 -flake8==4.0.1 -isort==5.10.1 +pytest==7.2.0 +pytest-asyncio==0.20.3 +pytest-cov==4.0.0 +mypy==1.0.0 +mypy-extensions==1.0.0 +pylint==2.16.4 +click==8.1.3 +black==23.1.0 +flake8==6.0.0 +isort==5.12.0 bandit==1.7.4 -pre-commit==2.17.0 -mkdocs==1.3.0 +pre-commit==3.1.1 +mkdocs==1.4.2 mkdocs-autorefs==0.4.1 -mkdocs-material==8.2.8 -mkdocs-material-extensions==1.0.3 -mkdocstrings==0.18.1 -mkdocstrings-python-legacy==0.2.2 -testcontainers[kafka,rabbitmq,mongo,postgresql]==3.4.2 -typer==0.4.1 +mkdocs-material==9.0.3 +mkdocs-material-extensions==1.1.1 +mkdocstrings==0.19.1 +mkdocstrings-python-legacy==0.2.3 +testcontainers[kafka,mongo,postgresql]==3.4.2 +typer==0.7.0 +httpx==0.23.3 boto3==1.24.93 crypt4gh==1.5 -ghga-service-chassis-lib==0.15.0 -hexkit==0.4.0 +hexkit==0.9.2 pycurl-requests==0.5.0 From 49f7c975dfb05b39f45d139c5c36cb351f4fcb71 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Wed, 26 Apr 2023 15:52:29 +0000 Subject: [PATCH 02/11] Intermediate state, new upload path finished --- requirements.txt | 3 +- src/s3_upload.py | 420 +++++++++++++++++++++++------------------------ 2 files changed, 208 insertions(+), 215 deletions(-) diff --git a/requirements.txt b/requirements.txt index a53abae..3114292 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,5 @@ testcontainers[kafka,mongo,postgresql]==3.4.2 typer==0.7.0 httpx==0.23.3 boto3==1.24.93 -crypt4gh==1.5 hexkit==0.9.2 -pycurl-requests==0.5.0 +ghga-connector==0.2.12 diff --git a/src/s3_upload.py b/src/s3_upload.py index aa91d74..4697333 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -27,10 +27,8 @@ import logging import math import os -import shutil import subprocess # nosec import sys -from dataclasses import dataclass from io import BufferedReader from pathlib import Path from tempfile import mkstemp @@ -40,41 +38,19 @@ import crypt4gh.header # type: ignore import crypt4gh.keys # type: ignore import crypt4gh.lib # type: ignore -import pycurl # type: ignore import requests # type: ignore import typer import yaml +from ghga_connector.core.file_operations import read_file_parts +from ghga_connector.core.session import RequestsSession from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore -from pycurl_requests.adapters import PyCurlHttpAdapter # type: ignore +from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator -from requests.adapters import HTTPAdapter, Retry # type: ignore - - -def configure_pycurl_session() -> requests.Session: - """Configure session with pycurl requests adapter""" - with requests.session() as session: - curl = pycurl.Curl() - - adapter = PyCurlHttpAdapter(curl) - - session.mount("http://", adapter=adapter) - session.mount("https://", adapter=adapter) - - return session - def configure_session() -> requests.Session: """Configure session with exponential backoff retry""" - with requests.session() as session: - - retries = Retry(total=7, backoff_factor=1) - adapter = HTTPAdapter(max_retries=retries) - - session.mount("http://", adapter=adapter) - session.mount("https://", adapter=adapter) - - return session - + RequestsSession.configure(6) + return RequestsSession.session LOGGER = logging.getLogger("s3_upload") PART_SIZE = 16 * 1024**2 @@ -101,204 +77,252 @@ class Config(BaseSettings): the current working dir or the home dir. """ - s3_endpoint_url: SecretStr = Field(..., description=("URL of the S3 server")) + s3_endpoint_url: SecretStr = Field(..., description="URL of the S3 server") s3_access_key_id: SecretStr = Field( - ..., description=("Access key ID for the S3 server") + ..., description="Access key ID for the S3 server" ) s3_secret_access_key: SecretStr = Field( - ..., description=("Secret access key for the S3 server") + ..., description="Secret access key for the S3 server" ) bucket_id: str = Field( - ..., description=("Bucket id where the encrypted, uploaded file is stored") + ..., description="Bucket id where the encrypted, uploaded file is stored" ) part_size: int = Field( - 16, description=("Upload part size in MiB. Has to be between 5 and 5120.") + 16, description="Upload part size in MiB. Has to be between 5 and 5120." ) - tmp_dir: Path = Field(..., description=("Directory for temporary output files")) output_dir: Path = Field( ..., - description=("Directory for the output metadata file"), + description="Directory for the output metadata file", ) - @validator("tmp_dir") - def expand_env_vars_temp_dir( - cls, tmp_dir: Path - ): # pylint: disable=no-self-argument,no-self-use - """Expand vars in path""" - return expand_env_vars_in_path(tmp_dir) - @validator("output_dir") def expand_env_vars_output_dir( cls, output_dir: Path - ): # pylint: disable=no-self-argument,no-self-use + ): # pylint: disable=no-self-argument """Expand vars in path""" return expand_env_vars_in_path(output_dir) -@dataclass class Keypair: """Crypt4GH keypair""" - public_key: bytes - private_key: bytes + def __init__(self): + """Creates a keypair using crypt4gh""" + LOGGER.info("(1/7) Generating keypair") + # Crypt4GH always writes to file and tmp_path fixture causes permission issues + sk_file, sk_path = mkstemp(prefix="private", suffix=".key") + pk_file, pk_path = mkstemp(prefix="public", suffix=".key") -class Upload: - """Handler class dealing with most of the upload functionality""" + # Crypt4GH does not reset the umask it sets, so we need to deal with it + original_umask = os.umask(0o022) - def __init__(self, input_path: Path, alias: str, config: Config) -> None: + crypt4gh.keys.c4gh.generate(seckey=sk_file, pubkey=pk_file) + self.public_key = crypt4gh.keys.get_public_key(pk_path) + self.private_key = crypt4gh.keys.get_private_key(sk_path, lambda: None) + + # remove unneeded files + os.umask(original_umask) + Path(pk_path).unlink() + Path(sk_path).unlink() + + +class MultipartUpload: + """Context manager to handle init + complete/abort for S3 multipart upload""" + + def __init__(self, config:Config, file_id: str, file_size: int) -> None: self.config = config - self.file_id = str(uuid4()) - self.alias = alias - self.input_path = input_path - self.checksum = get_checksum_unencrypted(input_path) - self.keypair = generate_crypt4gh_keypair() + self.storage = objectstorage(config=self.config) + self.file_id = file_id + self.file_size = file_size + + self.upload_id = "" - async def process_file(self): - """Run upload/download/validation flow""" - # upload related, clean up encrypted file on exception + async def __aenter__(self): + """Start multipart upload""" + self.upload_id = await self.storage.init_multipart_upload( + bucket_id=self.config.bucket_id, object_id=self.file_id + ) + return self + + async def __aexit__(self, exc_t, exc_v, exc_tb): + """Complete or clean up multipart upload""" try: - encrypted_file_loc = self._encrypt_file() - file_secret, offset = self._read_envelope( - encrypted_file_loc=encrypted_file_loc + await self.storage.complete_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, + anticipated_part_quantity=math.ceil(self.file_size / PART_SIZE), + anticipated_part_size=PART_SIZE, ) - file_size = encrypted_file_loc.stat().st_size - offset - enc_md5sums, enc_sha256sums = await self._upload_file( - encrypted_file_loc=encrypted_file_loc, - file_size=file_size, - offset=offset, + except (Exception, KeyboardInterrupt) as exc: # pylint: disable=broad-except + await self.storage.abort_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, ) - except (Exception, KeyboardInterrupt) as exc: - # cleanup tmp dir in case of failure - shutil.rmtree(self.config.tmp_dir / self.alias) raise exc - finally: - # cleanup local encrypted file. _upload_file takes care of multipart upload - encrypted_file_loc.unlink(missing_ok=True) - - # upload related, clean redwonloaded, encrypted file on exception, clean up - # object in bucket, remove temporary unencrypted file - destination = encrypted_file_loc - destination_decrypted = destination.with_name(destination.name + "_decrypted") + + async def send_part(self, part: bytes, part_number: int): + """Handle upload of one file part""" try: - await self._download( - file_size=file_size, - destination=destination, - file_secret=file_secret, - ) - # only calculate the checksum after we have the complete file - self._validate_checksum( - destination=destination, destination_decrypted=destination_decrypted + upload_url = await self.storage.get_part_upload_url( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, + part_number=part_number, ) - except (Exception, KeyboardInterrupt) as exc: - storage = objectstorage(config=self.config) - await storage.delete_object( - bucket_id=self.config.bucket_id, object_id=self.file_id + SESSION.put(url=upload_url, data=part) + except ( # pylint: disable=broad-except + Exception, + KeyboardInterrupt, + ) as exc: + await self.storage.abort_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, ) raise exc - finally: - # cleanup downloaded and unencrypted file and tmp subdir - destination.unlink(missing_ok=True) - destination_decrypted.unlink(missing_ok=True) - shutil.rmtree(self.config.tmp_dir / self.alias) - - self._write_metadata( - enc_md5sums=enc_md5sums, - enc_sha256sums=enc_sha256sums, - file_secret=file_secret, - ) +class Checksums: + """Container for checksum calculation""" - def _encrypt_file(self): - """Encrypt file using Crypt4GH""" - LOGGER.info("(2/7) Encrypting file %s", self.input_path.resolve()) - tmp_dir = self.config.tmp_dir / self.alias - if not tmp_dir.exists(): - tmp_dir.mkdir(parents=True) - output_path = tmp_dir / self.file_id + def __init__(self): + self.decrypted_sha256= hashlib.sha256() + self.encrypted_md5: list[str] = [] + self.encrypted_sha256: list[str] = [] - keys = [(0, self.keypair.private_key, self.keypair.public_key)] + def get(self): + """Return all checksums at the end of processing""" + return self.decrypted_sha256.hexdigest(), self.encrypted_md5, self.encrypted_sha256 - with self.input_path.open("rb") as infile: - with output_path.open("wb") as outfile: - crypt4gh.lib.encrypt(keys=keys, infile=infile, outfile=outfile) - return output_path + def update_unencrypted(self, part: bytes): + """Update checksum for unencrypted file""" + self.decrypted_sha256.update(part) - def _read_envelope(self, *, encrypted_file_loc: Path): - """Get file encryption/decryption secret and file content offset""" - LOGGER.info("(3/7) Extracting file secret and content offset") - with encrypted_file_loc.open("rb") as file: - keys = [(0, self.keypair.private_key, None)] - session_keys, _ = crypt4gh.header.deconstruct(infile=file, keys=keys) + def update_encrypted(self, part: bytes): + """Update encrypted part checksums""" + self.encrypted_md5.append(hashlib.md5(part, usedforsecurity=False).hexdigest()) + self.encrypted_sha256.append(hashlib.sha256(part).hexdigest()) - file_secret = session_keys[0] - offset = file.tell() - return file_secret, offset - async def _upload_file( - self, *, encrypted_file_loc: Path, file_size: int, offset: int - ): - """Perform multipart upload and compute encrypted part checksums""" - storage = objectstorage(config=self.config) - upload_id = await storage.init_multipart_upload( - bucket_id=self.config.bucket_id, object_id=self.file_id - ) +class Encryptor: + """Handles on the fly encryption and checksum calculation""" + + def __init__(self, part_size: int): + self.part_size = part_size + self.checksums = Checksums() + self.keypair = Keypair() + self.file_secret = os.urandom(32) - enc_md5sums = [] - enc_sha256sums = [] - sum_bytes = 0 - - with encrypted_file_loc.open("rb") as file: - for part_number, part in enumerate( - read_file(file=file, part_size=PART_SIZE, offset=offset), start=1 - ): - try: - sum_bytes += len(part) - LOGGER.info( - "(4/7) Uploading part no. %i (%.2f%%)", - part_number, - sum_bytes / file_size * 100, - ) - enc_md5sums.append( - hashlib.md5(part, usedforsecurity=False).hexdigest() - ) - enc_sha256sums.append(hashlib.sha256(part).hexdigest()) - - upload_url = await storage.get_part_upload_url( - upload_id=upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - part_number=part_number, - ) - SESSION.put(url=upload_url, data=part) - except ( # pylint: disable=broad-except - Exception, - KeyboardInterrupt, - ) as exc: - await storage.abort_multipart_upload( - upload_id=upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - ) - raise exc + def _prepare_envelope(self): + """Create personalized envelope""" + keys = [(0, self.keypair.private_key, self.keypair.public_key)] + header_content = crypt4gh.header.make_packet_data_enc(0, self.file_secret) + header_packets = crypt4gh.header.encrypt(header_content, keys) + header_bytes = crypt4gh.header.serialize(header_packets) + return header_bytes + + def _encrypt(self, part: bytes): + """Encrypt file part using secret""" + segments, incomplete_segment = self._get_segments(part) + + encrypted_segments = [] + for segment in segments: + + nonce = os.urandom(12) + encrypted_data = crypto_aead_chacha20poly1305_ietf_encrypt( + segment, None, nonce, self.file_secret + ) # no aad + + encrypted_segments.append(nonce+encrypted_data) + + return b"".join(encrypted_segments), incomplete_segment + + + def _get_segments(self, part: bytes): + """Chunk part into cipher segments""" + segment_size = crypt4gh.lib.SEGMENT_SIZE + num_segments = len(part) / segment_size + full_segments = int(num_segments) + segments = [ + part[i * segment_size : (i + 1) * segment_size] + for i in range(full_segments) + ] + + # check if we have a remainder of bytes that we need to handle, + # i.e. non-matching boundaries between part and cipher segment size + incomplete_segment = b"" + partial_segment_idx = math.ceil(num_segments) + if partial_segment_idx != full_segments: + incomplete_segment = part[full_segments * segment_size :] + return segments, incomplete_segment + + # type annotation for file parts, should be generator + def process_file(self, file: BufferedReader): + """ + Encrypt and upload file parts. + This is a bit involved for a few reasons: + - The first part sent needs to include the envelope + - Part size alignment does not necessary fit chunk size for encryption + - Check sums need correct parts + -> A few different buffers are needed + """ + upload_buffer = self._prepare_envelope() + unprocessed_bytes = b"" + encrypted_buffer = b"" + + for file_part in read_file_parts(file=file, part_size=self.part_size): + # process unencrypted + self.checksums.update_unencrypted(file_part) + unprocessed_bytes += file_part + + # encrypt in chunks + encrypted_bytes, unprocessed_bytes = self._encrypt(unprocessed_bytes) + + encrypted_buffer += encrypted_bytes + # checksum encrypted file parts if part size + if len(encrypted_buffer) >= self.part_size: + self.checksums.update_encrypted(encrypted_buffer[:self.part_size]) + encrypted_buffer = encrypted_buffer[self.part_size:] + + upload_buffer += encrypted_bytes + # yield if part size + if len(upload_buffer) >= self.part_size: + yield upload_buffer[:self.part_size] + upload_buffer = upload_buffer[self.part_size:] + + self.checksums.update_encrypted(encrypted_buffer) + yield upload_buffer + +class ChunkedUploader: + """Handler class dealing with upload functionality""" - try: - await storage.complete_multipart_upload( - upload_id=upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - anticipated_part_quantity=math.ceil(file_size / PART_SIZE), - anticipated_part_size=PART_SIZE, - ) - except (Exception, KeyboardInterrupt) as exc: # pylint: disable=broad-except - await storage.abort_multipart_upload( - upload_id=upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - ) - raise exc + def __init__(self, input_path: Path, alias: str, config: Config) -> None: + self.alias = alias + self.config = config + self.input_path = input_path + self.encryptor = Encryptor(self.config.part_size) + self.file_id = str(uuid4()) + + async def encrypt_and_upload(self): + """Delegate encryption and perform multipart upload""" + file_size = self.input_path.stat().st_size + + with open(self.input_path, "rb") as file: + async with MultipartUpload(config=self.config, file_id=self.file_id, file_size=file_size) as upload: + for part_number, part in enumerate(self.encryptor.process_file(file=file)): + await upload.send_part(part_number=part_number, part=part) +class Upload: + """Handler class dealing with most of the upload functionality""" + + def __init__(self, input_path: Path, alias: str, config: Config) -> None: + self.config = config + self.file_id = str(uuid4()) + self.alias = alias + self.input_path = input_path + self.keypair = Keypair() - return enc_md5sums, enc_sha256sums async def _download( self, @@ -385,25 +409,6 @@ def check_output_path(alias: str, output_dir: Path): handle_superficial_error(msg=msg) -def generate_crypt4gh_keypair() -> Keypair: - """Creates a keypair using crypt4gh""" - LOGGER.info("(1/7) Generating keypair") - # Crypt4GH always writes to file and tmp_path fixture causes permission issues - - sk_file, sk_path = mkstemp(prefix="private", suffix=".key") - pk_file, pk_path = mkstemp(prefix="public", suffix=".key") - - # Crypt4GH does not reset the umask it sets, so we need to deal with it - original_umask = os.umask(0o022) - crypt4gh.keys.c4gh.generate(seckey=sk_file, pubkey=pk_file) - public_key = crypt4gh.keys.get_public_key(pk_path) - private_key = crypt4gh.keys.get_private_key(sk_path, lambda: None) - os.umask(original_umask) - Path(pk_path).unlink() - Path(sk_path).unlink() - return Keypair(public_key=public_key, private_key=private_key) - - def get_checksum_unencrypted(file_location: Path) -> str: """Compute SHA256 checksum over unencrypted file content""" @@ -442,17 +447,6 @@ def read_file(*, file: BufferedReader, part_size: int, offset: int = 0): yield file_part -def prepare_envelope(keypair: Keypair, file_secret: bytes): - """ - Create personalized envelope - """ - keys = [(0, keypair.private_key, keypair.public_key)] - header_content = crypt4gh.header.make_packet_data_enc(0, file_secret) - header_packets = crypt4gh.header.encrypt(header_content, keys) - header_bytes = crypt4gh.header.serialize(header_packets) - return header_bytes - - def get_ranges(file_size: int): """Calculate part ranges""" num_parts = file_size / PART_SIZE @@ -544,7 +538,7 @@ def load_config_yaml(path: Path) -> Config: def main( input_path: Path = typer.Option(..., help="Local path of the input file"), alias: str = typer.Option(..., help="A human readable file alias"), - config_path: Path = typer.Option(..., help=("Path to a config YAML.")), + config_path: Path = typer.Option(..., help="Path to a config YAML."), ): """ Custom script to encrypt data using Crypt4GH and directly uploading it to S3 From 8a36242a1e45f52ede369b29e91bed8d447263af Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 13:05:54 +0000 Subject: [PATCH 03/11] Finished upload/download changes --- .flake8 | 9 +- .pylintrc | 12 +- src/otp_tsv_to_upload.py | 1 - src/s3_upload.py | 421 +++++++++++++++++++++------------------ 4 files changed, 238 insertions(+), 205 deletions(-) diff --git a/.flake8 b/.flake8 index 526f1ff..bfd9ba4 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ -; Copyright 2021 Universität Tübingen, DKFZ and EMBL +; Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln ; for the German Human Genome-Phenome Archive (GHGA) ; ; Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,7 +14,8 @@ ; limitations under the License. [flake8] -ignore = E, W # ignore all style checks from pycodestyle - # as they are already checked by black -exclude = .git,__pycache__,db_migration,build,dist +ignore = E, W + # ignore all style checks from pycodestyle + # as they are already checked by black +exclude = .git,__pycache__,build,dist max-complexity = 10 diff --git a/.pylintrc b/.pylintrc index 4cfa455..cfb50db 100644 --- a/.pylintrc +++ b/.pylintrc @@ -70,9 +70,12 @@ disable= duplicate-code, # is behaving strangely sometimes and cannot # be disabled on an individual basis: # https://github.com/PyCQA/pylint/issues/214 - too-few-public-methods # says that classes should always have methods + + too-few-public-methods, # says that classes should always have methods # but that is not true anymore (e.g. dataclasses) + unnecessary-ellipsis, # often used for interfaces/protocols + # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option # multiple time (only on the command line, not in the configuration file where @@ -235,6 +238,7 @@ good-names=i, j, k, ex, + ok, Run, _, __, @@ -537,6 +541,6 @@ valid-metaclass-classmethod-first-arg=cls [EXCEPTIONS] # Exceptions that will emit a warning when being caught. Defaults to -# "BaseException, Exception". -overgeneral-exceptions=BaseException, - Exception +# "builtins.BaseException, builtins.Exception". +overgeneral-exceptions=builtins.BaseException, + builtins.Exception diff --git a/src/otp_tsv_to_upload.py b/src/otp_tsv_to_upload.py index 12348ff..72a31c2 100755 --- a/src/otp_tsv_to_upload.py +++ b/src/otp_tsv_to_upload.py @@ -133,7 +133,6 @@ def handle_file_uploads( # noqa: C901 try: while files_to_do or in_progress: - # start new processes: while len(in_progress) < parallel_processes and files_to_do: next_file = files_to_do.pop() diff --git a/src/s3_upload.py b/src/s3_upload.py index 4697333..46bddaa 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -21,7 +21,6 @@ import asyncio import base64 -import codecs import hashlib import json import logging @@ -29,10 +28,12 @@ import os import subprocess # nosec import sys +from dataclasses import dataclass +from functools import partial from io import BufferedReader from pathlib import Path from tempfile import mkstemp -from typing import Any +from typing import Any, Generator from uuid import uuid4 import crypt4gh.header # type: ignore @@ -47,11 +48,13 @@ from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator + def configure_session() -> requests.Session: """Configure session with exponential backoff retry""" RequestsSession.configure(6) return RequestsSession.session + LOGGER = logging.getLogger("s3_upload") PART_SIZE = 16 * 1024**2 SESSION = configure_session() @@ -120,7 +123,7 @@ def __init__(self): crypt4gh.keys.c4gh.generate(seckey=sk_file, pubkey=pk_file) self.public_key = crypt4gh.keys.get_public_key(pk_path) self.private_key = crypt4gh.keys.get_private_key(sk_path, lambda: None) - + # remove unneeded files os.umask(original_umask) Path(pk_path).unlink() @@ -130,16 +133,18 @@ def __init__(self): class MultipartUpload: """Context manager to handle init + complete/abort for S3 multipart upload""" - def __init__(self, config:Config, file_id: str, file_size: int) -> None: + def __init__( + self, config: Config, file_id: str, file_size: int, part_size: int + ) -> None: self.config = config self.storage = objectstorage(config=self.config) self.file_id = file_id self.file_size = file_size - + self.part_size = part_size self.upload_id = "" async def __aenter__(self): - """Start multipart upload""" + """Start multipart upload""" self.upload_id = await self.storage.init_multipart_upload( bucket_id=self.config.bucket_id, object_id=self.file_id ) @@ -152,8 +157,8 @@ async def __aexit__(self, exc_t, exc_v, exc_tb): upload_id=self.upload_id, bucket_id=self.config.bucket_id, object_id=self.file_id, - anticipated_part_quantity=math.ceil(self.file_size / PART_SIZE), - anticipated_part_size=PART_SIZE, + anticipated_part_quantity=math.ceil(self.file_size / self.part_size), + anticipated_part_size=self.part_size, ) except (Exception, KeyboardInterrupt) as exc: # pylint: disable=broad-except await self.storage.abort_multipart_upload( @@ -183,21 +188,27 @@ async def send_part(self, part: bytes, part_number: int): object_id=self.file_id, ) raise exc + + class Checksums: """Container for checksum calculation""" def __init__(self): - self.decrypted_sha256= hashlib.sha256() + self.unencrypted_sha256 = hashlib.sha256() self.encrypted_md5: list[str] = [] self.encrypted_sha256: list[str] = [] def get(self): """Return all checksums at the end of processing""" - return self.decrypted_sha256.hexdigest(), self.encrypted_md5, self.encrypted_sha256 + return ( + self.unencrypted_sha256.hexdigest(), + self.encrypted_md5, + self.encrypted_sha256, + ) def update_unencrypted(self, part: bytes): """Update checksum for unencrypted file""" - self.decrypted_sha256.update(part) + self.unencrypted_sha256.update(part) def update_encrypted(self, part: bytes): """Update encrypted part checksums""" @@ -205,72 +216,36 @@ def update_encrypted(self, part: bytes): self.encrypted_sha256.append(hashlib.sha256(part).hexdigest()) - class Encryptor: """Handles on the fly encryption and checksum calculation""" - + def __init__(self, part_size: int): self.part_size = part_size self.checksums = Checksums() - self.keypair = Keypair() self.file_secret = os.urandom(32) - def _prepare_envelope(self): - """Create personalized envelope""" - keys = [(0, self.keypair.private_key, self.keypair.public_key)] - header_content = crypt4gh.header.make_packet_data_enc(0, self.file_secret) - header_packets = crypt4gh.header.encrypt(header_content, keys) - header_bytes = crypt4gh.header.serialize(header_packets) - return header_bytes - def _encrypt(self, part: bytes): """Encrypt file part using secret""" - segments, incomplete_segment = self._get_segments(part) + segments, incomplete_segment = get_segments( + part=part, segment_size=crypt4gh.lib.SEGMENT_SIZE + ) encrypted_segments = [] for segment in segments: - nonce = os.urandom(12) encrypted_data = crypto_aead_chacha20poly1305_ietf_encrypt( segment, None, nonce, self.file_secret ) # no aad - encrypted_segments.append(nonce+encrypted_data) + encrypted_segments.append(nonce + encrypted_data) return b"".join(encrypted_segments), incomplete_segment - - - def _get_segments(self, part: bytes): - """Chunk part into cipher segments""" - segment_size = crypt4gh.lib.SEGMENT_SIZE - num_segments = len(part) / segment_size - full_segments = int(num_segments) - segments = [ - part[i * segment_size : (i + 1) * segment_size] - for i in range(full_segments) - ] - - # check if we have a remainder of bytes that we need to handle, - # i.e. non-matching boundaries between part and cipher segment size - incomplete_segment = b"" - partial_segment_idx = math.ceil(num_segments) - if partial_segment_idx != full_segments: - incomplete_segment = part[full_segments * segment_size :] - return segments, incomplete_segment - + # type annotation for file parts, should be generator def process_file(self, file: BufferedReader): - """ - Encrypt and upload file parts. - This is a bit involved for a few reasons: - - The first part sent needs to include the envelope - - Part size alignment does not necessary fit chunk size for encryption - - Check sums need correct parts - -> A few different buffers are needed - """ - upload_buffer = self._prepare_envelope() + """Encrypt and upload file parts.""" unprocessed_bytes = b"" - encrypted_buffer = b"" + upload_buffer = b"" for file_part in read_file_parts(file=file, part_size=self.part_size): # process unencrypted @@ -279,22 +254,19 @@ def process_file(self, file: BufferedReader): # encrypt in chunks encrypted_bytes, unprocessed_bytes = self._encrypt(unprocessed_bytes) - - encrypted_buffer += encrypted_bytes - # checksum encrypted file parts if part size - if len(encrypted_buffer) >= self.part_size: - self.checksums.update_encrypted(encrypted_buffer[:self.part_size]) - encrypted_buffer = encrypted_buffer[self.part_size:] - upload_buffer += encrypted_bytes - # yield if part size + + # update checksums and yield if part size if len(upload_buffer) >= self.part_size: - yield upload_buffer[:self.part_size] - upload_buffer = upload_buffer[self.part_size:] + current_part = upload_buffer[: self.part_size] + self.checksums.update_encrypted(current_part) + yield current_part + upload_buffer = upload_buffer[self.part_size :] - self.checksums.update_encrypted(encrypted_buffer) + self.checksums.update_encrypted(upload_buffer) yield upload_buffer + class ChunkedUploader: """Handler class dealing with upload functionality""" @@ -304,125 +276,168 @@ def __init__(self, input_path: Path, alias: str, config: Config) -> None: self.input_path = input_path self.encryptor = Encryptor(self.config.part_size) self.file_id = str(uuid4()) - + async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" file_size = self.input_path.stat().st_size - + with open(self.input_path, "rb") as file: - async with MultipartUpload(config=self.config, file_id=self.file_id, file_size=file_size) as upload: - for part_number, part in enumerate(self.encryptor.process_file(file=file)): + async with MultipartUpload( + config=self.config, + file_id=self.file_id, + file_size=file_size, + part_size=self.config.part_size, + ) as upload: + for part_number, part in enumerate( + self.encryptor.process_file(file=file) + ): await upload.send_part(part_number=part_number, part=part) -class Upload: - """Handler class dealing with most of the upload functionality""" - def __init__(self, input_path: Path, alias: str, config: Config) -> None: - self.config = config - self.file_id = str(uuid4()) - self.alias = alias - self.input_path = input_path - self.keypair = Keypair() +@dataclass +class Metadata: # pylint: disable=too-many-instance-attributes + """Container class for output metadata""" + + alias: str + file_uuid: str + original_path: Path + part_size: int + file_secret: bytes + checksums: Checksums + unencrypted_size: int + encrypted_size: int + + def serialize(self, output_dir: Path): + """Serialize metadata to file""" + + output: dict[str, Any] = {} + output["Alias"] = self.alias + output["File UUID"] = self.file_uuid + output["Original filesystem path"] = str(self.original_path.resolve()) + output["Part Size"] = f"{self.part_size // 1024**2} MiB" + output["Unencrypted file size"] = self.unencrypted_size + output["Encrypted file size"] = self.encrypted_size + output["Symmetric file encryption secret"] = base64.b64encode( + self.file_secret + ).decode("utf-8") + ( + unencrypted_checksum, + encrypted_md5_checksums, + encrypted_sha256_checksums, + ) = self.checksums.get() + output["Unencrypted file checksum"] = unencrypted_checksum + output["Encrypted file part checksums (MD5)"] = encrypted_md5_checksums + output["Encrypted file part checksums (SHA256)"] = encrypted_sha256_checksums + + if not output_dir.exists(): + output_dir.mkdir(parents=True) + + output_path = output_dir / f"{self.alias}.json" + self._check_output_path(output_path) + + # owner read-only + with output_path.open("w") as file: + json.dump(output, file, indent=2) + os.chmod(path=output_path, mode=0o400) - async def _download( + def _check_output_path(self, output_path: Path): + """Check if we accidentally try to overwrite an alread existing metadata file""" + if output_path.exists(): + msg = f"Output file {output_path.resolve()} already exists and cannot be overwritten." + handle_superficial_error(msg=msg) + + +class ChunkedDownloader: + """Handler class dealing with download functionality""" + + def __init__( # pylint: disable=too-many-arguments self, - *, + config: Config, + file_id: str, file_size: int, - destination: Path, file_secret: bytes, - ): # pylint: disable=too-many-arguments - """Download uploaded file""" - storage = objectstorage(config=self.config) - download_url = await storage.get_object_download_url( + part_size: int, + target_checksums: Checksums, + ) -> None: + self.config = config + self.storage = objectstorage(self.config) + self.file_id = file_id + self.file_size = file_size + self.file_secret = file_secret + self.part_size = part_size + self.target_checksums = target_checksums + + def _download_parts(self, download_url): + """Download file parts""" + for start, stop in get_ranges( + file_size=self.file_size, part_size=self.config.part_size + ): + headers = {"Range": f"bytes={start}-{stop}"} + response = SESSION.get(download_url, timeout=60, headers=headers) + yield response.content + + async def download(self): + """Download file in parts and validate checksums""" + download_url = await self.storage.get_object_download_url( bucket_id=self.config.bucket_id, object_id=self.file_id ) - with destination.open("wb") as local_file: - envelope = prepare_envelope(keypair=self.keypair, file_secret=file_secret) - local_file.write(envelope) - - for start, stop in get_ranges(file_size=file_size): - headers = {"Range": f"bytes={start}-{stop}"} - response = SESSION.get(download_url, timeout=60, headers=headers) - chunk = response.content - LOGGER.info( - "(5/7) Downloading file for validation (%.2f%%)", - stop / file_size * 100, - ) - local_file.write(chunk) - - def _validate_checksum(self, destination: Path, destination_decrypted: Path): - """Decrypt downloaded file and compare checksum with original""" - - LOGGER.info("(6/7) Decrypting and validating checksum") - keys = [(0, self.keypair.private_key, None)] - - with destination.open("rb") as infile: - with destination_decrypted.open("wb") as outfile: - crypt4gh.lib.decrypt( - keys=keys, - infile=infile, - outfile=outfile, - sender_pubkey=self.keypair.public_key, - ) - dl_checksum = get_checksum_unencrypted(destination_decrypted) - if dl_checksum != self.checksum: + decryptor = Decryptor(file_secret=self.file_secret, part_size=self.part_size) + download_func = partial(self._download_parts, download_url=download_url) + decryptor.process_parts(download_func) + self.validate_checksums(checkums=decryptor.checksums) + + def validate_checksums(self, checkums: Checksums): + """Confirm checksums for upload and download match""" + if not self.target_checksums.get() == checkums.get(): raise ValueError( - f"Checksum mismatch:\nExpected: {self.checksum}\nActual: {dl_checksum}" + f"Checksum mismatch:\nUpload:\n{checkums}\nDownload:\n{self.target_checksums}" ) - def _write_metadata( - self, - *, - enc_md5sums: list[str], - enc_sha256sums: list[str], - file_secret: bytes, - ): # pylint: disable=too-many-arguments - """Write all necessary data about the uploaded file""" - output: dict[str, Any] = {} - output["Alias"] = self.alias - output["File UUID"] = self.file_id - output["Original filesystem path"] = str(self.input_path.resolve()) - output["Part Size"] = f"{self.config.part_size // 1024**2} MiB" - output["Symmetric file encryption secret"] = codecs.decode( - base64.b64encode(file_secret), encoding="utf-8" - ) - output["Unencrypted file checksum"] = self.checksum - output["Encrypted file part checksums (MD5)"] = enc_md5sums - output["Encrypted file part checksums (SHA256)"] = enc_sha256sums - if not self.config.output_dir.exists(): - self.config.output_dir.mkdir(parents=True) +class Decryptor: + """Handles on the fly decryption and checksum calculation""" - output_path = self.config.output_dir / f"{self.alias}.json" - LOGGER.info("(7/7) Writing file metadata to %s", output_path) - # owner read-only - with output_path.open("w") as file: - json.dump(output, file, indent=2) - os.chmod(path=output_path, mode=0o400) + def __init__(self, file_secret: bytes, part_size: int) -> None: + self.checksums = Checksums() + self.file_secret = file_secret + self.part_size = part_size + def process_parts(self, download_files: partial[Generator[bytes, None, None]]): + """Encrypt and upload file parts.""" + unprocessed_bytes = b"" + download_buffer = b"" -def check_output_path(alias: str, output_dir: Path): - """Check if we accidentally try to overwrite an alread existing metadata file""" - output_path = output_dir / f"{alias}.json" - if output_path.exists(): - msg = f"Output file {output_path.resolve()} already exists and cannot be overwritten." - handle_superficial_error(msg=msg) + for file_part in download_files(): + # process unencrypted + self.checksums.update_encrypted(file_part) + unprocessed_bytes += file_part + # encrypt in chunks + decrypted_bytes, unprocessed_bytes = self._decrypt(unprocessed_bytes) + download_buffer += decrypted_bytes -def get_checksum_unencrypted(file_location: Path) -> str: - """Compute SHA256 checksum over unencrypted file content""" + # update checksums and yield if part size + if len(download_buffer) >= self.part_size: + current_part = download_buffer[: self.part_size] + self.checksums.update_unencrypted(current_part) + download_buffer = download_buffer[self.part_size :] - LOGGER.info("Computing checksum...\tThis might take a moment") - sha256sum = hashlib.sha256() - file_size = file_location.stat().st_size - sum_bytes = 0 - with file_location.open("rb") as file: - for part in read_file(file=file, part_size=PART_SIZE): - sum_bytes += len(part) - LOGGER.info("Computing checksum (%.2f%%)", sum_bytes / file_size * 100) - sha256sum.update(part) + self.checksums.update_unencrypted(download_buffer) - return sha256sum.hexdigest() + def _decrypt(self, part: bytes): + """Decrypt file part""" + segments, incomplete_segment = get_segments( + part=part, segment_size=crypt4gh.lib.CIPHER_SEGMENT_SIZE + ) + + decrypted_segments = [] + for segment in segments: + decrypted = crypt4gh.lib.decrypt_block( + ciphersegment=segment, session_keys=[self.file_secret] + ) + decrypted_segments.append(decrypted) + + return b"".join(decrypted_segments), incomplete_segment def objectstorage(config: Config): @@ -435,27 +450,32 @@ def objectstorage(config: Config): return S3ObjectStorage(config=s3_config) -def read_file(*, file: BufferedReader, part_size: int, offset: int = 0): - """Read file content from offset in chunks""" - file.seek(offset) - while True: - file_part = file.read(part_size) - - if len(file_part) == 0: - return +def get_segments(part: bytes, segment_size: int): + """Chunk part into cipher segments""" + num_segments = len(part) / segment_size + full_segments = int(num_segments) + segments = [ + part[i * segment_size : (i + 1) * segment_size] for i in range(full_segments) + ] - yield file_part + # check if we have a remainder of bytes that we need to handle, + # i.e. non-matching boundaries between part and cipher segment size + incomplete_segment = b"" + partial_segment_idx = math.ceil(num_segments) + if partial_segment_idx != full_segments: + incomplete_segment = part[full_segments * segment_size :] + return segments, incomplete_segment -def get_ranges(file_size: int): +def get_ranges(file_size: int, part_size: int): """Calculate part ranges""" - num_parts = file_size / PART_SIZE + num_parts = file_size / part_size byte_ranges = [ - (PART_SIZE * part_no, PART_SIZE * (part_no + 1) - 1) + (part_size * part_no, part_size * (part_no + 1) - 1) for part_no in range(int(num_parts)) ] if math.ceil(num_parts) != int(num_parts): - byte_ranges.append((PART_SIZE * int(num_parts), file_size - 1)) + byte_ranges.append((part_size * int(num_parts), file_size - 1)) return byte_ranges @@ -508,23 +528,19 @@ def check_adjust_part_size(config: Config, file_size: int): config.part_size = part_size -async def async_main(input_path: Path, alias: str, config: Config): +def main( + input_path: Path = typer.Option(..., help="Local path of the input file"), + alias: str = typer.Option(..., help="A human readable file alias"), + config_path: Path = typer.Option(..., help="Path to a config YAML."), +): """ - Run encryption, upload and validation. - Prints metadata to .json in the specified output directory + Custom script to encrypt data using Crypt4GH and directly uploading it to S3 + objectstorage. """ - if not input_path.exists(): - msg = f"No such file: {input_path.resolve()}" - handle_superficial_error(msg=msg) - if input_path.is_dir(): - msg = f"File location points to a directory: {input_path.resolve()}" - handle_superficial_error(msg=msg) + config = load_config_yaml(config_path) - check_adjust_part_size(config=config, file_size=input_path.stat().st_size) - check_output_path(alias=alias, output_dir=config.output_dir) - upload = Upload(input_path=input_path, alias=alias, config=config) - await upload.process_file() + asyncio.run(async_main(input_path=input_path, alias=alias, config=config)) def load_config_yaml(path: Path) -> Config: @@ -535,19 +551,32 @@ def load_config_yaml(path: Path) -> Config: return Config(**config_dict) -def main( - input_path: Path = typer.Option(..., help="Local path of the input file"), - alias: str = typer.Option(..., help="A human readable file alias"), - config_path: Path = typer.Option(..., help="Path to a config YAML."), -): +async def async_main(input_path: Path, alias: str, config: Config): """ - Custom script to encrypt data using Crypt4GH and directly uploading it to S3 - objectstorage. + Run encryption, upload and validation. + Prints metadata to .json in the specified output directory """ + if not input_path.exists(): + msg = f"No such file: {input_path.resolve()}" + handle_superficial_error(msg=msg) - config = load_config_yaml(config_path) + if input_path.is_dir(): + msg = f"File location points to a directory: {input_path.resolve()}" + handle_superficial_error(msg=msg) - asyncio.run(async_main(input_path=input_path, alias=alias, config=config)) + file_size = input_path.stat().st_size + check_adjust_part_size(config=config, file_size=file_size) + uploader = ChunkedUploader(input_path=input_path, alias=alias, config=config) + await uploader.encrypt_and_upload() + downloader = ChunkedDownloader( + config=config, + file_id=uploader.file_id, + file_size=file_size, + file_secret=uploader.encryptor.file_secret, + part_size=config.part_size, + target_checksums=uploader.encryptor.checksums, + ) + await downloader.download() if __name__ == "__main__": From 1fdd6fb0aa5aea59af9fecb0b7eec834aafa9391 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 14:03:52 +0000 Subject: [PATCH 04/11] Debugging issues... --- src/__init__.py | 14 + src/s3_upload.py | 397 ++++++++++++++-------------- tests/fixtures/config.py | 18 +- tests/integration/test_s3_upload.py | 3 +- 4 files changed, 227 insertions(+), 205 deletions(-) create mode 100644 src/__init__.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..149a721 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2023 Universität Tübingen, DKFZ and EMBL +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/s3_upload.py b/src/s3_upload.py index 46bddaa..6b63e7f 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -32,7 +32,6 @@ from functools import partial from io import BufferedReader from pathlib import Path -from tempfile import mkstemp from typing import Any, Generator from uuid import uuid4 @@ -106,114 +105,173 @@ def expand_env_vars_output_dir( return expand_env_vars_in_path(output_dir) -class Keypair: - """Crypt4GH keypair""" +class Checksums: + """Container for checksum calculation""" def __init__(self): - """Creates a keypair using crypt4gh""" - LOGGER.info("(1/7) Generating keypair") - # Crypt4GH always writes to file and tmp_path fixture causes permission issues + self.unencrypted_sha256 = hashlib.sha256() + self.encrypted_md5: list[str] = [] + self.encrypted_sha256: list[str] = [] - sk_file, sk_path = mkstemp(prefix="private", suffix=".key") - pk_file, pk_path = mkstemp(prefix="public", suffix=".key") + def __repr__(self) -> str: + return ( + f"Unencrypted: {self.unencrypted_sha256.hexdigest()}\n" + + f"Encrypted MD5: {self.encrypted_md5}\n" + + f"Encrypted SHA256: {self.encrypted_sha256}" + ) - # Crypt4GH does not reset the umask it sets, so we need to deal with it - original_umask = os.umask(0o022) + def get(self): + """Return all checksums at the end of processing""" + return ( + self.unencrypted_sha256.hexdigest(), + self.encrypted_md5, + self.encrypted_sha256, + ) + + def update_unencrypted(self, part: bytes): + """Update checksum for unencrypted file""" + self.unencrypted_sha256.update(part) + + def update_encrypted(self, part: bytes): + """Update encrypted part checksums""" + self.encrypted_md5.append(hashlib.md5(part, usedforsecurity=False).hexdigest()) + self.encrypted_sha256.append(hashlib.sha256(part).hexdigest()) - crypt4gh.keys.c4gh.generate(seckey=sk_file, pubkey=pk_file) - self.public_key = crypt4gh.keys.get_public_key(pk_path) - self.private_key = crypt4gh.keys.get_private_key(sk_path, lambda: None) - # remove unneeded files - os.umask(original_umask) - Path(pk_path).unlink() - Path(sk_path).unlink() +class ChunkedUploader: + """Handler class dealing with upload functionality""" + def __init__(self, input_path: Path, alias: str, config: Config) -> None: + self.alias = alias + self.config = config + self.input_path = input_path + self.encryptor = Encryptor(self.config.part_size) + self.file_id = str(uuid4()) -class MultipartUpload: - """Context manager to handle init + complete/abort for S3 multipart upload""" + async def encrypt_and_upload(self): + """Delegate encryption and perform multipart upload""" + file_size = self.input_path.stat().st_size - def __init__( - self, config: Config, file_id: str, file_size: int, part_size: int + with open(self.input_path, "rb") as file: + async with MultipartUpload( + config=self.config, + file_id=self.file_id, + file_size=file_size, + part_size=self.config.part_size, + ) as upload: + LOGGER.info("(1/*) Initialized file uplod for %s\n", upload.file_id) + for part_number, part in enumerate( + self.encryptor.process_file(file=file), start=1 + ): + LOGGER.info( + "\r(2/*) Processing upload for file part %i", part_number + ) + await upload.send_part(part_number=part_number, part=part) + + +class ChunkedDownloader: + """Handler class dealing with download functionality""" + + def __init__( # pylint: disable=too-many-arguments + self, + config: Config, + file_id: str, + file_size: int, + file_secret: bytes, + part_size: int, + target_checksums: Checksums, ) -> None: self.config = config - self.storage = objectstorage(config=self.config) + self.storage = objectstorage(self.config) self.file_id = file_id self.file_size = file_size + self.file_secret = file_secret self.part_size = part_size - self.upload_id = "" + self.target_checksums = target_checksums - async def __aenter__(self): - """Start multipart upload""" - self.upload_id = await self.storage.init_multipart_upload( + def _download_parts(self, download_url): + """Download file parts""" + for start, stop in get_ranges( + file_size=self.file_size, part_size=self.config.part_size + ): + headers = {"Range": f"bytes={start}-{stop}"} + response = SESSION.get(download_url, timeout=60, headers=headers) + yield response.content + + async def download(self): + """Download file in parts and validate checksums""" + download_url = await self.storage.get_object_download_url( bucket_id=self.config.bucket_id, object_id=self.file_id ) - return self + decryptor = Decryptor(file_secret=self.file_secret, part_size=self.part_size) + download_func = partial(self._download_parts, download_url=download_url) + decryptor.process_parts(download_func) + self.validate_checksums(checkums=decryptor.checksums) - async def __aexit__(self, exc_t, exc_v, exc_tb): - """Complete or clean up multipart upload""" - try: - await self.storage.complete_multipart_upload( - upload_id=self.upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - anticipated_part_quantity=math.ceil(self.file_size / self.part_size), - anticipated_part_size=self.part_size, - ) - except (Exception, KeyboardInterrupt) as exc: # pylint: disable=broad-except - await self.storage.abort_multipart_upload( - upload_id=self.upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, + def validate_checksums(self, checkums: Checksums): + """Confirm checksums for upload and download match""" + if not self.target_checksums.get() == checkums.get(): + raise ValueError( + f"Checksum mismatch:\nUpload:\n{checkums}\nDownload:\n{self.target_checksums}" ) - raise exc - async def send_part(self, part: bytes, part_number: int): - """Handle upload of one file part""" - try: - upload_url = await self.storage.get_part_upload_url( - upload_id=self.upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - part_number=part_number, - ) - SESSION.put(url=upload_url, data=part) - except ( # pylint: disable=broad-except - Exception, - KeyboardInterrupt, - ) as exc: - await self.storage.abort_multipart_upload( - upload_id=self.upload_id, - bucket_id=self.config.bucket_id, - object_id=self.file_id, - ) - raise exc +class Decryptor: + """Handles on the fly decryption and checksum calculation""" -class Checksums: - """Container for checksum calculation""" + def __init__(self, file_secret: bytes, part_size: int) -> None: + self.checksums = Checksums() + self.file_secret = file_secret + self.part_size = part_size - def __init__(self): - self.unencrypted_sha256 = hashlib.sha256() - self.encrypted_md5: list[str] = [] - self.encrypted_sha256: list[str] = [] + def _decrypt(self, part: bytes): + """Decrypt file part""" + segments, incomplete_segment = get_segments( + part=part, segment_size=crypt4gh.lib.CIPHER_SEGMENT_SIZE + ) - def get(self): - """Return all checksums at the end of processing""" - return ( - self.unencrypted_sha256.hexdigest(), - self.encrypted_md5, - self.encrypted_sha256, + decrypted_segments = [] + for segment in segments: + decrypted_segments.append(self._decrypt_segment(segment)) + + return b"".join(decrypted_segments), incomplete_segment + + def _decrypt_segment(self, segment: bytes): + """Decrypt single ciphersegment""" + return crypt4gh.lib.decrypt_block( + ciphersegment=segment, session_keys=[self.file_secret] ) - def update_unencrypted(self, part: bytes): - """Update checksum for unencrypted file""" - self.unencrypted_sha256.update(part) + def process_parts(self, download_files: partial[Generator[bytes, None, None]]): + """Encrypt and upload file parts.""" + unprocessed_bytes = b"" + download_buffer = b"" - def update_encrypted(self, part: bytes): - """Update encrypted part checksums""" - self.encrypted_md5.append(hashlib.md5(part, usedforsecurity=False).hexdigest()) - self.encrypted_sha256.append(hashlib.sha256(part).hexdigest()) + for file_part in download_files(): + # process unencrypted + self.checksums.update_encrypted(file_part) + unprocessed_bytes += file_part + + # encrypt in chunks + decrypted_bytes, unprocessed_bytes = self._decrypt(unprocessed_bytes) + download_buffer += decrypted_bytes + + # update checksums and yield if part size + if len(download_buffer) >= self.part_size: + current_part = download_buffer[: self.part_size] + self.checksums.update_unencrypted(current_part) + download_buffer = download_buffer[self.part_size :] + + # process dangling bytes + if unprocessed_bytes: + download_buffer += self._decrypt_segment(unprocessed_bytes) + + if len(download_buffer) >= self.part_size: + current_part = download_buffer[: self.part_size] + self.checksums.update_unencrypted(current_part) + download_buffer = download_buffer[self.part_size :] + + self.checksums.update_unencrypted(download_buffer) class Encryptor: @@ -232,15 +290,18 @@ def _encrypt(self, part: bytes): encrypted_segments = [] for segment in segments: - nonce = os.urandom(12) - encrypted_data = crypto_aead_chacha20poly1305_ietf_encrypt( - segment, None, nonce, self.file_secret - ) # no aad - - encrypted_segments.append(nonce + encrypted_data) + encrypted_segments.append(self._encrypt_segment(segment)) return b"".join(encrypted_segments), incomplete_segment + def _encrypt_segment(self, segment=bytes): + """Encrypt one single segment""" + nonce = os.urandom(12) + encrypted_data = crypto_aead_chacha20poly1305_ietf_encrypt( + segment, None, nonce, self.file_secret + ) # no aad + return nonce + encrypted_data + # type annotation for file parts, should be generator def process_file(self, file: BufferedReader): """Encrypt and upload file parts.""" @@ -263,35 +324,18 @@ def process_file(self, file: BufferedReader): yield current_part upload_buffer = upload_buffer[self.part_size :] - self.checksums.update_encrypted(upload_buffer) - yield upload_buffer - + # process dangling bytes + if unprocessed_bytes: + upload_buffer += self._encrypt_segment(unprocessed_bytes) -class ChunkedUploader: - """Handler class dealing with upload functionality""" + if len(upload_buffer) >= self.part_size: + current_part = upload_buffer[: self.part_size] + self.checksums.update_encrypted(current_part) + yield current_part + upload_buffer = upload_buffer[self.part_size :] - def __init__(self, input_path: Path, alias: str, config: Config) -> None: - self.alias = alias - self.config = config - self.input_path = input_path - self.encryptor = Encryptor(self.config.part_size) - self.file_id = str(uuid4()) - - async def encrypt_and_upload(self): - """Delegate encryption and perform multipart upload""" - file_size = self.input_path.stat().st_size - - with open(self.input_path, "rb") as file: - async with MultipartUpload( - config=self.config, - file_id=self.file_id, - file_size=file_size, - part_size=self.config.part_size, - ) as upload: - for part_number, part in enumerate( - self.encryptor.process_file(file=file) - ): - await upload.send_part(part_number=part_number, part=part) + self.checksums.update_encrypted(upload_buffer) + yield upload_buffer @dataclass @@ -347,97 +391,64 @@ def _check_output_path(self, output_path: Path): handle_superficial_error(msg=msg) -class ChunkedDownloader: - """Handler class dealing with download functionality""" +class MultipartUpload: + """Context manager to handle init + complete/abort for S3 multipart upload""" - def __init__( # pylint: disable=too-many-arguments - self, - config: Config, - file_id: str, - file_size: int, - file_secret: bytes, - part_size: int, - target_checksums: Checksums, + def __init__( + self, config: Config, file_id: str, file_size: int, part_size: int ) -> None: self.config = config - self.storage = objectstorage(self.config) + self.storage = objectstorage(config=self.config) self.file_id = file_id self.file_size = file_size - self.file_secret = file_secret self.part_size = part_size - self.target_checksums = target_checksums - - def _download_parts(self, download_url): - """Download file parts""" - for start, stop in get_ranges( - file_size=self.file_size, part_size=self.config.part_size - ): - headers = {"Range": f"bytes={start}-{stop}"} - response = SESSION.get(download_url, timeout=60, headers=headers) - yield response.content + self.upload_id = "" - async def download(self): - """Download file in parts and validate checksums""" - download_url = await self.storage.get_object_download_url( + async def __aenter__(self): + """Start multipart upload""" + self.upload_id = await self.storage.init_multipart_upload( bucket_id=self.config.bucket_id, object_id=self.file_id ) - decryptor = Decryptor(file_secret=self.file_secret, part_size=self.part_size) - download_func = partial(self._download_parts, download_url=download_url) - decryptor.process_parts(download_func) - self.validate_checksums(checkums=decryptor.checksums) + return self - def validate_checksums(self, checkums: Checksums): - """Confirm checksums for upload and download match""" - if not self.target_checksums.get() == checkums.get(): - raise ValueError( - f"Checksum mismatch:\nUpload:\n{checkums}\nDownload:\n{self.target_checksums}" + async def __aexit__(self, exc_t, exc_v, exc_tb): + """Complete or clean up multipart upload""" + try: + await self.storage.complete_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, + anticipated_part_quantity=math.ceil(self.file_size / self.part_size), + anticipated_part_size=self.part_size, ) - - -class Decryptor: - """Handles on the fly decryption and checksum calculation""" - - def __init__(self, file_secret: bytes, part_size: int) -> None: - self.checksums = Checksums() - self.file_secret = file_secret - self.part_size = part_size - - def process_parts(self, download_files: partial[Generator[bytes, None, None]]): - """Encrypt and upload file parts.""" - unprocessed_bytes = b"" - download_buffer = b"" - - for file_part in download_files(): - # process unencrypted - self.checksums.update_encrypted(file_part) - unprocessed_bytes += file_part - - # encrypt in chunks - decrypted_bytes, unprocessed_bytes = self._decrypt(unprocessed_bytes) - download_buffer += decrypted_bytes - - # update checksums and yield if part size - if len(download_buffer) >= self.part_size: - current_part = download_buffer[: self.part_size] - self.checksums.update_unencrypted(current_part) - download_buffer = download_buffer[self.part_size :] - - self.checksums.update_unencrypted(download_buffer) - - def _decrypt(self, part: bytes): - """Decrypt file part""" - segments, incomplete_segment = get_segments( - part=part, segment_size=crypt4gh.lib.CIPHER_SEGMENT_SIZE - ) - - decrypted_segments = [] - for segment in segments: - decrypted = crypt4gh.lib.decrypt_block( - ciphersegment=segment, session_keys=[self.file_secret] + except (Exception, KeyboardInterrupt) as exc: # pylint: disable=broad-except + await self.storage.abort_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, ) - decrypted_segments.append(decrypted) + raise exc - return b"".join(decrypted_segments), incomplete_segment + async def send_part(self, part: bytes, part_number: int): + """Handle upload of one file part""" + try: + upload_url = await self.storage.get_part_upload_url( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, + part_number=part_number, + ) + SESSION.put(url=upload_url, data=part) + except ( # pylint: disable=broad-except + Exception, + KeyboardInterrupt, + ) as exc: + await self.storage.abort_multipart_upload( + upload_id=self.upload_id, + bucket_id=self.config.bucket_id, + object_id=self.file_id, + ) + raise exc def objectstorage(config: Config): diff --git a/tests/fixtures/config.py b/tests/fixtures/config.py index c738381..afe2436 100644 --- a/tests/fixtures/config.py +++ b/tests/fixtures/config.py @@ -27,13 +27,11 @@ def config_fixture() -> Generator[Config, None, None]: """Generate a test Config file.""" - with TemporaryDirectory() as tmp_dir: - with TemporaryDirectory() as output_dir: - yield Config( - s3_endpoint_url="s3://test_url", - s3_access_key_id="test_access_key", - s3_secret_access_key="test_secret_key", - bucket_id="test_bucket", - tmp_dir=tmp_dir, - output_dir=output_dir, - ) + with TemporaryDirectory() as output_dir: + yield Config( + s3_endpoint_url="s3://test_url", + s3_access_key_id="test_access_key", + s3_secret_access_key="test_secret_key", + bucket_id="test_bucket", + output_dir=output_dir, + ) diff --git a/tests/integration/test_s3_upload.py b/tests/integration/test_s3_upload.py index 57511d8..0789c24 100644 --- a/tests/integration/test_s3_upload.py +++ b/tests/integration/test_s3_upload.py @@ -54,6 +54,5 @@ async def test_process(config_fixture: Config): # noqa: F811 sys.set_int_max_str_digits(50 * 1024**2) # type: ignore with big_temp_file(50 * 1024**2) as file: await async_main(input_path=Path(file.name), alias=ALIAS, config=config) - # tmp dir empty and output file exists? - assert not any(config.tmp_dir.iterdir()) + # output file exists? assert (config.output_dir / ALIAS).with_suffix(".json").exists() From 8fff1b3fc8f9d499f7f5913a8a5ac8c8a2296b2a Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 14:24:16 +0000 Subject: [PATCH 05/11] Tests now working --- src/s3_upload.py | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/s3_upload.py b/src/s3_upload.py index 6b63e7f..54863d5 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -141,22 +141,25 @@ def update_encrypted(self, part: bytes): class ChunkedUploader: """Handler class dealing with upload functionality""" - def __init__(self, input_path: Path, alias: str, config: Config) -> None: + def __init__( + self, input_path: Path, alias: str, config: Config, unencrypted_file_size: int + ) -> None: self.alias = alias self.config = config self.input_path = input_path self.encryptor = Encryptor(self.config.part_size) self.file_id = str(uuid4()) + self.unencrypted_file_size = unencrypted_file_size + self.encrypted_file_size = 0 async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" - file_size = self.input_path.stat().st_size with open(self.input_path, "rb") as file: async with MultipartUpload( config=self.config, file_id=self.file_id, - file_size=file_size, + file_size=self.unencrypted_file_size, part_size=self.config.part_size, ) as upload: LOGGER.info("(1/*) Initialized file uplod for %s\n", upload.file_id) @@ -176,7 +179,7 @@ def __init__( # pylint: disable=too-many-arguments self, config: Config, file_id: str, - file_size: int, + encrypted_file_size: int, file_secret: bytes, part_size: int, target_checksums: Checksums, @@ -184,7 +187,7 @@ def __init__( # pylint: disable=too-many-arguments self.config = config self.storage = objectstorage(self.config) self.file_id = file_id - self.file_size = file_size + self.file_size = encrypted_file_size self.file_secret = file_secret self.part_size = part_size self.target_checksums = target_checksums @@ -281,6 +284,7 @@ def __init__(self, part_size: int): self.part_size = part_size self.checksums = Checksums() self.file_secret = os.urandom(32) + self.encrypted_file_size = 0 def _encrypt(self, part: bytes): """Encrypt file part using secret""" @@ -321,6 +325,7 @@ def process_file(self, file: BufferedReader): if len(upload_buffer) >= self.part_size: current_part = upload_buffer[: self.part_size] self.checksums.update_encrypted(current_part) + self.encrypted_file_size += self.part_size yield current_part upload_buffer = upload_buffer[self.part_size :] @@ -331,10 +336,12 @@ def process_file(self, file: BufferedReader): if len(upload_buffer) >= self.part_size: current_part = upload_buffer[: self.part_size] self.checksums.update_encrypted(current_part) + self.encrypted_file_size += self.part_size yield current_part upload_buffer = upload_buffer[self.part_size :] self.checksums.update_encrypted(upload_buffer) + self.encrypted_file_size += len(upload_buffer) yield upload_buffer @@ -577,18 +584,37 @@ async def async_main(input_path: Path, alias: str, config: Config): file_size = input_path.stat().st_size check_adjust_part_size(config=config, file_size=file_size) - uploader = ChunkedUploader(input_path=input_path, alias=alias, config=config) + + uploader = ChunkedUploader( + input_path=input_path, + alias=alias, + config=config, + unencrypted_file_size=file_size, + ) await uploader.encrypt_and_upload() + downloader = ChunkedDownloader( config=config, file_id=uploader.file_id, - file_size=file_size, + encrypted_file_size=uploader.encryptor.encrypted_file_size, file_secret=uploader.encryptor.file_secret, part_size=config.part_size, target_checksums=uploader.encryptor.checksums, ) await downloader.download() + metadata = Metadata( + alias=uploader.alias, + file_uuid=uploader.file_id, + original_path=input_path, + part_size=config.part_size, + file_secret=uploader.encryptor.file_secret, + checksums=uploader.encryptor.checksums, + unencrypted_size=file_size, + encrypted_size=uploader.encryptor.encrypted_file_size, + ) + metadata.serialize(config.output_dir) + if __name__ == "__main__": logging.basicConfig(level=logging.INFO) From 45778a490a50bd30516df6daa8cd533c9bfbfb03 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 17:17:50 +0000 Subject: [PATCH 06/11] Fixed SecretStr issue, readded progress information --- src/s3_upload.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/s3_upload.py b/src/s3_upload.py index 54863d5..1494e0f 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -155,6 +155,8 @@ def __init__( async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" + num_parts = math.ceil(self.unencrypted_file_size / self.config.part_size) + with open(self.input_path, "rb") as file: async with MultipartUpload( config=self.config, @@ -162,15 +164,19 @@ async def encrypt_and_upload(self): file_size=self.unencrypted_file_size, part_size=self.config.part_size, ) as upload: - LOGGER.info("(1/*) Initialized file uplod for %s\n", upload.file_id) + LOGGER.info("(1/7) Initialized file uplod for %s.", upload.file_id) for part_number, part in enumerate( self.encryptor.process_file(file=file), start=1 ): LOGGER.info( - "\r(2/*) Processing upload for file part %i", part_number + "(2/7) Processing upload for file part (%i/%i)", + part_number, + num_parts, ) await upload.send_part(part_number=part_number, part=part) + LOGGER.info("(3/7) Finished upload for %s.", upload.file_id) + class ChunkedDownloader: """Handler class dealing with download functionality""" @@ -203,10 +209,14 @@ def _download_parts(self, download_url): async def download(self): """Download file in parts and validate checksums""" + LOGGER.info("(4/7) Downloading file %s for validation.", self.file_id) download_url = await self.storage.get_object_download_url( bucket_id=self.config.bucket_id, object_id=self.file_id ) - decryptor = Decryptor(file_secret=self.file_secret, part_size=self.part_size) + num_parts = math.ceil(self.file_size / self.part_size) + decryptor = Decryptor( + file_secret=self.file_secret, num_parts=num_parts, part_size=self.part_size + ) download_func = partial(self._download_parts, download_url=download_url) decryptor.process_parts(download_func) self.validate_checksums(checkums=decryptor.checksums) @@ -217,14 +227,16 @@ def validate_checksums(self, checkums: Checksums): raise ValueError( f"Checksum mismatch:\nUpload:\n{checkums}\nDownload:\n{self.target_checksums}" ) + LOGGER.info("(6/7) Succesfully validated checksums for %s.", self.file_id) class Decryptor: """Handles on the fly decryption and checksum calculation""" - def __init__(self, file_secret: bytes, part_size: int) -> None: + def __init__(self, file_secret: bytes, num_parts: int, part_size: int) -> None: self.checksums = Checksums() self.file_secret = file_secret + self.num_parts = num_parts self.part_size = part_size def _decrypt(self, part: bytes): @@ -250,7 +262,8 @@ def process_parts(self, download_files: partial[Generator[bytes, None, None]]): unprocessed_bytes = b"" download_buffer = b"" - for file_part in download_files(): + for part_number, file_part in enumerate(download_files()): + LOGGER.info("(5/7) Downloading part (%i/%i)", part_number, self.num_parts) # process unencrypted self.checksums.update_encrypted(file_part) unprocessed_bytes += file_part @@ -386,6 +399,7 @@ def serialize(self, output_dir: Path): output_path = output_dir / f"{self.alias}.json" self._check_output_path(output_path) + LOGGER.info("(7/7) Writing metadata to %s.", output_path) # owner read-only with output_path.open("w") as file: json.dump(output, file, indent=2) @@ -461,9 +475,9 @@ async def send_part(self, part: bytes, part_number: int): def objectstorage(config: Config): """Configure S3 and return S3 DAO""" s3_config = S3Config( - s3_endpoint_url=config.s3_endpoint_url, - s3_access_key_id=config.s3_access_key_id, - s3_secret_access_key=config.s3_secret_access_key, + s3_endpoint_url=config.s3_endpoint_url.get_secret_value(), + s3_access_key_id=config.s3_access_key_id.get_secret_value(), + s3_secret_access_key=config.s3_secret_access_key.get_secret_value(), ) return S3ObjectStorage(config=s3_config) From 5fef6e2d400c3363ac784cea55c200fca6d0938f Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 20:13:58 +0000 Subject: [PATCH 07/11] Added average speed, fixed metadata check --- src/otp_tsv_to_upload.py | 2 +- src/s3_upload.py | 41 ++++++++++++++++++++++++++++------------ 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/src/otp_tsv_to_upload.py b/src/otp_tsv_to_upload.py index 72a31c2..f4938f3 100755 --- a/src/otp_tsv_to_upload.py +++ b/src/otp_tsv_to_upload.py @@ -27,7 +27,7 @@ import typer from pydantic import BaseModel -from s3_upload import load_config_yaml +from .s3_upload import load_config_yaml HERE = Path(__file__).parent diff --git a/src/s3_upload.py b/src/s3_upload.py index 1494e0f..47731a9 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -32,6 +32,7 @@ from functools import partial from io import BufferedReader from pathlib import Path +from time import time from typing import Any, Generator from uuid import uuid4 @@ -156,6 +157,7 @@ async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" num_parts = math.ceil(self.unencrypted_file_size / self.config.part_size) + start = time() with open(self.input_path, "rb") as file: async with MultipartUpload( @@ -168,13 +170,18 @@ async def encrypt_and_upload(self): for part_number, part in enumerate( self.encryptor.process_file(file=file), start=1 ): + await upload.send_part(part_number=part_number, part=part) + + delta = time() - start + avg_speed = ( + part_number * (self.config.part_size / 1024**2) / delta + ) LOGGER.info( - "(2/7) Processing upload for file part (%i/%i)", + "(2/7) Processing upload for file part %i/%i (%.2f MiB/s)", part_number, num_parts, + avg_speed, ) - await upload.send_part(part_number=part_number, part=part) - LOGGER.info("(3/7) Finished upload for %s.", upload.file_id) @@ -261,9 +268,9 @@ def process_parts(self, download_files: partial[Generator[bytes, None, None]]): """Encrypt and upload file parts.""" unprocessed_bytes = b"" download_buffer = b"" + start = time() for part_number, file_part in enumerate(download_files()): - LOGGER.info("(5/7) Downloading part (%i/%i)", part_number, self.num_parts) # process unencrypted self.checksums.update_encrypted(file_part) unprocessed_bytes += file_part @@ -278,6 +285,15 @@ def process_parts(self, download_files: partial[Generator[bytes, None, None]]): self.checksums.update_unencrypted(current_part) download_buffer = download_buffer[self.part_size :] + delta = time() - start + avg_speed = (part_number * (self.part_size / 1024**2)) / delta + LOGGER.info( + "(5/7) Downloading part %i/%i (%.2f MiB/s)", + part_number, + self.num_parts, + avg_speed, + ) + # process dangling bytes if unprocessed_bytes: download_buffer += self._decrypt_segment(unprocessed_bytes) @@ -397,7 +413,6 @@ def serialize(self, output_dir: Path): output_dir.mkdir(parents=True) output_path = output_dir / f"{self.alias}.json" - self._check_output_path(output_path) LOGGER.info("(7/7) Writing metadata to %s.", output_path) # owner read-only @@ -405,12 +420,6 @@ def serialize(self, output_dir: Path): json.dump(output, file, indent=2) os.chmod(path=output_path, mode=0o400) - def _check_output_path(self, output_path: Path): - """Check if we accidentally try to overwrite an alread existing metadata file""" - if output_path.exists(): - msg = f"Output file {output_path.resolve()} already exists and cannot be overwritten." - handle_superficial_error(msg=msg) - class MultipartUpload: """Context manager to handle init + complete/abort for S3 multipart upload""" @@ -560,6 +569,13 @@ def check_adjust_part_size(config: Config, file_size: int): config.part_size = part_size +def check_output_path(output_path: Path): + """Check if we accidentally try to overwrite an alread existing metadata file""" + if output_path.exists(): + msg = f"Output file {output_path.resolve()} already exists and cannot be overwritten." + handle_superficial_error(msg=msg) + + def main( input_path: Path = typer.Option(..., help="Local path of the input file"), alias: str = typer.Option(..., help="A human readable file alias"), @@ -571,7 +587,6 @@ def main( """ config = load_config_yaml(config_path) - asyncio.run(async_main(input_path=input_path, alias=alias, config=config)) @@ -596,6 +611,8 @@ async def async_main(input_path: Path, alias: str, config: Config): msg = f"File location points to a directory: {input_path.resolve()}" handle_superficial_error(msg=msg) + check_output_path(config.output_dir / f"{alias}.json") + file_size = input_path.stat().st_size check_adjust_part_size(config=config, file_size=file_size) From 708e65cade248af5e7355536bad21a20508a5ddf Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 20:32:16 +0000 Subject: [PATCH 08/11] Fixed test issue --- tests/fixtures/config.py | 7 ++++--- tests/integration/test_s3_upload.py | 5 +++-- tests/unit/test_size_adjustment.py | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/tests/fixtures/config.py b/tests/fixtures/config.py index afe2436..1711ea0 100644 --- a/tests/fixtures/config.py +++ b/tests/fixtures/config.py @@ -19,6 +19,7 @@ from typing import Generator import pytest +from pydantic import SecretStr from src.s3_upload import Config @@ -29,9 +30,9 @@ def config_fixture() -> Generator[Config, None, None]: with TemporaryDirectory() as output_dir: yield Config( - s3_endpoint_url="s3://test_url", - s3_access_key_id="test_access_key", - s3_secret_access_key="test_secret_key", + s3_endpoint_url=SecretStr("s3://test_url"), + s3_access_key_id=SecretStr("test_access_key"), + s3_secret_access_key=SecretStr("test_secret_key"), bucket_id="test_bucket", output_dir=output_dir, ) diff --git a/tests/integration/test_s3_upload.py b/tests/integration/test_s3_upload.py index 0789c24..a39b3de 100644 --- a/tests/integration/test_s3_upload.py +++ b/tests/integration/test_s3_upload.py @@ -23,6 +23,7 @@ from hexkit.providers.s3.testutils import ( # type: ignore config_from_localstack_container, ) +from pydantic import SecretStr from testcontainers.localstack import LocalStackContainer # type: ignore from src.s3_upload import Config, async_main, objectstorage @@ -43,8 +44,8 @@ async def test_process(config_fixture: Config): # noqa: F811 config = config_fixture.copy( update={ - "s3_endpoint_url": s3_config.s3_endpoint_url, - "s3_access_key_id": s3_config.s3_access_key_id, + "s3_endpoint_url": SecretStr(s3_config.s3_endpoint_url), + "s3_access_key_id": SecretStr(s3_config.s3_access_key_id), "s3_secret_access_key": s3_config.s3_secret_access_key, "bucket_id": BUCKET_ID, } diff --git a/tests/unit/test_size_adjustment.py b/tests/unit/test_size_adjustment.py index 1bf1e4e..4d9bde8 100644 --- a/tests/unit/test_size_adjustment.py +++ b/tests/unit/test_size_adjustment.py @@ -19,7 +19,7 @@ from ..fixtures.config import config_fixture # noqa: F401 -def test_check_adjust_part_size(config_fixture: Config): # noqa: F811 +def test_check_adjust_part_size(config: Config): # noqa: F811 """Test adaptive adjustment""" config_fixture.part_size = 16 file_size = 16 * 80_000 * 1024**2 @@ -28,7 +28,7 @@ def test_check_adjust_part_size(config_fixture: Config): # noqa: F811 assert adjusted_part_size == 256 -def test_check_adjust_part_size_lower_bound(config_fixture: Config): # noqa: F811 +def test_check_adjust_part_size_lower_bound(config: Config): # noqa: F811 """Test lower bound""" lower_expect = 5 * 1024**2 config_fixture.part_size = 4 @@ -36,7 +36,7 @@ def test_check_adjust_part_size_lower_bound(config_fixture: Config): # noqa: F8 assert config_fixture.part_size == lower_expect -def test_check_adjust_part_size_upper_bound(config_fixture: Config): # noqa: F811 +def test_check_adjust_part_size_upper_bound(config: Config): # noqa: F811 """Test upper bound""" upper_expect = 5 * 1024**3 config_fixture.part_size = int(5.1 * 1024) From 4ebb8f55f25f05651d96df4eef3f15f160a862c5 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Thu, 27 Apr 2023 20:37:52 +0000 Subject: [PATCH 09/11] Unbroke broken stuff --- tests/unit/test_size_adjustment.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_size_adjustment.py b/tests/unit/test_size_adjustment.py index 4d9bde8..1bf1e4e 100644 --- a/tests/unit/test_size_adjustment.py +++ b/tests/unit/test_size_adjustment.py @@ -19,7 +19,7 @@ from ..fixtures.config import config_fixture # noqa: F401 -def test_check_adjust_part_size(config: Config): # noqa: F811 +def test_check_adjust_part_size(config_fixture: Config): # noqa: F811 """Test adaptive adjustment""" config_fixture.part_size = 16 file_size = 16 * 80_000 * 1024**2 @@ -28,7 +28,7 @@ def test_check_adjust_part_size(config: Config): # noqa: F811 assert adjusted_part_size == 256 -def test_check_adjust_part_size_lower_bound(config: Config): # noqa: F811 +def test_check_adjust_part_size_lower_bound(config_fixture: Config): # noqa: F811 """Test lower bound""" lower_expect = 5 * 1024**2 config_fixture.part_size = 4 @@ -36,7 +36,7 @@ def test_check_adjust_part_size_lower_bound(config: Config): # noqa: F811 assert config_fixture.part_size == lower_expect -def test_check_adjust_part_size_upper_bound(config: Config): # noqa: F811 +def test_check_adjust_part_size_upper_bound(config_fixture: Config): # noqa: F811 """Test upper bound""" upper_expect = 5 * 1024**3 config_fixture.part_size = int(5.1 * 1024) From d53c4e5f7373be7602bdf3e39b5b0bce1e3ac8eb Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Fri, 28 Apr 2023 07:21:50 +0000 Subject: [PATCH 10/11] Hopefully fixed part size calculation --- src/s3_upload.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/s3_upload.py b/src/s3_upload.py index 47731a9..cf07742 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -157,13 +157,17 @@ async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" num_parts = math.ceil(self.unencrypted_file_size / self.config.part_size) + # compute encrypted_file_size + num_segments = math.ceil(self.unencrypted_file_size / crypt4gh.lib.SEGMENT_SIZE) + encrypted_file_size = self.unencrypted_file_size + num_segments * 28 + start = time() with open(self.input_path, "rb") as file: async with MultipartUpload( config=self.config, file_id=self.file_id, - file_size=self.unencrypted_file_size, + encrypted_file_size=encrypted_file_size, part_size=self.config.part_size, ) as upload: LOGGER.info("(1/7) Initialized file uplod for %s.", upload.file_id) @@ -182,6 +186,12 @@ async def encrypt_and_upload(self): num_parts, avg_speed, ) + if encrypted_file_size != self.encryptor.encrypted_file_size: + raise ValueError( + "Mismatch between actual and theoretical encrypted part size:\n" + + f"Is: {self.encryptor.encrypted_file_size}\n" + + "Should be: {encrypted_file_size}" + ) LOGGER.info("(3/7) Finished upload for %s.", upload.file_id) @@ -425,12 +435,12 @@ class MultipartUpload: """Context manager to handle init + complete/abort for S3 multipart upload""" def __init__( - self, config: Config, file_id: str, file_size: int, part_size: int + self, config: Config, file_id: str, encrypted_file_size: int, part_size: int ) -> None: self.config = config self.storage = objectstorage(config=self.config) self.file_id = file_id - self.file_size = file_size + self.file_size = encrypted_file_size self.part_size = part_size self.upload_id = "" From 92409f225b908343524adcb814b4b8845cf38dc1 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Tue, 2 May 2023 09:07:35 +0000 Subject: [PATCH 11/11] Fixed some smaller issues --- src/s3_upload.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/src/s3_upload.py b/src/s3_upload.py index cf07742..11889a2 100755 --- a/src/s3_upload.py +++ b/src/s3_upload.py @@ -156,10 +156,10 @@ def __init__( async def encrypt_and_upload(self): """Delegate encryption and perform multipart upload""" - num_parts = math.ceil(self.unencrypted_file_size / self.config.part_size) # compute encrypted_file_size num_segments = math.ceil(self.unencrypted_file_size / crypt4gh.lib.SEGMENT_SIZE) encrypted_file_size = self.unencrypted_file_size + num_segments * 28 + num_parts = math.ceil(encrypted_file_size / self.config.part_size) start = time() @@ -190,7 +190,7 @@ async def encrypt_and_upload(self): raise ValueError( "Mismatch between actual and theoretical encrypted part size:\n" + f"Is: {self.encryptor.encrypted_file_size}\n" - + "Should be: {encrypted_file_size}" + + f"Should be: {encrypted_file_size}" ) LOGGER.info("(3/7) Finished upload for %s.", upload.file_id) @@ -217,10 +217,13 @@ def __init__( # pylint: disable=too-many-arguments def _download_parts(self, download_url): """Download file parts""" - for start, stop in get_ranges( - file_size=self.file_size, part_size=self.config.part_size + + for part_no, (start, stop) in enumerate( + get_ranges(file_size=self.file_size, part_size=self.config.part_size), + start=1, ): headers = {"Range": f"bytes={start}-{stop}"} + LOGGER.debug("Downloading part number %i. %s", part_no, headers) response = SESSION.get(download_url, timeout=60, headers=headers) yield response.content @@ -308,12 +311,13 @@ def process_parts(self, download_files: partial[Generator[bytes, None, None]]): if unprocessed_bytes: download_buffer += self._decrypt_segment(unprocessed_bytes) - if len(download_buffer) >= self.part_size: + while len(download_buffer) >= self.part_size: current_part = download_buffer[: self.part_size] self.checksums.update_unencrypted(current_part) download_buffer = download_buffer[self.part_size :] - self.checksums.update_unencrypted(download_buffer) + if download_buffer: + self.checksums.update_unencrypted(download_buffer) class Encryptor: @@ -372,16 +376,17 @@ def process_file(self, file: BufferedReader): if unprocessed_bytes: upload_buffer += self._encrypt_segment(unprocessed_bytes) - if len(upload_buffer) >= self.part_size: + while len(upload_buffer) >= self.part_size: current_part = upload_buffer[: self.part_size] self.checksums.update_encrypted(current_part) self.encrypted_file_size += self.part_size yield current_part upload_buffer = upload_buffer[self.part_size :] - self.checksums.update_encrypted(upload_buffer) - self.encrypted_file_size += len(upload_buffer) - yield upload_buffer + if upload_buffer: + self.checksums.update_encrypted(upload_buffer) + self.encrypted_file_size += len(upload_buffer) + yield upload_buffer @dataclass @@ -521,12 +526,14 @@ def get_segments(part: bytes, segment_size: int): def get_ranges(file_size: int, part_size: int): """Calculate part ranges""" num_parts = file_size / part_size + num_parts_floor = int(num_parts) + byte_ranges = [ (part_size * part_no, part_size * (part_no + 1) - 1) - for part_no in range(int(num_parts)) + for part_no in range(num_parts_floor) ] - if math.ceil(num_parts) != int(num_parts): - byte_ranges.append((part_size * int(num_parts), file_size - 1)) + if math.ceil(num_parts) != num_parts_floor: + byte_ranges.append((part_size * num_parts_floor, file_size - 1)) return byte_ranges