diff --git a/e2e/service.go b/e2e/service.go index 1a09309..79082f6 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -157,6 +157,7 @@ func (s *Service) initKafka(ctx context.Context) error { func (s *Service) initReconcile(ctx context.Context) error { s.logger.Info("Starting reconcile") + // Create the KafkaAdmin client used for topic partition and leader reconciliation adminClient, err := s.kafkaSvc.CreateAndTestClient(ctx, s.logger, []kgo.Opt{}) if err != nil { return fmt.Errorf("failed to create kafka client for e2e: %w", err) @@ -169,7 +170,7 @@ func (s *Service) initReconcile(ctx context.Context) error { return fmt.Errorf("could not validate end-to-end topic: %w", err) } - // finally start everything else (producing, consuming, continuous validation, consumer group tracking) + // start topic creation/partition/leader reconciliation loop go s.startReconciliation(ctx) return nil diff --git a/e2e/topic.go b/e2e/topic.go index f5958ac..0e06f31 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -13,6 +13,8 @@ import ( // Check our end-to-end test topic and adapt accordingly if something does not match our expectations. // - does it exist? +// - does configuration allow topic management? +// // - is it configured correctly? // - does it have enough partitions? // - is the replicationFactor correct? @@ -37,6 +39,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { case kerr.UnknownTopicOrPartition: // UnknownTopicOrPartition (Error code 3) means that the topic does not exist. // When the topic doesn't exist, continue to create it further down in the code. + if !s.config.TopicManagement.Enabled { + return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + + "because topic management is disabled") + } topicExists = false default: // If the topic (possibly) exists, but there's an error, then this should result in a fail @@ -45,24 +51,13 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { // Create topic if it doesn't exist if !topicExists { - if !s.config.TopicManagement.Enabled { - return fmt.Errorf("the configured end to end topic does not exist. The topic will not be created " + - "because topic management is disabled") - } - if err = s.createManagementTopic(ctx, meta); err != nil { return err } - } - - if !s.config.TopicManagement.Enabled { - topicMetadata, err := s.getTopicMetadata(ctx) + meta, err = s.getTopicMetadata(ctx) if err != nil { return fmt.Errorf("could not get topic metadata after validation: %w", err) } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions - return nil } alterReq, createReq, pleReq, err := s.calculatePartitionReassignments(meta) @@ -85,13 +80,6 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { return fmt.Errorf("failed to elect partitions: %w", err) } - topicMetadata, err := s.getTopicMetadata(ctx) - if err != nil { - return fmt.Errorf("could not get topic metadata after validation: %w", err) - } - partitions := len(topicMetadata.Topics[0].Partitions) - s.partitionCount = partitions - logger.Info("end-to-end topic is valid.") return nil