El challenge consiste en las siguientes tareas:
-
1. Cargar el siguiente set de datos en una base de datos en snowflake (abrir una cuenta de prueba) https://www.kaggle.com/megelon/meetup
-
2. Realizar creación de tablas físicas auxiliares, el archivo esta adjunto
-
3. Crear un DAG usando Apache Airflow (puede ser usando un servicio administrado google composer, aws mwaa, astronomer, local, otro)
El objetivo del DAG es automatizar las tareas en el punto 2. Es decir crear nuevas tablas con otros nombres para realizar el proceso de manera automática cada 15 minutos.
Usa tu creatividad para generar nuevos datos y que tenga más sentido la automatización, usar MERGES, CREATES AND REPLACES y otras funciones de snowflake.
-
Tomar las tablas procesadas y usando Apache NiFi mandarlas a un bucket de s3. (El envió a S3 fue realizado sin embargo con otro método)
-
Crear alertas para slack usando airflow.
-
Empezar el curso de airflow de Marc Lamberti y exponer conceptos clave
-
Hacer un ejercicio con un dataset diferente
- Snowflake
- Google Cloud Composer
- Google Cloud Storage
- Apache Airflow
- Apache NiFi
- Slack
- Dbeaver
- Docker
- AWS S3
- Crear un warehouse en Snowflake
- Crear una base de datos en Snowflake
- Levantar una instancia de Airflow en Google Cloud Composer
- Realizar la conexión de DBeaver con snowflake para la carga de la data
Como se puede ver se las ejecuciones son cada 15 minutos y de igual manera se ha implementado alertas para cuando hayan errores así cómo para cuando el DAG inicia y finaliza.
Envío de tablas como archivos a AWS S3 utilizando operadores en airflow, primero enviando de snowflake a Google Cloud Storage y luego de GCS a AWS S3
Se utilizaron 2 operadores principales:
- Primero para mandar de snowflake a GCS el cual corre una DML que replica las tablas en el Storage
pythonsnowflake_op_copy_snowflake_into_gcs = SnowflakeOperator(
task_id="snowflake_op_copy_snowflake_into_gcs",
sql= path_sql+"copy_snowflake_into_gcs.sql",
split_statements=True,
)
- Segundo se envia de GCS a S3 a través de otro operador "GCSToS3Operator"
gcs_to_s3 = GCSToS3Operator(
task_id="gcs_to_s3",
bucket='rappi_test_storage',
dest_s3_key='s3://rappi-bucket-aws/',
replace=True,
dest_aws_conn_id = S3_CONN_ID
)