Skip to content

Commit

Permalink
Comments and refactored topic management logic
Browse files Browse the repository at this point in the history
  • Loading branch information
azun committed Oct 10, 2023
1 parent 2a2054c commit ee72349
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 20 deletions.
3 changes: 2 additions & 1 deletion e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s *Service) initKafka(ctx context.Context) error {

func (s *Service) initReconcile(ctx context.Context) error {
s.logger.Info("Starting reconcile")
// Create the KafkaAdmin client used for topic partition and leader reconciliation
adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{})
if err != nil {
return fmt.Errorf("failed to create kafka client for e2e: %w", err)
Expand All @@ -169,7 +170,7 @@ func (s *Service) initReconcile(ctx context.Context) error {
return fmt.Errorf("could not validate end-to-end topic: %w", err)
}

// finally start everything else (producing, consuming, continuous validation, consumer group tracking)
// start topic creation/partition/leader reconciliation loop
go s.startReconciliation(ctx)

return nil
Expand Down
26 changes: 7 additions & 19 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

// Check our end-to-end test topic and adapt accordingly if something does not match our expectations.
// - does it exist?
// - does configuration allow topic management?
//
// - is it configured correctly?
// - does it have enough partitions?
// - is the replicationFactor correct?
Expand All @@ -37,6 +39,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
case kerr.UnknownTopicOrPartition:
// UnknownTopicOrPartition (Error code 3) means that the topic does not exist.
// When the topic doesn't exist, continue to create it further down in the code.
if !s.config.TopicManagement.Enabled {
return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " +
"because topic management is disabled")
}
topicExists = false
default:
// If the topic (possibly) exists, but there's an error, then this should result in a fail
Expand All @@ -45,24 +51,13 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {

// Create topic if it doesn't exist
if !topicExists {
if !s.config.TopicManagement.Enabled {
return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " +
"because topic management is disabled")
}

if err = s.createManagementTopic(ctx, meta); err != nil {
return err
}
}

if !s.config.TopicManagement.Enabled {
topicMetadata, err := s.getTopicMetadata(ctx)
meta, err = s.getTopicMetadata(ctx)
if err != nil {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
}
partitions := len(topicMetadata.Topics[0].Partitions)
s.partitionCount = partitions
return nil
}

alterReq, createReq, pleReq, err := s.calculatePartitionReassignments(meta)
Expand All @@ -85,13 +80,6 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
return fmt.Errorf("failed to elect partitions: %w", err)
}

topicMetadata, err := s.getTopicMetadata(ctx)
if err != nil {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
}
partitions := len(topicMetadata.Topics[0].Partitions)
s.partitionCount = partitions

logger.Info("end-to-end topic is valid.")

return nil
Expand Down

0 comments on commit ee72349

Please # to comment.