From 291c7b1c65c1ed258449b45c5ef2eeaeda7ec15d Mon Sep 17 00:00:00 2001 From: Bharat Kathi Date: Sat, 10 Aug 2024 00:12:21 -0700 Subject: [PATCH] switched to using json encoding for kafka topics --- kafka/trip-connector.sh | 10 +++---- simulator/config/config.go | 4 +++ simulator/data/trips.csv | 2 ++ simulator/exporter/avro.go | 5 ++-- simulator/exporter/kafka.go | 56 ++++++++++++++++++++++++++++--------- simulator/go.mod | 2 +- simulator/main.go | 2 -- simulator/models/trip.go | 30 ++++++++++---------- 8 files changed, 73 insertions(+), 38 deletions(-) diff --git a/kafka/trip-connector.sh b/kafka/trip-connector.sh index 3bc2343..f459f41 100755 --- a/kafka/trip-connector.sh +++ b/kafka/trip-connector.sh @@ -1,8 +1,8 @@ -KAFKA_TOPIC="ridesharing-sim.trips" +KAFKA_TOPIC="ridesharing-sim-trips" eval $(cat .env) URL="https://$SNOWFLAKE_ACCOUNT.snowflakecomputing.com" -NAME="ridesharing-sim.trips" +NAME="ridesharing-sim-trips-snowflake" DB_NAME="RIDESHARE_INGEST" TABLE_NAME="TRIPS" @@ -22,10 +22,10 @@ curl -i -X PUT -H "Content-Type:application/json" \ "topics":"'$KAFKA_TOPIC'", "name":"'$NAME'", "key.converter":"org.apache.kafka.connect.storage.StringConverter", - "value.converter":"io.confluent.connect.avro.AvroConverter", - "value.converter.schema.registry.url": "http://host.docker.internal:18081", + "value.converter":"org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable":"false", "buffer.count.records":"1000000", - "buffer.flush.time":"10", + "buffer.flush.time":"1", "buffer.size.bytes":"250000000", "snowflake.topic2table.map":"'$KAFKA_TOPIC:$TABLE_NAME'" }' \ No newline at end of file diff --git a/simulator/config/config.go b/simulator/config/config.go index 590f1cd..9d59d9a 100644 --- a/simulator/config/config.go +++ b/simulator/config/config.go @@ -24,9 +24,13 @@ var Faker = faker.New() var Kafka = struct { Broker string SchemaRegistry string + SASLUsername string + SASLPassword string }{ Broker: os.Getenv("KAFKA_BROKER"), SchemaRegistry: os.Getenv("KAFKA_SCHEMA_REGISTRY"), + SASLUsername: os.Getenv("KAFKA_SASL_USERNAME"), + SASLPassword: os.Getenv("KAFKA_SASL_PASSWORD"), } func Verify() { diff --git a/simulator/data/trips.csv b/simulator/data/trips.csv index 48d23c1..0d0741c 100644 --- a/simulator/data/trips.csv +++ b/simulator/data/trips.csv @@ -1 +1,3 @@ id,driver_id,rider_id,status,request_time,accept_time,pickup_time,dropoff_time,fare,distance,pickup_lat,pickup_long,dropoff_lat,dropoff_long,city +ac04fb13-a814-44de-b195-e298f1ca145f,69285dc2-06eb-431b-9c80-ca8e623f78b2,cbc77039-7f16-4c82-936d-9758c6dc7488,completed,2024-08-10T00:07:15-07:00,2024-08-10T00:07:15-07:00,2024-08-10T00:07:44-07:00,2024-08-10T00:09:43-07:00,0,11542.20,37.78514484,-122.48257583,37.71204539,-122.38937181,San Francisco +5eb041dc-f51c-4112-aad1-f2d4beef8ba1,69285dc2-06eb-431b-9c80-ca8e623f78b2,cbc77039-7f16-4c82-936d-9758c6dc7488,completed,2024-08-10T00:09:55-07:00,2024-08-10T00:09:55-07:00,2024-08-10T00:10:34-07:00,2024-08-10T00:11:32-07:00,0,5523.11,37.73182799,-122.42274003,37.76584840,-122.46851047,San Francisco diff --git a/simulator/exporter/avro.go b/simulator/exporter/avro.go index eacb333..10be786 100644 --- a/simulator/exporter/avro.go +++ b/simulator/exporter/avro.go @@ -13,7 +13,7 @@ import ( var tripSchema = avro.MustParse(` { "type": "record", - "name": "Trip", + "name": "trip", "fields": [ {"name": "id", "type": "string"}, {"name": "driver_id", "type": "string"}, @@ -36,7 +36,8 @@ var tripSchema = avro.MustParse(` func CreateAvroSchemas() { // Create the trip schema - ss, err := SchemaRegistryClient.CreateSchema(context.Background(), "ridesharing-sim.trips-value", sr.Schema{ + topic := "ridesharing-sim-trips" + ss, err := SchemaRegistryClient.CreateSchema(context.Background(), fmt.Sprintf("%s-value", topic), sr.Schema{ Schema: tripSchema.String(), Type: sr.TypeAvro, }) diff --git a/simulator/exporter/kafka.go b/simulator/exporter/kafka.go index 153d921..2e5676d 100644 --- a/simulator/exporter/kafka.go +++ b/simulator/exporter/kafka.go @@ -2,15 +2,19 @@ package exporter import ( "context" + "crypto/tls" "fmt" "log" + "net" "simulator/config" "simulator/models" "time" + "github.com/goccy/go-json" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sr" ) @@ -24,14 +28,34 @@ func InitializeKafkaClient() { return } seeds := []string{config.Kafka.Broker} - cl, err := kgo.NewClient( - kgo.SeedBrokers(seeds...), - kgo.ConsumerGroup("simulator"), - ) - if err != nil { - log.Fatalf("unable to create kafka client: %v", err) + + if config.Kafka.SASLUsername != "" && config.Kafka.SASLPassword != "" { + tlsDialer := &tls.Dialer{NetDialer: &net.Dialer{Timeout: 10 * time.Second}} + opts := []kgo.Opt{ + kgo.SeedBrokers(seeds...), + kgo.ConsumerGroup("simulator"), + kgo.SASL(plain.Auth{ + User: config.Kafka.SASLUsername, + Pass: config.Kafka.SASLPassword, + }.AsMechanism()), + kgo.Dialer(tlsDialer.DialContext), + } + cl, err := kgo.NewClient(opts...) + if err != nil { + log.Fatalf("unable to create kafka client: %v", err) + } + KafkaClient = cl + } else { + cl, err := kgo.NewClient( + kgo.SeedBrokers(seeds...), + kgo.ConsumerGroup("simulator"), + ) + if err != nil { + log.Fatalf("unable to create kafka client: %v", err) + } + KafkaClient = cl } - KafkaClient = cl + log.Printf("kafka client connected successfully!\n") rcl, err := sr.NewClient(sr.URLs(config.Kafka.SchemaRegistry)) if err != nil { @@ -39,11 +63,11 @@ func InitializeKafkaClient() { } SchemaRegistryClient = rcl - for _, topic := range []string{"ridesharing-sim.trips"} { + for _, topic := range []string{"ridesharing-sim-trips"} { CreateTopic(topic) } - CreateAvroSchemas() + // CreateAvroSchemas() } func CreateTopic(topic string) { @@ -67,12 +91,17 @@ func CreateTopic(topic string) { } func KafkaProduceTrip(trip models.Trip) { + jsonTrip, err := json.Marshal(trip) + if err != nil { + log.Fatalf("unable to marshal trip: %v", err) + } KafkaClient.Produce( context.Background(), &kgo.Record{ Key: []byte(trip.ID), - Topic: "ridesharing-sim.trips", - Value: Serde.MustEncode(trip), + Topic: "ridesharing-sim-trips", + // Value: Serde.MustEncode(trip), + Value: jsonTrip, }, func(r *kgo.Record, err error) { if err != nil { @@ -83,12 +112,13 @@ func KafkaProduceTrip(trip models.Trip) { } func KafkaDebugTripConsumer() { - KafkaClient.AddConsumeTopics("ridesharing-sim.trips") + KafkaClient.AddConsumeTopics("ridesharing-sim-trips") for { fs := KafkaClient.PollFetches(context.Background()) fs.EachRecord(func(r *kgo.Record) { var trip models.Trip - err := Serde.Decode(r.Value, &trip) + // err := Serde.Decode(r.Value, &trip) + err := json.Unmarshal(r.Value, &trip) if err != nil { fmt.Printf("unable to decode: %v", err) } diff --git a/simulator/go.mod b/simulator/go.mod index 3efe07b..148a8f5 100644 --- a/simulator/go.mod +++ b/simulator/go.mod @@ -5,6 +5,7 @@ go 1.22.5 require ( github.com/gin-contrib/cors v1.7.2 github.com/gin-gonic/gin v1.10.0 + github.com/goccy/go-json v0.10.2 github.com/hamba/avro v1.8.0 github.com/jaswdr/faker/v2 v2.3.0 github.com/orcaman/concurrent-map/v2 v2.0.1 @@ -50,7 +51,6 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect - github.com/goccy/go-json v0.10.2 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/golang-jwt/jwt/v5 v5.2.1 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect diff --git a/simulator/main.go b/simulator/main.go index d39a51b..af63283 100644 --- a/simulator/main.go +++ b/simulator/main.go @@ -61,8 +61,6 @@ func main() { } }() - go exporter.KafkaDebugTripConsumer() - // Dump completed trips to CSV every minute // Also acts as a way to keep the main thread alive for { diff --git a/simulator/models/trip.go b/simulator/models/trip.go index af4b1ff..63645c0 100644 --- a/simulator/models/trip.go +++ b/simulator/models/trip.go @@ -4,20 +4,20 @@ import "time" // Trip represents a single trip in the ridesharing simulation type Trip struct { - ID string `avro:"id"` - DriverID string `avro:"driver_id"` - RiderID string `avro:"rider_id"` + ID string `avro:"id" json:"id"` + DriverID string `avro:"driver_id" json:"driver_id"` + RiderID string `avro:"rider_id" json:"rider_id"` // Status can be "requested", "accepted", "en_route", "completed" - Status string `avro:"status"` - RequestTime time.Time `avro:"request_time"` - AcceptTime time.Time `avro:"accept_time"` - PickupTime time.Time `avro:"pickup_time"` - DropoffTime time.Time `avro:"dropoff_time"` - Fare int `avro:"fare"` - Distance float64 `avro:"distance"` - PickupLat float64 `avro:"pickup_lat"` - PickupLong float64 `avro:"pickup_long"` - DropoffLat float64 `avro:"dropoff_lat"` - DropoffLong float64 `avro:"dropoff_long"` - City string `avro:"city"` + Status string `avro:"status" json:"status"` + RequestTime time.Time `avro:"request_time" json:"request_time"` + AcceptTime time.Time `avro:"accept_time" json:"accept_time"` + PickupTime time.Time `avro:"pickup_time" json:"pickup_time"` + DropoffTime time.Time `avro:"dropoff_time" json:"dropoff_time"` + Fare int `avro:"fare" json:"fare"` + Distance float64 `avro:"distance" json:"distance"` + PickupLat float64 `avro:"pickup_lat" json:"pickup_lat"` + PickupLong float64 `avro:"pickup_long" json:"pickup_long"` + DropoffLat float64 `avro:"dropoff_lat" json:"dropoff_lat"` + DropoffLong float64 `avro:"dropoff_long" json:"dropoff_long"` + City string `avro:"city" json:"city"` }