Skip to content

Commit

Permalink
Add randomize_initial for randomizing the initial connection in sou…
Browse files Browse the repository at this point in the history
…rce pool.
  • Loading branch information
knadh committed Jul 5, 2024
1 parent 43e191d commit b508d85
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ backoff_max = "10s"
# whether it's healthy or not.
request_timeout = "100ms"

# Pick a random server from the [[sources]] list to connect first on boot
# instead of the first one from the list. This can be useful for testing
# servers in production environments that may never be consumed from except
# during rare failover events.
randomize_initial = true


[[sources]]
name = "node1"
Expand Down
10 changes: 10 additions & 0 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"log/slog"
"math/rand"
"net/http"
"os"
"plugin"
Expand Down Expand Up @@ -184,6 +185,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC
log.Fatalf("error unmarshalling `sources` config: %v", err)
}

log.Printf("read config for %d servers in the source pool", len(src.Sources))
if ko.Bool("source_pool.randomize_initial") {
log.Println("randomizing source pool for initial connection")
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(src.Sources), func(i, j int) {
src.Sources[i], src.Sources[j] = src.Sources[j], src.Sources[i]
})
}

// Read target Kafka config.
var prod relay.ProducerCfg
if err := ko.Unmarshal("target", &prod); err != nil {
Expand Down

0 comments on commit b508d85

Please # to comment.