Skip to content

Commit

Permalink
[s3-sqs] Log a Lambda event summary on exception (#860)
Browse files Browse the repository at this point in the history
ESF summarizes the lambda event and adds the summary as extra information to the exception log message if there is an exception when processing a lambda event from the `s3-sqs` input, 

The summary contains the essential elements for the lambda event. The `s3-sqs` input includes the bucket ARN and the object key. The summary limits the number of records to `max_records`.

ESF currently supports lambda event summary for `s3-sqs` only. We want to hear users' feedback before making additional changes or extending the summary to other input types.
  • Loading branch information
zmoog authored Dec 13, 2024
1 parent 276560a commit 43ce2b3
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 3 deletions.
85 changes: 84 additions & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,96 @@ def wrapper(lambda_event: dict[str, Any], lambda_context: context_.Context) -> s
if apm_client:
apm_client.capture_exception()

shared_logger.exception("exception raised", exc_info=e)
shared_logger.exception(
"exception raised",
exc_info=e,
extra={
"summary": summarize_lambda_event(lambda_event, max_records=20),
},
)

return f"exception raised: {e.__repr__()}"

return wrapper


def summarize_lambda_event(event: dict[str, Any], max_records: int = 10) -> dict[str, Any]:
"""
Summarize the lambda event to include only the most relevant information.
"""
summary: dict[str, Any] = {}

try:
first_records_key = f"first_{max_records}_records"
records = event.get("Records", [])

for record in records:
event_source = record.get("eventSource", "unknown")

if event_source == "aws:sqs":
aws_sqs_summary = summary.get(
"aws:sqs",
# if `aws:sqs` key does not exist yet,
# we initialize the summary.
{
"total_records": 0,
first_records_key: [],
},
)

# We keep track of the total number of notifications in the
# lambda event, so users know if the summary is incomplete.
notifications = json_parser(record["body"])

# So users know if we included only a
# subset of the records.
aws_sqs_summary["total_records"] += len(notifications["Records"])

for r in notifications["Records"]:
# we only include the s3 object key in the summary.
#
# Here is an example of a notification record:
#
# {
# "Records": [
# {
# "awsRegion": "eu-west-1",
# "eventName": "ObjectCreated:Put",
# "eventSource": "aws:s3",
# "eventVersion": "2.1",
# "s3": {
# "bucket": {
# "arn": "arn:aws:s3:::mbranca-esf-data",
# "name": "mbranca-esf-data"
# },
# "object": {
# "key": "AWSLogs/1234567890/CloudTrail-Digest/"
# }
# }
# }
# ]
# }

# We stop adding records to the summary once we reach
# the `max_records` limit.
if len(aws_sqs_summary[first_records_key]) == max_records:
break

# Add the s3 object key to the summary.
aws_sqs_summary[first_records_key].append(r.get("s3"))

# Update the summary with the new information.
summary["aws:sqs"] = aws_sqs_summary

except Exception as exc:
shared_logger.exception("error summarizing lambda event", exc_info=exc)
# We add an error message to the summary so users know if the summary
# is incomplete.
summary["error"] = str(exc)

return summary


def discover_integration_scope(s3_object_key: str) -> str:
if s3_object_key == "":
shared_logger.debug("s3 object key is empty, dataset set to `generic`")
Expand Down
98 changes: 96 additions & 2 deletions tests/handlers/aws/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ def test_get_trigger_type_and_config_source(self) -> None:
assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE)

with self.subTest("no Records"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {}

get_trigger_type_and_config_source(event=event)

with self.subTest("len(Records) < 1"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {"Records": []}

get_trigger_type_and_config_source(event=event)
Expand Down Expand Up @@ -460,3 +460,97 @@ def test_without_variables(self) -> None:

with pytest.raises(ValueError):
get_lambda_region()


@pytest.mark.unit
class TestSummarizeLambdaEvent(TestCase):

max_records = 42

def test_with_single_s3_sqs_record(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/627286350134/CloudTrail-Digest/"}}}]}', # noqa: E501
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event=event, max_records=self.max_records)

assert summary == {
"aws:sqs": {
"total_records": 1,
f"first_{self.max_records}_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/627286350134/CloudTrail-Digest/"},
}
],
}
}

def test_with_multiple_s3_sqs_records(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/1.log"}}},{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/2.log"}}}]}', # noqa: E501
"eventSource": "aws:sqs",
}
]
}

with self.subTest("no limits"):
summary = summarize_lambda_event(event=event, max_records=self.max_records)

assert summary == {
"aws:sqs": {
"total_records": 2,
f"first_{self.max_records}_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
},
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/2.log"},
},
],
}
}

with self.subTest("with limits"):
summary = summarize_lambda_event(event=event, max_records=1)

assert summary == {
"aws:sqs": {
"total_records": 2,
"first_1_records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
}
],
}
}

def test_with_invalid_s3_sqs_notification(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": "I am not a valid JSON string.",
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event)

assert summary == {"error": "unexpected character: line 1 column 1 (char 0)"}

0 comments on commit 43ce2b3

Please # to comment.