From 43ce2b337c55974cbec44a23c9bed0372867ce14 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 13 Dec 2024 10:55:40 +0100 Subject: [PATCH] [s3-sqs] Log a Lambda event summary on exception (#860) ESF summarizes the lambda event and adds the summary as extra information to the exception log message if there is an exception when processing a lambda event from the `s3-sqs` input, The summary contains the essential elements for the lambda event. The `s3-sqs` input includes the bucket ARN and the object key. The summary limits the number of records to `max_records`. ESF currently supports lambda event summary for `s3-sqs` only. We want to hear users' feedback before making additional changes or extending the summary to other input types. --- handlers/aws/utils.py | 85 ++++++++++++++++++++++++++- tests/handlers/aws/test_utils.py | 98 +++++++++++++++++++++++++++++++- 2 files changed, 180 insertions(+), 3 deletions(-) diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index f973b657..eb9a2105 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -125,13 +125,96 @@ def wrapper(lambda_event: dict[str, Any], lambda_context: context_.Context) -> s if apm_client: apm_client.capture_exception() - shared_logger.exception("exception raised", exc_info=e) + shared_logger.exception( + "exception raised", + exc_info=e, + extra={ + "summary": summarize_lambda_event(lambda_event, max_records=20), + }, + ) return f"exception raised: {e.__repr__()}" return wrapper +def summarize_lambda_event(event: dict[str, Any], max_records: int = 10) -> dict[str, Any]: + """ + Summarize the lambda event to include only the most relevant information. + """ + summary: dict[str, Any] = {} + + try: + first_records_key = f"first_{max_records}_records" + records = event.get("Records", []) + + for record in records: + event_source = record.get("eventSource", "unknown") + + if event_source == "aws:sqs": + aws_sqs_summary = summary.get( + "aws:sqs", + # if `aws:sqs` key does not exist yet, + # we initialize the summary. + { + "total_records": 0, + first_records_key: [], + }, + ) + + # We keep track of the total number of notifications in the + # lambda event, so users know if the summary is incomplete. + notifications = json_parser(record["body"]) + + # So users know if we included only a + # subset of the records. + aws_sqs_summary["total_records"] += len(notifications["Records"]) + + for r in notifications["Records"]: + # we only include the s3 object key in the summary. + # + # Here is an example of a notification record: + # + # { + # "Records": [ + # { + # "awsRegion": "eu-west-1", + # "eventName": "ObjectCreated:Put", + # "eventSource": "aws:s3", + # "eventVersion": "2.1", + # "s3": { + # "bucket": { + # "arn": "arn:aws:s3:::mbranca-esf-data", + # "name": "mbranca-esf-data" + # }, + # "object": { + # "key": "AWSLogs/1234567890/CloudTrail-Digest/" + # } + # } + # } + # ] + # } + + # We stop adding records to the summary once we reach + # the `max_records` limit. + if len(aws_sqs_summary[first_records_key]) == max_records: + break + + # Add the s3 object key to the summary. + aws_sqs_summary[first_records_key].append(r.get("s3")) + + # Update the summary with the new information. + summary["aws:sqs"] = aws_sqs_summary + + except Exception as exc: + shared_logger.exception("error summarizing lambda event", exc_info=exc) + # We add an error message to the summary so users know if the summary + # is incomplete. + summary["error"] = str(exc) + + return summary + + def discover_integration_scope(s3_object_key: str) -> str: if s3_object_key == "": shared_logger.debug("s3 object key is empty, dataset set to `generic`") diff --git a/tests/handlers/aws/test_utils.py b/tests/handlers/aws/test_utils.py index e2bf845c..8166961b 100644 --- a/tests/handlers/aws/test_utils.py +++ b/tests/handlers/aws/test_utils.py @@ -93,13 +93,13 @@ def test_get_trigger_type_and_config_source(self) -> None: assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE) with self.subTest("no Records"): - with self.assertRaisesRegexp(Exception, "Not supported trigger"): + with self.assertRaisesRegex(Exception, "Not supported trigger"): event = {} get_trigger_type_and_config_source(event=event) with self.subTest("len(Records) < 1"): - with self.assertRaisesRegexp(Exception, "Not supported trigger"): + with self.assertRaisesRegex(Exception, "Not supported trigger"): event = {"Records": []} get_trigger_type_and_config_source(event=event) @@ -460,3 +460,97 @@ def test_without_variables(self) -> None: with pytest.raises(ValueError): get_lambda_region() + + +@pytest.mark.unit +class TestSummarizeLambdaEvent(TestCase): + + max_records = 42 + + def test_with_single_s3_sqs_record(self) -> None: + from handlers.aws.utils import summarize_lambda_event + + event = { + "Records": [ + { + "body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/627286350134/CloudTrail-Digest/"}}}]}', # noqa: E501 + "eventSource": "aws:sqs", + } + ] + } + + summary = summarize_lambda_event(event=event, max_records=self.max_records) + + assert summary == { + "aws:sqs": { + "total_records": 1, + f"first_{self.max_records}_records": [ + { + "bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"}, + "object": {"key": "AWSLogs/627286350134/CloudTrail-Digest/"}, + } + ], + } + } + + def test_with_multiple_s3_sqs_records(self) -> None: + from handlers.aws.utils import summarize_lambda_event + + event = { + "Records": [ + { + "body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/1.log"}}},{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/2.log"}}}]}', # noqa: E501 + "eventSource": "aws:sqs", + } + ] + } + + with self.subTest("no limits"): + summary = summarize_lambda_event(event=event, max_records=self.max_records) + + assert summary == { + "aws:sqs": { + "total_records": 2, + f"first_{self.max_records}_records": [ + { + "bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"}, + "object": {"key": "AWSLogs/123456789012/1.log"}, + }, + { + "bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"}, + "object": {"key": "AWSLogs/123456789012/2.log"}, + }, + ], + } + } + + with self.subTest("with limits"): + summary = summarize_lambda_event(event=event, max_records=1) + + assert summary == { + "aws:sqs": { + "total_records": 2, + "first_1_records": [ + { + "bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"}, + "object": {"key": "AWSLogs/123456789012/1.log"}, + } + ], + } + } + + def test_with_invalid_s3_sqs_notification(self) -> None: + from handlers.aws.utils import summarize_lambda_event + + event = { + "Records": [ + { + "body": "I am not a valid JSON string.", + "eventSource": "aws:sqs", + } + ] + } + + summary = summarize_lambda_event(event) + + assert summary == {"error": "unexpected character: line 1 column 1 (char 0)"}