Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add support for non-unique keys in Kafka output headers #30369

Merged
merged 2 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add support for kafka message headers. {pull}29940[29940]
- Add FIPS configuration option for all AWS API calls. {pull}[28899]
- Add metadata change support for some processors {pull}30183[30183]
- Add support for non-unique Kafka headers for output messages. {pull}30369[30369]

*Auditbeat*

Expand Down
11 changes: 7 additions & 4 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func newKafkaClient(
index string,
key *fmtstr.EventFormatString,
topic outil.Selector,
headers map[string]string,
headers []header,
writer codec.Codec,
cfg *sarama.Config,
) (*client, error) {
Expand All @@ -96,10 +96,13 @@ func newKafkaClient(

if len(headers) != 0 {
recordHeaders := make([]sarama.RecordHeader, 0, len(headers))
for k, v := range headers {
for _, h := range headers {
if h.Key == "" {
continue
}
recordHeader := sarama.RecordHeader{
Key: []byte(k),
Value: []byte(v),
Key: []byte(h.Key),
Value: []byte(h.Value),
}

recordHeaders = append(recordHeaders, recordHeader)
Expand Down
7 changes: 6 additions & 1 deletion libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type backoffConfig struct {
Max time.Duration `config:"max"`
}

type header struct {
Key string `config:"key"`
Value string `config:"value"`
}

type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Expand All @@ -62,7 +67,7 @@ type kafkaConfig struct {
BulkMaxSize int `config:"bulk_max_size"`
BulkFlushFrequency time.Duration `config:"bulk_flush_frequency"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
Headers map[string]string `config:"headers"`
Headers []header `config:"headers"`
Backoff backoffConfig `config:"backoff"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
Expand Down
16 changes: 16 additions & 0 deletions libbeat/outputs/kafka/docs/kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ available partitions only.

NOTE: Publishing to a subset of available partitions potentially increases resource usage because events may become unevenly distributed.

===== `headers`

A header is a key-value pair, and multiple headers can be included with the same `key`. Only string values are supported. These headers will be included in each produced Kafka message.

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
output.kafka:
hosts: ["localhost:9092"]
topic: "logs-%{[agent.version]}"
headers:
- key: "some-key"
value: "some value"
- key: "another-key"
value: "another value"
------------------------------------------------------------------------------

===== `client_id`

The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats".
Expand Down
31 changes: 26 additions & 5 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,19 @@ func TestKafkaPublish(t *testing.T) {
{
"publish message with kafka headers to test topic",
map[string]interface{}{
"headers": map[string]string{
"app": "test-app",
"host": "test-host",
"headers": []map[string]string{
{
"key": "app",
"value": "test-app",
},
{
"key": "app",
"value": "test-app2",
},
{
"key": "host",
"value": "test-host",
},
},
},
testTopic,
Expand Down Expand Up @@ -290,10 +300,21 @@ func TestKafkaPublish(t *testing.T) {
validate = makeValidateFmtStr(fmt.(string))
}

cfgHeaders, headersSet := test.config["headers"]

seenMsgs := map[string]struct{}{}
for _, s := range stored {
if headers, exists := test.config["headers"]; exists {
assert.Equal(t, len(headers.(map[string]string)), len(s.Headers))
if headersSet {
expectedHeaders, ok := cfgHeaders.([]map[string]string)
assert.True(t, ok)
assert.Len(t, s.Headers, len(expectedHeaders))
for i, h := range s.Headers {
expectedHeader := expectedHeaders[i]
key := string(h.Key)
value := string(h.Value)
assert.Equal(t, expectedHeader["key"], key)
assert.Equal(t, expectedHeader["value"], value)
}
}

msg := validate(t, s.Value, expected)
Expand Down