Skip to content

Commit

Permalink
refactor: reorient data_pipeline readme to favor cloud (#207)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Abraham <daniel@autokitteh.com>
  • Loading branch information
pashafateev and daabr authored Feb 19, 2025
1 parent a9edaa2 commit e3a1fce
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 125 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ demonstrate basic system features, integration APIs, and best practices.
| [Manage emergency AWS access requests via Slack](./break_glass/) | Submit emergency AWS access requests via Slack, which are then approved or denied based on a set of predefined conditions | aws, slack |
| [Email categorization and notification](./categorize_emails/) | Categorize incoming emails and notify relevant Slack channels | gmail, chatgpt, slack |
| [Slack notify on Confluence page created](./confluence_to_slack/) | When Confluence page is created the user will be notified on Slack | confluence, slack |
| [Parse a file in S3 and insert to database](./data_pipeline/) | Triggered by a new GPX file on an S3 bucket, the pipeline code will parse the GPX file and insert it into a database. | aws, http, sqlite3 |
| [ETL Pipeline From S3 to SQLite](./data_pipeline/) | Processes GPX files from S3 and inserts them into a SQLite database, creating a data pipeline from cloud to structured data | aws, http, sqlite3 |
| [GitHub issue alert](./devops/github_issue_alert/) | Send GitHub issue comments to Slack | github, slack |
| [GitHub workflow orchestration](./devops/github_workflows/) | Orchestrate GitHub workflows using advanced scenarios across multiple repositories | github |
| [Pull Request Review Reminder (Purrr)](./devops/purrr/) | Streamline code reviews and cut down turnaround time to merge pull requests | github, sheets, slack |
Expand Down
167 changes: 43 additions & 124 deletions data_pipeline/README.md
Original file line number Diff line number Diff line change
@@ -1,170 +1,89 @@
---
title: Parse a file in S3 and insert to database
description: Triggered by a new GPX file on an S3 bucket, the pipeline code will parse the GPX file and insert it into a database.
title: ETL Pipeline From S3 to SQLite
description: Processes GPX files from S3 and inserts them into a SQLite database, creating a data pipeline from cloud to structured data
integrations: ["aws", "http", "sqlite3"]
categories: ["DevOps"]
---

# Data Pipeline Workflow
# ETL Pipeline From S3 to SQLite

This is an example of a data pipeline workflow. The pipeline is triggered by a new GPX file on an S3 bucket.
The pipeline code will parse the GPX file and insert it into a database.
This project downloads and parses GPX files from an S3 bucket and inserts the resulting structured records into a SQLite database. It leverages AWS notifications to trigger an HTTP endpoint when new GPX files are available.

The event flow is:
## How It Works

- S3 sends a new item notification to an SNS topic
- SNS sends a notification to the AutoKitteh HTTP trigger
- The workflow extracts the bucket and file name from the S3 event
- The workflow reads the file from S3
- The workflow parses the GPX file and inserts the data into a SQLite database
1. Monitor S3 bucket for new GPX files
2. Receive SNS notification at AutoKitteh HTTP endpoint
3. Extract file details from notification
4. Download GPX file from S3 bucket
5. Parse file contents into structured data
6. Insert parsed records into SQLite database

## Setting Up
## Cloud Usage

### Starting `ak`
1. Initialize your AWS connection
2. Configure your SNS topic and S3 bucket notifications to trigger on new GPX file uploads.
3. Deploy the project

Start `ak`, you can use development mode
## Trigger Workflow

```
ak up --mode dev
```
> [!IMPORTANT]
> Ensure all connections (AWS) are initialized; otherwise, the workflow will raise a `ConnectionInitError`.
You'll need to expose `ak` web API to the outside world in order to get events from S3.
In this example, we'll use [ngrok](https://ngrok.com/)

```
ngrok http 9980
```

You'll see a screen with a line like:
To trigger the workflow, upload a new GPX file to the S3 bucket (you can use `hike.gpx` from here).
After the file is loaded, look at the session log:

```
Forwarding https://e7c499cae8d9.ngrok.app -> http://localhost:9980
```json
[2024-06-27 14:15:44.990824306 +0000 UTC] [stdout] event: {'Type': 'Notification', 'MessageId': 'e199ce57-86f5-59ba-a38a-90a0f0e190aa', 'TopicArn': 'arn:aws:sns:eu-north-1:975050051518:hikes', 'Subject': 'Amazon S3 Notification', 'Message': '{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"eu-north-1","eventTime":"2024-06-27T14:14:44.418Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AROA6GBMDB67DH6QBEE75:miki"},"requestParameters":{"sourceIPAddress":"147.235.211.162"},"responseElements":{"x-amz-request-id":"2593RVSRRERSMWG4","x-amz-id-2":"h+wcGUnQUN/uIMMybLf+mQj9k0xeAuUWN6GZw9P2fTNXWtpYY4v76wnvtQ5EZI+epG32f0OFGeB64mQScVkYMTVLatKGvn06nC71SQPTP2s="},"s3":{"s3SchemaVersion":"1.0","configurationId":"new","bucket":{"name":"ak-miki-hikes","ownerIdentity":{"principalId":"A3RBVIBHMVQI0T"},"arn":"arn:aws:s3:::ak-miki-hikes"},"object":{"key":"hike11.gpx","size":31683,"eTag":"07618ea3c6e04cb24c80007a10d91438","sequencer":"00667D73D45F53EA22"}}}]}', 'Timestamp': '2024-06-27T14:14:44.924Z', 'SignatureVersion': '1', 'Signature': 'fpXoBYMe3pvs74mtXy7vKCi9DDmh7kPeecoGuqgsEuyBHLK40yzWaZDb/v71WfsDH/UOLOAWE/LyqkAmOj3xNQVlH9NYh+rRYjAw6YcrzjRvmd2GvRqG6ZCQIxUgrUmXGSibFIGnJeTTEuLdKiP+FDU26ZjvGcAt9ogC6no9MT2+mkPd+9z1Czs+JDEGBV7IgWwDKKQ51Rkt48+CzjYl9EBeQesn4EjTpdIckss3p0324hc6IZneQhLcqopaPNVMLPX83hlAFmCEMSoUxuMp+dyGMaXVG4PsmpP2I3M5lbdnHBk5bueneJRft8xAsLMkFt+tfdwpHbIakm2I14vEZQ==', 'SigningCertURL': 'https://sns.eu-north-1.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem', 'UnsubscribeURL': 'https://sns.eu-north-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-north-1:975050051518:hikes:18b9ba01-43f1-4a6f-a5a1-95c76a68f760'}
[2024-06-27 14:15:45.002167665 +0000 UTC] [stdout] getting ak-miki-hikes/hike11.gpx
[2024-06-27 14:15:46.075361184 +0000 UTC] [stdout] inserted 358 records
```

This means your `ak` is exposed to the web at `https://e7c499cae8d9.ngrok.app`, remember this URL, you'll need it later.
## Self-Hosted Deployment

### Deploy The Workflow
Follow [these detailed instructions](https://docs.autokitteh.com/get_started/deployment) to deploy the project on a self-hosted server.

Before you deploy the workflow, edit the `AWS_REGION` value in the `vars` section of the manifest (`autokitteh.yaml`) to match your bucket region.
## Known Limitations

```
ak deploy --manifest autokitteh.yaml --file pipeline.py
```
- The current parsing mechanism is tailored for GPX files and might need adjustments for other formats
- Error handling is basic and may require enhancement for production environments

### AWS Keys
## AWS Setup

Make sure you have AWS keys with read access to the S3 bucket.

See [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) for more information on how to create AWS keys.

### Configuring S3 Bucket for Notifications

> [!WARNING]
> This project assumes that the AutoKitteh server is already configured with
> [HTTP tunneling](https://docs.autokitteh.com/config/http_tunneling/).
- Create an S3 bucket, or using an existing one
- [Create SNS topic](https://docs.aws.amazon.com/sns/latest/dg/sns-create-topic.html)
- [Give the bucket permission to publish to the SNS topic][ap]
- [Give the bucket permission to publish to the SNS topic](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html#step1-create-sns-topic-for-notification)
- Create a subscription to the SNS topic
- Select `HTTPS` protocol
- In the `Endpoint`, write the domain from ngrok and the HTTP trigger path.
For example `https://e7c499cae8d9.ngrok.app/http/pipeline/new_object`
For example `https://e7c499cae8d9.ngrok.app/<webhook-slug>`
- Send a confirmation to the subscription.
This will trigger the workflow and you should be able to see the subscription URL in the `ak` logs.

```
ak session log --prints-only
...
[stdout] SNS Subscribe URL: https://...
```

Visit the URL to confirm the subscription.

### Configure The S3 Bucket to Send Notifications
### Configure the S3 Bucket to Send Notifications

See [here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html).

### Create The Database
### Create the Database

Pick a location for the database, say `$PWD/hikes.db`, and create the database.
(Self-hosted) Pick a location for the database, say `$PWD/hikes.db`, and create the database.

```
sqlite3 $PWD/hikes.db < schema.sql
```

> [!NOTE]
> If you run the workflow on autokitteh.com, set the workflow variable
> `CREATE_DB` to `yes` and the workflow will create the database on the first
> event.
### Set Up Project Vars and Secrets

Add database location and AWS keys to `ak` secrets.

```
ak var set --env pipeline/default DB_DSN $PWD/hikes.db
ak var set --secret --env pipeline/default AWS_ACCESS_KEY_ID <your aws access key ID>
ak var set --secret --env pipeline/default AWS_SECRET_KEY <your aws secret key>
```

Make sure that your AWS credentials give you access to read the bucket.

[ap]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html#step1-create-sns-topic-for-notification

## Upload Files

Now upload a new GPX file to the bucket (you can use `hike.gpx` from here).
After the file is loaded, look at the session log:

```
ak session log --prints-only
```

```json
[2024-06-27 14:15:44.990824306 +0000 UTC] [stdout] event: {'Type': 'Notification', 'MessageId': 'e199ce57-86f5-59ba-a38a-90a0f0e190aa', 'TopicArn': 'arn:aws:sns:eu-north-1:975050051518:hikes', 'Subject': 'Amazon S3 Notification', 'Message': '{"Records":[{"eventVersion":"2.1","eventSource":"aws:s3","awsRegion":"eu-north-1","eventTime":"2024-06-27T14:14:44.418Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AROA6GBMDB67DH6QBEE75:miki"},"requestParameters":{"sourceIPAddress":"147.235.211.162"},"responseElements":{"x-amz-request-id":"2593RVSRRERSMWG4","x-amz-id-2":"h+wcGUnQUN/uIMMybLf+mQj9k0xeAuUWN6GZw9P2fTNXWtpYY4v76wnvtQ5EZI+epG32f0OFGeB64mQScVkYMTVLatKGvn06nC71SQPTP2s="},"s3":{"s3SchemaVersion":"1.0","configurationId":"new","bucket":{"name":"ak-miki-hikes","ownerIdentity":{"principalId":"A3RBVIBHMVQI0T"},"arn":"arn:aws:s3:::ak-miki-hikes"},"object":{"key":"hike11.gpx","size":31683,"eTag":"07618ea3c6e04cb24c80007a10d91438","sequencer":"00667D73D45F53EA22"}}}]}', 'Timestamp': '2024-06-27T14:14:44.924Z', 'SignatureVersion': '1', 'Signature': 'fpXoBYMe3pvs74mtXy7vKCi9DDmh7kPeecoGuqgsEuyBHLK40yzWaZDb/v71WfsDH/UOLOAWE/LyqkAmOj3xNQVlH9NYh+rRYjAw6YcrzjRvmd2GvRqG6ZCQIxUgrUmXGSibFIGnJeTTEuLdKiP+FDU26ZjvGcAt9ogC6no9MT2+mkPd+9z1Czs+JDEGBV7IgWwDKKQ51Rkt48+CzjYl9EBeQesn4EjTpdIckss3p0324hc6IZneQhLcqopaPNVMLPX83hlAFmCEMSoUxuMp+dyGMaXVG4PsmpP2I3M5lbdnHBk5bueneJRft8xAsLMkFt+tfdwpHbIakm2I14vEZQ==', 'SigningCertURL': 'https://sns.eu-north-1.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem', 'UnsubscribeURL': 'https://sns.eu-north-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:eu-north-1:975050051518:hikes:18b9ba01-43f1-4a6f-a5a1-95c76a68f760'}
[2024-06-27 14:15:45.002167665 +0000 UTC] [stdout] getting ak-miki-hikes/hike11.gpx
[2024-06-27 14:15:46.075361184 +0000 UTC] [stdout] inserted 358 records
```

## Local Development

### Via `ak`

There's a `Makefile` for common operation

You can run the pipeline locally and test it.

- Initialize the database
- `make init-db`
- Run `ak --mode dev` and set the AWS keys and database DSN (see above)
- `make vars`
- Deploy the workflow
- `make deploy`
- Trigger the workflow
- Make sure there's a GPX file in your bucket. I'll assume the bucket name is `miki-hikes` and the file is `hike-1.gpx`
- Edit `example-sns-event.json` and set the bucket name and the file name in the embedded `Message` JSON.
- `make trigger`
- View the `ak` logs in its console and the workflow logs
- `make logs`

### Debugging

You can set `AWS_ACCESS_KEY_ID`, `AWS_SECRET_KEY`, and `DB_DSN` in the environment (or in your IDE)
Then you can import `pipeline` and call `pipeline.on_new_s3_object` with the event JSON.

For example:

```python
from os import environ
import json

import pipeline

# Make sure to initialize hikes.db

environ.update({
'AWS_ACCESS_KEY_ID': '...',
'AWS_SECRET_KEY': '...',
'DB_DSN': 'hikes.db',
})

with open('example-sns-event.json') as fp:
event = json.load(fp)
pipeline.on_new_s3_object(event)
```
> When running on autokitteh.com, you can set the workflow variable `CREATE_DB` to `yes` to automatically
> create the database. Note that since the database is created inside the workflow's Docker container,
> it will be deleted when the workflow session ends.

0 comments on commit e3a1fce

Please # to comment.