Skip to content

Commit

Permalink
switched to using json encoding for kafka topics
Browse files Browse the repository at this point in the history
  • Loading branch information
BK1031 committed Aug 10, 2024
1 parent 6cc9297 commit 291c7b1
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 38 deletions.
10 changes: 5 additions & 5 deletions kafka/trip-connector.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
KAFKA_TOPIC="ridesharing-sim.trips"
KAFKA_TOPIC="ridesharing-sim-trips"
eval $(cat .env)

URL="https://$SNOWFLAKE_ACCOUNT.snowflakecomputing.com"
NAME="ridesharing-sim.trips"
NAME="ridesharing-sim-trips-snowflake"
DB_NAME="RIDESHARE_INGEST"
TABLE_NAME="TRIPS"

Expand All @@ -22,10 +22,10 @@ curl -i -X PUT -H "Content-Type:application/json" \
"topics":"'$KAFKA_TOPIC'",
"name":"'$NAME'",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://host.docker.internal:18081",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"buffer.count.records":"1000000",
"buffer.flush.time":"10",
"buffer.flush.time":"1",
"buffer.size.bytes":"250000000",
"snowflake.topic2table.map":"'$KAFKA_TOPIC:$TABLE_NAME'"
}'
4 changes: 4 additions & 0 deletions simulator/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ var Faker = faker.New()
var Kafka = struct {
Broker string
SchemaRegistry string
SASLUsername string
SASLPassword string
}{
Broker: os.Getenv("KAFKA_BROKER"),
SchemaRegistry: os.Getenv("KAFKA_SCHEMA_REGISTRY"),
SASLUsername: os.Getenv("KAFKA_SASL_USERNAME"),
SASLPassword: os.Getenv("KAFKA_SASL_PASSWORD"),
}

func Verify() {
Expand Down
2 changes: 2 additions & 0 deletions simulator/data/trips.csv
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
id,driver_id,rider_id,status,request_time,accept_time,pickup_time,dropoff_time,fare,distance,pickup_lat,pickup_long,dropoff_lat,dropoff_long,city
ac04fb13-a814-44de-b195-e298f1ca145f,69285dc2-06eb-431b-9c80-ca8e623f78b2,cbc77039-7f16-4c82-936d-9758c6dc7488,completed,2024-08-10T00:07:15-07:00,2024-08-10T00:07:15-07:00,2024-08-10T00:07:44-07:00,2024-08-10T00:09:43-07:00,0,11542.20,37.78514484,-122.48257583,37.71204539,-122.38937181,San Francisco
5eb041dc-f51c-4112-aad1-f2d4beef8ba1,69285dc2-06eb-431b-9c80-ca8e623f78b2,cbc77039-7f16-4c82-936d-9758c6dc7488,completed,2024-08-10T00:09:55-07:00,2024-08-10T00:09:55-07:00,2024-08-10T00:10:34-07:00,2024-08-10T00:11:32-07:00,0,5523.11,37.73182799,-122.42274003,37.76584840,-122.46851047,San Francisco
5 changes: 3 additions & 2 deletions simulator/exporter/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
var tripSchema = avro.MustParse(`
{
"type": "record",
"name": "Trip",
"name": "trip",
"fields": [
{"name": "id", "type": "string"},
{"name": "driver_id", "type": "string"},
Expand All @@ -36,7 +36,8 @@ var tripSchema = avro.MustParse(`

func CreateAvroSchemas() {
// Create the trip schema
ss, err := SchemaRegistryClient.CreateSchema(context.Background(), "ridesharing-sim.trips-value", sr.Schema{
topic := "ridesharing-sim-trips"
ss, err := SchemaRegistryClient.CreateSchema(context.Background(), fmt.Sprintf("%s-value", topic), sr.Schema{
Schema: tripSchema.String(),
Type: sr.TypeAvro,
})
Expand Down
56 changes: 43 additions & 13 deletions simulator/exporter/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package exporter

import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"simulator/config"
"simulator/models"
"time"

"github.com/goccy/go-json"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sr"
)

Expand All @@ -24,26 +28,46 @@ func InitializeKafkaClient() {
return
}
seeds := []string{config.Kafka.Broker}
cl, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("simulator"),
)
if err != nil {
log.Fatalf("unable to create kafka client: %v", err)

if config.Kafka.SASLUsername != "" && config.Kafka.SASLPassword != "" {
tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}}
opts := []kgo.Opt{
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("simulator"),
kgo.SASL(plain.Auth{
User: config.Kafka.SASLUsername,
Pass: config.Kafka.SASLPassword,
}.AsMechanism()),
kgo.Dialer(tlsDialer.DialContext),
}
cl, err := kgo.NewClient(opts...)
if err != nil {
log.Fatalf("unable to create kafka client: %v", err)
}
KafkaClient = cl
} else {
cl, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("simulator"),
)
if err != nil {
log.Fatalf("unable to create kafka client: %v", err)
}
KafkaClient = cl
}
KafkaClient = cl
log.Printf("kafka client connected successfully!\n")

rcl, err := sr.NewClient(sr.URLs(config.Kafka.SchemaRegistry))
if err != nil {
log.Fatalf("unable to create schema registry client: %v", err)
}
SchemaRegistryClient = rcl

for _, topic := range []string{"ridesharing-sim.trips"} {
for _, topic := range []string{"ridesharing-sim-trips"} {
CreateTopic(topic)
}

CreateAvroSchemas()
// CreateAvroSchemas()
}

func CreateTopic(topic string) {
Expand All @@ -67,12 +91,17 @@ func CreateTopic(topic string) {
}

func KafkaProduceTrip(trip models.Trip) {
jsonTrip, err := json.Marshal(trip)
if err != nil {
log.Fatalf("unable to marshal trip: %v", err)
}
KafkaClient.Produce(
context.Background(),
&kgo.Record{
Key: []byte(trip.ID),
Topic: "ridesharing-sim.trips",
Value: Serde.MustEncode(trip),
Topic: "ridesharing-sim-trips",
// Value: Serde.MustEncode(trip),
Value: jsonTrip,
},
func(r *kgo.Record, err error) {
if err != nil {
Expand All @@ -83,12 +112,13 @@ func KafkaProduceTrip(trip models.Trip) {
}

func KafkaDebugTripConsumer() {
KafkaClient.AddConsumeTopics("ridesharing-sim.trips")
KafkaClient.AddConsumeTopics("ridesharing-sim-trips")
for {
fs := KafkaClient.PollFetches(context.Background())
fs.EachRecord(func(r *kgo.Record) {
var trip models.Trip
err := Serde.Decode(r.Value, &trip)
// err := Serde.Decode(r.Value, &trip)
err := json.Unmarshal(r.Value, &trip)
if err != nil {
fmt.Printf("unable to decode: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion simulator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.5
require (
github.com/gin-contrib/cors v1.7.2
github.com/gin-gonic/gin v1.10.0
github.com/goccy/go-json v0.10.2
github.com/hamba/avro v1.8.0
github.com/jaswdr/faker/v2 v2.3.0
github.com/orcaman/concurrent-map/v2 v2.0.1
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
Expand Down
2 changes: 0 additions & 2 deletions simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ func main() {
}
}()

go exporter.KafkaDebugTripConsumer()

// Dump completed trips to CSV every minute
// Also acts as a way to keep the main thread alive
for {
Expand Down
30 changes: 15 additions & 15 deletions simulator/models/trip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import "time"

// Trip represents a single trip in the ridesharing simulation
type Trip struct {
ID string `avro:"id"`
DriverID string `avro:"driver_id"`
RiderID string `avro:"rider_id"`
ID string `avro:"id" json:"id"`
DriverID string `avro:"driver_id" json:"driver_id"`
RiderID string `avro:"rider_id" json:"rider_id"`
// Status can be "requested", "accepted", "en_route", "completed"
Status string `avro:"status"`
RequestTime time.Time `avro:"request_time"`
AcceptTime time.Time `avro:"accept_time"`
PickupTime time.Time `avro:"pickup_time"`
DropoffTime time.Time `avro:"dropoff_time"`
Fare int `avro:"fare"`
Distance float64 `avro:"distance"`
PickupLat float64 `avro:"pickup_lat"`
PickupLong float64 `avro:"pickup_long"`
DropoffLat float64 `avro:"dropoff_lat"`
DropoffLong float64 `avro:"dropoff_long"`
City string `avro:"city"`
Status string `avro:"status" json:"status"`
RequestTime time.Time `avro:"request_time" json:"request_time"`
AcceptTime time.Time `avro:"accept_time" json:"accept_time"`
PickupTime time.Time `avro:"pickup_time" json:"pickup_time"`
DropoffTime time.Time `avro:"dropoff_time" json:"dropoff_time"`
Fare int `avro:"fare" json:"fare"`
Distance float64 `avro:"distance" json:"distance"`
PickupLat float64 `avro:"pickup_lat" json:"pickup_lat"`
PickupLong float64 `avro:"pickup_long" json:"pickup_long"`
DropoffLat float64 `avro:"dropoff_lat" json:"dropoff_lat"`
DropoffLong float64 `avro:"dropoff_long" json:"dropoff_long"`
City string `avro:"city" json:"city"`
}

0 comments on commit 291c7b1

Please # to comment.