Skip to content

User's Guide

Lawrence Fernandes edited this page Feb 1, 2023 · 5 revisions

User's Guide

Basic terms

Composition is the unit of work of highest level in Debussy, it represents an entire data pipeline (aka. DAG in Airflow). In music theory, a composition refer to an original piece or work of music. A list of compositions by Claude Debussy can be found here. Debussy Concert compositions are classified into three groups, according to their end goal: data ingestion, transformation, and reverse ETL.

Movement is a major part of a composition, responsible for a group of actions, like the ingestion of tables from an RDBMS to the Data Lakehouse (e.g DataIngestionMovement). In music theory, a movement is a self-contained part of a composition. On Airflow, Debussy Concert implements Movements as TaskGroups, which act as a “parent” of the whole group.

Phrase is a logical unit, part of a Movement (e.g. IngestionSourceToRawVaultStoragePhrase). In music theory, a phrase is "a unit of musical meter that has a complete musical sense of its own, built from figures, motifs, and cells, and combining to form melodies, periods and larger sections". On Airflow, Debussy Concert also implements Phrases through the use of TaskGroups.

Motif is the smallest unit of work, consisting of an operator or small set of cohesive operators, that (together) executes a single operation (e.g. CreateDataprocClusterMotif). In music theory, a motif (also motive) is "a short musical phrase, a salient recurring figure, musical fragment or succession of notes that has some special importance in or is characteristic of a composition (...) considered the smallest structural unit possessing identity".

Installation

Debussy requires an Apache Airflow instance, either a local (e.g. Airflow running on Docker, Astro CLI), hosted (e.g. VM, Kubernetes), or managed (e.g. Cloud Composer, Astro, etc) environment.

Debussy Concert is available on PyPI and automatically manages all it's dependencies, including Debussy Airflow. Follow the specific steps of your Airflow environment to install addicional packages. To install the latest Debussy Concert version from PyPI, use the command bellow:

pip install debussy-concert

Alternatively, you can manually install the project by installing all the dependencies of Debussy Concert and Debussy Airflow, and uploading both projects to the dags/plugins folder of your Airflow instance — using Astro CLI, you can clone both repositories and mount them, as discussed here. Be aware you may need to change the import path of your Compositions (Python dags) according the project's location in your environment.

Environment definition

In order to develop a data pipeline using Debussy Concert, the first step is to define an environment file that will hold configuration parameters for your Google Cloud (GCP) project. Debussy Concert currently supports only GCP, which is used for both deployment (i.e. Cloud Composer) and the Data Lakehouse (i.e. Cloud Storage, BigQuery). Support for other clouds and data stacks are on the roadmap, and we encourage contributors — check out our Contributing Guide.

The environment configuration is done through a YAML file, that ideally should be reused by all data pipelines of the same Airflow instance, unless you have distinct environment requirements for a specific data pipeline. We recommend putting this file in the DAG root folder of your Airflow instance, like this:

dags/
└──  environment.yaml

Follows the environment file schema:

project: gcp-project-name                            # Update with your GCP project ID
region: us-central1                                  # Update with your GCP project region
zone: us-central1-a                                  # Update with your GCP project zone
reverse_etl_bucket: reverse-etl-bucket               # Update with your Cloud Storage bucket for reverse ETL data versioning
raw_vault_bucket: raw-vault-bucket                   # Update with your Cloud Storage bucket for the raw vault layer
artifact_bucket: artifacts-bucket                    # Update with your Cloud Storage bucket for the framework artifacts
staging_bucket: staging-bucket                       # Update with your Cloud Storage bucket for Dataproc (Spark) staging
raw_vault_dataset: raw_vault                         # Recommended BigQuery dataset for the Raw Vault Data Lakehouse's layer
raw_dataset: raw                                     # Recommended BigQuery dataset for the Raw Data Lakehouse's layer
trusted_dataset: trusted                             # Recommended BigQuery dataset for the Trusted Data Lakehouse's layer
reverse_etl_dataset: reverse_etl                     # Recommended BigQuery dataset for the Reverse ETL Data Lakehouse's layer
temp_dataset: temp                                   # Recommended BigQuery dataset for temporary tables
data_lakehouse_connection_id: google_cloud_debussy   # Recommended Airflow GCP connection for the framework

Composition definition

Once you've defined your environment, you have to select the desired composition, or develop a new custom composition for your use case. As mentioned earlier, we currently support data ingestion, data transformation, and reverse ETL pipelines. Take a look at our data pipelines examples.

Let's consider we want to develop a data ingestion from a MySQL database (e.g. mydb) to a Data Lakehouse at GCP. The first thing is to define a directory (e.g. mysql_mydb_ingestion) to the project and create a .py script (ideally with the same name) which we'll use to instantiate our Composition. We'll also need some additional yaml files: one for the definition of our composition parameters (composition.yaml), and schema definition files for each table we want to ingest — we suggest defining this files in a subfolder named table_schemasfor oganization purposes. In the future, the creation of the project folder according to the desired pipeline will be handled by a command-line interface (CLI) currently under development.

The project structure is going to look like this:

mysql_mydb_ingestion/
├──  mysql_mydb_ingestion.py
├──  composition.yaml
└──  table_schemas
     └── mytable.yaml

Once we have our project structure in place, let's define our DAG code:

from airflow.configuration import conf

from debussy_concert.data_ingestion.config.rdbms_data_ingestion import ConfigRdbmsDataIngestion
from debussy_concert.data_ingestion.composition.rdbms_ingestion import RdbmsIngestionComposition
from debussy_concert.core.service.injection import inject_dependencies
from debussy_concert.core.service.workflow.airflow import AirflowService

dags_folder = conf.get('core', 'dags_folder')

env_file = f'{dags_folder}/environment.yaml'                      # Path to the default environment configuration yaml file at dags root folder
composition_file = f'{dags_folder}/mysql_mydb_ingestion/composition.yaml'

workflow_service = AirflowService()                               # Create a definition of workflow service
config_composition = ConfigRdbmsDataIngestion.load_from_file(     # Create a definition of composition configuration
    composition_config_file_path=composition_file,                # that concatenates it's parameters with
    env_file_path=env_file                                        # the environment (GCP project) parameters
)

inject_dependencies(workflow_service, config_composition)         # Binds the workflow service with the composition configuration

debussy_composition = RdbmsIngestionComposition()                 # Creates the Composition
debussy_composition.dataproc_main_python_file_uri = (             # Composition setup
    f"gs://{config_composition.environment.artifact_bucket}/"
    "pyspark-scripts/jdbc-to-gcs/jdbc_to_gcs_hash_key.py"         # Specify the desired PySpark template that will be used for ingestion
)

dag = debussy_composition.auto_play()                             # Calls the auto_play method to run the Composition
                                                                  # It's a factory for the play method that all Compositions inherit

Now we need to define the Composition parameters on the composition.yaml file. Each type of Composition has a specific set of parameters, although they all share some common parameters, namely:

  • name: name of the composition
  • source_name: name of the source system
  • source_type: source type (e.g. MySQL, PostgreSQL, etc).
  • description: short description of the pipeline
  • secret_manager_uri: path to a secret of the database on GCP Secret Manager
  • dataproc_config: Dataproc configurations, used only for data ingestion pipelines
  • dag_parameters: Airflow dag parameters

For our example pipeline, as we can observe from the previous list, we'll need to provide a secret with information (hostname, username, etc) needed to connect with our source database. This secret will be used by the PySpark script. Follows bellow the required structure of the secret value (replace the "host" key value with your database external IP address, and do the same for each field):

{
"host": "XXX.XXX.XXX.XXX",
"port": "3306",
"user": "username",
"password": "mypassword"
}

Finaly, we also have the specific parameters for each type of pipeline:

  • ingestion_parameters: definition of data ingestion movements, used for data ingestion compositions. Example: mysql_sakila_ingestion_daily
  • extraction_movements: definition of data extraction movements, used for reverse ETL compositions. Example: reverse_etl
  • transformation_movements: definition of transformation movements, used for data transformation compositions. Used to schedule dbt projects, where the transformation is defined. Example: sakila_transformation

Bellow, we present a sample composition file for our ingestion example:

name: mysql_mydb_ingestion
source_name: mydb
source_type: mysql
description: mysql mydb ingestion
secret_manager_uri: projects/my-gcp-project/secrets/mydb-secret
dataproc_config:
  machine_type: n1-standard-2
  num_workers: 0
  subnet: subnet-cluster-services
  parallelism: 60
  pip_packages:
    - google-cloud-secret-manager
dag_parameters:
  dag_id: mysql_mydb_ingestion
  description: Mysql ingestion for mydb database.
  catchup: true
  schedule_interval: "@daily"
  max_active_runs: 1
  tags:
    - framework:debussy_concert
    - project:example
    - source:mydb
    - type:ingestion
    - load:incremental
    - tier:5
  default_args:
    owner: debussy
ingestion_parameters:
  - name: film_actor
    extraction_query: >
      SELECT id, name, updated
      FROM mydb.user
      WHERE updated >= '{{ "execution_date.strftime('%Y-%m-%d 00:00:00')" }}'
        AND updated <  '{{ "next_execution_date.strftime('%Y-%m-%d 00:00:00')" }}'

Alternatively, the whole composition could be defined directly through your DAG code — the yaml file is just an interface to simplify the definition, but more technically advanced users may prefer to use Python. Take a look at our yamless example.

Table schema definition

Finaly, we need to define the schema of the tables we wan't to ingest in our Data Lakehouse. The schema must be compliant with BigQuery data types. All records that start with an underscore are metadata used for partitioning and required by the jdbc_to_gcs_hash_key PySpark script. You can check our PySpark scripts here.

fields:
  - name: id
    data_type: INT64
    description: ID of the user
  - name: name
    data_type: STRING
    description: Name of the user
  - name: updated
    data_type: TIMESTAMP
    description: The last time the user record was updated
  - name: _load_flag
    data_type: STRING
    description: incr = incremental data ingestion; full = full data ingestion
  - name: _ts_window_start
    data_type: TIMESTAMP
    description: Ingestion window start at source timezone
  - name: _ts_window_end
    data_type: TIMESTAMP
    description: Ingestion window end at source timezone
  - name: _ts_logical
    data_type: TIMESTAMP
    description: Airflow logical date
  - name: _ts_ingestion
    data_type: TIMESTAMP
    description: Clock time at Airflow when the ingestion was executed
  - name: _hash_key
    data_type: STRING
    description: An MD5 surrogate hash key used to uniquely identify each record of the source
partitioning:
  field: _ts_logical
  type: time
  granularity: DAY

Deployment

Once development is done, and considering you've already installed Debussy Concert in your Airflow instance and uploaded your environment file, you just need to upload your project to the dags folder. It will looks like this:

dags/
└──  environment.yaml
└──  mysql_mydb_ingestion/

As soon as the Airflow scheduler executes again, it will parse your DAG file, and Debussy Concert will automatically generate the DAG, expanding the movements defined on your composition.

Clone this wiki locally