From 58b266791651da5ae2d3b619dfeff29851a4dd62 Mon Sep 17 00:00:00 2001 From: Calin Don Date: Fri, 6 Sep 2024 22:34:43 +0300 Subject: [PATCH] feat(inputs.statsd): Allow reporting sets and timings count as floats (#15852) --- plugins/inputs/statsd/README.md | 7 +++ plugins/inputs/statsd/sample.conf | 7 +++ plugins/inputs/statsd/statsd.go | 14 ++++- plugins/inputs/statsd/statsd_test.go | 76 ++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/statsd/README.md b/plugins/inputs/statsd/README.md index a376484c24fd0..8c14c7a32247a 100644 --- a/plugins/inputs/statsd/README.md +++ b/plugins/inputs/statsd/README.md @@ -127,6 +127,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Enabling this would ensure that both counters and guages are both emitted ## as floats. # float_counters = false + + ## Emit timings `metric__count` field as float, the same as all other + ## histogram fields + # float_timings = false + + ## Emit sets as float + # float_sets = false ``` ## Description diff --git a/plugins/inputs/statsd/sample.conf b/plugins/inputs/statsd/sample.conf index 4f7fbb50a4c35..4feacce0d4e4f 100644 --- a/plugins/inputs/statsd/sample.conf +++ b/plugins/inputs/statsd/sample.conf @@ -100,3 +100,10 @@ ## Enabling this would ensure that both counters and guages are both emitted ## as floats. # float_counters = false + + ## Emit timings `metric__count` field as float, the same as all other + ## histogram fields + # float_timings = false + + ## Emit sets as float + # float_sets = false diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go index 7a1224cc7581c..15bf523928bc7 100644 --- a/plugins/inputs/statsd/statsd.go +++ b/plugins/inputs/statsd/statsd.go @@ -77,6 +77,8 @@ type Statsd struct { DeleteTimings bool `toml:"delete_timings"` ConvertNames bool `toml:"convert_names"` FloatCounters bool `toml:"float_counters"` + FloatTimings bool `toml:"float_timings"` + FloatSets bool `toml:"float_sets"` EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"` @@ -260,7 +262,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { fields[prefix+"sum"] = stats.Sum() fields[prefix+"upper"] = stats.Upper() fields[prefix+"lower"] = stats.Lower() - fields[prefix+"count"] = stats.Count() + if s.FloatTimings { + fields[prefix+"count"] = float64(stats.Count()) + } else { + fields[prefix+"count"] = stats.Count() + } for _, percentile := range s.Percentiles { name := fmt.Sprintf("%s%v_percentile", prefix, percentile) fields[name] = stats.Percentile(float64(percentile)) @@ -306,7 +312,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error { for _, m := range s.sets { fields := make(map[string]interface{}) for field, set := range m.fields { - fields[field] = int64(len(set)) + if s.FloatSets { + fields[field] = float64(len(set)) + } else { + fields[field] = int64(len(set)) + } } if s.EnableAggregationTemporality { fields["start_time"] = s.lastGatherTime.Format(time.RFC3339) diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go index 3b5d10bb3922a..f76a8ded1fb9c 100644 --- a/plugins/inputs/statsd/statsd_test.go +++ b/plugins/inputs/statsd/statsd_test.go @@ -473,6 +473,51 @@ func TestParse_Sets(t *testing.T) { } } +func TestParse_Sets_SetsAsFloat(t *testing.T) { + s := NewTestStatsd() + s.FloatSets = true + + // Test that sets work + validLines := []string{ + "unique.user.ids:100|s", + "unique.user.ids:100|s", + "unique.user.ids:200|s", + } + + for _, line := range validLines { + require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line) + } + + validations := []struct { + name string + value int64 + }{ + { + "unique_user_ids", + 2, + }, + } + for _, test := range validations { + require.NoError(t, testValidateSet(test.name, test.value, s.sets)) + } + + expected := []telegraf.Metric{ + testutil.MustMetric( + "unique_user_ids", + map[string]string{"metric_type": "set"}, + map[string]interface{}{"value": 2.0}, + time.Now(), + telegraf.Untyped, + ), + } + + acc := &testutil.Accumulator{} + require.NoError(t, s.Gather(acc)) + metrics := acc.GetTelegrafMetrics() + testutil.PrintMetrics(metrics) + testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics()) +} + // Tests low-level functionality of counters func TestParse_Counters(t *testing.T) { s := NewTestStatsd() @@ -676,6 +721,37 @@ func TestParse_Timings(t *testing.T) { acc.AssertContainsFields(t, "test_timing", valid) } +func TestParse_Timings_TimingsAsFloat(t *testing.T) { + s := NewTestStatsd() + s.FloatTimings = true + s.Percentiles = []Number{90.0} + acc := &testutil.Accumulator{} + + // Test that timings work + validLines := []string{ + "test.timing:100|ms", + } + + for _, line := range validLines { + require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line) + } + + require.NoError(t, s.Gather(acc)) + + valid := map[string]interface{}{ + "90_percentile": float64(100), + "count": float64(1), + "lower": float64(100), + "mean": float64(100), + "median": float64(100), + "stddev": float64(0), + "sum": float64(100), + "upper": float64(100), + } + + acc.AssertContainsFields(t, "test_timing", valid) +} + // Tests low-level functionality of distributions func TestParse_Distributions(t *testing.T) { s := NewTestStatsd()