diff --git a/preset/kafka/options.go b/preset/kafka/options.go index 090b5adf..9082c4fe 100644 --- a/preset/kafka/options.go +++ b/preset/kafka/options.go @@ -13,12 +13,22 @@ func WithVersion(version string) Option { // WithTopics makes sure that the provided topics are available when Kafka is // up and running. +// Both topics from WithTopics and WithTopicConfigs will be added to Kafka. func WithTopics(topics ...string) Option { return func(o *P) { o.Topics = append(o.Topics, topics...) } } +// WithTopicConfigs makes sure that the provided topics with the given configs are available when Kafka is +// up and running. Unlike WithTopics, this allows to also set partition count. +// Both topics from WithTopics and WithTopicConfigs will be added to Kafka. +func WithTopicConfigs(topics ...TopicConfig) Option { + return func(o *P) { + o.TopicConfigs = append(o.TopicConfigs, topics...) + } +} + // WithMessages makes sure that these messages can be consumed during the test // once the container is ready. func WithMessages(messages ...Message) Option { diff --git a/preset/kafka/preset.go b/preset/kafka/preset.go index ed65d317..f2999beb 100644 --- a/preset/kafka/preset.go +++ b/preset/kafka/preset.go @@ -63,6 +63,11 @@ func Preset(opts ...Option) gnomock.Preset { return p } +type TopicConfig struct { + Topic string + NumPartitions int +} + // P is a Gnomock Preset implementation of Kafka. type P struct { Version string `json:"version"` @@ -70,6 +75,8 @@ type P struct { Messages []Message `json:"messages"` MessagesFiles []string `json:"messages_files"` UseSchemaRegistry bool `json:"use_schema_registry"` + + TopicConfigs []TopicConfig `json:"topic_configs"` } // Image returns an image that should be pulled to create this container. @@ -106,7 +113,7 @@ func (p *P) Options() []gnomock.Option { gnomock.WithEnv("SAMPLEDATA=0"), } - if len(p.Topics) > 0 || len(p.Messages) > 0 { + if len(p.Topics) > 0 || len(p.TopicConfigs) > 0 || len(p.Messages) > 0 { opts = append(opts, gnomock.WithInit(p.initf)) } @@ -226,7 +233,7 @@ func (p *P) ingestMessageFiles(ctx context.Context, c *gnomock.Container, conn * p.Topics = append(p.Topics, topic) } - topics := make([]kafka.TopicConfig, 0, len(p.Topics)) + topics := make([]kafka.TopicConfig, 0, len(p.Topics)+len(p.TopicConfigs)) for _, topic := range p.Topics { topics = append(topics, kafka.TopicConfig{ @@ -236,6 +243,14 @@ func (p *P) ingestMessageFiles(ctx context.Context, c *gnomock.Container, conn * }) } + for _, topic := range p.TopicConfigs { + topics = append(topics, kafka.TopicConfig{ + Topic: topic.Topic, + ReplicationFactor: 1, // cannot set more; cluster has just 1 node + NumPartitions: topic.NumPartitions, + }) + } + if err := conn.CreateTopics(topics...); err != nil { return fmt.Errorf("can't create topics: %w", err) } diff --git a/preset/kafka/preset_test.go b/preset/kafka/preset_test.go index b7f1720d..146d873a 100644 --- a/preset/kafka/preset_test.go +++ b/preset/kafka/preset_test.go @@ -44,7 +44,11 @@ func testPreset(version string) func(t *testing.T) { } p := kafka.Preset( - kafka.WithTopics("topic-1", "topic-2"), + kafka.WithTopics("topic-1"), + kafka.WithTopicConfigs(kafka.TopicConfig{ + Topic: "topic-2", + NumPartitions: 3, + }), kafka.WithMessages(messages...), kafka.WithVersion(version), kafka.WithMessagesFile("./testdata/messages.json"), @@ -89,6 +93,26 @@ func testPreset(version string) func(t *testing.T) { c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort)) require.NoError(t, err) + // Test that topic-1 exists, and topic-2 has all 3 partitions + topicReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{ + Brokers: []string{container.Address(kafka.BrokerPort)}, + Topic: "topic-1", + }) + _, err = topicReader.ReadLag(ctx) + require.NoError(t, err) + require.NoError(t, topicReader.Close()) + + for i := 0; i < 3; i++ { + topicReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{ + Brokers: []string{container.Address(kafka.BrokerPort)}, + Topic: "topic-2", + Partition: i, + }) + _, err = topicReader.ReadLag(ctx) + require.NoError(t, err) + require.NoError(t, topicReader.Close()) + } + require.NoError(t, c.DeleteTopics("topic-1", "topic-2")) require.Error(t, c.DeleteTopics("unknown-topic"))