diff --git a/ai4papi/routers/v1/catalog/__init__.py b/ai4papi/routers/v1/catalog/__init__.py index 605f3180..90336b10 100644 --- a/ai4papi/routers/v1/catalog/__init__.py +++ b/ai4papi/routers/v1/catalog/__init__.py @@ -1,6 +1,6 @@ import fastapi -from . import modules, tools +from . import modules, tools, datasets app = fastapi.APIRouter() @@ -12,3 +12,7 @@ router=tools.router, prefix='/catalog', ) +app.include_router( + router=datasets.app, + prefix='/datasets', + ) diff --git a/ai4papi/routers/v1/catalog/datasets/__init__.py b/ai4papi/routers/v1/catalog/datasets/__init__.py new file mode 100644 index 00000000..03cd6a45 --- /dev/null +++ b/ai4papi/routers/v1/catalog/datasets/__init__.py @@ -0,0 +1,9 @@ +import fastapi + +from . import zenodo + + +app = fastapi.APIRouter() +app.include_router( + router=zenodo.router, + ) diff --git a/ai4papi/routers/v1/catalog/datasets/zenodo.py b/ai4papi/routers/v1/catalog/datasets/zenodo.py new file mode 100644 index 00000000..77d3ac61 --- /dev/null +++ b/ai4papi/routers/v1/catalog/datasets/zenodo.py @@ -0,0 +1,129 @@ +""" +This route mimics Zenodo API and return results as is. + +PAPI is acting as a proxy between the Dashboard and Zenodo because we want to +make *authenticated* calls with Zenodo. And we cannot have a Zenodo token in the +Dashboard because the calls are being run on the client side (ie. the client would see +the Zenodo token). +""" + +import os +import re +import requests +from typing import Union + + +from cachetools import cached, TTLCache +from fastapi import APIRouter, Depends, HTTPException +from fastapi.security import HTTPBearer + +from ai4papi import auth + + +router = APIRouter( + prefix="/zenodo", + tags=["Zenodo datasets"], + responses={404: {"description": "Not found"}}, +) +security = HTTPBearer() + +# If available, authenticate the call to Zenodo to increase rate limit. +# https://developers.zenodo.org/#rate-limiting +API_URL = 'https://zenodo.org' +session = requests.Session() +zenodo_token = os.environ.get('ZENODO_TOKEN', None) +if zenodo_token: + session.headers = { + 'Authorization': f'Bearer {zenodo_token}', + } + + +@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60)) +def _zenodo_proxy( + api_route: str, + params: Union[frozenset, None] = None, + ): + """ + We use this hidden function to allow for caching responses. + Otherwise error will be raised, because "authorization" param cannot be cached + `TypeError: unhashable type: 'types.SimpleNamespace'` + + **Note**: + - we use `frozenset` instead of `dict` because dicts are not hashable because they + are mutable. To convert back and forth: + ``` + fset = frozenset(d.items()) # to frozenset + d = dict(fset) # to dict + ``` + """ + # To avoid security issues, only allow a subset of Zenodo API (to avoid users + # using *our* Zenodo token to update any record) + allowed_routes = [ + '^communities', + '^communities/[a-zA-Z0-9-]+/records*$', + '^records/[0-9]+', + '^records/[0-9]+/versions*$', + ] + allowed = False + for i in allowed_routes: + if re.match(i, api_route): + allowed = True + break + if not allowed: + raise HTTPException( + status_code=400, + detail="Zenodo API route not allowed." \ + f"Allowed routes: {allowed_routes}", + ) + + # Make the call + r = session.get( + f"{API_URL}/api/{api_route}", + params=params, + ) + + if not r.ok: + raise HTTPException( + status_code=500, + detail="Failed to query Zenodo.", + ) + + return r.json() + + +@router.post("/") +def zenodo_proxy( + api_route: str, + params: Union[dict, None] = None, + authorization=Depends(security), + ): + """ + Zenodo proxy + + Parameters + ---------- + * api_route: + For example: + - `communities/imagine-project/records` + - `records/11195949/versions` + * params: + Any additional params the Zenodo call might need for that given route. + For example, in when calling `communities/*/records`: + ``` + {"q": "resource_type.type:dataset"} + ``` + + **Notes**: + The method if a POST because GET methods with body are not supported in FastAPI [1,2]. + Zenodo API seems to support them, probably because it is using Elastic Search [3]. + [1]: https://github.com/tiangolo/fastapi/discussions/6450 + [2]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/GET + [3]: https://github.com/whatwg/fetch/issues/551 + """ + # To avoid DDoS in Zenodo, only allow access to EGI authenticated users. + _ = auth.get_user_info(token=authorization.credentials) + + # Convert params to frozenset + fparams = frozenset(params.items()) + + return _zenodo_proxy(api_route, fparams) diff --git a/ai4papi/routers/v1/deployments/modules.py b/ai4papi/routers/v1/deployments/modules.py index ee7cfb77..10e2e9d5 100644 --- a/ai4papi/routers/v1/deployments/modules.py +++ b/ai4papi/routers/v1/deployments/modules.py @@ -177,6 +177,9 @@ def create_deployment( reference=user_conf, ) + # Utils validate conf + user_conf = utils.validate_conf(user_conf) + # Check if the provided configuration is within the job quotas quotas.check_jobwise( conf=user_conf, @@ -270,9 +273,24 @@ def create_deployment( if not user_conf['hardware']['gpu_type']: usertask['Resources']['Devices'][0]['Constraints'] = None - # If storage credentials not provided, remove storage-related tasks - if not all(user_conf['storage'].values()): - tasks[:] = [t for t in tasks if t['Name'] not in {'storagetask', 'storagecleanup'}] + # If storage credentials not provided, remove all storage-related tasks + rclone = {k: v for k, v in user_conf['storage'].items() if k.startswith('rclone')} + if not all(rclone.values()): + exclude_tasks = ['storagetask', 'storagecleanup', 'dataset_download'] + else: + # If datasets provided, replicate 'dataset_download' task as many times as needed + if user_conf['storage']['datasets']: + download_task = [t for t in tasks if t['Name'] == 'dataset_download'][0] + for i, dataset in enumerate(user_conf['storage']['datasets']): + t = deepcopy(download_task) + t['Env']['DOI'] = dataset['doi'] + t['Env']['FORCE_PULL'] = dataset['doi'] + t['Name'] = f'dataset_download_{i}' + tasks.append(t) + # Always exclude initial 'dataset_download' task, as it is used as template + exclude_tasks = ['dataset_download'] + + tasks[:] = [t for t in tasks if t['Name'] not in exclude_tasks] # Submit job r = nomad.create_deployment(nomad_conf) diff --git a/ai4papi/utils.py b/ai4papi/utils.py index 36ca27de..baabdd00 100644 --- a/ai4papi/utils.py +++ b/ai4papi/utils.py @@ -141,3 +141,30 @@ def update_values_conf(submitted, reference): reference[k].update(submitted[k]) return reference + + +def validate_conf(conf): + """ + Validate user configuration + """ + + # Check datasets_info list + for d in conf['storage']['datasets']: + + # Validate DOI + # ref: https://stackoverflow.com/a/48524047/18471590 + pattern = r"^10.\d{4,9}/[-._;()/:A-Z0-9]+$" + if not re.match(pattern, d['doi'], re.IGNORECASE): + raise HTTPException( + status_code=400, + detail="Invalid DOI." + ) + + # Check force pull parameter + if not isinstance(d['force_pull'], bool): + raise HTTPException( + status_code=400, + detail="Force pull should be bool." + ) + + return conf diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 665775d1..13b70478 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -8,6 +8,7 @@ services: environment: - NOMAD_ADDR=https://193.146.75.221:4646 - ACCOUNTING_PTH=/home/ai4-accounting + - ZENODO_TOKEN=************************* volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting @@ -22,6 +23,7 @@ services: environment: - NOMAD_ADDR=https://193.146.75.221:4646 - ACCOUNTING_PTH=/home/ai4-accounting + - ZENODO_TOKEN=************************* volumes: - /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs - /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting diff --git a/etc/modules/nomad.hcl b/etc/modules/nomad.hcl index 7ac87bc6..4c2426f0 100644 --- a/etc/modules/nomad.hcl +++ b/etc/modules/nomad.hcl @@ -99,12 +99,18 @@ job "userjob-${JOB_UUID}" { task "storagetask" { // Running task in charge of mounting storage + lifecycle { + hook = "prestart" + sidecar = true + } + driver = "docker" config { - image = "ignacioheredia/ai4-docker-storage" + force_pull = true + image = "registry.services.ai4os.eu/ai4os/docker-storage:latest" privileged = true - volumes = [ + volumes = [ "/nomad-storage/${JOB_UUID}:/storage:shared", ] } @@ -126,6 +132,36 @@ job "userjob-${JOB_UUID}" { } } + task "dataset_download" { + // Download a dataset to the Nextcloud-mounted storage + + lifecycle { + hook = "prestart" + sidecar = false + } + + driver = "docker" + + config { + force_pull = true + image = "registry.services.ai4os.eu/ai4os/docker-zenodo:latest" + volumes = [ + "/nomad-storage/${JOB_UUID}:/storage:shared", + ] + } + + env { + DOI = "${DATASET_DOI}" + FORCE_PULL = "${DATASET_FORCE_PULL}" + } + + resources { + cpu = 50 + memory = 2000 + } + + } + task "usertask" { // Task configured by the user (deepaas, jupyter, vscode) diff --git a/etc/modules/user.yaml b/etc/modules/user.yaml index 6d0d83f0..b9196c10 100644 --- a/etc/modules/user.yaml +++ b/etc/modules/user.yaml @@ -102,3 +102,8 @@ storage: rclone_password: name: RCLONE user password value: '' + + datasets: + name: Info of the datasets you want to download + value: [] + description: Each element in the list should be a dict containing "doi" and "force_pull" keys. It requires the definition of all RCLONE variables.