Skip to content

Commit

Permalink
kafka receiver exposes some metrics by partition (fixes open-telemetr…
Browse files Browse the repository at this point in the history
…y#30177)

add changelog notes
  • Loading branch information
spadger committed Jan 16, 2024
1 parent a900a7c commit 9625c43
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 23 deletions.
31 changes: 31 additions & 0 deletions .chloggen/fix_kafka_receiver_metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The Kafka receiver now exports some partition-specific metrics per-partition, with a `partition` tag

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following metrics now render per partition:
- kafka_receiver_messages
- kafka_receiver_current_offset
- kafka_receiver_offset_lag
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 14 additions & 3 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -446,7 +447,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}

ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
Expand Down Expand Up @@ -526,7 +530,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
}

ctx := c.obsrecv.StartMetricsOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
Expand Down Expand Up @@ -610,9 +617,13 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}

ctx := c.obsrecv.StartLogsOp(session.Context())
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))
Expand Down
20 changes: 11 additions & 9 deletions receiver/kafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

var (
tagInstanceName, _ = tag.NewKey("name")
tagPartition, _ = tag.NewKey("partition")

statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless)
statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless)
Expand All @@ -26,69 +27,70 @@ var (

// metricViews return metric views for Kafka receiver.
func metricViews() []*view.View {
tagKeys := []tag.Key{tagInstanceName}
partitionAgnosticTagKeys := []tag.Key{tagInstanceName}
partitionSpecificTagKeys := []tag.Key{tagInstanceName, tagPartition}

countMessages := &view.View{
Name: statMessageCount.Name(),
Measure: statMessageCount,
Description: statMessageCount.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.Sum(),
}

lastValueOffset := &view.View{
Name: statMessageOffset.Name(),
Measure: statMessageOffset,
Description: statMessageOffset.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.LastValue(),
}

lastValueOffsetLag := &view.View{
Name: statMessageOffsetLag.Name(),
Measure: statMessageOffsetLag,
Description: statMessageOffsetLag.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.LastValue(),
}

countPartitionStart := &view.View{
Name: statPartitionStart.Name(),
Measure: statPartitionStart,
Description: statPartitionStart.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countPartitionClose := &view.View{
Name: statPartitionClose.Name(),
Measure: statPartitionClose,
Description: statPartitionClose.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedMetricPoints := &view.View{
Name: statUnmarshalFailedMetricPoints.Name(),
Measure: statUnmarshalFailedMetricPoints,
Description: statUnmarshalFailedMetricPoints.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedLogRecords := &view.View{
Name: statUnmarshalFailedLogRecords.Name(),
Measure: statUnmarshalFailedLogRecords,
Description: statUnmarshalFailedLogRecords.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedSpans := &view.View{
Name: statUnmarshalFailedSpans.Name(),
Measure: statUnmarshalFailedSpans,
Description: statUnmarshalFailedSpans.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

Expand Down
29 changes: 18 additions & 11 deletions receiver/kafkareceiver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@ import (
"github.com/stretchr/testify/assert"
)

type expectedView struct {
name string
tagCount int
}

func TestMetrics(t *testing.T) {
metricViews := metricViews()
viewNames := []string{
"kafka_receiver_messages",
"kafka_receiver_current_offset",
"kafka_receiver_offset_lag",
"kafka_receiver_partition_start",
"kafka_receiver_partition_close",
"kafka_receiver_unmarshal_failed_metric_points",
"kafka_receiver_unmarshal_failed_log_records",
"kafka_receiver_unmarshal_failed_spans",
viewNames := []expectedView{
{name: "kafka_receiver_messages", tagCount: 2},
{name: "kafka_receiver_current_offset", tagCount: 2},
{name: "kafka_receiver_offset_lag", tagCount: 2},
{name: "kafka_receiver_partition_start", tagCount: 1},
{name: "kafka_receiver_partition_close", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_metric_points", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_log_records", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_spans", tagCount: 1},
}
for i, viewName := range viewNames {
assert.Equal(t, viewName, metricViews[i].Name)

for i, expectedView := range viewNames {
assert.Equal(t, expectedView.name, metricViews[i].Name)
assert.Equal(t, expectedView.tagCount, len(metricViews[i].TagKeys))
}
}

0 comments on commit 9625c43

Please # to comment.