From 7ba5b5c9bafeac0b041f3e6e1eeaf8f44a460805 Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 20 Jul 2023 08:25:26 -0700 Subject: [PATCH] fix(awscloudwatchreceiver): emit logs from one stream in one resource (#22976) The receiver treated every event as a separate log. However, it makes a lot sense to group logs into common resource by their group and stream. Logs without a stream are grouped into the same resource. I'm not sure how should `scope` be treated here, but my intuition tells me that it makes sense to group them also into the same scope. --- ...loudwatch-emit-stream-in-one-resource.yaml | 19 ++++++++ receiver/awscloudwatchreceiver/logs.go | 39 +++++++++++++--- receiver/awscloudwatchreceiver/logs_test.go | 37 +++++++++++++--- .../testdata/processed/prefixed.yaml | 44 +++++++++++++++++++ 4 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 .chloggen/awscloudwatch-emit-stream-in-one-resource.yaml diff --git a/.chloggen/awscloudwatch-emit-stream-in-one-resource.yaml b/.chloggen/awscloudwatch-emit-stream-in-one-resource.yaml new file mode 100644 index 000000000000..3888f01da56d --- /dev/null +++ b/.chloggen/awscloudwatch-emit-stream-in-one-resource.yaml @@ -0,0 +1,19 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscloudwatchreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: emit logs from one log stream in the same resource + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22145] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. diff --git a/receiver/awscloudwatchreceiver/logs.go b/receiver/awscloudwatchreceiver/logs.go index c5ddda8898e2..16f1ead9f0bc 100644 --- a/receiver/awscloudwatchreceiver/logs.go +++ b/receiver/awscloudwatchreceiver/logs.go @@ -21,6 +21,10 @@ import ( "go.uber.org/zap" ) +const ( + noStreamName = "THIS IS INVALID STREAM" +) + type logsReceiver struct { region string profile string @@ -222,6 +226,9 @@ func (l *logsReceiver) pollForLogs(ctx context.Context, pc groupRequest, startTi func (l *logsReceiver) processEvents(now pcommon.Timestamp, logGroupName string, output *cloudwatchlogs.FilterLogEventsOutput) plog.Logs { logs := plog.NewLogs() + + resourceMap := map[string](map[string]*plog.ResourceLogs){} + for _, e := range output.Events { if e.Timestamp == nil { l.logger.Error("unable to determine timestamp of event as the timestamp is nil") @@ -238,15 +245,35 @@ func (l *logsReceiver) processEvents(now pcommon.Timestamp, logGroupName string, continue } - rl := logs.ResourceLogs().AppendEmpty() - resourceAttributes := rl.Resource().Attributes() - resourceAttributes.PutStr("aws.region", l.region) - resourceAttributes.PutStr("cloudwatch.log.group.name", logGroupName) + group, ok := resourceMap[logGroupName] + if !ok { + group = map[string]*plog.ResourceLogs{} + resourceMap[logGroupName] = group + } + + logStreamName := noStreamName if e.LogStreamName != nil { - resourceAttributes.PutStr("cloudwatch.log.stream", *e.LogStreamName) + logStreamName = *e.LogStreamName + } + + resourceLogs, ok := group[logStreamName] + if !ok { + rl := logs.ResourceLogs().AppendEmpty() + resourceLogs = &rl + resourceAttributes := resourceLogs.Resource().Attributes() + resourceAttributes.PutStr("aws.region", l.region) + resourceAttributes.PutStr("cloudwatch.log.group.name", logGroupName) + resourceAttributes.PutStr("cloudwatch.log.stream", logStreamName) + group[logStreamName] = resourceLogs + + // Ensure one scopeLogs is initialized so we can handle in standardized way going forward. + _ = resourceLogs.ScopeLogs().AppendEmpty() } - logRecord := rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + // Now we know resourceLogs is initialized and has one scopeLogs so we don't have to handle any special cases. + + logRecord := resourceLogs.ScopeLogs().At(0).LogRecords().AppendEmpty() + logRecord.SetObservedTimestamp(now) ts := time.UnixMilli(*e.Timestamp) logRecord.SetTimestamp(pcommon.NewTimestampFromTime(ts)) diff --git a/receiver/awscloudwatchreceiver/logs_test.go b/receiver/awscloudwatchreceiver/logs_test.go index 77b12a66d6f8..236a0e6907e1 100644 --- a/receiver/awscloudwatchreceiver/logs_test.go +++ b/receiver/awscloudwatchreceiver/logs_test.go @@ -225,12 +225,33 @@ func defaultMockClient() client { &cloudwatchlogs.FilterLogEventsOutput{ Events: []*cloudwatchlogs.FilteredLogEvent{ { - EventId: &testEventID, + EventId: &testEventIDs[0], IngestionTime: aws.Int64(testIngestionTime), LogStreamName: aws.String(testLogStreamName), Message: aws.String(testLogStreamMessage), Timestamp: aws.Int64(testTimeStamp), }, + { + EventId: &testEventIDs[1], + IngestionTime: aws.Int64(testIngestionTime), + LogStreamName: aws.String(testLogStreamName), + Message: aws.String(testLogStreamMessage), + Timestamp: aws.Int64(testTimeStamp), + }, + { + EventId: &testEventIDs[2], + IngestionTime: aws.Int64(testIngestionTime), + LogStreamName: aws.String(testLogStreamName2), + Message: aws.String(testLogStreamMessage), + Timestamp: aws.Int64(testTimeStamp), + }, + { + EventId: &testEventIDs[3], + IngestionTime: aws.Int64(testIngestionTime), + LogStreamName: aws.String(testLogStreamName2), + Message: aws.String(testLogStreamMessage), + Timestamp: aws.Int64(testTimeStamp), + }, }, NextToken: nil, }, nil) @@ -238,10 +259,16 @@ func defaultMockClient() client { } var ( - testLogGroupName = "test-log-group-name" - testLogStreamName = "test-log-stream-name" - testLogStreamPrefix = "test-log-stream" - testEventID = "37134448277055698880077365577645869800162629528367333379" + testLogGroupName = "test-log-group-name" + testLogStreamName = "test-log-stream-name" + testLogStreamName2 = "test-log-stream-name-2" + testLogStreamPrefix = "test-log-stream" + testEventIDs = []string{ + "37134448277055698880077365577645869800162629528367333379", + "37134448277055698880077365577645869800162629528367333380", + "37134448277055698880077365577645869800162629528367333381", + "37134448277055698880077365577645869800162629528367333382", + } testIngestionTime = int64(1665166252124) testTimeStamp = int64(1665166251014) testLogStreamMessage = `"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""` diff --git a/receiver/awscloudwatchreceiver/testdata/processed/prefixed.yaml b/receiver/awscloudwatchreceiver/testdata/processed/prefixed.yaml index 19ad76fe8656..c35389b9815a 100644 --- a/receiver/awscloudwatchreceiver/testdata/processed/prefixed.yaml +++ b/receiver/awscloudwatchreceiver/testdata/processed/prefixed.yaml @@ -22,4 +22,48 @@ resourceLogs: spanId: "" timeUnixNano: "1665166251014000000" traceId: "" + - attributes: + - key: id + value: + stringValue: "37134448277055698880077365577645869800162629528367333380" + body: + stringValue: '"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""' + observedTimeUnixNano: "1665166998995360000" + spanId: "" + timeUnixNano: "1665166251014000000" + traceId: "" + scope: {} + - resource: + attributes: + - key: aws.region + value: + stringValue: us-west-1 + - key: cloudwatch.log.group.name + value: + stringValue: test-log-group-name + - key: cloudwatch.log.stream + value: + stringValue: test-log-stream-name-2 + scopeLogs: + - logRecords: + - attributes: + - key: id + value: + stringValue: "37134448277055698880077365577645869800162629528367333381" + body: + stringValue: '"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""' + observedTimeUnixNano: "1665166998995360000" + spanId: "" + timeUnixNano: "1665166251014000000" + traceId: "" + - attributes: + - key: id + value: + stringValue: "37134448277055698880077365577645869800162629528367333382" + body: + stringValue: '"time=\"2022-10-07T18:10:46Z\" level=info msg=\"access granted\" arn=\"arn:aws:iam::892146088969:role/AWSWesleyClusterManagerLambda-NodeManagerRole-16UPVDKA1KBGI\" client=\"127.0.0.1:50252\" groups=\"[]\" method=POST path=/authenticate uid=\"aws-iam-authenticator:892146088969:AROA47OAM7QE2NWPDFDCW\" username=\"eks:node-manager\""' + observedTimeUnixNano: "1665166998995360000" + spanId: "" + timeUnixNano: "1665166251014000000" + traceId: "" scope: {}