diff --git a/aws-iceberg-sink/app.yaml b/aws-iceberg-sink/app.yaml index 57f2c4c..b844475 100644 --- a/aws-iceberg-sink/app.yaml +++ b/aws-iceberg-sink/app.yaml @@ -33,4 +33,3 @@ variables: dockerfile: dockerfile runEntryPoint: main.py defaultFile: main.py -libraryItemId: s3-iceberg-destination diff --git a/aws-iceberg-sink/icon.png b/aws-iceberg-sink/icon.png index bb87c06..9dc565c 100644 Binary files a/aws-iceberg-sink/icon.png and b/aws-iceberg-sink/icon.png differ diff --git a/http-api-source/README.md b/http-api-source/README.md new file mode 100644 index 0000000..45a731f --- /dev/null +++ b/http-api-source/README.md @@ -0,0 +1,33 @@ +# HTTP API Source + +[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/http_source) demonstrates how to run a Flask HTTP API as a web gateway and use it to publish to a Kafka topic via HTTP POST requests. + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the Samples to use this project + +Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying. + +You can test your endpoint by sending a message via curl: +`curl -X POST -H "Content-Type: application/json" -d '{"sessionId": "000001", "name": "Tony Hawk", "purchase": "skateboard" }' https:///data/ +` + +## Environment variables + +The code sample uses the following environment variables: + +- **output**: This is the output topic for hello world data. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. + + + +image: Flaticon.com \ No newline at end of file diff --git a/http-api-source/app.yaml b/http-api-source/app.yaml new file mode 100644 index 0000000..1f4b94b --- /dev/null +++ b/http-api-source/app.yaml @@ -0,0 +1,12 @@ +name: http-api-source +language: python +variables: + - name: output + inputType: OutputTopic + description: This is the output topic for hello world data + defaultValue: http-source + required: true +dockerfile: dockerfile +runEntryPoint: main.py +defaultFile: main.py +libraryItemId: web-api-gateway diff --git a/http-api-source/dockerfile b/http-api-source/dockerfile new file mode 100644 index 0000000..692316b --- /dev/null +++ b/http-api-source/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/http-api-source/icon.png b/http-api-source/icon.png new file mode 100644 index 0000000..ae32bff Binary files /dev/null and b/http-api-source/icon.png differ diff --git a/http-api-source/main.py b/http-api-source/main.py new file mode 100644 index 0000000..990e764 --- /dev/null +++ b/http-api-source/main.py @@ -0,0 +1,110 @@ +import os +import datetime +import json +from flask import Flask, request, Response, redirect +from flasgger import Swagger +from waitress import serve +import time + +from flask_cors import CORS + +from setup_logging import get_logger +from quixstreams import Application + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + +service_url = os.environ["Quix__Deployment__Network__PublicUrl"] + +quix_app = Application() +topic = quix_app.topic(os.environ["output"]) +producer = quix_app.get_producer() + +logger = get_logger() + +app = Flask(__name__) + +# Enable CORS for all routes and origins by default +CORS(app) + +app.config['SWAGGER'] = { + 'title': 'HTTP API Source', + 'description': 'Test your HTTP API with this Swagger interface. Send data and see it arrive in Quix.', + 'uiversion': 3 +} + +swagger = Swagger(app) + +@app.route("/", methods=['GET']) +def redirect_to_swagger(): + return redirect("/apidocs/") + +@app.route("/data/", methods=['POST']) +def post_data_without_key(): + """ + Post data without key + --- + parameters: + - in: body + name: body + schema: + type: object + properties: + some_value: + type: string + responses: + 200: + description: Data received successfully + """ + data = request.json + logger.debug(f"{data}") + + producer.produce(topic.name, json.dumps(data)) + + # Return a normal 200 response; CORS headers are added automatically by Flask-CORS + return Response(status=200) + +@app.route("/data/", methods=['POST']) +def post_data_with_key(key: str): + """ + Post data with a key + --- + parameters: + - in: path + name: key + type: string + required: true + - in: body + name: body + schema: + type: object + properties: + some_value: + type: string + responses: + 200: + description: Data received successfully + """ + data = request.json + logger.debug(f"{data}") + + producer.produce(topic.name, json.dumps(data), key.encode()) + + return Response(status=200) + +if __name__ == '__main__': + print("=" * 60) + print(" " * 20 + "CURL EXAMPLE") + print("=" * 60) + print( + f""" +curl -L -X POST \\ + -H 'Content-Type: application/json' \\ + -d '{{"key": "value"}}' \\ + {service_url}/data + """ + ) + print("=" * 60) + + serve(app, host="0.0.0.0", port=80) \ No newline at end of file diff --git a/http-api-source/requirements.txt b/http-api-source/requirements.txt new file mode 100644 index 0000000..512c937 --- /dev/null +++ b/http-api-source/requirements.txt @@ -0,0 +1,6 @@ +quixstreams==3.2.1 +flask +flask_cors +flasgger==0.9.7b2 +waitress +python-dotenv \ No newline at end of file diff --git a/http-api-source/setup_logging.py b/http-api-source/setup_logging.py new file mode 100644 index 0000000..9356909 --- /dev/null +++ b/http-api-source/setup_logging.py @@ -0,0 +1,23 @@ +import logging + + +def get_logger(): + + logging.basicConfig( + level=logging.DEBUG, + format='[%(asctime)s] [%(levelname)s]: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Set up logging + logger = logging.getLogger('waitress') + logger.setLevel(logging.DEBUG) # Set to DEBUG for more detailed output + logger.propagate = False # Prevent the log messages from propagating to the root logger + + # Create handlers (console and file handler for example) + console_handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + return logger \ No newline at end of file diff --git a/http-transformation/README.md b/http-transformation/README.md new file mode 100644 index 0000000..83751bf --- /dev/null +++ b/http-transformation/README.md @@ -0,0 +1,30 @@ +# Event Detection + +[This code sample](https://github.com/quixio/quix-samples/tree/main/python/transformations/Event-Detection) demonstrates how to consume data from a Kafka topic, filter the data based on some criteria and publish the result to an output topic. + +It shows how to make real-time decisions on your data by looking at the `Brake` value and if a condition is met will output JSON message to the output topic. + +You can adapt this code to suit your needs. + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the Samples to use this project. + +Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying. + +## Environment variables + +The code sample uses the following environment variables: + +- **input**: This is the input topic for f1 data. +- **output**: This is the output topic for hard braking events. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/http-transformation/app.yaml b/http-transformation/app.yaml new file mode 100644 index 0000000..1de98fe --- /dev/null +++ b/http-transformation/app.yaml @@ -0,0 +1,18 @@ +name: HTTP Transformation +language: python +variables: + - name: input + inputType: InputTopic + description: This is the input topic for f1 data + defaultValue: http-source + required: true + - name: output + inputType: OutputTopic + multiline: false + description: This is the output topic for hard braking events + defaultValue: sensor-data-table + required: true +dockerfile: dockerfile +runEntryPoint: main.py +defaultFile: quix_function.py +libraryItemId: event-detection-transformation diff --git a/http-transformation/dockerfile b/http-transformation/dockerfile new file mode 100644 index 0000000..692316b --- /dev/null +++ b/http-transformation/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/http-transformation/main.py b/http-transformation/main.py new file mode 100644 index 0000000..592393b --- /dev/null +++ b/http-transformation/main.py @@ -0,0 +1,45 @@ +import os +from quixstreams import Application + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + +app = Application(consumer_group="v1.1", + auto_offset_reset="earliest", + use_changelog_topics=False) + +input_topic = app.topic(os.environ["input"], key_deserializer="str") +output_topic = app.topic(os.environ["output"]) + +sdf = app.dataframe(input_topic) + +sdf = sdf.apply(lambda row: row["payload"], expand=True) + +# Calculate hopping window of 5 seconds with 10 second buffer delay. +sdf = sdf.tumbling_window(1000, 10000).collect().final() + +sdf = sdf.apply(lambda row: sorted(row["value"], key=lambda row: row["time"]), expand=True) + +def transpose(row, key, *_): + + for axis, value in row["values"].items(): + yield { + "device_id": key, + "sensor": row["name"], + "axis": axis, + "location": "na", + "timestamp": row["time"], + "value": value + } + +sdf = sdf.apply(transpose, metadata=True, expand=True) + +# Print JSON messages in console. +sdf.print_table(metadata=False) + +# Send the message to the output topic +sdf.to_topic(output_topic) + +if __name__ == "__main__": + app.run() \ No newline at end of file diff --git a/http-transformation/requirements.txt b/http-transformation/requirements.txt new file mode 100644 index 0000000..4de0a49 --- /dev/null +++ b/http-transformation/requirements.txt @@ -0,0 +1,2 @@ +quixstreams==3.13.1 +python-dotenv \ No newline at end of file diff --git a/influxdb-3.0-sink/README.md b/influxdb-3.0-sink/README.md new file mode 100644 index 0000000..834a3b5 --- /dev/null +++ b/influxdb-3.0-sink/README.md @@ -0,0 +1,40 @@ +# InfluxDB v3 + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/influxdb_3) demonstrates how to consume data from a Kafka topic in Quix and persist the data to an InfluxDB v3 database using the InfluxDB write API. + +## How to run + +Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Environment Variables + +The connector uses the following environment variables: + +- **input**: This is the input topic (Default: `detection-result`, Required: `True`) +- **TIMESTAMP_COLUMN**: This is the column in your data that represents the timestamp in nanoseconds. Defaults to use the message timestamp received from the broker if not supplied. Case sensitive. (Default: ``, Required: `False`) +- **INFLUXDB_HOST**: Host address for the InfluxDB instance. (Default: `https://eu-central-1-1.aws.cloud2.influxdata.com`, Required: `True`) +- **INFLUXDB_TOKEN**: Authentication token to access InfluxDB. (Default: ``, Required: `True`) +- **INFLUXDB_ORG**: Organization name in InfluxDB. (Default: ``, Required: `False`) +- **INFLUXDB_DATABASE**: Database name in InfluxDB where data should be stored. (Default: ``, Required: `True`) +- **INFLUXDB_TAG_KEYS**: Keys to be used as tags when writing data to InfluxDB. These are columns that are available in the input topic. (Default: ``, Required: `False`) +- **INFLUXDB_FIELD_KEYS**: Keys to be used as fields when writing data to InfluxDB. These are columns that are available in the input topic. (Default: ``, Required: `True`) +- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to write data to. If not specified, the name of the input topic will be used. (Default: `measurement1`, Required: `False`) + +## Requirements / Prerequisites + +You will need to have an InfluxDB 3.0 instance available and an API authentication token. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open Source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/influxdb-3.0-sink/app.yaml b/influxdb-3.0-sink/app.yaml new file mode 100644 index 0000000..18fd7a0 --- /dev/null +++ b/influxdb-3.0-sink/app.yaml @@ -0,0 +1,56 @@ +name: influxdb-3.0-sink +language: python +variables: + - name: input + inputType: InputTopic + description: This is the input topic + defaultValue: sensor-data-table + required: true + - name: INFLUXDB_HOST + inputType: FreeText + description: Host address for the InfluxDB instance. + defaultValue: https://eu-central-1-1.aws.cloud2.influxdata.com + required: true + - name: INFLUXDB_TOKEN + inputType: Secret + description: Authentication token to access InfluxDB. + defaultValue: basic_auth_password + required: true + - name: INFLUXDB_ORG + inputType: FreeText + description: Organization name in InfluxDB. + - name: INFLUXDB_MEASUREMENT_NAME + inputType: FreeText + description: The InfluxDB measurement to write data to. If not specified, the name of the input topic will be used + defaultValue: measurement1 + - name: INFLUXDB_DATABASE + inputType: FreeText + description: Database name in InfluxDB where data should be stored. + defaultValue: sdfsdf + required: true + - name: INFLUXDB_TAG_KEYS + inputType: FreeText + description: 'The tags to include when writing the measurement data. Example: Tag1,Tag2' + - name: INFLUXDB_FIELD_KEYS + inputType: FreeText + description: 'The fields to include when writing the measurement data. Example: Field1,Field2' + - name: CONSUMER_GROUP_NAME + inputType: FreeText + description: The name of the consumer group to use when consuming from Kafka + defaultValue: influxdb-sink + required: true + - name: TIMESTAMP_COLUMN + inputType: FreeText + description: 'The column containing the timestamp column. NOTE: Must be nanoseconds' + - name: BUFFER_SIZE + inputType: FreeText + description: The number of records that sink holds before flush data to the InfluxDb + defaultValue: 1000 + - name: BUFFER_TIMEOUT + inputType: FreeText + description: The number of seconds that sink holds before flush data to the InfluxDb + defaultValue: 1 +dockerfile: dockerfile +runEntryPoint: main.py +defaultFile: main.py +libraryItemId: influxdb-3-destination diff --git a/influxdb-3.0-sink/dockerfile b/influxdb-3.0-sink/dockerfile new file mode 100644 index 0000000..692316b --- /dev/null +++ b/influxdb-3.0-sink/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/influxdb2-source/icon.png b/influxdb-3.0-sink/icon.png similarity index 100% rename from influxdb2-source/icon.png rename to influxdb-3.0-sink/icon.png diff --git a/influxdb-3.0-sink/main.py b/influxdb-3.0-sink/main.py new file mode 100644 index 0000000..b7719ca --- /dev/null +++ b/influxdb-3.0-sink/main.py @@ -0,0 +1,58 @@ +# import Utility modules +import os +import logging + +# import vendor-specific modules +from quixstreams import Application +from quixstreams.sinks.core.influxdb3 import InfluxDB3Sink + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv +load_dotenv() + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# read the consumer group from config +consumer_group_name = os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer") + +# read the timestamp column from config +timestamp_column = os.environ.get("TIMESTAMP_COLUMN") if os.environ.get("TIMESTAMP_COLUMN") else None + +buffer_size = int(os.environ.get("BUFFER_SIZE", "1000")) + +buffer_delay = float(os.environ.get("BUFFER_DELAY", "1")) + +# Create a Quix platform-specific application instead +app = Application( + consumer_group=consumer_group_name, + auto_offset_reset="earliest", + commit_every=buffer_size, + commit_interval=buffer_delay) + +input_topic = app.topic(os.environ["input"]) + +# Read the environment variable and convert it to a dictionary +tag_keys = os.environ.get("INFLUXDB_TAG_KEYS", "").split(",") if os.environ.get("INFLUXDB_TAG_KEYS") else [] +field_keys = os.environ.get("INFLUXDB_FIELD_KEYS", "").split(",")if os.environ.get("INFLUXDB_FIELD_KEYS") else [] +measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", "measurement1") + +influxdb_v3_sink = InfluxDB3Sink( + token=os.environ["INFLUXDB_TOKEN"], + host=os.environ["INFLUXDB_HOST"], + organization_id=os.environ["INFLUXDB_ORG"], + tags_keys=tag_keys, + fields_keys=field_keys, + time_key=timestamp_column, + database=os.environ["INFLUXDB_DATABASE"], + measurement=measurement_name) + +sdf = app.dataframe(input_topic) + +#sdf.print() +sdf.sink(influxdb_v3_sink) + +if __name__ == "__main__": + logger.info("Starting application") + app.run() + diff --git a/influxdb-3.0-sink/requirements.txt b/influxdb-3.0-sink/requirements.txt new file mode 100644 index 0000000..1135dd5 --- /dev/null +++ b/influxdb-3.0-sink/requirements.txt @@ -0,0 +1,2 @@ +quixstreams[influxdb3]==3.8.1 +python-dotenv \ No newline at end of file diff --git a/influxdb-cdc-normalisation/README.md b/influxdb-cdc-normalisation/README.md deleted file mode 100644 index 5c16696..0000000 --- a/influxdb-cdc-normalisation/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# InfluxDB CDC transformation - -This service converts InfluxDB CDC specific messages to unified format for Iceberg sink. - -## Environment variables - -The code sample uses the following environment variables: - -- **input**: Name of the input topic to listen to. -- **output**: Name of the output topic to write to. - diff --git a/influxdb-cdc-normalisation/app.yaml b/influxdb-cdc-normalisation/app.yaml deleted file mode 100644 index 3bfb7fb..0000000 --- a/influxdb-cdc-normalisation/app.yaml +++ /dev/null @@ -1,16 +0,0 @@ -name: influxdb-cdc-normalisation -language: Python -variables: - - name: input - inputType: InputTopic - description: Name of the input topic to listen to. - defaultValue: influxdbv2-data - required: false - - name: output - inputType: OutputTopic - description: Name of the output topic to write to. - defaultValue: sensor-data-table - required: false -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py diff --git a/influxdb-cdc-normalisation/dockerfile b/influxdb-cdc-normalisation/dockerfile deleted file mode 100644 index 03ef4f7..0000000 --- a/influxdb-cdc-normalisation/dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.12.5-slim-bookworm - -# Set environment variables to non-interactive and unbuffered output -ENV DEBIAN_FRONTEND=noninteractive \ - PYTHONUNBUFFERED=1 \ - PYTHONIOENCODING=UTF-8 - -# Set the working directory inside the container -WORKDIR /app - -# Copy only the requirements file(s) to leverage Docker cache -# Assuming all requirements files are in the root or subdirectories -COPY ./requirements.txt ./ - -# Install dependencies -# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the rest of the application -COPY . . - -# Set the command to run your application -ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/influxdb-cdc-normalisation/main.py b/influxdb-cdc-normalisation/main.py deleted file mode 100644 index 8b5ddd0..0000000 --- a/influxdb-cdc-normalisation/main.py +++ /dev/null @@ -1,63 +0,0 @@ -import os -from quixstreams import Application -from datetime import datetime - -# for local dev, load env vars from a .env file -from dotenv import load_dotenv -load_dotenv() - -app = Application(consumer_group="transformation-v1", auto_offset_reset="earliest") - -input_topic = app.topic(os.environ["input"]) -output_topic = app.topic(os.environ["output"]) - -sdf = app.dataframe(input_topic) - -def expand_influx_row(row: dict): - - # Columns we want to keep as is. Rest we transpose into rows. - fixed_fields = ["result","table","_start","_stop","original_time","_measurement","deviceId","sessionId"] - - # Custom location field. - if "location-latitude" in row and "location-longitude" in row: - location = f"{row['location-latitude']},{row['location-longitude']}" - else: - location = "unknown" - - # Transpose columns into rows. - for key in row: - if key in fixed_fields: - continue - - key_parts = key.split("-") - - # Each columns should have two parts (-). If not, skip. - if len(key_parts) != 2: - continue - - output_row = { - "timestamp": datetime.strptime(row["original_time"], "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000, - "device_id": row["deviceId"], - "sensor": key_parts[0], - "location": location, - "axis": key_parts[1] - } - - # Check if the value is a number or a string and set it accordingly. - value = row[key] - if isinstance(value, (int, float)): # Check for number (integer or float) - output_row["value_float"] = float(value) - elif isinstance(value, str): # Check for string - output_row["value_str"] = str(value) - else: - print(f"{value} is neither a number nor a string") - - yield output_row - -sdf = sdf.apply(expand_influx_row, expand=True) - -sdf.print() -sdf.to_topic(output_topic) - -if __name__ == "__main__": - app.run() \ No newline at end of file diff --git a/influxdb-cdc-normalisation/requirements.txt b/influxdb-cdc-normalisation/requirements.txt deleted file mode 100644 index 365f9bd..0000000 --- a/influxdb-cdc-normalisation/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -quixstreams==3.2.1 -python-dotenv \ No newline at end of file diff --git a/influxdb2-source/README.md b/influxdb2-source/README.md deleted file mode 100644 index 2d7da9e..0000000 --- a/influxdb2-source/README.md +++ /dev/null @@ -1,35 +0,0 @@ -# InfluxDB 2.0 - -[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_2) demonstrates how to use the InfluxDB 2.0 query API to periodically query InfluxDB and publish the results to a Kafka topic. - -## How to run - -Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log in and visit the Code Samples to use this project. - -Clicking `Deploy` on the Sample, deploys a pre-built container in Quix. Complete the environment variables to configure the container. - -Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying. - -## Environment Variables - -The code sample uses the following environment variables: - -- **output**: This is the ouput topic that will receive the stream (Default: `influxdb`, Required: `True`) -- **task_interval**: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1mo, 1y (Default: `5m`, Required: `True`) -- **INFLUXDB_HOST**: Host address for the InfluxDB instance. (Default: `eu-central-1-1.aws.cloud2.influxdata.com`, Required: `True`) -- **INFLUXDB_TOKEN**: Authentication token to access InfluxDB. (Default: ``, Required: `True`) -- **INFLUXDB_ORG**: Organization name in InfluxDB. (Default: ``, Required: `False`) -- **INFLUXDB_BUCKET**: Bucket name in InfluxDB where data is stored. (Default: ``, Required: `True`) -- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used (Default: ``, Required: `False`) - -## Requirements / Prerequisites - -You will need to have an InfluxDB 2.0 instance available and an API authentication token. - -## Contribute - -Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. - -## Open Source - -This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/influxdb2-source/app.yaml b/influxdb2-source/app.yaml deleted file mode 100644 index 748fb53..0000000 --- a/influxdb2-source/app.yaml +++ /dev/null @@ -1,42 +0,0 @@ -name: InfluxDB 2.0 Source -language: python -variables: - - name: output - inputType: OutputTopic - description: This is the Kafka topic that will receive the query results - defaultValue: influxdbv2-data - required: true - - name: task_interval - inputType: FreeText - description: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1y - defaultValue: 5m - required: true - - name: INFLUXDB_HOST - inputType: FreeText - description: Host address for the InfluxDB instance. - defaultValue: https://influxdb-tomas-crashdetection-telegfaf.deployments.quix.io - required: true - - name: INFLUXDB_TOKEN - inputType: FreeText - description: Authentication token to access InfluxDB. - defaultValue: rwJpQuwqTmVfXoj-6bYv - required: true - - name: INFLUXDB_ORG - inputType: FreeText - description: Organization name in InfluxDB. - defaultValue: quix - required: true - - name: INFLUXDB_BUCKET - inputType: FreeText - description: Bucket name in InfluxDB where data is stored. - defaultValue: iotdemo - required: true - - name: INFLUXDB_MEASUREMENT_NAME - inputType: FreeText - description: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used - defaultValue: sensordata - required: false -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py -libraryItemId: influxdb-2-source diff --git a/influxdb2-source/dockerfile b/influxdb2-source/dockerfile deleted file mode 100644 index 03ef4f7..0000000 --- a/influxdb2-source/dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.12.5-slim-bookworm - -# Set environment variables to non-interactive and unbuffered output -ENV DEBIAN_FRONTEND=noninteractive \ - PYTHONUNBUFFERED=1 \ - PYTHONIOENCODING=UTF-8 - -# Set the working directory inside the container -WORKDIR /app - -# Copy only the requirements file(s) to leverage Docker cache -# Assuming all requirements files are in the root or subdirectories -COPY ./requirements.txt ./ - -# Install dependencies -# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the rest of the application -COPY . . - -# Set the command to run your application -ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/influxdb2-source/main.py b/influxdb2-source/main.py deleted file mode 100644 index 09dd665..0000000 --- a/influxdb2-source/main.py +++ /dev/null @@ -1,152 +0,0 @@ -# Import basic utilities -import os -import random -import json -import logging -from time import sleep - -# import vendor-specfic modules -from quixstreams import Application -from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext -import influxdb_client - -# for local dev, load env vars from a .env file -from dotenv import load_dotenv -load_dotenv() - -# Initialize logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Create a Quix Application -app = Application() - -# Define a serializer for messages, using JSON Serializer for ease -serializer = JSONSerializer() - -# Define the topic using the "output" environment variable -topic_name = os.environ["output"] -topic = app.topic(topic_name) - -influxdb2_client = influxdb_client.InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"], - org=os.environ["INFLUXDB_ORG"], - url=os.environ['INFLUXDB_HOST']) - -query_api = influxdb2_client.query_api() - -interval = os.environ.get("task_interval", "5m") -bucket = os.environ.get("INFLUXDB_BUCKET", "placeholder-bucket") - -# Global variable to control the main loop's execution -run = True - -# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing. -# This function is useful for determining the frequency of certain operations. -UNIT_SECONDS = { - "s": 1, - "m": 60, - "h": 3600, - "d": 86400, - "w": 604800, - "y": 31536000, -} - -def interval_to_seconds(interval: str) -> int: - try: - return int(interval[:-1]) * UNIT_SECONDS[interval[-1]] - except ValueError as e: - if "invalid literal" in str(e): - raise ValueError( - "interval format is {int}{unit} i.e. '10h'; " - f"valid units: {list(UNIT_SECONDS.keys())}") - except KeyError: - raise ValueError( - f"Unknown interval unit: {interval[-1]}; " - f"valid units: {list(UNIT_SECONDS.keys())}") - -interval_seconds = interval_to_seconds(interval) - - -def is_dataframe(result): - return type(result).__name__ == 'DataFrame' - -# Function to fetch data from InfluxDB and send it to Quix -# It runs in a continuous loop, periodically fetching data based on the interval. -def get_data(): - # Run in a loop until the main thread is terminated - while run: - try: - # Query InfluxDB 2.0 using flux - flux_query = f''' - from(bucket: "{bucket}") - |> range(start: -{interval}) - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") - ''' - logger.info(f"Sending query: {flux_query}") - - table = query_api.query_data_frame(query=flux_query,org=os.environ['INFLUXDB_ORG']) - - # Renaming time column to distinguish it from other timestamp types - # table.rename(columns={'_time': 'original_time'}, inplace=True) - - # If the query returns tables with different schemas, the result will be a list of dataframes. - if isinstance(table, list): - for item in table: - item.rename(columns={'_time': 'original_time'}, inplace=True) - json_result = item.to_json(orient='records', date_format='iso') - yield json_result - logger.info("Published multiple measurements to Quix") - elif is_dataframe(table) and len(table) > 0: - table.rename(columns={'_time': 'original_time'}, inplace=True) - json_result = table.to_json(orient='records', date_format='iso') - yield json_result - logger.info("Published single measurement to Quix") - elif is_dataframe(table) and len(table) < 1: - logger.info("No results.") - - logger.info(f"Trying again in {interval_seconds} seconds...") - sleep(interval_seconds) - - except Exception as e: - logger.info("query failed") - logger.info(f"error: {e}") - flush=True - sleep(1) - -def main(): - """ - Read data from the Query and publish it to Kafka - """ - - # Create a pre-configured Producer object. - # Producer is already setup to use Quix brokers. - # It will also ensure that the topics exist before producing to them if - # Application.Quix is initialized with "auto_create_topics=True". - producer = app.get_producer() - - with producer: - for res in get_data(): - # Parse the JSON string into a Python object - records = json.loads(res) - for index, obj in enumerate(records): - # Generate a unique message_key for each row - message_key = f"INFLUX2_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}" - logger.info(f"Produced message with key:{message_key}, value:{obj}") - - # Serialize row value to bytes - serialized_value = serializer( - value=obj, ctx=SerializationContext(topic=topic.name) - ) - - # publish the data to the topic - producer.produce( - topic=topic.name, - key=message_key, - value=serialized_value, - ) - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - logger.info("Exiting.") \ No newline at end of file diff --git a/influxdb2-source/requirements.txt b/influxdb2-source/requirements.txt deleted file mode 100644 index 2a6071e..0000000 --- a/influxdb2-source/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -quixstreams==2.9.0 -influxdb-client==1.39.0 -pandas -python-dotenv \ No newline at end of file diff --git a/postgres-cdc-normalisation/README.md b/postgres-cdc-normalisation/README.md deleted file mode 100644 index 045479b..0000000 --- a/postgres-cdc-normalisation/README.md +++ /dev/null @@ -1,11 +0,0 @@ -# PostgreSQL CDC transformation - -This service converts PostgreSQL CDC specific messages to unified format for Iceberg sink. - -## Environment variables - -The code sample uses the following environment variables: - -- **input**: Name of the input topic to listen to. -- **output**: Name of the output topic to write to. - diff --git a/postgres-cdc-normalisation/app.yaml b/postgres-cdc-normalisation/app.yaml deleted file mode 100644 index b5377f1..0000000 --- a/postgres-cdc-normalisation/app.yaml +++ /dev/null @@ -1,17 +0,0 @@ -name: postgres-cdc-normalisation -language: Python -variables: - - name: input - inputType: InputTopic - description: Name of the input topic to listen to. - defaultValue: postgres-cdc - required: false - - name: output - inputType: OutputTopic - description: Name of the output topic to write to. - defaultValue: sensor-data-table - required: false -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py -libraryItemId: starter-transformation diff --git a/postgres-cdc-normalisation/dockerfile b/postgres-cdc-normalisation/dockerfile deleted file mode 100644 index 03ef4f7..0000000 --- a/postgres-cdc-normalisation/dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.12.5-slim-bookworm - -# Set environment variables to non-interactive and unbuffered output -ENV DEBIAN_FRONTEND=noninteractive \ - PYTHONUNBUFFERED=1 \ - PYTHONIOENCODING=UTF-8 - -# Set the working directory inside the container -WORKDIR /app - -# Copy only the requirements file(s) to leverage Docker cache -# Assuming all requirements files are in the root or subdirectories -COPY ./requirements.txt ./ - -# Install dependencies -# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the rest of the application -COPY . . - -# Set the command to run your application -ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/postgres-cdc-normalisation/main.py b/postgres-cdc-normalisation/main.py deleted file mode 100644 index 4656df3..0000000 --- a/postgres-cdc-normalisation/main.py +++ /dev/null @@ -1,75 +0,0 @@ -import os -from quixstreams import Application -from datetime import datetime - -# for local dev, load env vars from a .env file -from dotenv import load_dotenv -load_dotenv() - -app = Application(consumer_group="postgres-norm-v1.1", auto_offset_reset="earliest") - -input_topic = app.topic(os.environ["input"]) -output_topic = app.topic(os.environ["output"]) - -sdf = app.dataframe(input_topic) - -def convert_to_sensor_table(row: dict): - - result = {} - for i in range(len(row["columnnames"])): - result[row["columnnames"][i]] = row["columnvalues"][i] - - return result - -def expand_row(row: dict): - - fixed_fields = ["timestamp","__key","sessionId","deviceId"] - - if "location-latitude" in row and "location-longitude" in row: - location = f"{row['location-latitude']},{row['location-longitude']}" - else: - location = "unknown" - - for key in row: - - if key in fixed_fields: - continue - - key_parts = key.split("-") - - if len(key_parts) != 2: - continue - - try: - parsed_time = datetime.strptime(row["timestamp"], '%Y-%m-%d %H:%M:%S.%f') - except ValueError: - parsed_time = datetime.strptime(row["timestamp"], '%Y-%m-%d %H:%M:%S') - - output_row = { - "timestamp": parsed_time.timestamp() * 1000, - "device_id": row["deviceId"], - "sensor": key_parts[0], - "location": location, - "axis": key_parts[1] - } - - value = row[key] - - if isinstance(value, (int, float)): # Check for number (integer or float) - output_row["value_float"] = float(value) - elif isinstance(value, str): # Check for string - output_row["value_str"] = str(value) - else: - print(f"{value} is neither a number nor a string") - - yield output_row - -sdf = sdf.apply(convert_to_sensor_table) - -sdf = sdf.apply(expand_row, expand=True) - -sdf.print() -sdf.to_topic(output_topic) - -if __name__ == "__main__": - app.run() \ No newline at end of file diff --git a/postgres-cdc-normalisation/requirements.txt b/postgres-cdc-normalisation/requirements.txt deleted file mode 100644 index 365f9bd..0000000 --- a/postgres-cdc-normalisation/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -quixstreams==3.2.1 -python-dotenv \ No newline at end of file diff --git a/postgres-cdc-source/README.md b/postgres-cdc-source/README.md deleted file mode 100644 index 5ddf3b7..0000000 --- a/postgres-cdc-source/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# Postgres CDC - -[This code sample](https://github.com/quixio/quix-samples/tree/main/python/sources/postgres_cdc) demonstrates how to capture changes to a Postgres database table (using CDC) and publish the change events to a Kafka topic. - -## How to run - -Create a [Quix](https://portal.platform.quix.io/signup?xlink=github) account or log-in and visit the Samples to use this project. - -Clicking `Deploy` on the Sample, deploys a pre-built container in Quix. Complete the environment variables to configure the container. - -Clicking `Edit code` on the Sample, forks the project to your own Git repo so you can customize it before deploying. - -## Environment variables - -The code sample uses the following environment variables: - -- **output**: Name of the output topic to write into. -- **PG_HOST**: The IP address or fully qualified domain name of your server. -- **PG_PORT**: The Port number to use for communication with the server. -- **PG_DATABASE**: The name of the database for CDC. -- **PG_USER**: The username of the sink should use to interact with the database. -- **PG_PASSWORD**: The password for the user configured above. -- **PG_SCHEMA**: The name of the schema for CDC. -- **PG_TABLE**: The name of the table for CDC. - -## Requirements / Prerequisites - -- A Postgres Database. -- Set `wal_level = logical` in `postgresql.conf`. - -## Contribute - -Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. - -## Open source - -This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. - -Please star us and mention us on social to show your appreciation. \ No newline at end of file diff --git a/postgres-cdc-source/app.yaml b/postgres-cdc-source/app.yaml deleted file mode 100644 index 9c8bde5..0000000 --- a/postgres-cdc-source/app.yaml +++ /dev/null @@ -1,47 +0,0 @@ -name: Postgres CDC Source 6 -language: python -variables: - - name: output - inputType: OutputTopic - description: This is the output topic - defaultValue: postgres-cdc - required: true - - name: PG_HOST - inputType: FreeText - description: Host name of Postgres - defaultValue: quixpostgresql.postgres.database.azure.com - required: true - - name: PG_PORT - inputType: FreeText - description: Port of Postgres - defaultValue: 5432 - required: true - - name: PG_USER - inputType: FreeText - description: Username of Postgres - defaultValue: postgres - required: true - - name: PG_PASSWORD - inputType: HiddenText - description: Password of Postgres - defaultValue: uFGpdjz3xyxzhUzoKp.o - required: true - - name: PG_DATABASE - inputType: FreeText - description: Database name of Postgres - defaultValue: postgres - required: true - - name: PG_SCHEMA - inputType: FreeText - description: Name of schema for CDC - defaultValue: public - required: true - - name: PG_TABLE - inputType: FreeText - description: Name of table for CDC - defaultValue: iceberg_tables - required: true -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py -libraryItemId: postgres-cdc-source diff --git a/postgres-cdc-source/dockerfile b/postgres-cdc-source/dockerfile deleted file mode 100644 index 03ef4f7..0000000 --- a/postgres-cdc-source/dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.12.5-slim-bookworm - -# Set environment variables to non-interactive and unbuffered output -ENV DEBIAN_FRONTEND=noninteractive \ - PYTHONUNBUFFERED=1 \ - PYTHONIOENCODING=UTF-8 - -# Set the working directory inside the container -WORKDIR /app - -# Copy only the requirements file(s) to leverage Docker cache -# Assuming all requirements files are in the root or subdirectories -COPY ./requirements.txt ./ - -# Install dependencies -# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the rest of the application -COPY . . - -# Set the command to run your application -ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/postgres-cdc-source/icon.png b/postgres-cdc-source/icon.png deleted file mode 100644 index d765318..0000000 Binary files a/postgres-cdc-source/icon.png and /dev/null differ diff --git a/postgres-cdc-source/main.py b/postgres-cdc-source/main.py deleted file mode 100644 index 19bbf8f..0000000 --- a/postgres-cdc-source/main.py +++ /dev/null @@ -1,86 +0,0 @@ -from quixstreams import Application -import time -import os -import json -from setup_logger import logger -from postgres_helper import connect_postgres, create_logical_slot, create_publication_on_table, get_changes - -# Load environment variables (useful when working locally) -from dotenv import load_dotenv -load_dotenv() - -# Global Variables -PG_SLOT_NAME = os.environ["Quix__Workspace__Id"].lower().replace("-", "_") -PG_SCHEMA = os.environ["PG_SCHEMA"] -PG_TABLE = os.environ["PG_TABLE"] -PG_PUBLICATION_NAME = f"pub_{PG_SCHEMA}_{PG_TABLE}" -PG_TABLE_NAME = f"{PG_SCHEMA}.{PG_TABLE}" -WAIT_INTERVAL = 0.1 - -# Connect to postgres and set up table -try: - create_logical_slot(PG_SLOT_NAME) - create_publication_on_table(PG_PUBLICATION_NAME, PG_TABLE_NAME) - conn = connect_postgres() - logger.info("CONNECTED!") -except Exception as e: - logger.info(f"ERROR!: {e}") - raise - -# should the main loop run? -run = True - -# Create a Quix Application, this manages the connection to the Quix platform -app = Application() - -# Create the producer, this is used to write data to the output topic -producer = app.get_producer() - -# Check the output topic is configured -output_topic_name = os.getenv("output", "") -if output_topic_name == "": - raise ValueError("output_topic environment variable is required") -output_topic = app.topic(output_topic_name) - -# get data from postgres and publish it to kafka -# to reduce network traffic, we buffer the messages for 100ms -def main(): - buffer = [] - last_flush_time = time.time() - - while run: - records = get_changes(conn, PG_SLOT_NAME) - for record in records: - changes = json.loads(record[0]) - for change in changes["change"]: - buffer.append(change) - - # Check if 100 milliseconds have passed - current_time = time.time() - if (current_time - last_flush_time) >= 0.5 and len(buffer) > 0: - # If 500ms have passed, produce all buffered messages - for message in buffer: - producer.produce(topic=output_topic.name, - key=PG_TABLE_NAME, - value=json.dumps(message)) - print("Message sent to Kafka") - # Flush the producer to send the messages - - # Clear the buffer - buffer = [] - # Update the last flush time - last_flush_time = current_time - - time.sleep(WAIT_INTERVAL) - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - logger.info("Exiting.") - run = False - finally: - conn.close() - logger.info("Connection to postgres closed") - logger.info("Exiting") \ No newline at end of file diff --git a/postgres-cdc-source/postgres_helper.py b/postgres-cdc-source/postgres_helper.py deleted file mode 100644 index 86d1859..0000000 --- a/postgres-cdc-source/postgres_helper.py +++ /dev/null @@ -1,72 +0,0 @@ -import psycopg2 -import os - -def connect_postgres(): - # Postgres Constants - PG_HOST = os.environ["PG_HOST"] - PG_PORT = os.environ["PG_PORT"] - PG_USER = os.environ["PG_USER"] - PG_PASSWORD = os.environ["PG_PASSWORD"] - PG_DATABASE = os.environ["PG_DATABASE"] - - conn = psycopg2.connect( - database = PG_DATABASE, user = PG_USER, password = PG_PASSWORD, host = PG_HOST, port = PG_PORT - ) - return conn - - -def run_query(conn, query: str): - cur = conn.cursor() - cur.execute(query) - conn.commit() - cur.close() - - -def create_logical_slot(slot_name: str): - conn = connect_postgres() - query = f''' - SELECT pg_create_logical_replication_slot('{slot_name}', 'wal2json'); - ''' - try: - run_query(conn, query) - conn.close() - - except psycopg2.errors.DuplicateObject: - print(f"Replication slot {slot_name} already exists.") - conn.close() - - else: - conn.close() - - -def create_publication_on_table(publication_name: str, table_name: str): - conn = connect_postgres() - query = f''' - CREATE PUBLICATION {publication_name} FOR TABLE {table_name}; - ''' - try: - run_query(conn, query) - conn.close() - - except psycopg2.errors.DuplicateObject: - print(f"Publication {publication_name} already exists.") - conn.close() - - except psycopg2.errors.UndefinedTable: - print(f"{table_name} not found.") - conn.close() - raise - - else: - conn.close() - raise - -def get_changes(conn, slot_name: str): - query = f''' - SELECT data FROM pg_logical_slot_get_changes('{slot_name}', NULL, NULL); - ''' - cur = conn.cursor() - cur.execute(query) - records = cur.fetchall() - cur.close() - return records diff --git a/postgres-cdc-source/requirements.txt b/postgres-cdc-source/requirements.txt deleted file mode 100644 index 0811d8c..0000000 --- a/postgres-cdc-source/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -quixstreams==2.9.0 -psycopg2-binary -python-dotenv \ No newline at end of file diff --git a/postgres-cdc-source/setup_logger.py b/postgres-cdc-source/setup_logger.py deleted file mode 100644 index 7d60585..0000000 --- a/postgres-cdc-source/setup_logger.py +++ /dev/null @@ -1,13 +0,0 @@ -import logging - -# Set up logger -PROD_ENV = False -logger = logging.getLogger("Postgres CDC") -logging.basicConfig() - -if PROD_ENV: - logger.setLevel(logging.INFO) - logger.info("Running in Production Mode...") -else: - logger.setLevel(logging.DEBUG) - logger.info("Running in Debug Mode...") \ No newline at end of file diff --git a/query/README.md b/query/README.md deleted file mode 100644 index cf916f9..0000000 --- a/query/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Iceberg Query -This is super simple frontend to query AWS Apache Iceberg using AWS Athena. - -## How to Run - -### Prerequisites - -- **Python Environment**: Ensure you have Python 3.7 or higher installed. -- **AWS Credentials**: Configure your AWS credentials with appropriate permissions to access S3 and AWS Glue. -- **Dependencies**: Install the required Python packages. - -### Install Dependencies - -```bash -pip install -r requirements.txt -``` - -### Set Environment Variables - -The application uses the following environment variables: - -- **input**: Name of the input topic to listen to. -- **basic_auth_password** Password for frontend basic auth -- **table_name**: The name of the Iceberg table. -- **AWS_S3_URI**: The S3 URI where the Iceberg table data will be stored (e.g., `s3://your-bucket/warehouse/`). -- **AWS_ACCESS_KEY_ID**: Your AWS Access Key ID. -- **AWS_SECRET_ACCESS_KEY**: Your AWS Secret Access Key. -- **AWS_DEFAULT_REGION**: The AWS region where your S3 bucket and AWS Glue catalog are located (e.g., `eu-north-1`). - -Create a `.env` file in the project root or set these environment variables in your system. - -**Example `.env` file:** - -```dotenv -input=your_input_topic -basic_auth_password= -table_name=your_database.your_table -AWS_S3_URI=s3://your-bucket/warehouse/ -AWS_ACCESS_KEY_ID=your_access_key_id -AWS_SECRET_ACCESS_KEY=your_secret_access_key -AWS_DEFAULT_REGION=eu-north-1 -``` - -**Important Security Note:** **Never** include your actual AWS credentials in code repositories or share them publicly. Always keep your credentials secure. - -### Run the Application - -```bash -python main.py -``` diff --git a/query/app.yaml b/query/app.yaml deleted file mode 100644 index 9da6e57..0000000 --- a/query/app.yaml +++ /dev/null @@ -1,40 +0,0 @@ -name: IcebergQuery -language: python -variables: - - name: basic_auth_password - inputType: Secret - description: '' - defaultValue: basic_auth_password - required: true - - name: consumer_group - inputType: FreeText - defaultValue: iceberg-sink-v3.9 - required: false - - name: leading_edge_ms - inputType: FreeText - defaultValue: 5000 - required: false - - name: AWS_SECRET_ACCESS_KEY - inputType: Secret - description: '' - defaultValue: AWS_SECRET_ACCESS_KEY - required: false - - name: AWS_S3_URI - inputType: FreeText - defaultValue: s3://data-integration-example/tomas-dataintegrationexample-dev - required: false - - name: AWS_ACCESS_KEY_ID - inputType: FreeText - defaultValue: AKIA5JJJFC76GRHTOTPS - required: false - - name: AWS_DEFAULT_REGION - inputType: FreeText - defaultValue: eu-north-1 - required: false - - name: table_name - inputType: FreeText - defaultValue: glue.sensordata3 - required: false -dockerfile: dockerfile -runEntryPoint: main.py -defaultFile: main.py diff --git a/query/dockerfile b/query/dockerfile deleted file mode 100644 index d3cd495..0000000 --- a/query/dockerfile +++ /dev/null @@ -1,23 +0,0 @@ -FROM python:3.11.1-slim-buster - -# Set environment variables to non-interactive and unbuffered output -ENV DEBIAN_FRONTEND=noninteractive \ - PYTHONUNBUFFERED=1 \ - PYTHONIOENCODING=UTF-8 - -# Set the working directory inside the container -WORKDIR /app - -# Copy only the requirements file(s) to leverage Docker cache -# Assuming all requirements files are in the root or subdirectories -COPY ./requirements.txt ./ - -# Install dependencies -# Adding `--no-cache-dir` to avoid storing unnecessary files and potentially reduce image size -RUN pip install --no-cache-dir -r requirements.txt - -# Copy the rest of the application -COPY . . - -# Set the command to run your application -ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/query/main.py b/query/main.py deleted file mode 100644 index 867b164..0000000 --- a/query/main.py +++ /dev/null @@ -1,81 +0,0 @@ -import os -from flask import Flask, render_template -from dotenv import load_dotenv -import pandas as pd -from requests import request -from flask import Flask, render_template, request -from flask_httpauth import HTTPBasicAuth -from pyathena import connect -import pandas as pd -import os -from werkzeug.security import generate_password_hash, check_password_hash - -# Load environment variables -load_dotenv() - -app = Flask(__name__) -auth = HTTPBasicAuth() - -# Basic auth configuration -users = { - "admin": generate_password_hash(os.environ["basic_auth_password"]) # Replace with your desired username and password -} - -@auth.verify_password -def verify_password(username, password): - if username in users and check_password_hash(users.get(username), password): - return username - return None - - - -# Connect to Athena -conn = connect( - s3_staging_dir=os.environ["AWS_S3_URI"], # Replace with your S3 bucket for Athena query results - region_name=os.environ["AWS_DEFAULT_REGION"] -) - -def query_athena(sql_query): - # Execute the query and load the results into a Pandas DataFrame - df = pd.read_sql(sql_query, conn) - return df - - - -@app.route('/', methods=['GET', 'POST']) -@auth.login_required -def index(): - - if request.method == "GET": - - # Set a default code snippet if custom_code is empty (first load) - default_code = f"""SELECT * FROM "glue"."{os.environ['table_name']}" LIMIT 100""" - - message = "Query data please." - - - return render_template('index.html', tables=pd.DataFrame().to_html(classes='data'), titles="Query data", message=message, custom_code=default_code) - - custom_code = default_code if request.method == 'GET' else request.form.get('custom_code', '') - - try: - - # Load and join tables - df = query_athena(custom_code) - - message = "Data loaded successfully." if df is not None else "No data found." - - # Render the result as an HTML table - if df is not None: - table_html = df[:500].to_html(classes='data') - else: - table_html = "" - except Exception as e: - df = None - message = str(e) - table_html = "" - - return render_template('index.html', tables=table_html, titles=df.columns.values if df is not None else [], message=message, custom_code=custom_code) - -if __name__ == '__main__': - app.run(debug=True, host="0.0.0.0", port=80) \ No newline at end of file diff --git a/query/requirements.txt b/query/requirements.txt deleted file mode 100644 index dc2df7e..0000000 --- a/query/requirements.txt +++ /dev/null @@ -1,12 +0,0 @@ -quixstreams==2.11.1 -python-dotenv -pyarrow -pyiceberg -boto3 -mypy_boto3_glue -sqlalchemy -pandas -flask -requests -flask_httpauth -pyathena \ No newline at end of file diff --git a/query/templates/index.html b/query/templates/index.html deleted file mode 100644 index 829941c..0000000 --- a/query/templates/index.html +++ /dev/null @@ -1,98 +0,0 @@ - - - - - AWS Iceberg query with AWS Athena - - - - - -
Loading, please wait...
- -

AWS Iceberg query with AWS Athena

-
-
- - -
- - - - -

- -
-

{{ message }}

- - -
- {{ tables | safe }} -
- - - - \ No newline at end of file diff --git a/quix.yaml b/quix.yaml index 82b2dcc..91b61d3 100644 --- a/quix.yaml +++ b/quix.yaml @@ -38,18 +38,16 @@ deployments: - name: mqtt_username inputType: FreeText description: Username of your MQTT user - required: false - name: mqtt_password inputType: HiddenText description: Password for the MQTT user - required: false - name: mqtt_version inputType: FreeText description: 'MQTT protocol version: 3.1, 3.1.1, 5' required: true value: 3.1.1 - - name: Postgres CDC Source - application: postgres-cdc-source + - name: Telegraf Source + application: telegraf-source version: latest deploymentType: Service resources: @@ -59,46 +57,48 @@ deployments: variables: - name: output inputType: OutputTopic - description: This is the output topic - required: true - value: postgres-cdc - - name: PG_HOST - inputType: FreeText - description: Host name of Postgres + description: Name of the output topic to write into required: true - value: {{postgress_host}} - - name: PG_PORT - inputType: FreeText - description: Port of Postgres + value: telegraf + - name: Azure Blob Iceberg + application: aws-iceberg-sink + version: latest + deploymentType: Service + resources: + cpu: 200 + memory: 4000 + replicas: 1 + variables: + - name: input + inputType: InputTopic + description: This is the input topic required: true - value: 5432 - - name: PG_USER + value: sensor-data-table + - name: AWS_S3_URI inputType: FreeText - description: Username of Postgres - required: true - value: postgres - - name: PG_PASSWORD - inputType: HiddenText - description: Password of Postgres + description: The URI or URL to your S3 bucket required: true - value: uFGpdjz3xyxzhUzoKp.o - - name: PG_DATABASE - inputType: FreeText - description: Database name of Postgres + value: s3://data-integration-example/demo-dataintegrationtemplate-dev/sensordata + - name: AWS_SECRET_ACCESS_KEY + inputType: Secret + description: Your AWS secret required: true - value: postgres - - name: PG_SCHEMA + secretKey: AWS_SECRET_ACCESS_KEY + - name: AWS_ACCESS_KEY_ID inputType: FreeText - description: Name of schema for CDC + description: Your AWS Access Key required: true - value: public - - name: PG_TABLE + value: AKIA5JJJFC76GRHTOTPS + - name: table_name inputType: FreeText - description: Name of table for CDC + description: The table to publish data to required: true value: sensordata - - name: InfluxDB 2.0 Source - application: influxdb2-source + - name: AWS_REGION + inputType: FreeText + value: eu-north-1 + - name: InfluxDB 3.0 Sink + application: influxdb-3.0-sink version: latest deploymentType: Service resources: @@ -106,56 +106,77 @@ deployments: memory: 500 replicas: 1 variables: - - name: output - inputType: OutputTopic - description: This is the Kafka topic that will receive the query results - required: true - value: influxdbv2-data - - name: task_interval - inputType: FreeText - description: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1y + - name: input + inputType: InputTopic + description: This is the input topic required: true - value: 10s + value: sensor-data-table - name: INFLUXDB_HOST inputType: FreeText description: Host address for the InfluxDB instance. required: true - value: {{influxdb_host}} + value: https://eu-central-1-1.aws.cloud2.influxdata.com - name: INFLUXDB_TOKEN - inputType: FreeText + inputType: Secret description: Authentication token to access InfluxDB. required: true - value: rwJpQuwqTmVfXoj-6bYv + secretKey: influx_token - name: INFLUXDB_ORG inputType: FreeText description: Organization name in InfluxDB. + value: play_org + - name: INFLUXDB_MEASUREMENT_NAME + inputType: FreeText + description: The InfluxDB measurement to write data to. If not specified, the name of the input topic will be used + value: measurement1 + - name: INFLUXDB_DATABASE + inputType: FreeText + description: Database name in InfluxDB where data should be stored. required: true - value: quix - - name: INFLUXDB_BUCKET + value: water + - name: INFLUXDB_TAG_KEYS + inputType: FreeText + description: 'The tags to include when writing the measurement data. Example: Tag1,Tag2' + value: '' + - name: INFLUXDB_FIELD_KEYS inputType: FreeText - description: Bucket name in InfluxDB where data is stored. + description: 'The fields to include when writing the measurement data. Example: Field1,Field2' + value: '' + - name: CONSUMER_GROUP_NAME + inputType: FreeText + description: The name of the consumer group to use when consuming from Kafka required: true - value: iotdemo - - name: INFLUXDB_MEASUREMENT_NAME + value: influxdb-sink + - name: TIMESTAMP_COLUMN inputType: FreeText - description: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used - required: false - value: sensordata - - name: Telegraf Source - application: telegraf-source + description: 'The column containing the timestamp column. NOTE: Must be nanoseconds' + value: '' + - name: BUFFER_SIZE + inputType: FreeText + description: The number of records that sink holds before flush data to the InfluxDb + value: 1000 + - name: BUFFER_TIMEOUT + inputType: FreeText + description: The number of seconds that sink holds before flush data to the InfluxDb + value: 1 + - name: HTTP API Source + application: http-api-source version: latest deploymentType: Service resources: cpu: 200 memory: 500 replicas: 1 + publicAccess: + enabled: true + urlPrefix: gateway variables: - name: output inputType: OutputTopic - description: Name of the output topic to write into + description: This is the output topic for hello world data required: true - value: telegraf - - name: MQTT normalisation + value: http-source + - name: MQTT UNS Normalisation application: mqtt-normalisation version: latest deploymentType: Service @@ -167,34 +188,13 @@ deployments: - name: input inputType: InputTopic description: Name of the input topic to listen to. - required: false value: mqtt - name: output inputType: OutputTopic description: Name of the output topic to write to. - required: false - value: sensor-data-table - - name: InfluxDB CDC normalisation - application: influxdb-cdc-normalisation - version: latest - deploymentType: Service - resources: - cpu: 200 - memory: 500 - replicas: 1 - variables: - - name: input - inputType: InputTopic - description: Name of the input topic to listen to. - required: false - value: influxdbv2-data - - name: output - inputType: OutputTopic - description: Name of the output topic to write to. - required: false value: sensor-data-table - - name: PostgreSQL CDC normalisation - application: postgres-cdc-normalisation + - name: Telegraf UNS Normalisation + application: telegraf-normalisation version: latest deploymentType: Service resources: @@ -205,15 +205,13 @@ deployments: - name: input inputType: InputTopic description: Name of the input topic to listen to. - required: false - value: postgres-cdc + value: telegraf - name: output inputType: OutputTopic description: Name of the output topic to write to. - required: false value: sensor-data-table - - name: Telegraf normalisation - application: telegraf-normalisation + - name: HTTP UNS Normalization + application: http-transformation version: latest deploymentType: Service resources: @@ -223,98 +221,14 @@ deployments: variables: - name: input inputType: InputTopic - description: Name of the input topic to listen to. - required: false - value: telegraf + description: This is the input topic for f1 data + required: true + value: http-source - name: output inputType: OutputTopic - description: Name of the output topic to write to. - required: false - value: sensor-data-table - - name: AWS Iceberg sink - application: aws-iceberg-sink - version: latest - deploymentType: Service - resources: - cpu: 200 - memory: 4000 - replicas: 1 - variables: - - name: input - inputType: InputTopic - description: This is the input topic + description: This is the output topic for hard braking events required: true value: sensor-data-table - - name: AWS_S3_URI - inputType: FreeText - description: The URI or URL to your S3 bucket - required: true - value: {{s3_iceberg_path}} - - name: AWS_SECRET_ACCESS_KEY - inputType: Secret - description: Your AWS secret - required: true - secretKey: AWS_SECRET_ACCESS_KEY - - name: AWS_ACCESS_KEY_ID - inputType: FreeText - description: Your AWS Access Key - required: true - value: AKIA5JJJFC76GRHTOTPS - - name: table_name - inputType: FreeText - description: The table to publish data to - required: true - value: {{table_name}} - - name: AWS_REGION - inputType: FreeText - required: false - value: eu-north-1 - - name: IcebergQuery - application: query - version: latest - deploymentType: Service - resources: - cpu: 500 - memory: 1000 - replicas: 1 - publicAccess: - enabled: true - urlPrefix: query - variables: - - name: basic_auth_password - inputType: Secret - description: '' - required: true - secretKey: basic_auth_password - - name: consumer_group - inputType: FreeText - required: false - value: iceberg-sink-v3.9 - - name: leading_edge_ms - inputType: FreeText - required: false - value: 5000 - - name: AWS_SECRET_ACCESS_KEY - inputType: Secret - description: '' - required: false - secretKey: AWS_SECRET_ACCESS_KEY - - name: AWS_S3_URI - inputType: FreeText - required: false - value: s3://data-integration-example/tomas-dataintegrationexample-dev - - name: AWS_ACCESS_KEY_ID - inputType: FreeText - required: false - value: AKIA5JJJFC76GRHTOTPS - - name: AWS_DEFAULT_REGION - inputType: FreeText - required: false - value: eu-north-1 - - name: table_name - inputType: FreeText - required: false - value: {{table_name}} # This section describes the Topics of the data pipeline topics: @@ -346,3 +260,4 @@ topics: retentionInMinutes: 1440 retentionInBytes: 52428800 dataTier: Gold + - name: http-source