diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index f2fcb81..c11459d 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -38,6 +38,8 @@ type Server struct { // based on a threshold. If a server is unhealthy, the weight is marked as -1. Weight int64 + Healthy bool + // This is only set when a new live Kafka consumer connection is established // on demand via Get(), where a server{} is returned. Internally, no connections // are maintained on SourcePool.[]servers and only the config, weight etc. @@ -72,7 +74,6 @@ type SourcePool struct { const ( unhealthyWeight int64 = -1 - unhealthyID int = -1 ) var ( @@ -81,14 +82,15 @@ var ( // NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer) // servers. The pool always attempts to find one healthy node for the relay to consume from. -func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[string]map[int32]kgo.Offset, topics map[string]Topic, log *slog.Logger) (*SourcePool, error) { +func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics map[string]Topic, log *slog.Logger) (*SourcePool, error) { servers := make([]Server, 0, len(serverCfgs)) // Initially mark all nodes as unhealthy for n, c := range serverCfgs { servers = append(servers, Server{ - ID: n, - Weight: unhealthyWeight, - Config: c, + ID: n, + Weight: unhealthyWeight, + Healthy: false, + Config: c, }) } @@ -97,6 +99,15 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[stri cfg.Topics = append(cfg.Topics, t) } + return &SourcePool{ + cfg: cfg, + servers: servers, + log: log, + backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax), + }, nil +} + +func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) { // Assign the current weight as initial target offset. // This is done to resume if target already has messages published from src. var w int64 @@ -106,20 +117,14 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[stri } } - return &SourcePool{ - cfg: cfg, - servers: servers, - offsets: of, - - // Set the current candidate with initial weight and a placeholder ID. A real - // healthy node should replace this via background checks - curCandidate: Server{ - ID: unhealthyID, - Weight: w, - }, - backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax), - log: log, - }, nil + sp.offsets = of + // Set the current candidate with initial weight and a placeholder ID. This initial + // weight ensures we resume consuming from where last left off. A real + // healthy node should replace this via background checks + sp.curCandidate = Server{ + Healthy: false, + Weight: w, + } } // Get attempts return a healthy source Kafka client connection. @@ -467,7 +472,7 @@ func (sp *SourcePool) getCurCandidate() (Server, error) { // If the weight (sum of all high watermarks of all topics on the source) is -1, // the server is unhealthy. - if sp.curCandidate.Weight == unhealthyWeight || sp.curCandidate.ID == unhealthyID { + if sp.curCandidate.Weight == unhealthyWeight || !sp.curCandidate.Healthy { return sp.curCandidate, ErrorNoHealthy } @@ -486,6 +491,9 @@ func (sp *SourcePool) setWeight(id int, weight int64) { } s.Weight = weight + if s.Weight != unhealthyWeight { + s.Healthy = true + } // If the incoming server's weight is greater than the current candidate, // promote that to the current candidate. diff --git a/main.go b/main.go index 1cbce3e..cb875d3 100644 --- a/main.go +++ b/main.go @@ -64,11 +64,13 @@ func main() { } // Initialize the source Kafka (consumer) relay. - srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, hOf.KOffsets(), topics, lo) + srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo) if err != nil { log.Fatalf("error initializing source pool controller: %v", err) } + srcPool.SetInitialOffsets(hOf.KOffsets()) + // Initialize the Relay which orchestrates consumption from the sourcePool // and writing to the target pool. relay, err := relay.NewRelay(initRelayConfig(ko), srcPool, target, topics, filters, lo)