Skip to content

Commit 98399b6

Browse files
committed
Fixes from review:
* extra word typo in readme * initialize timestamp field in init(), update tests * always use string arrow type for tags
1 parent 4a840fd commit 98399b6

File tree

3 files changed

+15
-18
lines changed

3 files changed

+15
-18
lines changed

plugins/outputs/parquet/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Parquet Output Plugin
22

3-
This plugin sends writes metrics to parquet files. By default, the parquet
3+
This plugin writes metrics to parquet files. By default, the parquet
44
output groups metrics by metric name and write those metrics all to the same
55
file. If a metric schema does not match then metrics are dropped.
66

plugins/outputs/parquet/parquet.go

+8-11
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
//go:embed sample.conf
2323
var sampleConfig string
2424

25+
var defaultTimestampFieldName = "timestamp"
26+
2527
type metricGroup struct {
2628
filename string
2729
builder *array.RecordBuilder
@@ -47,11 +49,6 @@ func (p *Parquet) Init() error {
4749
p.Directory = "."
4850
}
4951

50-
if p.TimestampFieldName == nil {
51-
timestampFieldName := "timestamp"
52-
p.TimestampFieldName = &timestampFieldName
53-
}
54-
5552
stat, err := os.Stat(p.Directory)
5653
if os.IsNotExist(err) {
5754
if err := os.MkdirAll(p.Directory, 0750); err != nil {
@@ -255,11 +252,7 @@ func (p *Parquet) createSchema(metrics []telegraf.Metric) (*arrow.Schema, error)
255252
}
256253
for _, tag := range metric.TagList() {
257254
if _, ok := rawFields[tag.Key]; !ok {
258-
arrowType, err := goToArrowType(tag.Value)
259-
if err != nil {
260-
return nil, fmt.Errorf("error converting '%s=%s' tag to arrow type: %w", tag.Key, tag.Value, err)
261-
}
262-
rawFields[tag.Key] = arrowType
255+
rawFields[tag.Key] = arrow.BinaryTypes.String
263256
}
264257
}
265258
}
@@ -335,5 +328,9 @@ func goToArrowType(value interface{}) (arrow.DataType, error) {
335328
}
336329

337330
func init() {
338-
outputs.Add("parquet", func() telegraf.Output { return &Parquet{} })
331+
outputs.Add("parquet", func() telegraf.Output {
332+
return &Parquet{
333+
TimestampFieldName: &defaultTimestampFieldName,
334+
}
335+
})
339336
}

plugins/outputs/parquet/parquet_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ func TestCases(t *testing.T) {
125125
t.Run(tc.name, func(t *testing.T) {
126126
testDir := t.TempDir()
127127
plugin := &Parquet{
128-
Directory: testDir,
128+
Directory: testDir,
129+
TimestampFieldName: &defaultTimestampFieldName,
129130
}
130131
require.NoError(t, plugin.Init())
131132
require.NoError(t, plugin.Connect())
@@ -161,8 +162,9 @@ func TestRotation(t *testing.T) {
161162

162163
testDir := t.TempDir()
163164
plugin := &Parquet{
164-
Directory: testDir,
165-
RotationInterval: config.Duration(1 * time.Second),
165+
Directory: testDir,
166+
RotationInterval: config.Duration(1 * time.Second),
167+
TimestampFieldName: &defaultTimestampFieldName,
166168
}
167169

168170
require.NoError(t, plugin.Init())
@@ -188,11 +190,9 @@ func TestOmitTimestamp(t *testing.T) {
188190
),
189191
}
190192

191-
emptyTimestamp := ""
192193
testDir := t.TempDir()
193194
plugin := &Parquet{
194-
Directory: testDir,
195-
TimestampFieldName: &emptyTimestamp,
195+
Directory: testDir,
196196
}
197197
require.NoError(t, plugin.Init())
198198
require.NoError(t, plugin.Connect())

0 commit comments

Comments
 (0)