From 22b8ec4709a26f396bc8bd1158b0c9e84aa4c5c3 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Mon, 3 Jul 2023 15:12:49 +0000 Subject: [PATCH 1/4] Updated dependencies Added alias->accesion mapping to get meaningful ID --- ghga_datasteward_kit/cli/file.py | 10 +-- ghga_datasteward_kit/file_ingest.py | 62 +++++++++++------ ghga_datasteward_kit/s3_upload.py | 15 ++--- setup.cfg | 4 +- tests/fixtures/ingest.py | 100 +++++++++++++++++++--------- tests/test_file_ingest.py | 33 ++++++--- tests/test_s3_upload.py | 4 ++ 7 files changed, 146 insertions(+), 82 deletions(-) diff --git a/ghga_datasteward_kit/cli/file.py b/ghga_datasteward_kit/cli/file.py index 4241275..522d5ad 100644 --- a/ghga_datasteward_kit/cli/file.py +++ b/ghga_datasteward_kit/cli/file.py @@ -66,15 +66,7 @@ def ingest_upload_metadata( ): """Upload all output metdata files from the given directory to the file ingest service""" - def dummy_generator(): - """Placeholder, needs replacement with actual implementation""" - while True: - yield "test_id" - - errors = file_ingest.main( - config_path=config_path, - id_generator=dummy_generator, - ) + errors = file_ingest.main(config_path=config_path) if errors: print(f"Encountered {len(errors)} errors during processing.") diff --git a/ghga_datasteward_kit/file_ingest.py b/ghga_datasteward_kit/file_ingest.py index 4162437..ff96f23 100644 --- a/ghga_datasteward_kit/file_ingest.py +++ b/ghga_datasteward_kit/file_ingest.py @@ -14,17 +14,20 @@ # limitations under the License. """Interaction with file ingest service""" -from itertools import islice from pathlib import Path from typing import Callable import httpx -from pydantic import BaseSettings, Field, ValidationError +from metldata.submission_registry.submission_store import ( + SubmissionStore, + SubmissionStoreConfig, +) +from pydantic import Field, ValidationError from ghga_datasteward_kit import models, utils -class IngestConfig(BaseSettings): +class IngestConfig(SubmissionStoreConfig): """Config options for calling the file ingest endpoint""" file_ingest_url: str = Field( @@ -40,9 +43,30 @@ class IngestConfig(BaseSettings): ) +def alias_to_accession(alias: str, submission_store: SubmissionStore) -> str: + """Get all submissions to retrieve valid accessions from corresponding file aliases""" + + submission_ids = submission_store.get_all_submission_ids() + + all_submission_map = {} + + for submission_id in submission_ids: + all_submission_map.update( + submission_store.get_by_id(submission_id=submission_id).accession_map[ + "files" + ] + ) + + accession = all_submission_map.get(alias) + + if accession is None: + raise ValueError(f"No accession exists for file alias {alias}") + + return accession + + def main( config_path: Path, - id_generator: Callable[[], str], ): """Handle ingestion of a folder of s3 upload file metadata""" @@ -51,23 +75,11 @@ def main( errors = {} - # pre generate paths/ids to make sure generator procudes a sufficient amount of ids - file_paths = [ - file_path - for file_path in config.input_dir.iterdir() - if file_path.suffix == ".json" - ] - file_ids = list(islice(id_generator(), len(file_paths))) - - if len(file_paths) != len(file_ids): - raise ValueError( - "Provided ID generator function does not create the correct amount of IDs." - + f"\nRequired: {len(file_paths)}, generated {len(file_ids)}" - ) - - for in_path, file_id in zip(file_paths, file_ids): + for in_path in config.input_dir.iterdir(): + if in_path.suffix != ".json": + continue try: - file_ingest(in_path=in_path, file_id=file_id, token=token, config=config) + file_ingest(in_path=in_path, token=token, config=config) except (ValidationError, ValueError) as error: errors[in_path.resolve()] = str(error) continue @@ -75,13 +87,21 @@ def main( return errors -def file_ingest(in_path: Path, file_id: str, token: str, config: IngestConfig): +def file_ingest( + in_path: Path, + token: str, + config: IngestConfig, + alias_to_id: Callable[[str, SubmissionStore], str] = alias_to_accession, +): """ Transform from s3 upload output representation to what the file ingest service expects. Then call the ingest endpoint """ + submission_store = SubmissionStore(config=config) + output_metadata = models.OutputMetadata.load(input_path=in_path) + file_id = alias_to_id(output_metadata.alias, submission_store) upload_metadata = output_metadata.to_upload_metadata(file_id=file_id) encrypted = upload_metadata.encrypt_metadata(pubkey=config.file_ingest_pubkey) diff --git a/ghga_datasteward_kit/s3_upload.py b/ghga_datasteward_kit/s3_upload.py index 28cc506..44a2742 100755 --- a/ghga_datasteward_kit/s3_upload.py +++ b/ghga_datasteward_kit/s3_upload.py @@ -35,9 +35,8 @@ import crypt4gh.header # type: ignore import crypt4gh.keys # type: ignore import crypt4gh.lib # type: ignore -import requests # type: ignore +from ghga_connector.core.client import HttpxClientState, httpx_client from ghga_connector.core.file_operations import read_file_parts -from ghga_connector.core.session import RequestsSession from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt from pydantic import BaseSettings, Field, SecretStr, validator @@ -46,15 +45,13 @@ from ghga_datasteward_kit.utils import load_config_yaml -def configure_session() -> requests.Session: +def configure_session(): """Configure session with exponential backoff retry""" - RequestsSession.configure(6) - return RequestsSession.session + HttpxClientState.configure(6) LOGGER = logging.getLogger("s3_upload") PART_SIZE = 16 * 1024**2 -SESSION = configure_session() def expand_env_vars_in_path(path: Path) -> Path: @@ -187,7 +184,8 @@ def _download_parts(self, download_url): ): headers = {"Range": f"bytes={start}-{stop}"} LOGGER.debug("Downloading part number %i. %s", part_no, headers) - response = SESSION.get(download_url, timeout=60, headers=headers) + with httpx_client() as client: + response = client.get(download_url, timeout=60, headers=headers) yield response.content async def download(self): @@ -399,7 +397,8 @@ async def send_part(self, part: bytes, part_number: int): object_id=self.file_id, part_number=part_number, ) - SESSION.put(url=upload_url, data=part) + with httpx_client() as client: + client.put(url=upload_url, data=part) except ( # pylint: disable=broad-except Exception, KeyboardInterrupt, diff --git a/setup.cfg b/setup.cfg index b686726..bfc04e5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,9 +36,9 @@ include_package_data = True packages = find: install_requires = hexkit[mongodb,s3]==0.10.0 - ghga-connector==0.3.3 + ghga-connector==0.3.5 ghga-transpiler==1.0.3 - metldata==0.2.3 + metldata==0.3.6 python_requires = >= 3.9 diff --git a/tests/fixtures/ingest.py b/tests/fixtures/ingest.py index 80d39d4..10c3c1e 100644 --- a/tests/fixtures/ingest.py +++ b/tests/fixtures/ingest.py @@ -23,10 +23,31 @@ import pytest from ghga_service_commons.utils.crypt import KeyPair, encode_key, generate_key_pair from ghga_service_commons.utils.simple_token import generate_token_and_hash +from ghga_service_commons.utils.utc_dates import now_as_utc +from metldata.submission_registry.models import ( + StatusChange, + Submission, + SubmissionStatus, +) +from metldata.submission_registry.submission_store import SubmissionStore from ghga_datasteward_kit.file_ingest import IngestConfig from ghga_datasteward_kit.models import OutputMetadata +EXAMPLE_SUBMISSION = Submission( + title="test", + description="test", + content={"test_class": [{"alias": "test_alias"}]}, + accession_map={"files": {"test_alias": "test_accession"}}, + id="testsubmission001", + status_history=( + StatusChange( + timestamp=now_as_utc(), + new_status=SubmissionStatus.COMPLETED, + ), + ), +) + @dataclass class IngestFixture: @@ -44,35 +65,50 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]: """Generate necessary data for file ingest.""" with TemporaryDirectory() as input_dir: - token, token_hash = generate_token_and_hash() - keypair = generate_key_pair() - - file_path = Path(input_dir) / "test.json" - - metadata = OutputMetadata( - alias="test", - file_uuid="happy_little_object", - original_path=file_path, - part_size=16 * 1024**2, - unencrypted_size=50 * 1024**2, - encrypted_size=50 * 1024**2 + 128, - file_secret=os.urandom(32), - unencrypted_checksum="def", - encrypted_md5_checksums=["a", "b", "c"], - encrypted_sha256_checksums=["a", "b", "c"], - ) - - metadata.serialize(file_path) - - config = IngestConfig( - file_ingest_url="https://not-a-valid-url", - file_ingest_pubkey=encode_key(keypair.public), - input_dir=Path(input_dir), - ) - yield IngestFixture( - config=config, - file_path=file_path, - token=token, - token_hash=token_hash, - keypair=keypair, - ) + with TemporaryDirectory() as submission_store_dir: + token, token_hash = generate_token_and_hash() + keypair = generate_key_pair() + + file_path = Path(input_dir) / "test.json" + + metadata = OutputMetadata( + alias="test_alias", + file_uuid="happy_little_object", + original_path=file_path, + part_size=16 * 1024**2, + unencrypted_size=50 * 1024**2, + encrypted_size=50 * 1024**2 + 128, + file_secret=os.urandom(32), + unencrypted_checksum="def", + encrypted_md5_checksums=["a", "b", "c"], + encrypted_sha256_checksums=["a", "b", "c"], + ) + + metadata.serialize(file_path) + + config = IngestConfig( + file_ingest_url="https://not-a-valid-url", + file_ingest_pubkey=encode_key(keypair.public), + input_dir=Path(input_dir), + submission_store_dir=Path(submission_store_dir), + ) + + submission_store = SubmissionStore(config=config) + submission_store.insert_new(submission=EXAMPLE_SUBMISSION) + submission_2 = EXAMPLE_SUBMISSION.copy( + update={ + "title": "test2", + "content": {"test_class": [{"alias": "test_alias2"}]}, + "accession_map": {"files": {"test_alias2": "test_accession2"}}, + "id": "testsubmission002", + } + ) + submission_store.insert_new(submission=submission_2) + + yield IngestFixture( + config=config, + file_path=file_path, + token=token, + token_hash=token_hash, + keypair=keypair, + ) diff --git a/tests/test_file_ingest.py b/tests/test_file_ingest.py index 5d60d1a..9ee248c 100644 --- a/tests/test_file_ingest.py +++ b/tests/test_file_ingest.py @@ -16,17 +16,32 @@ import pytest import yaml from ghga_service_commons.utils.simple_token import generate_token +from metldata.submission_registry.submission_store import SubmissionStore from pytest_httpx import HTTPXMock +from ghga_datasteward_kit import models from ghga_datasteward_kit.cli.file import ingest_upload_metadata -from ghga_datasteward_kit.file_ingest import file_ingest -from tests.fixtures.ingest import IngestFixture, ingest_fixture # noqa: F401 +from ghga_datasteward_kit.file_ingest import alias_to_accession, file_ingest +from tests.fixtures.ingest import ( # noqa: F401 + EXAMPLE_SUBMISSION, + IngestFixture, + ingest_fixture, +) -def id_generator(): - """Generate dummy IDs.""" - for i in [1, 2]: - yield f"test_{i}" +@pytest.mark.asyncio +async def test_alias_to_accession(ingest_fixture: IngestFixture): # noqa: F811 + """Test alias->accession mapping""" + + submission_store = SubmissionStore(config=ingest_fixture.config) + metadata = models.OutputMetadata.load(input_path=ingest_fixture.file_path) + + accession = alias_to_accession(metadata.alias, submission_store=submission_store) + example_accession = list(EXAMPLE_SUBMISSION.accession_map["files"].values())[0] + assert accession == example_accession + + with pytest.raises(ValueError): + alias_to_accession("invalid_alias", submission_store=submission_store) @pytest.mark.asyncio @@ -40,7 +55,6 @@ async def test_ingest_directly( httpx_mock.add_response(url=ingest_fixture.config.file_ingest_url, status_code=202) file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -53,7 +67,6 @@ async def test_ingest_directly( with pytest.raises(ValueError, match="Unauthorized"): file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -68,7 +81,6 @@ async def test_ingest_directly( ): file_ingest( in_path=ingest_fixture.file_path, - file_id="test", token=token, config=ingest_fixture.config, ) @@ -87,9 +99,10 @@ async def test_main( config = ingest_fixture.config.dict() config["input_dir"] = str(config["input_dir"]) + config["submission_store_dir"] = str(config["submission_store_dir"]) with config_path.open("w") as config_file: - yaml.dump(config, config_file) + yaml.safe_dump(config, config_file) monkeypatch.setattr("ghga_datasteward_kit.utils.read_token", generate_token) diff --git a/tests/test_s3_upload.py b/tests/test_s3_upload.py index b8e7ebd..c62005a 100644 --- a/tests/test_s3_upload.py +++ b/tests/test_s3_upload.py @@ -19,6 +19,7 @@ from pathlib import Path import pytest +from ghga_connector.core.client import HttpxClientState from ghga_service_commons.utils.temp_files import big_temp_file # type: ignore from hexkit.providers.s3.testutils import ( # type: ignore config_from_localstack_container, @@ -36,6 +37,9 @@ @pytest.mark.asyncio async def test_process(config_fixture: Config): # noqa: F811 """Test whole upload/download process for s3_upload script""" + + HttpxClientState.configure(3) + with LocalStackContainer(image="localstack/localstack:0.14.2").with_services( "s3" ) as localstack: From 568d168b0fe7e4601e70bf4626838e3cc2b7cede Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Mon, 3 Jul 2023 15:15:53 +0000 Subject: [PATCH 2/4] Updated config --- ingest_config.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ingest_config.md b/ingest_config.md index d58b649..f746fe5 100644 --- a/ingest_config.md +++ b/ingest_config.md @@ -7,6 +7,8 @@ ## Properties +- **`submission_store_dir`** *(string)*: The directory where the submission JSONs will be stored. + - **`file_ingest_url`** *(string)*: Base URL under which the /ingest endpoint is available. - **`file_ingest_pubkey`** *(string)*: Public key used for encryption of the payload. From 3d390f217637129827ceeb48744d9394ee02b015 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Mon, 3 Jul 2023 15:23:14 +0000 Subject: [PATCH 3/4] Version bump --- ghga_datasteward_kit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ghga_datasteward_kit/__init__.py b/ghga_datasteward_kit/__init__.py index 56dbb82..e62b876 100644 --- a/ghga_datasteward_kit/__init__.py +++ b/ghga_datasteward_kit/__init__.py @@ -15,4 +15,4 @@ """A utils package for GHGA data stewards.""" -__version__ = "0.4.0" +__version__ = "0.4.1" From 540b85b5b61e4a06c42dc0b6c3f14b160db43619 Mon Sep 17 00:00:00 2001 From: "Thomas J. Zajac" Date: Mon, 3 Jul 2023 15:26:35 +0000 Subject: [PATCH 4/4] Removed unneeded test setup --- tests/fixtures/ingest.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/fixtures/ingest.py b/tests/fixtures/ingest.py index 10c3c1e..58310ce 100644 --- a/tests/fixtures/ingest.py +++ b/tests/fixtures/ingest.py @@ -95,15 +95,6 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]: submission_store = SubmissionStore(config=config) submission_store.insert_new(submission=EXAMPLE_SUBMISSION) - submission_2 = EXAMPLE_SUBMISSION.copy( - update={ - "title": "test2", - "content": {"test_class": [{"alias": "test_alias2"}]}, - "accession_map": {"files": {"test_alias2": "test_accession2"}}, - "id": "testsubmission002", - } - ) - submission_store.insert_new(submission=submission_2) yield IngestFixture( config=config,