Skip to content

Commit 1185c96

Browse files
srebhanjustinwwhuang
authored andcommitted
fix(outputs.remotefile): Handle tracking metrics correctly (influxdata#16289)
1 parent 49a4d62 commit 1185c96

File tree

2 files changed

+134
-1
lines changed

2 files changed

+134
-1
lines changed

plugins/outputs/remotefile/remotefile.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,12 @@ func (f *File) Write(metrics []telegraf.Metric) error {
177177

178178
// Group the metrics per output file
179179
groups := make(map[string][]telegraf.Metric)
180-
for _, m := range metrics {
180+
for _, raw := range metrics {
181+
m := raw
182+
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
183+
m = wm.Unwrap()
184+
}
185+
181186
for _, tmpl := range f.templates {
182187
buf.Reset()
183188
if err := tmpl.Execute(&buf, m); err != nil {

plugins/outputs/remotefile/remotefile_test.go

+128
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"os"
66
"path/filepath"
77
"strings"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -393,3 +394,130 @@ func TestForgettingFiles(t *testing.T) {
393394
require.Len(t, plugin.serializers, 1)
394395
require.Contains(t, plugin.serializers, "test-b.csv")
395396
}
397+
398+
func TestTrackingMetrics(t *testing.T) {
399+
// see issue #16045
400+
inputRaw := []telegraf.Metric{
401+
metric.New(
402+
"test",
403+
map[string]string{"source": "localhost"},
404+
map[string]interface{}{"value": 23},
405+
time.Unix(1719410465, 0),
406+
),
407+
metric.New(
408+
"test",
409+
map[string]string{"source": "remotehost"},
410+
map[string]interface{}{"value": 21},
411+
time.Unix(1719410465, 0),
412+
),
413+
metric.New(
414+
"test",
415+
map[string]string{"source": "localhost"},
416+
map[string]interface{}{"value": 42},
417+
time.Unix(1719410485, 0),
418+
),
419+
metric.New(
420+
"test",
421+
map[string]string{"source": "remotehost"},
422+
map[string]interface{}{"value": 66},
423+
time.Unix(1719410485, 0),
424+
),
425+
metric.New(
426+
"test",
427+
map[string]string{"source": "remotehost"},
428+
map[string]interface{}{"value": 55},
429+
time.Unix(1716310124, 0),
430+
),
431+
metric.New(
432+
"test",
433+
map[string]string{"source": "remotehost"},
434+
map[string]interface{}{"value": 1},
435+
time.Unix(1716310174, 0),
436+
),
437+
}
438+
439+
// Create tracking metrics as inputs for the test
440+
var mu sync.Mutex
441+
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
442+
notify := func(di telegraf.DeliveryInfo) {
443+
mu.Lock()
444+
defer mu.Unlock()
445+
delivered = append(delivered, di)
446+
}
447+
input := make([]telegraf.Metric, 0, len(inputRaw))
448+
for _, m := range inputRaw {
449+
tm, _ := metric.WithTracking(m, notify)
450+
input = append(input, tm)
451+
}
452+
453+
// Create the expectations
454+
expected := map[string][]string{
455+
"localhost-2024-06-26": {
456+
"test,source=localhost value=23i 1719410465000000000\n",
457+
"test,source=localhost value=42i 1719410485000000000\n",
458+
},
459+
"remotehost-2024-06-26": {
460+
"test,source=remotehost value=21i 1719410465000000000\n",
461+
"test,source=remotehost value=66i 1719410485000000000\n",
462+
},
463+
"remotehost-2024-05-21": {
464+
"test,source=remotehost value=55i 1716310124000000000\n",
465+
"test,source=remotehost value=1i 1716310174000000000\n",
466+
},
467+
}
468+
469+
// Prepare the output filesystem
470+
tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
471+
require.NoError(t, err)
472+
defer os.RemoveAll(tmpdir)
473+
474+
// Setup the plugin including the serializer
475+
plugin := &File{
476+
Remote: config.NewSecret([]byte("local:" + tmpdir)),
477+
Files: []string{`{{.Tag "source"}}-{{.Time.Format "2006-01-02"}}`},
478+
WriteBackInterval: config.Duration(100 * time.Millisecond),
479+
Log: &testutil.Logger{},
480+
}
481+
482+
plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
483+
serializer := &influx.Serializer{}
484+
err := serializer.Init()
485+
return serializer, err
486+
})
487+
require.NoError(t, plugin.Init())
488+
require.NoError(t, plugin.Connect())
489+
defer plugin.Close()
490+
491+
// Write the metrics and wait for the data to settle to disk
492+
require.NoError(t, plugin.Write(input))
493+
require.Eventually(t, func() bool {
494+
ok := true
495+
for fn := range expected {
496+
_, err := os.Stat(filepath.Join(tmpdir, fn))
497+
ok = ok && err == nil
498+
}
499+
return ok
500+
}, 5*time.Second, 100*time.Millisecond)
501+
502+
// Check the result
503+
for fn, lines := range expected {
504+
tmpfn := filepath.Join(tmpdir, fn)
505+
require.FileExists(t, tmpfn)
506+
507+
actual, err := os.ReadFile(tmpfn)
508+
require.NoError(t, err)
509+
require.Equal(t, strings.Join(lines, ""), string(actual))
510+
}
511+
512+
// Simulate output acknowledging delivery
513+
for _, m := range input {
514+
m.Accept()
515+
}
516+
517+
// Check delivery
518+
require.Eventuallyf(t, func() bool {
519+
mu.Lock()
520+
defer mu.Unlock()
521+
return len(input) == len(delivered)
522+
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
523+
}

0 commit comments

Comments
 (0)