diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 47341385489c..f30f71ad13fe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Add support for kafka message headers. {pull}29940[29940] - Add FIPS configuration option for all AWS API calls. {pull}[28899] - Add metadata change support for some processors {pull}30183[30183] +- Add support for non-unique Kafka headers for output messages. {pull}30369[30369] *Auditbeat* diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 149067cde340..de3a3a905a98 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -78,7 +78,7 @@ func newKafkaClient( index string, key *fmtstr.EventFormatString, topic outil.Selector, - headers map[string]string, + headers []header, writer codec.Codec, cfg *sarama.Config, ) (*client, error) { @@ -96,10 +96,13 @@ func newKafkaClient( if len(headers) != 0 { recordHeaders := make([]sarama.RecordHeader, 0, len(headers)) - for k, v := range headers { + for _, h := range headers { + if h.Key == "" { + continue + } recordHeader := sarama.RecordHeader{ - Key: []byte(k), - Value: []byte(v), + Key: []byte(h.Key), + Value: []byte(h.Value), } recordHeaders = append(recordHeaders, recordHeader) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 0d3d483840a9..4a63834ea965 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -44,6 +44,11 @@ type backoffConfig struct { Max time.Duration `config:"max"` } +type header struct { + Key string `config:"key"` + Value string `config:"value"` +} + type kafkaConfig struct { Hosts []string `config:"hosts" validate:"required"` TLS *tlscommon.Config `config:"ssl"` @@ -62,7 +67,7 @@ type kafkaConfig struct { BulkMaxSize int `config:"bulk_max_size"` BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` - Headers map[string]string `config:"headers"` + Headers []header `config:"headers"` Backoff backoffConfig `config:"backoff"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` diff --git a/libbeat/outputs/kafka/docs/kafka.asciidoc b/libbeat/outputs/kafka/docs/kafka.asciidoc index a9b517d94903..b42b328faa8d 100644 --- a/libbeat/outputs/kafka/docs/kafka.asciidoc +++ b/libbeat/outputs/kafka/docs/kafka.asciidoc @@ -195,6 +195,22 @@ available partitions only. NOTE: Publishing to a subset of available partitions potentially increases resource usage because events may become unevenly distributed. +===== `headers` + +A header is a key-value pair, and multiple headers can be included with the same `key`. Only string values are supported. These headers will be included in each produced Kafka message. + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------ +output.kafka: + hosts: ["localhost:9092"] + topic: "logs-%{[agent.version]}" + headers: + - key: "some-key" + value: "some value" + - key: "another-key" + value: "another value" +------------------------------------------------------------------------------ + ===== `client_id` The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats". diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 412451a38163..1f75c2710867 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -218,9 +218,19 @@ func TestKafkaPublish(t *testing.T) { { "publish message with kafka headers to test topic", map[string]interface{}{ - "headers": map[string]string{ - "app": "test-app", - "host": "test-host", + "headers": []map[string]string{ + { + "key": "app", + "value": "test-app", + }, + { + "key": "app", + "value": "test-app2", + }, + { + "key": "host", + "value": "test-host", + }, }, }, testTopic, @@ -290,10 +300,21 @@ func TestKafkaPublish(t *testing.T) { validate = makeValidateFmtStr(fmt.(string)) } + cfgHeaders, headersSet := test.config["headers"] + seenMsgs := map[string]struct{}{} for _, s := range stored { - if headers, exists := test.config["headers"]; exists { - assert.Equal(t, len(headers.(map[string]string)), len(s.Headers)) + if headersSet { + expectedHeaders, ok := cfgHeaders.([]map[string]string) + assert.True(t, ok) + assert.Len(t, s.Headers, len(expectedHeaders)) + for i, h := range s.Headers { + expectedHeader := expectedHeaders[i] + key := string(h.Key) + value := string(h.Value) + assert.Equal(t, expectedHeader["key"], key) + assert.Equal(t, expectedHeader["value"], value) + } } msg := validate(t, s.Value, expected)