Skip to content

Commit

Permalink
fix: config file and parsing bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshay Kalbhor committed Apr 30, 2024
1 parent 6c79c2c commit 2c16961
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
48 changes: 28 additions & 20 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Server struct {
// based on a threshold. If a server is unhealthy, the weight is marked as -1.
Weight int64

Healthy bool

// This is only set when a new live Kafka consumer connection is established
// on demand via Get(), where a server{} is returned. Internally, no connections
// are maintained on SourcePool.[]servers and only the config, weight etc.
Expand Down Expand Up @@ -72,7 +74,6 @@ type SourcePool struct {

const (
unhealthyWeight int64 = -1
unhealthyID int = -1
)

var (
Expand All @@ -81,14 +82,15 @@ var (

// NewSourcePool returns a controller instance that manages the lifecycle of a pool of N source (consumer)
// servers. The pool always attempts to find one healthy node for the relay to consume from.
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[string]map[int32]kgo.Offset, topics map[string]Topic, log *slog.Logger) (*SourcePool, error) {
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics map[string]Topic, log *slog.Logger) (*SourcePool, error) {
servers := make([]Server, 0, len(serverCfgs))
// Initially mark all nodes as unhealthy
for n, c := range serverCfgs {
servers = append(servers, Server{
ID: n,
Weight: unhealthyWeight,
Config: c,
ID: n,
Weight: unhealthyWeight,
Healthy: false,
Config: c,
})
}

Expand All @@ -97,6 +99,15 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[stri
cfg.Topics = append(cfg.Topics, t)
}

return &SourcePool{
cfg: cfg,
servers: servers,
log: log,
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
}, nil
}

func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
// Assign the current weight as initial target offset.
// This is done to resume if target already has messages published from src.
var w int64
Expand All @@ -106,20 +117,14 @@ func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, of map[stri
}
}

return &SourcePool{
cfg: cfg,
servers: servers,
offsets: of,

// Set the current candidate with initial weight and a placeholder ID. A real
// healthy node should replace this via background checks
curCandidate: Server{
ID: unhealthyID,
Weight: w,
},
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
log: log,
}, nil
sp.offsets = of
// Set the current candidate with initial weight and a placeholder ID. This initial
// weight ensures we resume consuming from where last left off. A real
// healthy node should replace this via background checks
sp.curCandidate = Server{
Healthy: false,
Weight: w,
}
}

// Get attempts return a healthy source Kafka client connection.
Expand Down Expand Up @@ -467,7 +472,7 @@ func (sp *SourcePool) getCurCandidate() (Server, error) {

// If the weight (sum of all high watermarks of all topics on the source) is -1,
// the server is unhealthy.
if sp.curCandidate.Weight == unhealthyWeight || sp.curCandidate.ID == unhealthyID {
if sp.curCandidate.Weight == unhealthyWeight || !sp.curCandidate.Healthy {
return sp.curCandidate, ErrorNoHealthy
}

Expand All @@ -486,6 +491,9 @@ func (sp *SourcePool) setWeight(id int, weight int64) {
}

s.Weight = weight
if s.Weight != unhealthyWeight {
s.Healthy = true
}

// If the incoming server's weight is greater than the current candidate,
// promote that to the current candidate.
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ func main() {
}

// Initialize the source Kafka (consumer) relay.
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, hOf.KOffsets(), topics, lo)
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo)
if err != nil {
log.Fatalf("error initializing source pool controller: %v", err)
}

srcPool.SetInitialOffsets(hOf.KOffsets())

// Initialize the Relay which orchestrates consumption from the sourcePool
// and writing to the target pool.
relay, err := relay.NewRelay(initRelayConfig(ko), srcPool, target, topics, filters, lo)
Expand Down

0 comments on commit 2c16961

Please # to comment.