From b508d8523c142e57c2fd8d976cd0962ace82162c Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 5 Jul 2024 09:43:48 +0530 Subject: [PATCH] Add `randomize_initial` for randomizing the initial connection in source pool. --- config.sample.toml | 6 ++++++ init.go | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/config.sample.toml b/config.sample.toml index 7e191c4..d3bbd3b 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -42,6 +42,12 @@ backoff_max = "10s" # whether it's healthy or not. request_timeout = "100ms" +# Pick a random server from the [[sources]] list to connect first on boot +# instead of the first one from the list. This can be useful for testing +# servers in production environments that may never be consumed from except +# during rare failover events. +randomize_initial = true + [[sources]] name = "node1" diff --git a/init.go b/init.go index 3e94c39..6c15920 100644 --- a/init.go +++ b/init.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "log/slog" + "math/rand" "net/http" "os" "plugin" @@ -184,6 +185,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC log.Fatalf("error unmarshalling `sources` config: %v", err) } + log.Printf("read config for %d servers in the source pool", len(src.Sources)) + if ko.Bool("source_pool.randomize_initial") { + log.Println("randomizing source pool for initial connection") + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(src.Sources), func(i, j int) { + src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i] + }) + } + // Read target Kafka config. var prod relay.ProducerCfg if err := ko.Unmarshal("target", &prod); err != nil {