Skip to content

Commit

Permalink
feat: add support for downloading datasets (#53)
Browse files Browse the repository at this point in the history
* feat: add support for zenodo datasets

Co-authored-by: Fernando Aguilar <aguilarf@ifca.unican.es>

* feat: authenticate zenodo calls

* feat: proxify Zenodo route

* fix: use `POST` instead of `GET`, to allow body

* feat: use `frozenset` to allow for caching

* feat: add zenodo help message

* feat: add `zenodo_force_pull` option

* feat: use new docker images

* feat: start using generic DOIs

* feat: allow downloading multiple datasets

* fix: update allowed routes

---------

Co-authored-by: Fernando Aguilar <aguilarf@ifca.unican.es>
Co-authored-by: Marta Obregón <obregonm@ifca.unican.es>
Co-authored-by: Saúl <sftobias@ifca.unican.es>
  • Loading branch information
4 people authored Jun 27, 2024
1 parent fd00d14 commit 55f6b77
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 6 deletions.
6 changes: 5 additions & 1 deletion ai4papi/routers/v1/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import fastapi

from . import modules, tools
from . import modules, tools, datasets


app = fastapi.APIRouter()
Expand All @@ -12,3 +12,7 @@
router=tools.router,
prefix='/catalog',
)
app.include_router(
router=datasets.app,
prefix='/datasets',
)
9 changes: 9 additions & 0 deletions ai4papi/routers/v1/catalog/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import fastapi

from . import zenodo


app = fastapi.APIRouter()
app.include_router(
router=zenodo.router,
)
129 changes: 129 additions & 0 deletions ai4papi/routers/v1/catalog/datasets/zenodo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""
This route mimics Zenodo API and return results as is.
PAPI is acting as a proxy between the Dashboard and Zenodo because we want to
make *authenticated* calls with Zenodo. And we cannot have a Zenodo token in the
Dashboard because the calls are being run on the client side (ie. the client would see
the Zenodo token).
"""

import os
import re
import requests
from typing import Union


from cachetools import cached, TTLCache
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import HTTPBearer

from ai4papi import auth


router = APIRouter(
prefix="/zenodo",
tags=["Zenodo datasets"],
responses={404: {"description": "Not found"}},
)
security = HTTPBearer()

# If available, authenticate the call to Zenodo to increase rate limit.
# https://developers.zenodo.org/#rate-limiting
API_URL = 'https://zenodo.org'
session = requests.Session()
zenodo_token = os.environ.get('ZENODO_TOKEN', None)
if zenodo_token:
session.headers = {
'Authorization': f'Bearer {zenodo_token}',
}


@cached(cache=TTLCache(maxsize=1024, ttl=6*60*60))
def _zenodo_proxy(
api_route: str,
params: Union[frozenset, None] = None,
):
"""
We use this hidden function to allow for caching responses.
Otherwise error will be raised, because "authorization" param cannot be cached
`TypeError: unhashable type: 'types.SimpleNamespace'`
**Note**:
- we use `frozenset` instead of `dict` because dicts are not hashable because they
are mutable. To convert back and forth:
```
fset = frozenset(d.items()) # to frozenset
d = dict(fset) # to dict
```
"""
# To avoid security issues, only allow a subset of Zenodo API (to avoid users
# using *our* Zenodo token to update any record)
allowed_routes = [
'^communities',
'^communities/[a-zA-Z0-9-]+/records*$',
'^records/[0-9]+',
'^records/[0-9]+/versions*$',
]
allowed = False
for i in allowed_routes:
if re.match(i, api_route):
allowed = True
break
if not allowed:
raise HTTPException(
status_code=400,
detail="Zenodo API route not allowed." \
f"Allowed routes: {allowed_routes}",
)

# Make the call
r = session.get(
f"{API_URL}/api/{api_route}",
params=params,
)

if not r.ok:
raise HTTPException(
status_code=500,
detail="Failed to query Zenodo.",
)

return r.json()


@router.post("/")
def zenodo_proxy(
api_route: str,
params: Union[dict, None] = None,
authorization=Depends(security),
):
"""
Zenodo proxy
Parameters
----------
* api_route:
For example:
- `communities/imagine-project/records`
- `records/11195949/versions`
* params:
Any additional params the Zenodo call might need for that given route.
For example, in when calling `communities/*/records`:
```
{"q": "resource_type.type:dataset"}
```
**Notes**:
The method if a POST because GET methods with body are not supported in FastAPI [1,2].
Zenodo API seems to support them, probably because it is using Elastic Search [3].
[1]: https://github.com/tiangolo/fastapi/discussions/6450
[2]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/GET
[3]: https://github.com/whatwg/fetch/issues/551
"""
# To avoid DDoS in Zenodo, only allow access to EGI authenticated users.
_ = auth.get_user_info(token=authorization.credentials)

# Convert params to frozenset
fparams = frozenset(params.items())

return _zenodo_proxy(api_route, fparams)
24 changes: 21 additions & 3 deletions ai4papi/routers/v1/deployments/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def create_deployment(
reference=user_conf,
)

# Utils validate conf
user_conf = utils.validate_conf(user_conf)

# Check if the provided configuration is within the job quotas
quotas.check_jobwise(
conf=user_conf,
Expand Down Expand Up @@ -270,9 +273,24 @@ def create_deployment(
if not user_conf['hardware']['gpu_type']:
usertask['Resources']['Devices'][0]['Constraints'] = None

# If storage credentials not provided, remove storage-related tasks
if not all(user_conf['storage'].values()):
tasks[:] = [t for t in tasks if t['Name'] not in {'storagetask', 'storagecleanup'}]
# If storage credentials not provided, remove all storage-related tasks
rclone = {k: v for k, v in user_conf['storage'].items() if k.startswith('rclone')}
if not all(rclone.values()):
exclude_tasks = ['storagetask', 'storagecleanup', 'dataset_download']
else:
# If datasets provided, replicate 'dataset_download' task as many times as needed
if user_conf['storage']['datasets']:
download_task = [t for t in tasks if t['Name'] == 'dataset_download'][0]
for i, dataset in enumerate(user_conf['storage']['datasets']):
t = deepcopy(download_task)
t['Env']['DOI'] = dataset['doi']
t['Env']['FORCE_PULL'] = dataset['doi']
t['Name'] = f'dataset_download_{i}'
tasks.append(t)
# Always exclude initial 'dataset_download' task, as it is used as template
exclude_tasks = ['dataset_download']

tasks[:] = [t for t in tasks if t['Name'] not in exclude_tasks]

# Submit job
r = nomad.create_deployment(nomad_conf)
Expand Down
27 changes: 27 additions & 0 deletions ai4papi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,30 @@ def update_values_conf(submitted, reference):
reference[k].update(submitted[k])

return reference


def validate_conf(conf):
"""
Validate user configuration
"""

# Check datasets_info list
for d in conf['storage']['datasets']:

# Validate DOI
# ref: https://stackoverflow.com/a/48524047/18471590
pattern = r"^10.\d{4,9}/[-._;()/:A-Z0-9]+$"
if not re.match(pattern, d['doi'], re.IGNORECASE):
raise HTTPException(
status_code=400,
detail="Invalid DOI."
)

# Check force pull parameter
if not isinstance(d['force_pull'], bool):
raise HTTPException(
status_code=400,
detail="Force pull should be bool."
)

return conf
2 changes: 2 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
environment:
- NOMAD_ADDR=https://193.146.75.221:4646
- ACCOUNTING_PTH=/home/ai4-accounting
- ZENODO_TOKEN=*************************
volumes:
- /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs
- /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting
Expand All @@ -22,6 +23,7 @@ services:
environment:
- NOMAD_ADDR=https://193.146.75.221:4646
- ACCOUNTING_PTH=/home/ai4-accounting
- ZENODO_TOKEN=*************************
volumes:
- /home/ubuntu/nomad-certs/nomad-prod:/home/nomad-certs
- /mnt/ai4os-logs/ai4-accounting:/home/ai4-accounting
Expand Down
40 changes: 38 additions & 2 deletions etc/modules/nomad.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,18 @@ job "userjob-${JOB_UUID}" {
task "storagetask" {
// Running task in charge of mounting storage

lifecycle {
hook = "prestart"
sidecar = true
}

driver = "docker"

config {
image = "ignacioheredia/ai4-docker-storage"
force_pull = true
image = "registry.services.ai4os.eu/ai4os/docker-storage:latest"
privileged = true
volumes = [
volumes = [
"/nomad-storage/${JOB_UUID}:/storage:shared",
]
}
Expand All @@ -126,6 +132,36 @@ job "userjob-${JOB_UUID}" {
}
}

task "dataset_download" {
// Download a dataset to the Nextcloud-mounted storage

lifecycle {
hook = "prestart"
sidecar = false
}

driver = "docker"

config {
force_pull = true
image = "registry.services.ai4os.eu/ai4os/docker-zenodo:latest"
volumes = [
"/nomad-storage/${JOB_UUID}:/storage:shared",
]
}

env {
DOI = "${DATASET_DOI}"
FORCE_PULL = "${DATASET_FORCE_PULL}"
}

resources {
cpu = 50
memory = 2000
}

}

task "usertask" {
// Task configured by the user (deepaas, jupyter, vscode)

Expand Down
5 changes: 5 additions & 0 deletions etc/modules/user.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ storage:
rclone_password:
name: RCLONE user password
value: ''

datasets:
name: Info of the datasets you want to download
value: []
description: Each element in the list should be a dict containing "doi" and "force_pull" keys. It requires the definition of all RCLONE variables.

0 comments on commit 55f6b77

Please # to comment.