Skip to content

Commit

Permalink
Merge commit '5f375f698b57ad735b1ae56ef7bbdeceaf1371fe' as 'diva/conn…
Browse files Browse the repository at this point in the history
…ector'
  • Loading branch information
Nicolò Bertozzi committed Apr 19, 2024
2 parents b31500e + 5f375f6 commit fa13a52
Show file tree
Hide file tree
Showing 22 changed files with 952 additions and 0 deletions.
21 changes: 21 additions & 0 deletions diva/connector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM python:3.10

LABEL maintainer="nicolo.bertozzi@linksfoundation.com"
LABEL org.label-schema.schema-version = "1.0"
LABEL org.label-schema.vendor = "LINKS Foundation, Turin, Italy"
LABEL org.label-schema.description = "S3 Connector Module"

ENV PYTHONUNBUFFERED 1
ENV POETRY_VERSION 1.1.11

RUN pip install "poetry==$POETRY_VERSION"

WORKDIR /app

COPY pyproject.toml poetry.lock /app/

RUN poetry install
RUN pip install aiokafka
COPY /app /app

ENTRYPOINT [ "poetry", "run", "python", "-m", "main" ]
10 changes: 10 additions & 0 deletions diva/connector/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# DIVA S3 Connector

## Overview
Custom connector created for MODERATE

## Acknowledgements
Acknowledgement of funding and support from the European Union’s Horizon Europe research and innovation programme.

## Contact
Maintained by Nicolò Bertozzi and Edoardo Pristeri, Links Foundation, Turin, Italy.
Empty file added diva/connector/app/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions diva/connector/app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio

from services import manager, inbound_connector, cron

async def main():
await asyncio.gather(
manager.start(),
inbound_connector.start(),
cron.start()
)

if __name__ == "__main__":
loop = asyncio.get_event_loop()

try:
asyncio.run(main())
except KeyboardInterrupt:
loop.close()
7 changes: 7 additions & 0 deletions diva/connector/app/model/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pydantic import BaseModel
from typing import Optional

class Dataset(BaseModel):

key: Optional[str]
url: Optional[str]
6 changes: 6 additions & 0 deletions diva/connector/app/model/message_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel

class MessageKey(BaseModel):

dataset: str
timestamp: int
Empty file.
31 changes: 31 additions & 0 deletions diva/connector/app/serde/proto/sample.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";
package dq;

message Sample {
uint64 ts = 1;
States state = 2;
string key = 3;
map<string, FloatArray> float_data = 4;
map<string, StringArray> string_data = 5;
map<string, BoolArray> bool_data = 6;
string dataset = 7;
map<string, string> metadata = 8;

enum States {
RAW = 0;
NEW = 1;
VALID = 2;
FAIL = 3;
AGGREGATED = 4;
ANONYMIZED = 5;
}
}
message FloatArray {
repeated float element = 1;
}
message StringArray {
repeated string element = 1;
}
message BoolArray {
repeated bool element = 1;
}
50 changes: 50 additions & 0 deletions diva/connector/app/serde/sample_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Empty file.
107 changes: 107 additions & 0 deletions diva/connector/app/services/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import asyncio
import aiohttp

from utils.queues import cron_queue
from utils.settings import settings

from serde import sample_pb2

from model.dataset import Dataset

import json

from aiologger import Logger
from aiologger.formatters.base import Formatter
formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s')
logger = Logger.with_default_handlers(formatter=formatter)


"""Gets the authentication JWT token
Returns:
token (str): the JWT just requested
"""
async def get_token():
data = {
'username': settings.auth_user,
'password': settings.auth_password
}

async with aiohttp.ClientSession() as session:
async with session.post(settings.auth_url, data=data) as response:
if response.status == 200:
logger.info(f"JWT Token Success")
text_response = await response.text()
return json.loads(text_response)['access_token']
else:
logger.error(f"Error: {response.status} - {await response.text()}")
return None

"""Gets the list of assets
Parameters:
access_token (str): the JWT token for authenticating the request
"""
async def get_assets(access_token):
headers = {
'Authorization': f'Bearer {access_token}'
}

async with aiohttp.ClientSession() as session:
async with session.get(settings.asset_url, headers=headers) as response:
if response.status == 200:
logger.info(f"Assets List Success")
return await response.json()
else:
logger.error(f"Error: {response.status} - {await response.text()}")
return None

"""Gets the URL of each asset object
Parameters:
access_token (str): the JWT token for authenticating the request
asset_id (int): the asset for which it is requested to get the download URLs
"""
async def get_urls(access_token, asset_id):
headers = {
'Authorization': f'Bearer {access_token}'
}

async with aiohttp.ClientSession() as session:
async with session.get(f"{settings.asset_url}/{asset_id}/download-urls", headers=headers) as response:
if response.status == 200:
logger.info(f"Download URLs for Asset {asset_id} Success")
return await response.json()
else:
logger.error(f"Error: {response.status} - {await response.text()}")
return None

"""Executes the assets pooling
"""
async def cron():
past_keys = []

while True:
token = await get_token()
if token is not None:
assets = await get_assets(token)

for asset in assets:
asset_objects = await get_urls(token, asset["id"])
for obj in asset_objects:

if obj["key"] not in past_keys:
dataset = Dataset(
key=obj["key"],
url=obj["download_url"]
)

past_keys.append(obj["key"])

logger.info(f"Sent new asset url: {dataset}")
await cron_queue.put(dataset)

await asyncio.sleep(settings.polling_time)

async def start():
await cron()
46 changes: 46 additions & 0 deletions diva/connector/app/services/inbound_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
import time
import json

import serde.sample_pb2 as s
from model.dataset import Dataset
from utils.queues import cron_queue, connector_queue

import pandas as pd

from aiologger import Logger
from aiologger.formatters.base import Formatter
formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s')
logger = Logger.with_default_handlers(formatter=formatter)

"""Converts the outgoing sample into JSON format.
"""
async def convert_in():
while True:
dataset: Dataset = await cron_queue.get()

try:
df = pd.read_parquet(dataset.url, engine='pyarrow')

for i, d in df.iterrows():

string_data = {
"topic": s.StringArray(element=[dataset.key]),
"JSON": s.StringArray(element=[json.dumps(d.to_dict())])
}

sample = s.Sample(
ts=time.time_ns()//1000,
state=s.Sample.States.NEW,
dataset=dataset.key,
string_data=string_data
)

logger.info(f"Published new sample for {dataset.key}")
await connector_queue.put(sample)

except:
logger.error(f"Invalid Dataset with key {dataset.key}")

async def start():
await convert_in()
72 changes: 72 additions & 0 deletions diva/connector/app/services/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import asyncio
import functools

from aiokafka import ConsumerRecord
from model.message_key import MessageKey

from utils.kafka import create_connector_consumer, create_producer
from utils.queues import connector_queue
from utils.checks import parse_message, check_state, check_dataset
from utils.settings import settings

from serde.sample_pb2 import Sample

from aiologger import Logger
from aiologger.formatters.base import Formatter
formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s')
logger = Logger.with_default_handlers(formatter=formatter)

"""It consumes the messages received from the connector topic
Parameters:
item (Sample): the sample from which the method has to build the message key
Returns:
key (str): JSON formatted key
"""
async def create_key(item):
import time

if (item.state == Sample.States.NEW):
ns = time.time_ns()//10**3
item.ts = ns
key = MessageKey(
dataset=item.dataset,
timestamp=ns
)
else:
key = MessageKey(
dataset=item.dataset,
timestamp=item.ts
)

return key.json()

"""It publishes messages into the connector topic
"""
async def produce_connector():
producer = create_producer()
await producer.start()
await logger.info("Connector Producer STARTED")

try:
while True:
item = await connector_queue.get()
key = await create_key(item)
await producer.send_and_wait(
topic=settings.connector_topic,
key=key,
value=item.SerializeToString(),
partition=settings.connector_partition
)

await logger.info(f"Published Connector Message, ts: {item.ts}")
finally:
await producer.stop()

async def start():
logger.info("Manager STARTED")

await asyncio.gather(
produce_connector()
)
Empty file.
Loading

0 comments on commit fa13a52

Please # to comment.