Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: config file and parsing bugfixes #23

Merged
merged 3 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ go.work
config.toml
/dist
kaf-relay.bin
kaf-relay
2 changes: 1 addition & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func initSourcePoolConfig(ko *koanf.Koanf) relay.SourcePoolCfg {
ReqTimeout: ko.MustDuration("source_pool.request_timeout"),
LagThreshold: ko.MustInt64("source_pool.offset_lag_threshold"),
MaxRetries: ko.MustInt("source_pool.max_retries"),
EnableBackoff: ko.Bool("source_pool._backoff_enable"),
EnableBackoff: ko.Bool("source_pool.backoff_enable"),
BackoffMin: ko.MustDuration("source_pool.backoff_min"),
BackoffMax: ko.MustDuration("source_pool.backoff_max"),
GroupID: ko.MustString("source_pool.group_id"),
Expand Down
2 changes: 0 additions & 2 deletions internal/relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type KafkaCfg struct {
// ConsumerGroupCfg is the consumer group specific config.
type ConsumerGroupCfg struct {
KafkaCfg `koanf:",squash"`

Topics []string
}

// ProducerCfg is the Kafka producer config.
Expand Down
1 change: 1 addition & 0 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ loop:
continue
}

re.log.Info("poll loop got new healthy node", "node_id", s.ID, "brokers", s.Config.BootstrapBrokers)
server = s
break
}
Expand Down
124 changes: 74 additions & 50 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type SourcePoolCfg struct {

GroupID string
InstanceID string
Topics []string
}

// Server represents a source Server's config with health and weight
Expand All @@ -37,6 +38,8 @@ type Server struct {
// based on a threshold. If a server is unhealthy, the weight is marked as -1.
Weight int64

Healthy bool

// This is only set when a new live Kafka consumer connection is established
// on demand via Get(), where a server{} is returned. Internally, no connections
// are maintained on SourcePool.[]servers and only the config, weight etc.
Expand All @@ -50,7 +53,6 @@ type SourcePool struct {
client *kgo.Client
log *slog.Logger

topics []string
offsets map[string]map[int32]kgo.Offset

// List of all source servers.
Expand Down Expand Up @@ -82,77 +84,98 @@ var (
// servers. The pool always attempts to find one healthy node for the relay to consume from.
func NewSourcePool(cfg SourcePoolCfg, serverCfgs []ConsumerGroupCfg, topics map[string]Topic, log *slog.Logger) (*SourcePool, error) {
servers := make([]Server, 0, len(serverCfgs))
// Initially mark all nodes as unhealthy
for n, c := range serverCfgs {
servers = append(servers, Server{
ID: n,
Weight: -1,
Config: c,
ID: n,
Weight: unhealthyWeight,
Healthy: false,
Config: c,
})
}

topicNames := make([]string, 0, len(topics))
for _, t := range topicNames {
topicNames = append(topicNames, t)
cfg.Topics = make([]string, 0, len(topics))
for t := range topics {
cfg.Topics = append(cfg.Topics, t)
}

return &SourcePool{
cfg: cfg,
servers: servers,
topics: topicNames,

// Set the "healthiest" node as the first node. This will fail as the program
// boots until the health tracker updates weights to indicate a real healthy
// node at which point the pool can return a real healthy node.
curCandidate: servers[0],
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
log: log,
cfg: cfg,
servers: servers,
log: log,
backoffFn: getBackoffFn(cfg.EnableBackoff, cfg.BackoffMin, cfg.BackoffMax),
}, nil
}

func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
// Assign the current weight as initial target offset.
// This is done to resume if target already has messages published from src.
var w int64
for _, p := range of {
for _, o := range p {
w += o.EpochOffset().Offset
}
}

sp.offsets = of
// Set the current candidate with initial weight and a placeholder ID. This initial
// weight ensures we resume consuming from where last left off. A real
// healthy node should replace this via background checks
sp.curCandidate = Server{
Healthy: false,
Weight: w,
}
}

// Get attempts return a healthy source Kafka client connection.
// It internally applies backoff/retries between connection attempts and thus can take
// indefinitely long to return based on the config.
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
retries := 0
for {
if sp.cfg.MaxRetries != IndefiniteRetry && retries >= sp.cfg.MaxRetries {
return nil, fmt.Errorf("`max_retries`(%d) exhausted; exiting relay", sp.cfg.MaxRetries)
}

// Get the config for a healthy node.
if s, err := sp.getCurCandidate(); err == nil {
sp.log.Debug("attempting new source connection", "id", s.ID, "broker", s.Config.BootstrapBrokers, "retries", retries)
conn, err := sp.newConn(globalCtx, s)
if err != nil {
retries++
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
select {
case <-globalCtx.Done():
return nil, globalCtx.Err()
default:
if sp.cfg.MaxRetries != IndefiniteRetry && retries >= sp.cfg.MaxRetries {
return nil, fmt.Errorf("`max_retries`(%d) exhausted; exiting relay", sp.cfg.MaxRetries)
}

// Cache the current live connection internally.
sp.client = conn
// Get the config for a healthy node.
if s, err := sp.getCurCandidate(); err == nil {
sp.log.Debug("attempting new source connection", "id", s.ID, "broker", s.Config.BootstrapBrokers, "retries", retries)
conn, err := sp.newConn(globalCtx, s)
if err != nil {
retries++
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
}

out := s
out.Client = conn
// Cache the current live connection internally.
sp.client = conn

sp.fetchCtx, sp.fetchCancel = context.WithCancel(context.Background())
return &out, nil
}
out := s
out.Client = conn

sp.fetchCtx, sp.fetchCancel = context.WithCancel(globalCtx)
return &out, nil
}

retries++
sp.log.Error("no healthy server found. waiting and retrying", "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
retries++
sp.log.Error("no healthy server found. waiting and retrying", "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
}
}
}

// GetFetches retrieves a Kafka fetch iterator to retrieve individual messages from.
func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
sp.log.Debug("rerieving fetches from source", "id", s.ID, "broker", s.Config.BootstrapBrokers)
sp.log.Debug("retrieving fetches from source", "id", s.ID, "broker", s.Config.BootstrapBrokers)
fetches := s.Client.PollFetches(sp.fetchCtx)

// There's no connection.
if fetches.IsClientClosed() {
sp.log.Debug("rerieving fetches failed. client closed.", "id", s.ID, "broker", s.Config.BootstrapBrokers)
sp.log.Debug("retrieving fetches failed. client closed.", "id", s.ID, "broker", s.Config.BootstrapBrokers)
sp.setWeight(s.ID, unhealthyWeight)

return nil, errors.New("fetch failed")
Expand Down Expand Up @@ -188,7 +211,7 @@ func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
}

func (sp *SourcePool) GetHighWatermark(ctx context.Context, cl *kgo.Client) (kadm.ListedOffsets, error) {
return getHighWatermark(ctx, cl, sp.topics, sp.cfg.ReqTimeout)
return getHighWatermark(ctx, cl, sp.cfg.Topics, sp.cfg.ReqTimeout)
}

// Close closes the active source Kafka client.
Expand All @@ -214,7 +237,7 @@ func (sp *SourcePool) newConn(ctx context.Context, s Server) (*kgo.Client, error
}

if sp.offsets != nil {
sp.log.Debug("resetting cached offsets", "id", s.ID, "server", s.Config.BootstrapBrokers)
sp.log.Debug("resetting cached offsets", "id", s.ID, "server", s.Config.BootstrapBrokers, "offsets", sp.offsets)
if err := sp.leaveAndResetOffsets(ctx, cl, s); err != nil {
sp.log.Error("error resetting cached offsets", "id", s.ID, "server", s.Config.BootstrapBrokers, "error", err)
return nil, err
Expand All @@ -241,9 +264,7 @@ func (sp *SourcePool) healthcheck(ctx context.Context, signal chan struct{}) err
// of servers don't have to be locked.
sp.Lock()
servers := make([]Server, 0, len(sp.servers))
for _, s := range sp.servers {
servers = append(servers, s)
}
servers = append(servers, sp.servers...)
sp.Unlock()

clients := make([]*kgo.Client, len(servers))
Expand Down Expand Up @@ -355,7 +376,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.BootstrapBrokers...),
kgo.FetchMaxWait(sp.cfg.ReqTimeout),
kgo.ConsumeTopics(cfg.Topics...),
kgo.ConsumeTopics(sp.cfg.Topics...),
kgo.ConsumerGroup(sp.cfg.GroupID),
kgo.InstanceID(sp.cfg.InstanceID),
kgo.SessionTimeout(cfg.SessionTimeout),
Expand Down Expand Up @@ -391,7 +412,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
return nil, err
}

if err := testConnection(cl, cfg.SessionTimeout, cfg.Topics, nil); err != nil {
if err := testConnection(cl, cfg.SessionTimeout, sp.cfg.Topics, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -451,7 +472,7 @@ func (sp *SourcePool) getCurCandidate() (Server, error) {

// If the weight (sum of all high watermarks of all topics on the source) is -1,
// the server is unhealthy.
if sp.curCandidate.Weight == -1 {
if sp.curCandidate.Weight == unhealthyWeight || !sp.curCandidate.Healthy {
return sp.curCandidate, ErrorNoHealthy
}

Expand All @@ -470,6 +491,9 @@ func (sp *SourcePool) setWeight(id int, weight int64) {
}

s.Weight = weight
if s.Weight != unhealthyWeight {
s.Healthy = true
}

// If the incoming server's weight is greater than the current candidate,
// promote that to the current candidate.
Expand Down Expand Up @@ -522,7 +546,7 @@ waitForTopicLag:
}

// Get end offsets of the topics
topicOffsets, err := admCl.ListEndOffsets(ctx, s.Config.Topics...)
topicOffsets, err := admCl.ListEndOffsets(ctx, sp.cfg.Topics...)
if err != nil {
sp.log.Error("error fetching offsets", "err", err)
return err
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,19 @@ func main() {
log.Fatalf("error initializing target controller: %v", err)
}

hOf, err := target.GetHighWatermark()
if err != nil {
log.Fatalf("error getting destination high watermark: %v", err)
}

// Initialize the source Kafka (consumer) relay.
srcPool, err := relay.NewSourcePool(initSourcePoolConfig(ko), consumerCfgs, topics, lo)
if err != nil {
log.Fatalf("error initializing source pool controller: %v", err)
}

srcPool.SetInitialOffsets(hOf.KOffsets())

// Initialize the Relay which orchestrates consumption from the sourcePool
// and writing to the target pool.
relay, err := relay.NewRelay(initRelayConfig(ko), srcPool, target, topics, filters, lo)
Expand Down