From 8d77fab24275a7eb919b88fccae6204926a2ad3c Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 5 Jul 2024 09:43:48 +0530 Subject: [PATCH 1/2] Add `randomize_initial` for randomizing the initial connection in source pool. --- config.sample.toml | 6 ++++++ init.go | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/config.sample.toml b/config.sample.toml index 7e191c4..4031379 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -42,6 +42,12 @@ backoff_max = "10s" # whether it's healthy or not. request_timeout = "100ms" +# Pick a random server from the [[sources]] list to connect first on boot +# instead of the first one from the list. This can be useful for testing +# servers in production environments that may never be consumed from except +# during rare failover events. +randomize_initial = false + [[sources]] name = "node1" diff --git a/init.go b/init.go index 3e94c39..a2d59dc 100644 --- a/init.go +++ b/init.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "log/slog" + "math/rand" "net/http" "os" "plugin" @@ -184,6 +185,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC log.Fatalf("error unmarshalling `sources` config: %v", err) } + log.Printf("read config for %d servers in the source pool", len(src.Sources)) + if ko.Bool("source_pool.randomize_initial") { + log.Println("randomizing source pool for initial connection") + r := rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(src.Sources), func(i, j int) { + src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i] + }) + } + // Read target Kafka config. var prod relay.ProducerCfg if err := ko.Unmarshal("target", &prod); err != nil { From 0febbc42b3161c19caca5728506e57d780dba23f Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 5 Jul 2024 10:43:18 +0530 Subject: [PATCH 2/2] Remove multiple servers in `single` relay mode. --- init.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/init.go b/init.go index a2d59dc..7bd7d4b 100644 --- a/init.go +++ b/init.go @@ -194,6 +194,12 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC }) } + // If it's single mode, eliminate all servers in the pool except one + // to disable healthcehcks and failover. + if ko.String("mode") == relay.ModeSingle { + src.Sources = src.Sources[:1] + } + // Read target Kafka config. var prod relay.ProducerCfg if err := ko.Unmarshal("target", &prod); err != nil {