Skip to content

Commit afac9eb

Browse files
authored
feat(inputs.netflow): Add support for sFlow drop notification packets (#15396)
1 parent f11ead9 commit afac9eb

File tree

6 files changed

+59
-6
lines changed

6 files changed

+59
-6
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ require (
147147
github.com/multiplay/go-ts3 v1.1.0
148148
github.com/nats-io/nats-server/v2 v2.10.17
149149
github.com/nats-io/nats.go v1.36.0
150-
github.com/netsampler/goflow2/v2 v2.1.3
150+
github.com/netsampler/goflow2/v2 v2.1.5
151151
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1
152152
github.com/nsqio/go-nsq v1.1.0
153153
github.com/nwaples/tacplus v0.0.3

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -1983,8 +1983,8 @@ github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdh
19831983
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
19841984
github.com/ncw/swift/v2 v2.0.2 h1:jx282pcAKFhmoZBSdMcCRFn9VWkoBIRsCpe+yZq7vEk=
19851985
github.com/ncw/swift/v2 v2.0.2/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg=
1986-
github.com/netsampler/goflow2/v2 v2.1.3 h1:glfeG2hIzFlAmEz236nZIAWi848AB+p1pJnuQqiyLkI=
1987-
github.com/netsampler/goflow2/v2 v2.1.3/go.mod h1:94ZaxfHuUwG6KviCxMWuZIvz0UGu657StE/g83hlS8A=
1986+
github.com/netsampler/goflow2/v2 v2.1.5 h1:xW9xkBBNmSWaDjC5VsV7wK556pJB8dB9FsuthmcXKDA=
1987+
github.com/netsampler/goflow2/v2 v2.1.5/go.mod h1:DnkDq99+sHUMUkR8PaN5Z4hLZALyrQObVhtz8zPSj8g=
19881988
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1 h1:6OX5VXMuj2salqNBc41eXKz6K+nV6OB/hhlGnAKCbwU=
19891989
github.com/newrelic/newrelic-telemetry-sdk-go v0.8.1/go.mod h1:2kY6OeOxrJ+RIQlVjWDc/pZlT3MIf30prs6drzMfJ6E=
19901990
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=

plugins/inputs/netflow/sflow_v5.go

+52-3
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,43 @@ func (d *sflowv5Decoder) Decode(srcIP net.IP, payload []byte) ([]telegraf.Metric
159159
fields[k] = v
160160
}
161161
metrics = append(metrics, metric.New("netflow", tags, fields, t))
162+
case sflow.DropSample:
163+
fields := map[string]interface{}{
164+
"ip_version": decodeSflowIPVersion(msg.IPVersion),
165+
"sys_uptime": msg.Uptime,
166+
"agent_subid": msg.SubAgentId,
167+
"seq_number": sample.Header.SampleSequenceNumber,
168+
"sampling_drops": sample.Drops,
169+
"in_snmp": sample.Input,
170+
"out_snmp": sample.Output,
171+
"reason": sample.Reason,
172+
}
173+
174+
var err error
175+
fields["agent_ip"], err = decodeIP(msg.AgentIP)
176+
if err != nil {
177+
return nil, fmt.Errorf("decoding 'agent_ip' failed: %w", err)
178+
}
179+
180+
// Decode the source information
181+
if name := decodeSflowSourceInterface(sample.Header.SourceIdType); name != "" {
182+
fields[name] = sample.Header.SourceIdValue
183+
}
184+
// Decode the sampling direction
185+
if sample.Header.SourceIdValue == sample.Input {
186+
fields["direction"] = "ingress"
187+
} else {
188+
fields["direction"] = "egress"
189+
}
190+
recordFields, err := d.decodeFlowRecords(sample.Records)
191+
if err != nil {
192+
return nil, err
193+
}
194+
for k, v := range recordFields {
195+
fields[k] = v
196+
}
197+
metrics = append(metrics, metric.New("netflow", tags, fields, t))
198+
162199
default:
163200
return nil, fmt.Errorf("unknown record type %T", s)
164201
}
@@ -249,9 +286,6 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri
249286
case sflow.ExtendedGateway:
250287
var err error
251288
fields["next_hop_ip_version"] = record.NextHopIPVersion
252-
if err != nil {
253-
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
254-
}
255289
fields["next_hop"], err = decodeIP(record.NextHop)
256290
if err != nil {
257291
return nil, fmt.Errorf("decoding 'next_hop' failed: %w", err)
@@ -276,6 +310,21 @@ func (d *sflowv5Decoder) decodeFlowRecords(records []sflow.FlowRecord) (map[stri
276310
}
277311
fields["communities"] = strings.Join(parts, ",")
278312
fields["local_pref"] = record.LocalPref
313+
case sflow.EgressQueue:
314+
fields["out_queue"] = record.Queue
315+
case sflow.ExtendedACL:
316+
fields["acl_id"] = record.Number
317+
fields["acl_name"] = record.Name
318+
switch record.Direction {
319+
case 1:
320+
fields["direction"] = "ingress"
321+
case 2:
322+
fields["direction"] = "egress"
323+
default:
324+
fields["direction"] = "unknown"
325+
}
326+
case sflow.ExtendedFunction:
327+
fields["function"] = record.Symbol
279328
default:
280329
return nil, fmt.Errorf("unhandled flow record type %T", r.Data)
281330
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
netflow,source=127.0.0.1,version=sFlowV5 sys_uptime=12414u,agent_ip="192.168.119.184",agent_subid=100000u,seq_number=2u,direction="ingress",in_snmp=1u,out_snmp=2u,out_queue=42u,reason=1u,sampling_drops=256u,ip_version="IPv4" 12414000000
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[[inputs.netflow]]
2+
service_address = "udp://127.0.0.1:0"
3+
protocol = "sflow v5"

0 commit comments

Comments
 (0)