Skip to content

Latest commit

 

History

History

postgres-to-snowflake-with-cdc

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Postgres CDC to Snowflake

This is supporting code for a blog about streaming data from Postgres to Snowflake using Decodable.

It includes a Docker Compose for running Postgres locally, accessible from the internet using ngrok

To use this you need to create an ngrok account and add a file called .env in this folder with the following entry:

NGROK_AUTH_TOKEN=<your_token>

Bring up the Postgres and ngrok stack with

docker compose up

Once up, you can find out your Postgres server host/post that is available on the internet:

curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g'

Source connection: Postgres CDC

Configure Postgres tables for replication

docker exec -it postgres psql -h localhost -U postgres -d postgres -f /data/replication-config.sql
ALTER TABLE customers REPLICA IDENTITY FULL;
ALTER TABLE pets REPLICA IDENTITY FULL;
ALTER TABLE appointments REPLICA IDENTITY FULL;

Check their replica status; each should show f:

SELECT oid::regclass, relreplident FROM pg_class
 WHERE oid in ( 'customers'::regclass, 'pets'::regclass, 'appointments'::regclass);

Store the password

Ref:

decodable apply decodable/pg-secret.yaml
---
kind: secret
name: omd-pg
id: ee94bd72
result: updated
• Wrote plaintext values for secret IDs: [ee94bd72]

Generate resource definitions

decodable connection scan \
          --name oh-my-dawg-pg \
          --connector postgres-cdc \
          --type source \
          --prop hostname=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f1) \
          --prop port=$(curl -s localhost:4040/api/tunnels | jq -r '.tunnels[0].public_url' | sed 's/tcp:\/\///g' | cut -d':' -f2) \
          --prop database-name=postgres \
          --prop username=postgres \
          --prop password=$(decodable query --name omd-pg --keep-ids | yq '.metadata.id') \
          --include-pattern schema-name=public \
          --output-resource-name-template stream-name="omd-{table-name}" \
          > omd-pg.yaml

Apply resource definitions

decodable apply omd-pg.yaml
---
kind: connection
name: oh-my-dawg-pg
id: 6d02ba15
result: created
---
kind: stream
name: omd-appointments
id: 975bba8d
result: created
---
kind: stream
name: omd-customers
id: 7885b32e
result: created
---
kind: stream
name: omd-pets
id: 3cc8e060
result: created

Activate the Postgres connection

decodable query --name oh-my-dawg-pg -X activate --stabilize

Sink connection: Snowflake

Generate & create the Snowflake sink

You can pipe from one command to another to streamline the process.

decodable connection scan \
          --name oh-my-dawg-snowflake \
          --connector snowflake \
          --type sink \
          --prop snowflake.database=omd          \
          --prop snowflake.schema=omd          \
          --prop snowflake.user=decodable          \
          --prop snowflake.private-key=$(decodable query --name omd-snowflake --keep-ids | yq '.metadata.id')          \
          --prop snowflake.role=load_data          \
          --prop snowflake.account-name=<org>-<account>          \
          --prop snowflake.warehouse=stg          \
          --prop snowflake.merge-interval="1 minute" \
          --include-pattern stream-name='^omd-' \
          | decodable apply -
Note
If you encounter errors then remove the pipe and decodable apply and inspect the YAML generated by decodable connection scan first.

Activate the Snowflake connection

decodable query --name oh-my-dawg-snowflake -X activate --stabilize