Skip to content

Dream #4

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b254d9a
Update deployment Azure Blob Iceberg
May 6, 2025
ae475f4
Delete deployment InfluxDB 2.0 Source
May 6, 2025
f26cd98
Delete deployment Postgres CDC Source
May 6, 2025
f2abc8e
Delete deployment IcebergQuery
May 6, 2025
3dc816a
Delete deployment PostgreSQL CDC normalisation
May 6, 2025
471fa8f
Delete deployment InfluxDB CDC normalisation
May 6, 2025
4b225e1
Deleted application files from influxdb2-source
May 6, 2025
9f3f1ff
Deleted application files from query
May 6, 2025
33d6b01
Deleted application files from influxdb-cdc-normalisation
May 6, 2025
066d5c0
Deleted application files from postgres-cdc-source
May 6, 2025
7c8c4a1
Deleted application files from postgres-cdc-normalisation
May 6, 2025
3964974
Save file influxdb-3.0-sink/app.yaml
May 6, 2025
a1cf0f4
Created from InfluxDB 3.0 Sink
May 6, 2025
534789b
Add deployment InfluxDB 3.0 Sink
May 6, 2025
83da83a
Save file http-api-source/app.yaml
May 6, 2025
b3d064d
Created from HTTP API Source
May 6, 2025
5b48315
Add deployment HTTP API Source
May 6, 2025
6894ce5
Add topic http-source
May 6, 2025
38cd3d5
Save file http-transformation/app.yaml
May 6, 2025
11724a6
Created from Event Detection Transformation
May 6, 2025
67d8359
Add deployment HTTP Transformation
May 6, 2025
b4f9324
Update deployment HTTP Normalization
May 6, 2025
3c977c0
Update deployment MQTT UNS Normalisation
May 6, 2025
640f048
Update deployment Telegraf UNS Normalisation
May 6, 2025
5a5ecc5
Update deployment HTTP UNS Normalization
May 6, 2025
4f0b423
Edited 'main.py'
May 6, 2025
40b970d
Edited 'main.py'
May 6, 2025
76766f4
Edited 'main.py'
May 6, 2025
7d3cdc1
Edited 'main.py'
May 6, 2025
42ab74a
Edited 'main.py'
May 6, 2025
f2a1e9f
Edited 'main.py'
May 6, 2025
24653ef
Update deployment InfluxDB 3.0 Sink
May 6, 2025
e82805c
Delete 'icon.png'
May 6, 2025
512cb06
Upload 'icon.png'
May 6, 2025
4bdc902
Edited 'app.yaml'
May 6, 2025
a6c267f
'http-transformation/app.yaml' edited via Quix Portal
May 7, 2025
9027308
Edited 'main.py'
May 7, 2025
ce86330
Edited 'requirements.txt'
May 7, 2025
3c54e61
Edited 'main.py'
May 7, 2025
cfd22d2
Edited 'main.py'
May 7, 2025
7c6517b
Edited 'main.py'
May 7, 2025
1c8ff6f
Edited 'main.py'
May 7, 2025
f41fbea
Edited 'main.py'
May 7, 2025
7c1f56c
Edited 'main.py'
May 7, 2025
1802bef
Edited 'main.py'
May 7, 2025
08f2a1f
Edited 'main.py'
May 7, 2025
d437868
Edited 'main.py'
May 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion aws-iceberg-sink/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ variables:
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
libraryItemId: s3-iceberg-destination
Binary file modified aws-iceberg-sink/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
33 changes: 33 additions & 0 deletions http-api-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# HTTP API Source

[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/http_source) demonstrates how to run a Flask HTTP API as a web gateway and use it to publish to a Kafka topic via HTTP POST requests.

## How to run

Create a [Quix](https://portal.platform.quix.io/#?xlink=github) account or log-in and visit the Samples to use this project

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

You can test your endpoint by sending a message via curl:
`curl -X POST -H "Content-Type: application/json" -d '{"sessionId": "000001", "name": "Tony Hawk", "purchase": "skateboard" }' https://<your-deployment-url>/data/
`

## Environment variables

The code sample uses the following environment variables:

- **output**: This is the output topic for hello world data.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.



image: Flaticon.com
12 changes: 12 additions & 0 deletions http-api-source/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: http-api-source
language: python
variables:
- name: output
inputType: OutputTopic
description: This is the output topic for hello world data
defaultValue: http-source
required: true
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: main.py
libraryItemId: web-api-gateway
28 changes: 28 additions & 0 deletions http-api-source/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
Binary file added http-api-source/icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
110 changes: 110 additions & 0 deletions http-api-source/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import datetime
import json
from flask import Flask, request, Response, redirect
from flasgger import Swagger
from waitress import serve
import time

from flask_cors import CORS

from setup_logging import get_logger
from quixstreams import Application

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

service_url = os.environ["Quix__Deployment__Network__PublicUrl"]

quix_app = Application()
topic = quix_app.topic(os.environ["output"])
producer = quix_app.get_producer()

logger = get_logger()

app = Flask(__name__)

# Enable CORS for all routes and origins by default
CORS(app)

app.config['SWAGGER'] = {
'title': 'HTTP API Source',
'description': 'Test your HTTP API with this Swagger interface. Send data and see it arrive in Quix.',
'uiversion': 3
}

swagger = Swagger(app)

@app.route("/", methods=['GET'])
def redirect_to_swagger():
return redirect("/apidocs/")

@app.route("/data/", methods=['POST'])
def post_data_without_key():
"""
Post data without key
---
parameters:
- in: body
name: body
schema:
type: object
properties:
some_value:
type: string
responses:
200:
description: Data received successfully
"""
data = request.json
logger.debug(f"{data}")

producer.produce(topic.name, json.dumps(data))

# Return a normal 200 response; CORS headers are added automatically by Flask-CORS
return Response(status=200)

@app.route("/data/<key>", methods=['POST'])
def post_data_with_key(key: str):
"""
Post data with a key
---
parameters:
- in: path
name: key
type: string
required: true
- in: body
name: body
schema:
type: object
properties:
some_value:
type: string
responses:
200:
description: Data received successfully
"""
data = request.json
logger.debug(f"{data}")

producer.produce(topic.name, json.dumps(data), key.encode())

return Response(status=200)

if __name__ == '__main__':
print("=" * 60)
print(" " * 20 + "CURL EXAMPLE")
print("=" * 60)
print(
f"""
curl -L -X POST \\
-H 'Content-Type: application/json' \\
-d '{{"key": "value"}}' \\
{service_url}/data
"""
)
print("=" * 60)

serve(app, host="0.0.0.0", port=80)
6 changes: 6 additions & 0 deletions http-api-source/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
quixstreams==3.2.1
flask
flask_cors
flasgger==0.9.7b2
waitress
python-dotenv
23 changes: 23 additions & 0 deletions http-api-source/setup_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging


def get_logger():

logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] [%(levelname)s]: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)

# Set up logging
logger = logging.getLogger('waitress')
logger.setLevel(logging.DEBUG) # Set to DEBUG for more detailed output
logger.propagate = False # Prevent the log messages from propagating to the root logger

# Create handlers (console and file handler for example)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

return logger
30 changes: 30 additions & 0 deletions http-transformation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Event Detection

[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/Event-Detection) demonstrates how to consume data from a Kafka topic, filter the data based on some criteria and publish the result to an output topic.

It shows how to make real-time decisions on your data by looking at the `Brake` value and if a condition is met will output JSON message to the output topic.

You can adapt this code to suit your needs.

## How to run

Create a [Quix](https://portal.platform.quix.io/#?xlink=github) account or log-in and visit the Samples to use this project.

Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying.

## Environment variables

The code sample uses the following environment variables:

- **input**: This is the input topic for f1 data.
- **output**: This is the output topic for hard braking events.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
18 changes: 18 additions & 0 deletions http-transformation/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: HTTP Transformation
language: python
variables:
- name: input
inputType: InputTopic
description: This is the input topic for f1 data
defaultValue: http-source
required: true
- name: output
inputType: OutputTopic
multiline: false
description: This is the output topic for hard braking events
defaultValue: sensor-data-table
required: true
dockerfile: dockerfile
runEntryPoint: main.py
defaultFile: quix_function.py
libraryItemId: event-detection-transformation
28 changes: 28 additions & 0 deletions http-transformation/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
45 changes: 45 additions & 0 deletions http-transformation/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os
from quixstreams import Application

# for local dev, load env vars from a .env file
from dotenv import load_dotenv
load_dotenv()

app = Application(consumer_group="v1.1",
auto_offset_reset="earliest",
use_changelog_topics=False)

input_topic = app.topic(os.environ["input"], key_deserializer="str")
output_topic = app.topic(os.environ["output"])

sdf = app.dataframe(input_topic)

sdf = sdf.apply(lambda row: row["payload"], expand=True)

# Calculate hopping window of 5 seconds with 10 second buffer delay.
sdf = sdf.tumbling_window(1000, 10000).collect().final()

sdf = sdf.apply(lambda row: sorted(row["value"], key=lambda row: row["time"]), expand=True)

def transpose(row, key, *_):

for axis, value in row["values"].items():
yield {
"device_id": key,
"sensor": row["name"],
"axis": axis,
"location": "na",
"timestamp": row["time"],
"value": value
}

sdf = sdf.apply(transpose, metadata=True, expand=True)

# Print JSON messages in console.
sdf.print_table(metadata=False)

# Send the message to the output topic
sdf.to_topic(output_topic)

if __name__ == "__main__":
app.run()
2 changes: 2 additions & 0 deletions http-transformation/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams==3.13.1
python-dotenv
Loading