Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Added alias->accession mapping to get meaningful ID #18

Merged
merged 4 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ghga_datasteward_kit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

"""A utils package for GHGA data stewards."""

__version__ = "0.4.0"
__version__ = "0.4.1"
10 changes: 1 addition & 9 deletions ghga_datasteward_kit/cli/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,7 @@ def ingest_upload_metadata(
):
"""Upload all output metdata files from the given directory to the file ingest service"""

def dummy_generator():
"""Placeholder, needs replacement with actual implementation"""
while True:
yield "test_id"

errors = file_ingest.main(
config_path=config_path,
id_generator=dummy_generator,
)
errors = file_ingest.main(config_path=config_path)

if errors:
print(f"Encountered {len(errors)} errors during processing.")
Expand Down
62 changes: 41 additions & 21 deletions ghga_datasteward_kit/file_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
# limitations under the License.
"""Interaction with file ingest service"""

from itertools import islice
from pathlib import Path
from typing import Callable

import httpx
from pydantic import BaseSettings, Field, ValidationError
from metldata.submission_registry.submission_store import (
SubmissionStore,
SubmissionStoreConfig,
)
from pydantic import Field, ValidationError

from ghga_datasteward_kit import models, utils


class IngestConfig(BaseSettings):
class IngestConfig(SubmissionStoreConfig):
"""Config options for calling the file ingest endpoint"""

file_ingest_url: str = Field(
Expand All @@ -40,9 +43,30 @@ class IngestConfig(BaseSettings):
)


def alias_to_accession(alias: str, submission_store: SubmissionStore) -> str:
"""Get all submissions to retrieve valid accessions from corresponding file aliases"""

submission_ids = submission_store.get_all_submission_ids()

all_submission_map = {}

for submission_id in submission_ids:
all_submission_map.update(
submission_store.get_by_id(submission_id=submission_id).accession_map[
"files"
]
)

accession = all_submission_map.get(alias)

if accession is None:
raise ValueError(f"No accession exists for file alias {alias}")

return accession


def main(
config_path: Path,
id_generator: Callable[[], str],
):
"""Handle ingestion of a folder of s3 upload file metadata"""

Expand All @@ -51,37 +75,33 @@ def main(

errors = {}

# pre generate paths/ids to make sure generator procudes a sufficient amount of ids
file_paths = [
file_path
for file_path in config.input_dir.iterdir()
if file_path.suffix == ".json"
]
file_ids = list(islice(id_generator(), len(file_paths)))

if len(file_paths) != len(file_ids):
raise ValueError(
"Provided ID generator function does not create the correct amount of IDs."
+ f"\nRequired: {len(file_paths)}, generated {len(file_ids)}"
)

for in_path, file_id in zip(file_paths, file_ids):
for in_path in config.input_dir.iterdir():
if in_path.suffix != ".json":
continue
try:
file_ingest(in_path=in_path, file_id=file_id, token=token, config=config)
file_ingest(in_path=in_path, token=token, config=config)
except (ValidationError, ValueError) as error:
errors[in_path.resolve()] = str(error)
continue

return errors


def file_ingest(in_path: Path, file_id: str, token: str, config: IngestConfig):
def file_ingest(
in_path: Path,
token: str,
config: IngestConfig,
alias_to_id: Callable[[str, SubmissionStore], str] = alias_to_accession,
):
"""
Transform from s3 upload output representation to what the file ingest service expects.
Then call the ingest endpoint
"""

submission_store = SubmissionStore(config=config)

output_metadata = models.OutputMetadata.load(input_path=in_path)
file_id = alias_to_id(output_metadata.alias, submission_store)
upload_metadata = output_metadata.to_upload_metadata(file_id=file_id)
encrypted = upload_metadata.encrypt_metadata(pubkey=config.file_ingest_pubkey)

Expand Down
15 changes: 7 additions & 8 deletions ghga_datasteward_kit/s3_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
import crypt4gh.header # type: ignore
import crypt4gh.keys # type: ignore
import crypt4gh.lib # type: ignore
import requests # type: ignore
from ghga_connector.core.client import HttpxClientState, httpx_client
from ghga_connector.core.file_operations import read_file_parts
from ghga_connector.core.session import RequestsSession
from hexkit.providers.s3 import S3Config, S3ObjectStorage # type: ignore
from nacl.bindings import crypto_aead_chacha20poly1305_ietf_encrypt
from pydantic import BaseSettings, Field, SecretStr, validator
Expand All @@ -46,15 +45,13 @@
from ghga_datasteward_kit.utils import load_config_yaml


def configure_session() -> requests.Session:
def configure_session():
"""Configure session with exponential backoff retry"""
RequestsSession.configure(6)
return RequestsSession.session
HttpxClientState.configure(6)


LOGGER = logging.getLogger("s3_upload")
PART_SIZE = 16 * 1024**2
SESSION = configure_session()


def expand_env_vars_in_path(path: Path) -> Path:
Expand Down Expand Up @@ -187,7 +184,8 @@ def _download_parts(self, download_url):
):
headers = {"Range": f"bytes={start}-{stop}"}
LOGGER.debug("Downloading part number %i. %s", part_no, headers)
response = SESSION.get(download_url, timeout=60, headers=headers)
with httpx_client() as client:
response = client.get(download_url, timeout=60, headers=headers)
yield response.content

async def download(self):
Expand Down Expand Up @@ -399,7 +397,8 @@ async def send_part(self, part: bytes, part_number: int):
object_id=self.file_id,
part_number=part_number,
)
SESSION.put(url=upload_url, data=part)
with httpx_client() as client:
client.put(url=upload_url, data=part)
except ( # pylint: disable=broad-except
Exception,
KeyboardInterrupt,
Expand Down
2 changes: 2 additions & 0 deletions ingest_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
## Properties


- **`submission_store_dir`** *(string)*: The directory where the submission JSONs will be stored.

- **`file_ingest_url`** *(string)*: Base URL under which the /ingest endpoint is available.

- **`file_ingest_pubkey`** *(string)*: Public key used for encryption of the payload.
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ include_package_data = True
packages = find:
install_requires =
hexkit[mongodb,s3]==0.10.0
ghga-connector==0.3.3
ghga-connector==0.3.5
ghga-transpiler==1.0.3
metldata==0.2.3
metldata==0.3.6

python_requires = >= 3.9

Expand Down
91 changes: 59 additions & 32 deletions tests/fixtures/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,31 @@
import pytest
from ghga_service_commons.utils.crypt import KeyPair, encode_key, generate_key_pair
from ghga_service_commons.utils.simple_token import generate_token_and_hash
from ghga_service_commons.utils.utc_dates import now_as_utc
from metldata.submission_registry.models import (
StatusChange,
Submission,
SubmissionStatus,
)
from metldata.submission_registry.submission_store import SubmissionStore

from ghga_datasteward_kit.file_ingest import IngestConfig
from ghga_datasteward_kit.models import OutputMetadata

EXAMPLE_SUBMISSION = Submission(
title="test",
description="test",
content={"test_class": [{"alias": "test_alias"}]},
accession_map={"files": {"test_alias": "test_accession"}},
id="testsubmission001",
status_history=(
StatusChange(
timestamp=now_as_utc(),
new_status=SubmissionStatus.COMPLETED,
),
),
)


@dataclass
class IngestFixture:
Expand All @@ -44,35 +65,41 @@ def ingest_fixture() -> Generator[IngestFixture, None, None]:
"""Generate necessary data for file ingest."""

with TemporaryDirectory() as input_dir:
token, token_hash = generate_token_and_hash()
keypair = generate_key_pair()

file_path = Path(input_dir) / "test.json"

metadata = OutputMetadata(
alias="test",
file_uuid="happy_little_object",
original_path=file_path,
part_size=16 * 1024**2,
unencrypted_size=50 * 1024**2,
encrypted_size=50 * 1024**2 + 128,
file_secret=os.urandom(32),
unencrypted_checksum="def",
encrypted_md5_checksums=["a", "b", "c"],
encrypted_sha256_checksums=["a", "b", "c"],
)

metadata.serialize(file_path)

config = IngestConfig(
file_ingest_url="https://not-a-valid-url",
file_ingest_pubkey=encode_key(keypair.public),
input_dir=Path(input_dir),
)
yield IngestFixture(
config=config,
file_path=file_path,
token=token,
token_hash=token_hash,
keypair=keypair,
)
with TemporaryDirectory() as submission_store_dir:
token, token_hash = generate_token_and_hash()
keypair = generate_key_pair()

file_path = Path(input_dir) / "test.json"

metadata = OutputMetadata(
alias="test_alias",
file_uuid="happy_little_object",
original_path=file_path,
part_size=16 * 1024**2,
unencrypted_size=50 * 1024**2,
encrypted_size=50 * 1024**2 + 128,
file_secret=os.urandom(32),
unencrypted_checksum="def",
encrypted_md5_checksums=["a", "b", "c"],
encrypted_sha256_checksums=["a", "b", "c"],
)

metadata.serialize(file_path)

config = IngestConfig(
file_ingest_url="https://not-a-valid-url",
file_ingest_pubkey=encode_key(keypair.public),
input_dir=Path(input_dir),
submission_store_dir=Path(submission_store_dir),
)

submission_store = SubmissionStore(config=config)
submission_store.insert_new(submission=EXAMPLE_SUBMISSION)

yield IngestFixture(
config=config,
file_path=file_path,
token=token,
token_hash=token_hash,
keypair=keypair,
)
33 changes: 23 additions & 10 deletions tests/test_file_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,32 @@
import pytest
import yaml
from ghga_service_commons.utils.simple_token import generate_token
from metldata.submission_registry.submission_store import SubmissionStore
from pytest_httpx import HTTPXMock

from ghga_datasteward_kit import models
from ghga_datasteward_kit.cli.file import ingest_upload_metadata
from ghga_datasteward_kit.file_ingest import file_ingest
from tests.fixtures.ingest import IngestFixture, ingest_fixture # noqa: F401
from ghga_datasteward_kit.file_ingest import alias_to_accession, file_ingest
from tests.fixtures.ingest import ( # noqa: F401
EXAMPLE_SUBMISSION,
IngestFixture,
ingest_fixture,
)


def id_generator():
"""Generate dummy IDs."""
for i in [1, 2]:
yield f"test_{i}"
@pytest.mark.asyncio
async def test_alias_to_accession(ingest_fixture: IngestFixture): # noqa: F811
"""Test alias->accession mapping"""

submission_store = SubmissionStore(config=ingest_fixture.config)
metadata = models.OutputMetadata.load(input_path=ingest_fixture.file_path)

accession = alias_to_accession(metadata.alias, submission_store=submission_store)
example_accession = list(EXAMPLE_SUBMISSION.accession_map["files"].values())[0]
assert accession == example_accession

with pytest.raises(ValueError):
alias_to_accession("invalid_alias", submission_store=submission_store)


@pytest.mark.asyncio
Expand All @@ -40,7 +55,6 @@ async def test_ingest_directly(
httpx_mock.add_response(url=ingest_fixture.config.file_ingest_url, status_code=202)
file_ingest(
in_path=ingest_fixture.file_path,
file_id="test",
token=token,
config=ingest_fixture.config,
)
Expand All @@ -53,7 +67,6 @@ async def test_ingest_directly(
with pytest.raises(ValueError, match="Unauthorized"):
file_ingest(
in_path=ingest_fixture.file_path,
file_id="test",
token=token,
config=ingest_fixture.config,
)
Expand All @@ -68,7 +81,6 @@ async def test_ingest_directly(
):
file_ingest(
in_path=ingest_fixture.file_path,
file_id="test",
token=token,
config=ingest_fixture.config,
)
Expand All @@ -87,9 +99,10 @@ async def test_main(

config = ingest_fixture.config.dict()
config["input_dir"] = str(config["input_dir"])
config["submission_store_dir"] = str(config["submission_store_dir"])

with config_path.open("w") as config_file:
yaml.dump(config, config_file)
yaml.safe_dump(config, config_file)

monkeypatch.setattr("ghga_datasteward_kit.utils.read_token", generate_token)

Expand Down
4 changes: 4 additions & 0 deletions tests/test_s3_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pathlib import Path

import pytest
from ghga_connector.core.client import HttpxClientState
from ghga_service_commons.utils.temp_files import big_temp_file # type: ignore
from hexkit.providers.s3.testutils import ( # type: ignore
config_from_localstack_container,
Expand All @@ -36,6 +37,9 @@
@pytest.mark.asyncio
async def test_process(config_fixture: Config): # noqa: F811
"""Test whole upload/download process for s3_upload script"""

HttpxClientState.configure(3)

with LocalStackContainer(image="localstack/localstack:0.14.2").with_services(
"s3"
) as localstack:
Expand Down