Skip to content

BigQuery Source Connector Code #599

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions python/sources/BigQuery_source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# BigQuery to Kafka Connector

A Python utility that extracts data from Google BigQuery and publishes it to a Kafka topic using Quix Streams.

## Overview

This connector reads data from a BigQuery table and streams it to a Kafka topic, handling various data types and serialization challenges. It's built with Quix Streams for Kafka integration and includes custom serialization to properly handle all BigQuery data types.

## Features

- Connects to BigQuery using service account authentication
- Extracts data using custom SQL queries
- Handles complex data types including:
- Datetime objects
- Decimal values
- Binary data
- NULL/NA values
- Time objects
- Publishes data to Kafka topics with proper serialization
- Error handling for serialization issues

## Prerequisites

- Python 3.7+
- Access to a Google Cloud project with BigQuery
- A Kafka cluster configured with Quix
- Service account credentials with BigQuery access

## Installation

1. Clone this repository
2. Install dependencies:

```bash
pip install quixstreams google-cloud-bigquery pandas python-dotenv
```

## Configuration

The script uses environment variables for configuration:

- `output`: Kafka topic name to publish data to
- `service_account`: JSON string containing the Google Cloud service account credentials

For local development, you can use a `.env` file with these variables.

## Usage

1. Set the environment variables or create a `.env` file
2. Modify the SQL query in the `read_data()` function to match your BigQuery table
3. Run the script:

```bash
python bigquery_to_kafka.py
```

## How It Works

1. The script establishes a connection to BigQuery using the provided service account credentials
2. It executes the SQL query to fetch data from the specified table
3. Each row is converted to a JSON-serializable format with custom handling for special data types
4. The data is serialized and published to the Kafka topic
5. Error handling captures and logs any serialization issues without stopping the entire process

## Custom Data Type Handling

The script includes two custom classes for data serialization:

- `CustomJSONEncoder`: Extends the standard JSON encoder to handle BigQuery-specific data types
- `BigQuerySerializer`: Handles pre-processing of data before serialization

## Error Handling

The script includes comprehensive error handling that:
- Catches exceptions during serialization
- Logs problematic data with detailed information
- Continues processing other rows when errors occur

## Contributing

Feel free to submit issues or pull requests for improvements to the connector.

## License

This project is open source under the Apache 2.0 license.
28 changes: 28 additions & 0 deletions python/sources/BigQuery_source/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=./Enter app name here/

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
48 changes: 48 additions & 0 deletions python/sources/BigQuery_source/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"libraryItemId": "big-query-source",
"name": "BigQuery Source",
"language": "Python",
"tags": {
"Type": ["Connectors"],
"Pipeline Stage": ["Source"],
"Category": ["Data warehouse"]
},
"shortDescription": "Persist data from BigQuery to Quix",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"DefaultFile": "main.py",
"Variables": [
{
"Name": "query",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Topic to store the Query that is Executed in BigQuery",
"DefaultValue": "",
"Required": true
},
{
"Name": "output",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Topic to store the result of BigQuery query",
"DefaultValue": "",
"Required": true
},
{
"Name": "SERVICE_ACCOUNT_JSON",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "JSON string of the service account file for the BigQuery GCP project",
"DefaultValue": "",
"Required": true
}
],
"DeploySettings": {
"DeploymentType": "Job",
"CpuMillicores": 200,
"MemoryInMb": 200,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": true
}
}
204 changes: 204 additions & 0 deletions python/sources/BigQuery_source/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# import the Quix Streams modules for interacting with Kafka:
from quixstreams import Application
from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext

# import additional modules as needed
import os
import json
import time as tm
import base64
import random
import pandas as pd
from decimal import Decimal
from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime, date, timezone, time
from typing import Dict, Any, List, Generator, Optional



# for local dev, load env vars from a .env file
from dotenv import load_dotenv

load_dotenv()


class CustomJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles all BigQuery data types"""

def default(self, obj):
# Handle pandas NA type explicitly
if isinstance(obj, pd._libs.missing.NAType):
return None

# Handle datetime.time objects
if isinstance(obj, datetime.time):
return obj.isoformat()

# Handle datetime objects
if isinstance(obj, datetime):
return obj.isoformat()

# Handle date objects
if isinstance(obj, date):
return obj.isoformat()

# Handle Decimal objects
if isinstance(obj, Decimal):
return str(obj)

# Handle bytes
if isinstance(obj, bytes):
return base64.b64encode(obj).decode('utf-8')

# Handle NaN and NA values
if pd.isna(obj):
return None

# Let the base class handle anything else
return super().default(obj)


class BigQuerySerializer(JSONSerializer):
"""Serializer for BigQuery data that handles all data types"""

def __call__(self, value, ctx):
"""Serialize a value to JSON string"""
if value is None:
return None
return self._dumps(value)

def _dumps(self, value):
# For dictionaries, pre-process to explicitly handle problematic types
if isinstance(value, dict):
processed_dict = {}
for k, v in value.items():
# Handle pandas NA type explicitly
if isinstance(v, pd._libs.missing.NAType):
processed_dict[k] = None
# Handle NaT strings
elif v == 'NaT':
processed_dict[k] = None
else:
processed_dict[k] = v

# Use our custom encoder to handle all other special types
return json.dumps(processed_dict, cls=CustomJSONEncoder)

# Use our custom encoder for any value
return json.dumps(value, cls=CustomJSONEncoder)


app = Application(consumer_group="data_source", auto_create_topics=True)
serializer = BigQuerySerializer()

# define the topic using the "output" environment variable
topic_name = os.environ["output"]
topic = app.topic(topic_name)

# define the topic storing the service account credentials and extract the secret value
service_account_json = os.environ["service_account"]
service_account_info = json.loads(service_account_json, strict=False)


def read_data():
# Connect to BigQuery
credentials = service_account.Credentials.from_service_account_info(
service_account_info,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Query to fetch data from BigQuery with all supported data types
# This query should include all BQ data types you want to support
query = """
Enter your Query here
"""

# Adjusted query to match existing table schema
# Add other columns only if they exist in your table

print("Table Rows loading.")
df = client.query_and_wait(query).to_dataframe()
print("Table Rows loaded.")

row_count = len(df)
stream_id = f"BQ_DATA_{str(random.randint(1, 100)).zfill(3)}"
headers = df.columns.tolist()

print(f"Publishing {row_count} rows.")

for _, row in df.iterrows():
# Convert row to dictionary while properly handling all data types
row_data = {}
for header in headers:
value = row[header]

# Skip columns that don't exist in the query result
if pd.isna(value) and header not in df.columns:
continue

# Handle special data types
if isinstance(value, (datetime, date)):
row_data[header] = value.isoformat()
elif isinstance(value, time): # Handle datetime.time here
row_data[header] = value.strftime("%H:%M:%S") # Convert time to string
elif isinstance(value, bytes):
row_data[header] = base64.b64encode(value).decode('utf-8')
elif isinstance(value, Decimal):
# Store Decimal as string to preserve precision
row_data[header] = str(value)
else:
row_data[header] = value

# add current timestamp
row_data["Timestamp"] = tm.time_ns()
yield stream_id, row_data


def main():
"""
Read data from the BigQuery table and publish it to Kafka
"""

def debug_type(obj):
print(f"Type: {type(obj)}, Value: {obj}, repr: {repr(obj)}")
if hasattr(obj, '__dict__'):
print(f"Object attributes: {obj.__dict__}")
return str(obj)

producer = app.get_producer()

with producer:
for message_key, row_data in read_data():
try:
serialized_value = serializer(
value=row_data,
ctx=SerializationContext(topic=topic.name, field='value')
)

producer.produce(
topic=topic.name,
key=message_key,
value=serialized_value,
)
except Exception as e:
print(f"Error serializing row: {e}")
print(f"Problematic row data: {row_data}")
for k, v in row_data.items():
print(f"Key: {k}, Value type: {type(v)}")
if pd.isna(v):
print(f" This is a pandas NA value: {repr(v)}")
# Continue with next row instead of stopping completely
continue

print("All rows published")


if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Exiting.")
except Exception as e:
print(f"Error: {e}")
5 changes: 5 additions & 0 deletions python/sources/BigQuery_source/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
quixstreams==3.2.1
python-dotenv
google-auth
google-cloud-bigquery
google-cloud-bigquery[pandas]