diff --git a/diva/connector/Dockerfile b/diva/connector/Dockerfile new file mode 100755 index 0000000..916912e --- /dev/null +++ b/diva/connector/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.10 + +LABEL maintainer="nicolo.bertozzi@linksfoundation.com" +LABEL org.label-schema.schema-version = "1.0" +LABEL org.label-schema.vendor = "LINKS Foundation, Turin, Italy" +LABEL org.label-schema.description = "S3 Connector Module" + +ENV PYTHONUNBUFFERED 1 +ENV POETRY_VERSION 1.1.11 + +RUN pip install "poetry==$POETRY_VERSION" + +WORKDIR /app + +COPY pyproject.toml poetry.lock /app/ + +RUN poetry install +RUN pip install aiokafka +COPY /app /app + +ENTRYPOINT [ "poetry", "run", "python", "-m", "main" ] \ No newline at end of file diff --git a/diva/connector/README.md b/diva/connector/README.md new file mode 100644 index 0000000..111032e --- /dev/null +++ b/diva/connector/README.md @@ -0,0 +1,10 @@ +# DIVA S3 Connector + +## Overview +Custom connector created for MODERATE + +## Acknowledgements +Acknowledgement of funding and support from the European Union’s Horizon Europe research and innovation programme. + +## Contact +Maintained by Nicolò Bertozzi and Edoardo Pristeri, Links Foundation, Turin, Italy. diff --git a/diva/connector/app/__init__.py b/diva/connector/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/diva/connector/app/main.py b/diva/connector/app/main.py new file mode 100755 index 0000000..ae0e983 --- /dev/null +++ b/diva/connector/app/main.py @@ -0,0 +1,18 @@ +import asyncio + +from services import manager, inbound_connector, cron + +async def main(): + await asyncio.gather( + manager.start(), + inbound_connector.start(), + cron.start() + ) + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + + try: + asyncio.run(main()) + except KeyboardInterrupt: + loop.close() \ No newline at end of file diff --git a/diva/connector/app/model/dataset.py b/diva/connector/app/model/dataset.py new file mode 100755 index 0000000..207afd9 --- /dev/null +++ b/diva/connector/app/model/dataset.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel +from typing import Optional + +class Dataset(BaseModel): + + key: Optional[str] + url: Optional[str] \ No newline at end of file diff --git a/diva/connector/app/model/message_key.py b/diva/connector/app/model/message_key.py new file mode 100755 index 0000000..30def22 --- /dev/null +++ b/diva/connector/app/model/message_key.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + +class MessageKey(BaseModel): + + dataset: str + timestamp: int \ No newline at end of file diff --git a/diva/connector/app/serde/__init__.py b/diva/connector/app/serde/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/diva/connector/app/serde/proto/sample.proto b/diva/connector/app/serde/proto/sample.proto new file mode 100755 index 0000000..b005425 --- /dev/null +++ b/diva/connector/app/serde/proto/sample.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; +package dq; + +message Sample { + uint64 ts = 1; + States state = 2; + string key = 3; + map float_data = 4; + map string_data = 5; + map bool_data = 6; + string dataset = 7; + map metadata = 8; + + enum States { + RAW = 0; + NEW = 1; + VALID = 2; + FAIL = 3; + AGGREGATED = 4; + ANONYMIZED = 5; + } +} +message FloatArray { + repeated float element = 1; +} +message StringArray { + repeated string element = 1; +} +message BoolArray { + repeated bool element = 1; +} diff --git a/diva/connector/app/serde/sample_pb2.py b/diva/connector/app/serde/sample_pb2.py new file mode 100755 index 0000000..fcdc5c2 --- /dev/null +++ b/diva/connector/app/serde/sample_pb2.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: sample.proto +# Protobuf Python Version: 4.25.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0csample.proto\x12\x02\x64q\"\xd5\x04\n\x06Sample\x12\n\n\x02ts\x18\x01 \x01(\x04\x12 \n\x05state\x18\x02 \x01(\x0e\x32\x11.dq.Sample.States\x12\x0b\n\x03key\x18\x03 \x01(\t\x12-\n\nfloat_data\x18\x04 \x03(\x0b\x32\x19.dq.Sample.FloatDataEntry\x12/\n\x0bstring_data\x18\x05 \x03(\x0b\x32\x1a.dq.Sample.StringDataEntry\x12+\n\tbool_data\x18\x06 \x03(\x0b\x32\x18.dq.Sample.BoolDataEntry\x12\x0f\n\x07\x64\x61taset\x18\x07 \x01(\t\x12*\n\x08metadata\x18\x08 \x03(\x0b\x32\x18.dq.Sample.MetadataEntry\x1a@\n\x0e\x46loatDataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1d\n\x05value\x18\x02 \x01(\x0b\x32\x0e.dq.FloatArray:\x02\x38\x01\x1a\x42\n\x0fStringDataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1e\n\x05value\x18\x02 \x01(\x0b\x32\x0f.dq.StringArray:\x02\x38\x01\x1a>\n\rBoolDataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x1c\n\x05value\x18\x02 \x01(\x0b\x32\r.dq.BoolArray:\x02\x38\x01\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"O\n\x06States\x12\x07\n\x03RAW\x10\x00\x12\x07\n\x03NEW\x10\x01\x12\t\n\x05VALID\x10\x02\x12\x08\n\x04\x46\x41IL\x10\x03\x12\x0e\n\nAGGREGATED\x10\x04\x12\x0e\n\nANONYMIZED\x10\x05\"\x1d\n\nFloatArray\x12\x0f\n\x07\x65lement\x18\x01 \x03(\x02\"\x1e\n\x0bStringArray\x12\x0f\n\x07\x65lement\x18\x01 \x03(\t\"\x1c\n\tBoolArray\x12\x0f\n\x07\x65lement\x18\x01 \x03(\x08\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'sample_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_SAMPLE_FLOATDATAENTRY']._options = None + _globals['_SAMPLE_FLOATDATAENTRY']._serialized_options = b'8\001' + _globals['_SAMPLE_STRINGDATAENTRY']._options = None + _globals['_SAMPLE_STRINGDATAENTRY']._serialized_options = b'8\001' + _globals['_SAMPLE_BOOLDATAENTRY']._options = None + _globals['_SAMPLE_BOOLDATAENTRY']._serialized_options = b'8\001' + _globals['_SAMPLE_METADATAENTRY']._options = None + _globals['_SAMPLE_METADATAENTRY']._serialized_options = b'8\001' + _globals['_SAMPLE']._serialized_start=21 + _globals['_SAMPLE']._serialized_end=618 + _globals['_SAMPLE_FLOATDATAENTRY']._serialized_start=292 + _globals['_SAMPLE_FLOATDATAENTRY']._serialized_end=356 + _globals['_SAMPLE_STRINGDATAENTRY']._serialized_start=358 + _globals['_SAMPLE_STRINGDATAENTRY']._serialized_end=424 + _globals['_SAMPLE_BOOLDATAENTRY']._serialized_start=426 + _globals['_SAMPLE_BOOLDATAENTRY']._serialized_end=488 + _globals['_SAMPLE_METADATAENTRY']._serialized_start=490 + _globals['_SAMPLE_METADATAENTRY']._serialized_end=537 + _globals['_SAMPLE_STATES']._serialized_start=539 + _globals['_SAMPLE_STATES']._serialized_end=618 + _globals['_FLOATARRAY']._serialized_start=620 + _globals['_FLOATARRAY']._serialized_end=649 + _globals['_STRINGARRAY']._serialized_start=651 + _globals['_STRINGARRAY']._serialized_end=681 + _globals['_BOOLARRAY']._serialized_start=683 + _globals['_BOOLARRAY']._serialized_end=711 +# @@protoc_insertion_point(module_scope) diff --git a/diva/connector/app/services/__init__.py b/diva/connector/app/services/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/diva/connector/app/services/cron.py b/diva/connector/app/services/cron.py new file mode 100755 index 0000000..26342f1 --- /dev/null +++ b/diva/connector/app/services/cron.py @@ -0,0 +1,107 @@ +import asyncio +import aiohttp + +from utils.queues import cron_queue +from utils.settings import settings + +from serde import sample_pb2 + +from model.dataset import Dataset + +import json + +from aiologger import Logger +from aiologger.formatters.base import Formatter +formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s') +logger = Logger.with_default_handlers(formatter=formatter) + + +"""Gets the authentication JWT token + +Returns: + token (str): the JWT just requested +""" +async def get_token(): + data = { + 'username': settings.auth_user, + 'password': settings.auth_password + } + + async with aiohttp.ClientSession() as session: + async with session.post(settings.auth_url, data=data) as response: + if response.status == 200: + logger.info(f"JWT Token Success") + text_response = await response.text() + return json.loads(text_response)['access_token'] + else: + logger.error(f"Error: {response.status} - {await response.text()}") + return None + +"""Gets the list of assets + +Parameters: + access_token (str): the JWT token for authenticating the request +""" +async def get_assets(access_token): + headers = { + 'Authorization': f'Bearer {access_token}' + } + + async with aiohttp.ClientSession() as session: + async with session.get(settings.asset_url, headers=headers) as response: + if response.status == 200: + logger.info(f"Assets List Success") + return await response.json() + else: + logger.error(f"Error: {response.status} - {await response.text()}") + return None + +"""Gets the URL of each asset object + +Parameters: + access_token (str): the JWT token for authenticating the request + asset_id (int): the asset for which it is requested to get the download URLs +""" +async def get_urls(access_token, asset_id): + headers = { + 'Authorization': f'Bearer {access_token}' + } + + async with aiohttp.ClientSession() as session: + async with session.get(f"{settings.asset_url}/{asset_id}/download-urls", headers=headers) as response: + if response.status == 200: + logger.info(f"Download URLs for Asset {asset_id} Success") + return await response.json() + else: + logger.error(f"Error: {response.status} - {await response.text()}") + return None + +"""Executes the assets pooling +""" +async def cron(): + past_keys = [] + + while True: + token = await get_token() + if token is not None: + assets = await get_assets(token) + + for asset in assets: + asset_objects = await get_urls(token, asset["id"]) + for obj in asset_objects: + + if obj["key"] not in past_keys: + dataset = Dataset( + key=obj["key"], + url=obj["download_url"] + ) + + past_keys.append(obj["key"]) + + logger.info(f"Sent new asset url: {dataset}") + await cron_queue.put(dataset) + + await asyncio.sleep(settings.polling_time) + +async def start(): + await cron() \ No newline at end of file diff --git a/diva/connector/app/services/inbound_connector.py b/diva/connector/app/services/inbound_connector.py new file mode 100755 index 0000000..69241a0 --- /dev/null +++ b/diva/connector/app/services/inbound_connector.py @@ -0,0 +1,46 @@ +import asyncio +import time +import json + +import serde.sample_pb2 as s +from model.dataset import Dataset +from utils.queues import cron_queue, connector_queue + +import pandas as pd + +from aiologger import Logger +from aiologger.formatters.base import Formatter +formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s') +logger = Logger.with_default_handlers(formatter=formatter) + +"""Converts the outgoing sample into JSON format. +""" +async def convert_in(): + while True: + dataset: Dataset = await cron_queue.get() + + try: + df = pd.read_parquet(dataset.url, engine='pyarrow') + + for i, d in df.iterrows(): + + string_data = { + "topic": s.StringArray(element=[dataset.key]), + "JSON": s.StringArray(element=[json.dumps(d.to_dict())]) + } + + sample = s.Sample( + ts=time.time_ns()//1000, + state=s.Sample.States.NEW, + dataset=dataset.key, + string_data=string_data + ) + + logger.info(f"Published new sample for {dataset.key}") + await connector_queue.put(sample) + + except: + logger.error(f"Invalid Dataset with key {dataset.key}") + +async def start(): + await convert_in() \ No newline at end of file diff --git a/diva/connector/app/services/manager.py b/diva/connector/app/services/manager.py new file mode 100755 index 0000000..f598dcf --- /dev/null +++ b/diva/connector/app/services/manager.py @@ -0,0 +1,72 @@ +import asyncio +import functools + +from aiokafka import ConsumerRecord +from model.message_key import MessageKey + +from utils.kafka import create_connector_consumer, create_producer +from utils.queues import connector_queue +from utils.checks import parse_message, check_state, check_dataset +from utils.settings import settings + +from serde.sample_pb2 import Sample + +from aiologger import Logger +from aiologger.formatters.base import Formatter +formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s') +logger = Logger.with_default_handlers(formatter=formatter) + +"""It consumes the messages received from the connector topic + +Parameters: + item (Sample): the sample from which the method has to build the message key + +Returns: + key (str): JSON formatted key +""" +async def create_key(item): + import time + + if (item.state == Sample.States.NEW): + ns = time.time_ns()//10**3 + item.ts = ns + key = MessageKey( + dataset=item.dataset, + timestamp=ns + ) + else: + key = MessageKey( + dataset=item.dataset, + timestamp=item.ts + ) + + return key.json() + +"""It publishes messages into the connector topic +""" +async def produce_connector(): + producer = create_producer() + await producer.start() + await logger.info("Connector Producer STARTED") + + try: + while True: + item = await connector_queue.get() + key = await create_key(item) + await producer.send_and_wait( + topic=settings.connector_topic, + key=key, + value=item.SerializeToString(), + partition=settings.connector_partition + ) + + await logger.info(f"Published Connector Message, ts: {item.ts}") + finally: + await producer.stop() + +async def start(): + logger.info("Manager STARTED") + + await asyncio.gather( + produce_connector() + ) \ No newline at end of file diff --git a/diva/connector/app/utils/__init__.py b/diva/connector/app/utils/__init__.py new file mode 100755 index 0000000..e69de29 diff --git a/diva/connector/app/utils/checks.py b/diva/connector/app/utils/checks.py new file mode 100755 index 0000000..9dc2499 --- /dev/null +++ b/diva/connector/app/utils/checks.py @@ -0,0 +1,78 @@ +import functools + +import json +from aiokafka import ConsumerRecord + +from serde.sample_pb2 import Sample +from model.message_key import MessageKey + +from aiologger import Logger +from aiologger.formatters.base import Formatter +from aiologger.levels import LogLevel +formatter = Formatter('[%(asctime)s][%(module)s][%(levelname)s]: %(message)s') +logger = Logger.with_default_handlers(formatter=formatter, level=LogLevel.DEBUG) + +"""This annotation verifies if the dataset of the message is correct or not. + +Parameters: + datasets (str): list of allowed datasets +""" +def check_dataset(datasets: str): + def wrapper(func): + @functools.wraps(func) + async def wrapped(msg: Sample): + datasets_list = datasets.split(",") + key = MessageKey(**json.loads(msg.key)) + if key.dataset in datasets_list: + logger.debug("Dataset: OK") + return await func(msg) + else: + logger.debug("Dataset: NO") + return await func(None) + return wrapped + return wrapper + +"""This annotation verifies if the state of the message is correct or not. + +Parameters: + states (str): list of allowed states +""" +def check_state(states: str): + def wrapper(func): + @functools.wraps(func) + async def wrapped(msg: Sample): + try: + states_list = [Sample.States.Value(state) for state in states.split(",")] + if msg.state in states_list: + logger.debug("State: OK") + return await func(msg) + else: + logger.debug(f"State: NO, received {Sample.States.Name(msg.state)} requested {states}") + return await func(None) + + except ValueError: + logger.error("State: NOT FOUND") + return await func(None) + + return wrapped + return wrapper + +"""This annotation deserialize the incoming messages. + +Parameters: + serde (Protobuf class): the protocol buffer supporting class +""" +def parse_message(serde): + def wrapper(func): + @functools.wraps(func) + async def wrapped(msg: ConsumerRecord): + try: + message = serde() + message.ParseFromString(msg.value) + logger.debug("Parsing: OK") + return await func(message) + except Exception: + logger.debug("Dataset: ERROR") + return await func(None) + return wrapped + return wrapper \ No newline at end of file diff --git a/diva/connector/app/utils/kafka.py b/diva/connector/app/utils/kafka.py new file mode 100755 index 0000000..7b331d1 --- /dev/null +++ b/diva/connector/app/utils/kafka.py @@ -0,0 +1,33 @@ +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer +from utils.settings import settings + +import ssl +import certifi + +"""It creates a Kafka Producer + +Parameters: + topic (str): topic to subscribe to + +Returns: + AIOKafkaProducer: Kafka async producer object +""" +def create_producer() -> AIOKafkaProducer: + context = ssl.create_default_context() + context.load_verify_locations(cafile=settings.ssl_cafile) + + return AIOKafkaProducer( + bootstrap_servers=settings.bootstrap_servers, + security_protocol = settings.security_protocol, + sasl_mechanism = settings.sasl_mechanism, + sasl_plain_username = settings.sasl_plain_username, + sasl_plain_password = settings.sasl_plain_password, + ssl_context = context, + key_serializer=lambda v:v.encode(), + value_serializer=lambda v: v + ) + +"""It creates a consumer for the conncector topic +""" +def create_connector_consumer() -> AIOKafkaConsumer: + return create_consumer(settings.connector_topic) \ No newline at end of file diff --git a/diva/connector/app/utils/queues.py b/diva/connector/app/utils/queues.py new file mode 100755 index 0000000..b4c5f9a --- /dev/null +++ b/diva/connector/app/utils/queues.py @@ -0,0 +1,9 @@ +import asyncio + +# Queue of messages sent to the inbound converter service +inbound_queue = asyncio.Queue() +# Queue of messages sent to the cron service +cron_queue = asyncio.Queue() + +# Queue of messages that need to be published inside the DIVA +connector_queue = asyncio.Queue() \ No newline at end of file diff --git a/diva/connector/app/utils/settings.py b/diva/connector/app/utils/settings.py new file mode 100755 index 0000000..5d780dc --- /dev/null +++ b/diva/connector/app/utils/settings.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +import os + +@dataclass +class Settings: + + bootstrap_servers: str = os.getenv("KAFKA_BOOTSTRAP_SERVERS") or "csc-ai.csc-ai.csc:9092" + group_id: str = os.getenv("KAFKA_GROUP_ID") or "moderate" + security_protocol: str = "SASL_SSL" + sasl_mechanism: str = "PLAIN" + sasl_plain_username: str = os.getenv("KAFKA_SASL_USERNAME") or "kafka-dq" + sasl_plain_password: str = os.getenv("KAFKA_SASL_PASSWORD") or "KAFKA!dq!2023" + ssl_cafile: str = os.getenv("KAFKA_SSL_CAFILE") or "utils/ca.crt" + + connector_topic: str = os.getenv("CONNECTOR_TOPIC") or "connector" + connector_partition: str = int(os.getenv("CONNECTOR_PARTITION") or "0") + input_state: str = os.getenv("INPUT_STATE") or "RAW" + output_state: str = os.getenv("OUTPUT_STATE") or "VALID,ANONYMIZED" + dataset_names: str = os.getenv("DATASET_NAMES") or "moderate/lombardia" + + auth_url: str = os.getenv("AUTH_URL") or "https://api.gw.moderate.cloud/api/token" + auth_user: str = os.getenv("AUTH_USER") or "nicolo" + auth_password: str = os.getenv("AUTH_PASS") or "MODERATE_Links!2024!" + + asset_url: str = os.getenv("ASSET_URL") or "https://api.gw.moderate.cloud/asset" + + polling_time: int = int(os.getenv("POLLING_TIME") or "60") + +settings = Settings() \ No newline at end of file diff --git a/diva/connector/docker-compose-build.yaml b/diva/connector/docker-compose-build.yaml new file mode 100755 index 0000000..94a471e --- /dev/null +++ b/diva/connector/docker-compose-build.yaml @@ -0,0 +1,7 @@ +services: + s3-connector: + image: maestri.ismb.it:5050/moderate/s3-connector:1.0 + container_name: moderate-s3-connector + build: + context: . + dockerfile: ./Dockerfile \ No newline at end of file diff --git a/diva/connector/docker-compose-dev.yaml b/diva/connector/docker-compose-dev.yaml new file mode 100755 index 0000000..980b14f --- /dev/null +++ b/diva/connector/docker-compose-dev.yaml @@ -0,0 +1,16 @@ +services: + s3-connector: + image: maestri.ismb.it:5050/moderate/s3-connector:1.0 + container_name: moderate-s3-connector + environment: + KAFKA_BOOTSTRAP_SERVERS: "csc-ai.csc-ai.csc:9092" + KAFKA_SASL_USERNAME: "kafka-dq" + KAFKA_SASL_PASSWORD: "KAFKA!dq!2023" + KAFKA_SSL_CAFILE: "/certs/ca.crt" + ASSET_URL: "https://api.gw.moderate.cloud/asset" + AUTH_URL: "https://api.gw.moderate.cloud/api/token" + AUTH_USER: ${AUTH_USER} + AUTH_PASS: ${AUTH_PASS} + POLLING_TIME: "60" + volumes: + - "./certs/ca.crt:/certs/ca.crt:ro" \ No newline at end of file diff --git a/diva/connector/poetry.lock b/diva/connector/poetry.lock new file mode 100644 index 0000000..a7c89a3 --- /dev/null +++ b/diva/connector/poetry.lock @@ -0,0 +1,387 @@ +[[package]] +name = "aiohttp" +version = "3.9.5" +description = "Async http client/server framework (asyncio)" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +aiosignal = ">=1.1.2" +async-timeout = {version = ">=4.0,<5.0", markers = "python_version < \"3.11\""} +attrs = ">=17.3.0" +frozenlist = ">=1.1.1" +multidict = ">=4.5,<7.0" +yarl = ">=1.0,<2.0" + +[package.extras] +speedups = ["brotlicffi", "brotli", "aiodns"] + +[[package]] +name = "aiokafka" +version = "0.10.0" +description = "Kafka integration with asyncio" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +async-timeout = "*" +packaging = "*" + +[package.extras] +all = ["cramjam", "lz4 (>=3.1.3)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["lz4 (>=3.1.3)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + +[[package]] +name = "aiologger" +version = "0.7.0" +description = "Asynchronous logging for python and asyncio" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +aiofiles = ["aiofiles (==0.4.0)"] + +[[package]] +name = "aiosignal" +version = "1.3.1" +description = "aiosignal: a list of registered asynchronous callbacks" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +frozenlist = ">=1.1.0" + +[[package]] +name = "annotated-types" +version = "0.6.0" +description = "Reusable constraint types to use with typing.Annotated" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "async-timeout" +version = "4.0.3" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "asyncio" +version = "3.4.3" +description = "reference implementation of PEP 3156" +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "attrs" +version = "23.2.0" +description = "Classes Without Boilerplate" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +cov = ["attrs", "coverage[toml] (>=5.3)"] +dev = ["attrs", "pre-commit"] +docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"] +tests = ["attrs", "zope-interface"] +tests-mypy = ["mypy (>=1.6)", "pytest-mypy-plugins"] +tests-no-zope = ["attrs", "cloudpickle", "hypothesis", "pympler", "pytest-xdist", "pytest (>=4.3.0)"] + +[[package]] +name = "certifi" +version = "2024.2.2" +description = "Python package for providing Mozilla's CA Bundle." +category = "main" +optional = false +python-versions = ">=3.6" + +[[package]] +name = "cramjam" +version = "2.8.3" +description = "Thin Python bindings to de/compression algorithms in Rust" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +dev = ["black (==22.3.0)", "numpy", "pytest (>=5.30)", "pytest-xdist", "hypothesis"] + +[[package]] +name = "fastparquet" +version = "2024.2.0" +description = "Python support for Parquet file format" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +cramjam = ">=2.3" +fsspec = "*" +numpy = ">=1.20.3" +packaging = "*" +pandas = ">=1.5.0" + +[package.extras] +lzo = ["python-lzo"] + +[[package]] +name = "frozenlist" +version = "1.4.1" +description = "A list-like structure which implements collections.abc.MutableSequence" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "fsspec" +version = "2024.3.1" +description = "File-system specification" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.extras] +abfs = ["adlfs"] +adl = ["adlfs"] +arrow = ["pyarrow (>=1)"] +dask = ["dask", "distributed"] +devel = ["pytest", "pytest-cov"] +dropbox = ["dropboxdrivefs", "requests", "dropbox"] +full = ["adlfs", "aiohttp (!=4.0.0a0,!=4.0.0a1)", "dask", "distributed", "dropbox", "dropboxdrivefs", "fusepy", "gcsfs", "libarchive-c", "ocifs", "panel", "paramiko", "pyarrow (>=1)", "pygit2", "requests", "s3fs", "smbprotocol", "tqdm"] +fuse = ["fusepy"] +gcs = ["gcsfs"] +git = ["pygit2"] +github = ["requests"] +gs = ["gcsfs"] +gui = ["panel"] +hdfs = ["pyarrow (>=1)"] +http = ["aiohttp (!=4.0.0a0,!=4.0.0a1)"] +libarchive = ["libarchive-c"] +oci = ["ocifs"] +s3 = ["s3fs"] +sftp = ["paramiko"] +smb = ["smbprotocol"] +ssh = ["paramiko"] +tqdm = ["tqdm"] + +[[package]] +name = "idna" +version = "3.7" +description = "Internationalized Domain Names in Applications (IDNA)" +category = "main" +optional = false +python-versions = ">=3.5" + +[[package]] +name = "multidict" +version = "6.0.5" +description = "multidict implementation" +category = "main" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "numpy" +version = "1.26.4" +description = "Fundamental package for array computing in Python" +category = "main" +optional = false +python-versions = ">=3.9" + +[[package]] +name = "packaging" +version = "24.0" +description = "Core utilities for Python packages" +category = "main" +optional = false +python-versions = ">=3.7" + +[[package]] +name = "pandas" +version = "2.2.1" +description = "Powerful data structures for data analysis, time series, and statistics" +category = "main" +optional = false +python-versions = ">=3.9" + +[package.dependencies] +numpy = [ + {version = ">=1.22.4,<2", markers = "python_version < \"3.11\""}, + {version = ">=1.23.2,<2", markers = "python_version == \"3.11\""}, + {version = ">=1.26.0,<2", markers = "python_version >= \"3.12\""}, +] +python-dateutil = ">=2.8.2" +pytz = ">=2020.1" +tzdata = ">=2022.7" + +[package.extras] +test = ["hypothesis (>=6.46.1)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)"] +pyarrow = ["pyarrow (>=10.0.1)"] +performance = ["bottleneck (>=1.3.6)", "numba (>=0.56.4)", "numexpr (>=2.8.4)"] +computation = ["scipy (>=1.10.0)", "xarray (>=2022.12.0)"] +fss = ["fsspec (>=2022.11.0)"] +aws = ["s3fs (>=2022.11.0)"] +gcp = ["gcsfs (>=2022.11.0)", "pandas-gbq (>=0.19.0)"] +excel = ["odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)"] +parquet = ["pyarrow (>=10.0.1)"] +feather = ["pyarrow (>=10.0.1)"] +hdf5 = ["tables (>=3.8.0)"] +spss = ["pyreadstat (>=1.2.0)"] +postgresql = ["SQLAlchemy (>=2.0.0)", "psycopg2 (>=2.9.6)", "adbc-driver-postgresql (>=0.8.0)"] +mysql = ["SQLAlchemy (>=2.0.0)", "pymysql (>=1.0.2)"] +sql-other = ["SQLAlchemy (>=2.0.0)", "adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)"] +html = ["beautifulsoup4 (>=4.11.2)", "html5lib (>=1.1)", "lxml (>=4.9.2)"] +xml = ["lxml (>=4.9.2)"] +plot = ["matplotlib (>=3.6.3)"] +output-formatting = ["jinja2 (>=3.1.2)", "tabulate (>=0.9.0)"] +clipboard = ["PyQt5 (>=5.15.9)", "qtpy (>=2.3.0)"] +compression = ["zstandard (>=0.19.0)"] +consortium-standard = ["dataframe-api-compat (>=0.1.7)"] +all = ["adbc-driver-postgresql (>=0.8.0)", "adbc-driver-sqlite (>=0.8.0)", "beautifulsoup4 (>=4.11.2)", "bottleneck (>=1.3.6)", "dataframe-api-compat (>=0.1.7)", "fastparquet (>=2022.12.0)", "fsspec (>=2022.11.0)", "gcsfs (>=2022.11.0)", "html5lib (>=1.1)", "hypothesis (>=6.46.1)", "jinja2 (>=3.1.2)", "lxml (>=4.9.2)", "matplotlib (>=3.6.3)", "numba (>=0.56.4)", "numexpr (>=2.8.4)", "odfpy (>=1.4.1)", "openpyxl (>=3.1.0)", "pandas-gbq (>=0.19.0)", "psycopg2 (>=2.9.6)", "pyarrow (>=10.0.1)", "pymysql (>=1.0.2)", "PyQt5 (>=5.15.9)", "pyreadstat (>=1.2.0)", "pytest (>=7.3.2)", "pytest-xdist (>=2.2.0)", "python-calamine (>=0.1.7)", "pyxlsb (>=1.0.10)", "qtpy (>=2.3.0)", "scipy (>=1.10.0)", "s3fs (>=2022.11.0)", "SQLAlchemy (>=2.0.0)", "tables (>=3.8.0)", "tabulate (>=0.9.0)", "xarray (>=2022.12.0)", "xlrd (>=2.0.1)", "xlsxwriter (>=3.0.5)", "zstandard (>=0.19.0)"] + +[[package]] +name = "protobuf" +version = "5.26.1" +description = "" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "pyarrow" +version = "15.0.2" +description = "Python library for Apache Arrow" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +numpy = ">=1.16.6,<2" + +[[package]] +name = "pydantic" +version = "2.6.4" +description = "Data validation using Python type hints" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +annotated-types = ">=0.4.0" +pydantic-core = "2.16.3" +typing-extensions = ">=4.6.1" + +[package.extras] +email = ["email-validator (>=2.0.0)"] + +[[package]] +name = "pydantic-core" +version = "2.16.3" +description = "" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0" + +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +category = "main" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" + +[package.dependencies] +six = ">=1.5" + +[[package]] +name = "pytz" +version = "2024.1" +description = "World timezone definitions, modern and historical" +category = "main" +optional = false +python-versions = "*" + +[[package]] +name = "six" +version = "1.16.0" +description = "Python 2 and 3 compatibility utilities" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" + +[[package]] +name = "typing-extensions" +version = "4.10.0" +description = "Backported and Experimental Type Hints for Python 3.8+" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "tzdata" +version = "2024.1" +description = "Provider of IANA time zone data" +category = "main" +optional = false +python-versions = ">=2" + +[[package]] +name = "yarl" +version = "1.9.4" +description = "Yet another URL library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +idna = ">=2.0" +multidict = ">=4.0" + +[metadata] +lock-version = "1.1" +python-versions = "^3.10" +content-hash = "3517f98ec5ec8a3cea77694c146c08d98290ba047add28881016e1af51d1e41d" + +[metadata.files] +aiohttp = [] +aiokafka = [] +aiologger = [] +aiosignal = [] +annotated-types = [] +async-timeout = [] +asyncio = [] +attrs = [] +certifi = [] +cramjam = [] +fastparquet = [] +frozenlist = [] +fsspec = [] +idna = [] +multidict = [] +numpy = [] +packaging = [] +pandas = [] +protobuf = [] +pyarrow = [] +pydantic = [] +pydantic-core = [] +python-dateutil = [] +pytz = [] +six = [] +typing-extensions = [] +tzdata = [] +yarl = [] diff --git a/diva/connector/pyproject.toml b/diva/connector/pyproject.toml new file mode 100644 index 0000000..e9066a9 --- /dev/null +++ b/diva/connector/pyproject.toml @@ -0,0 +1,25 @@ +[tool.poetry] +name = "dq-s3-connector" +version = "0.1.0" +description = "Data Quality Connector for S3 Object Storages" +authors = ["Nicolò Bertozzi "] +license = "Eclipse Public License - v 2.0" + +[tool.poetry.dependencies] +python = "^3.10" +pandas = "pyarrow" +asyncio = "aiokafka" +aiologger = "functools" +aiokafka = "^0.10.0" +pydantic = "^2.6.4" +protobuf = "^5.26.1" +aiohttp = "^3.9.5" +pyarrow = "^15.0.2" +fastparquet = "^2024.2.0" +certifi = "^2024.2.2" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api"