-
Notifications
You must be signed in to change notification settings - Fork 1
/
lambda_write_to_S3_from_SQS.py
151 lines (131 loc) · 5.89 KB
/
lambda_write_to_S3_from_SQS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Copyright 2010-2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# This file is licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
This lambda reads a payload received from SQS. It expects to find a JSON payload in the Message.
The payload content is checked for specific keys.
Metrics on Lambda execution are sent to CloudWatch.
Configuration:
Declare the following environment variables:
:param bool TRACE: True for additional logs
:param str BUCKET_NAME: Destination bucket name
:param str INSPECT: if notTrue, the inspection of the payload is disabled.
The Role allocated to the Lambda for execution must have the following policies (or less permissive equivalent):
* AWSLambdaBasicExecutionRole
* AWSLambdaSQSQueueExecutionRole
* AmazonS3FullAccess
* [TODO]: Figure out policy for sending CloudWatch metrics
"""
import json
import boto3
import os
import datetime as dt
# Recover & check environment variables
bucket = os.environ.get("BUCKET_NAME")
trace = os.environ.get("TRACE", True)
if trace in ("true", "True", "TRUE", 1, "Yes", "YES", True):
trace = True
else:
trace = False
inspect = os.environ.get("INSPECT", False)
if inspect in ("true", "True", "TRUE", 1, "Yes", "YES", True):
inspect = True
else:
inspect = False
if not bucket:
raise Exception("Environment variable BUCKET_NAME missing")
s3 = boto3.client('s3')
cw = boto3.client('cloudwatch')
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
def log_me(msg):
if trace is True:
print(msg)
def send_cw_metric(name, dims, unit, value, namespace):
response = cw.put_metric_data(
MetricData=[
{
'MetricName': name,
'Dimensions': dims,
'Unit': unit,
'Value': value,
'Timestamp': dt.datetime.utcnow()
},
],
Namespace=namespace
)
log_me(response)
# noinspection PyUnusedLocal
def lambda_handler(event, context):
message_ids = []
if 'Records' in event:
print("Found {} records to store to S3.".format(len(event['Records'])))
dims = [{'Name': 'Function', 'Value': 'SQS_to_S3'}]
send_cw_metric(name='Batch Size', dims=dims, unit='Count', value=len(event['Records']),
namespace='Custom Lambda Metrics')
# First build a list of all the message IDs to process. The list will be depopulated when processed.
for record in event.get('Records'):
message_ids.append(record['messageId'])
log_me("Messages IDs to proceed: {}".format(message_ids))
# Process each message in the Records
for record in event.get('Records'):
body_str = record.get('body')
try:
# Make sure the records is properly structured and the payload exists
if not body_str:
raise Exception("No body found in Record")
body = json.loads(body_str)
msg = body.get('Message')
if not msg:
raise Exception("no Payload found")
else:
payload = json.loads(msg)
log_me("The payload is: {}".format(payload))
if inspect is True:
timestring = payload.get('timestamp')
if not timestring:
raise Exception('Malformed payload: timestamp key missing')
thing = payload.get('gateway')
if not thing:
raise Exception('Malformed payload: thing key missing')
device = payload.get('deviceName')
if not device:
raise Exception('Malformed payload: thing key missing')
epoch = payload.get('epoch_ms')
if not epoch:
raise Exception('Malformed payload: thing key missing')
value = payload.get('values')
log_me("values in payload: {}".format(value))
if not value:
raise Exception("Empty payload found")
# Check that the timestamp is in the right format and genera the S3 object key
tstamp = dt.datetime.strptime(timestring, TIME_FORMAT)
else:
# Do not inspect payload - try to retrieve timestamp in ms or generate it
epoch = payload.get('epoch_ms', int(dt.datetime.utcnow().timestamp()*1000))
thing = payload.get('gateway', payload.get('device_name', 'unknown_gateway'))
device = payload.get('deviceName', payload.get('site_name', 'unknown_device'))
tstamp = dt.datetime.fromtimestamp(epoch/1000, dt.timezone.utc)
# save to S3
key = "{:02d}/{:02d}/{:02d}/{}/{}/{}.json".format(tstamp.year, tstamp.month, tstamp.day,
thing, device, epoch)
s3.put_object(
Body=json.dumps(payload),
Bucket=bucket,
Key=key
)
log_me("Object stored: {}".format(key))
# Finally remove the item from the list of unprocessed messages
log_me("Message ID {} processed successfully".format(record['messageId']))
message_ids.remove(record['messageId'])
except Exception as e:
print("Error when processing a Record: {}".format(e))
r = {"batchItemFailures": [{"itemIdentifier": x} for x in message_ids]}
log_me("Returning unprocessed messages IDs: {}".format(r))
return r