diff --git a/python/sources/BigQuery_source/README.md b/python/sources/BigQuery_source/README.md new file mode 100644 index 00000000..b5483d29 --- /dev/null +++ b/python/sources/BigQuery_source/README.md @@ -0,0 +1,85 @@ +# BigQuery to Kafka Connector + +A Python utility that extracts data from Google BigQuery and publishes it to a Kafka topic using Quix Streams. + +## Overview + +This connector reads data from a BigQuery table and streams it to a Kafka topic, handling various data types and serialization challenges. It's built with Quix Streams for Kafka integration and includes custom serialization to properly handle all BigQuery data types. + +## Features + +- Connects to BigQuery using service account authentication +- Extracts data using custom SQL queries +- Handles complex data types including: + - Datetime objects + - Decimal values + - Binary data + - NULL/NA values + - Time objects +- Publishes data to Kafka topics with proper serialization +- Error handling for serialization issues + +## Prerequisites + +- Python 3.7+ +- Access to a Google Cloud project with BigQuery +- A Kafka cluster configured with Quix +- Service account credentials with BigQuery access + +## Installation + +1. Clone this repository +2. Install dependencies: + +```bash +pip install quixstreams google-cloud-bigquery pandas python-dotenv +``` + +## Configuration + +The script uses environment variables for configuration: + +- `output`: Kafka topic name to publish data to +- `service_account`: JSON string containing the Google Cloud service account credentials + +For local development, you can use a `.env` file with these variables. + +## Usage + +1. Set the environment variables or create a `.env` file +2. Modify the SQL query in the `read_data()` function to match your BigQuery table +3. Run the script: + +```bash +python bigquery_to_kafka.py +``` + +## How It Works + +1. The script establishes a connection to BigQuery using the provided service account credentials +2. It executes the SQL query to fetch data from the specified table +3. Each row is converted to a JSON-serializable format with custom handling for special data types +4. The data is serialized and published to the Kafka topic +5. Error handling captures and logs any serialization issues without stopping the entire process + +## Custom Data Type Handling + +The script includes two custom classes for data serialization: + +- `CustomJSONEncoder`: Extends the standard JSON encoder to handle BigQuery-specific data types +- `BigQuerySerializer`: Handles pre-processing of data before serialization + +## Error Handling + +The script includes comprehensive error handling that: +- Catches exceptions during serialization +- Logs problematic data with detailed information +- Continues processing other rows when errors occur + +## Contributing + +Feel free to submit issues or pull requests for improvements to the connector. + +## License + +This project is open source under the Apache 2.0 license. diff --git a/python/sources/BigQuery_source/dockerfile b/python/sources/BigQuery_source/dockerfile new file mode 100644 index 00000000..47bab8df --- /dev/null +++ b/python/sources/BigQuery_source/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=./Enter app name here/ + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] \ No newline at end of file diff --git a/python/sources/BigQuery_source/library.json b/python/sources/BigQuery_source/library.json new file mode 100644 index 00000000..88bc1913 --- /dev/null +++ b/python/sources/BigQuery_source/library.json @@ -0,0 +1,48 @@ +{ + "libraryItemId": "big-query-source", + "name": "BigQuery Source", + "language": "Python", + "tags": { + "Type": ["Connectors"], + "Pipeline Stage": ["Source"], + "Category": ["Data warehouse"] + }, + "shortDescription": "Persist data from BigQuery to Quix", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "DefaultFile": "main.py", + "Variables": [ + { + "Name": "query", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Topic to store the Query that is Executed in BigQuery", + "DefaultValue": "", + "Required": true + }, + { + "Name": "output", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Topic to store the result of BigQuery query", + "DefaultValue": "", + "Required": true + }, + { + "Name": "SERVICE_ACCOUNT_JSON", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "JSON string of the service account file for the BigQuery GCP project", + "DefaultValue": "", + "Required": true + } + ], + "DeploySettings": { + "DeploymentType": "Job", + "CpuMillicores": 200, + "MemoryInMb": 200, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} diff --git a/python/sources/BigQuery_source/main.py b/python/sources/BigQuery_source/main.py new file mode 100644 index 00000000..c4206209 --- /dev/null +++ b/python/sources/BigQuery_source/main.py @@ -0,0 +1,204 @@ +# import the Quix Streams modules for interacting with Kafka: +from quixstreams import Application +from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext + +# import additional modules as needed +import os +import json +import time as tm +import base64 +import random +import pandas as pd +from decimal import Decimal +from google.cloud import bigquery +from google.oauth2 import service_account +from datetime import datetime, date, timezone, time +from typing import Dict, Any, List, Generator, Optional + + + +# for local dev, load env vars from a .env file +from dotenv import load_dotenv + +load_dotenv() + + +class CustomJSONEncoder(json.JSONEncoder): + """Custom JSON encoder that handles all BigQuery data types""" + + def default(self, obj): + # Handle pandas NA type explicitly + if isinstance(obj, pd._libs.missing.NAType): + return None + + # Handle datetime.time objects + if isinstance(obj, datetime.time): + return obj.isoformat() + + # Handle datetime objects + if isinstance(obj, datetime): + return obj.isoformat() + + # Handle date objects + if isinstance(obj, date): + return obj.isoformat() + + # Handle Decimal objects + if isinstance(obj, Decimal): + return str(obj) + + # Handle bytes + if isinstance(obj, bytes): + return base64.b64encode(obj).decode('utf-8') + + # Handle NaN and NA values + if pd.isna(obj): + return None + + # Let the base class handle anything else + return super().default(obj) + + +class BigQuerySerializer(JSONSerializer): + """Serializer for BigQuery data that handles all data types""" + + def __call__(self, value, ctx): + """Serialize a value to JSON string""" + if value is None: + return None + return self._dumps(value) + + def _dumps(self, value): + # For dictionaries, pre-process to explicitly handle problematic types + if isinstance(value, dict): + processed_dict = {} + for k, v in value.items(): + # Handle pandas NA type explicitly + if isinstance(v, pd._libs.missing.NAType): + processed_dict[k] = None + # Handle NaT strings + elif v == 'NaT': + processed_dict[k] = None + else: + processed_dict[k] = v + + # Use our custom encoder to handle all other special types + return json.dumps(processed_dict, cls=CustomJSONEncoder) + + # Use our custom encoder for any value + return json.dumps(value, cls=CustomJSONEncoder) + + +app = Application(consumer_group="data_source", auto_create_topics=True) +serializer = BigQuerySerializer() + +# define the topic using the "output" environment variable +topic_name = os.environ["output"] +topic = app.topic(topic_name) + +# define the topic storing the service account credentials and extract the secret value +service_account_json = os.environ["service_account"] +service_account_info = json.loads(service_account_json, strict=False) + + +def read_data(): + # Connect to BigQuery + credentials = service_account.Credentials.from_service_account_info( + service_account_info, + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + client = bigquery.Client(credentials=credentials, project=credentials.project_id) + + # Query to fetch data from BigQuery with all supported data types + # This query should include all BQ data types you want to support + query = """ + Enter your Query here + """ + + # Adjusted query to match existing table schema + # Add other columns only if they exist in your table + + print("Table Rows loading.") + df = client.query_and_wait(query).to_dataframe() + print("Table Rows loaded.") + + row_count = len(df) + stream_id = f"BQ_DATA_{str(random.randint(1, 100)).zfill(3)}" + headers = df.columns.tolist() + + print(f"Publishing {row_count} rows.") + + for _, row in df.iterrows(): + # Convert row to dictionary while properly handling all data types + row_data = {} + for header in headers: + value = row[header] + + # Skip columns that don't exist in the query result + if pd.isna(value) and header not in df.columns: + continue + + # Handle special data types + if isinstance(value, (datetime, date)): + row_data[header] = value.isoformat() + elif isinstance(value, time): # Handle datetime.time here + row_data[header] = value.strftime("%H:%M:%S") # Convert time to string + elif isinstance(value, bytes): + row_data[header] = base64.b64encode(value).decode('utf-8') + elif isinstance(value, Decimal): + # Store Decimal as string to preserve precision + row_data[header] = str(value) + else: + row_data[header] = value + + # add current timestamp + row_data["Timestamp"] = tm.time_ns() + yield stream_id, row_data + + +def main(): + """ + Read data from the BigQuery table and publish it to Kafka + """ + + def debug_type(obj): + print(f"Type: {type(obj)}, Value: {obj}, repr: {repr(obj)}") + if hasattr(obj, '__dict__'): + print(f"Object attributes: {obj.__dict__}") + return str(obj) + + producer = app.get_producer() + + with producer: + for message_key, row_data in read_data(): + try: + serialized_value = serializer( + value=row_data, + ctx=SerializationContext(topic=topic.name, field='value') + ) + + producer.produce( + topic=topic.name, + key=message_key, + value=serialized_value, + ) + except Exception as e: + print(f"Error serializing row: {e}") + print(f"Problematic row data: {row_data}") + for k, v in row_data.items(): + print(f"Key: {k}, Value type: {type(v)}") + if pd.isna(v): + print(f" This is a pandas NA value: {repr(v)}") + # Continue with next row instead of stopping completely + continue + + print("All rows published") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("Exiting.") + except Exception as e: + print(f"Error: {e}") \ No newline at end of file diff --git a/python/sources/BigQuery_source/requirements.txt b/python/sources/BigQuery_source/requirements.txt new file mode 100644 index 00000000..28d4025c --- /dev/null +++ b/python/sources/BigQuery_source/requirements.txt @@ -0,0 +1,5 @@ +quixstreams==3.2.1 +python-dotenv +google-auth +google-cloud-bigquery +google-cloud-bigquery[pandas] \ No newline at end of file