Advanced Task Automator Project
Advanced Task Automator Project
This is a guide for building a serverless task automator with a queue. This is similar to the basic Task Automator project except we have added an SQS queue for a more resilient architecture. Here is the code below:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import json
import boto3
import decimal
from datetime import datetime
from boto3.dynamodb.conditions import Key as dynamoKey
#
class DatabaseScanner:
"""Collects raw user metrics from prod databases."""
def _scan_pagination_wrapper(self, target, **kwargs):
"""Handles scans of data over 1mb.
https://stackoverflow.com/a/38619425/."""
response = target.scan(**kwargs)
data = []
if 'Items' in response:
data.extend(self._replace_decimals(response['Items']))
while response.get('LastEvaluatedKey'):
response = target.scan(ExclusiveStartKey=response['LastEvaluatedKey'], **kwargs)
if 'Items' in response:
data.extend(self._replace_decimals(response['Items']))
return data
#
def _replace_decimals(self, obj):
"""Helper function replaces Decimal() and Binary() objects.
https://github.com/boto/boto3/issues/369/."""
if isinstance(obj, list):
for i in range(len(obj)):
obj[i] = self._replace_decimals(obj[i])
return obj
elif isinstance(obj, dict):
for k in obj.keys():
obj[k] = self._replace_decimals(obj[k])
return obj
elif isinstance(obj, decimal.Decimal):
if obj % 1 == 0:
return int(obj)
else:
return float(obj)
else:
return obj
#
def get(self, date, job):
"""Scans for data from a given UTC date to now() in a given table.
Example params: date = '1970-01-01 00:00:00',
job = {"application": "hello-world-webapp","table_name": "webapp_user_logs",
"region": "us-east-1","assume_role": "None"}"""
# Parse Params
start_date_time = int(datetime.strptime(date, '%Y-%m-%d %H:%M:%S').timestamp())
end_date_time = int(datetime.now().timestamp())
table_name = job.get('table_name')
region = job.get('region')
#assume_role = job.get('assume_role')
if not (table_name, region):
return {'job params incorrect'}
#
# Define Resources and Key
local_db = boto3.resource('dynamodb', region_name=region)
table = local_db.Table(table_name)
fe = dynamoKey('access_timestamp').between(start_date_time, end_date_time)
return self._scan_pagination_wrapper(table, FilterExpression=fe)
#
class DataProcesser:
"""Processes data into JSON files and push to S3."""
def __init__(self, database_scanner, bucketname):
self.database_scanner = database_scanner
self.s3 = boto3.resource('s3')
self.s3client = boto3.client('s3')
self.bucketname = bucketname
#
def _get_data(self, date, job):
"""Get data using database scanner"""
return self.database_scanner.get(date, job)
#
def _process_put(self, date, job):
"""Checks if there was an upload today.
If not, then retrieves data, uploads, and puts into S3."""
#
# Check Application
application = job.get('application')
if not application:
return {'no application name given'}
#
# Create Timestamp
nowdt = datetime.now()
now = [nowdt.strftime('%Y-%m-%d %H:%M:%S') + 'Z',
f"/year={nowdt.year}/month={nowdt.month}/day={nowdt.day}/",
int(datetime.now().timestamp())]
#
# Create object path
folder = 'usermetrics/application=' + application
extension = application + '_userlogs_'
objectpath = f"{folder}{now[1]}{extension}{now[0]}.json"
objectpath_split = objectpath.split(extension)
filename = extension + objectpath_split[1]
prefix = objectpath_split[0]
##prefix = 'usermetrics/application=hello-world-webapp/year=2019/month=11/day=9/' #for testing
#
# Test if we already uploaded a file today
s3metadata = self.s3client.list_objects(Bucket=self.bucketname, Prefix=prefix)
if 'Contents' in s3metadata:
return {'exists - no new data uploaded: s3://' + self.bucketname + '/' + prefix}
#
# Retrieved and Upload data
data = self._get_data(date, job)
if not data or len(data)==0:
return {'no new data was retrieved'}
jsonfile = json.dumps({'user_metrics':data}).encode('UTF-8')
result = self.s3.Object(self.bucketname, objectpath).put(Body=(bytes(jsonfile)))
return filename + ' uploaded to s3://' + self.bucketname if result else 'S3 upload error.'
#
def load(self, job, pastdays=1):
"""Processes metrics data and loads into S3 for
the past number of days (default is 1 day) for a given job.
Example params: pastdays = 1,
job = {"application":"hello-world-webapp", "table_name":"webapp_user_logs",
"region":"us-east-1","assume_role": "None"}"""
#
getdate = int(datetime.now().timestamp()) - 86400 * int(pastdays)
strfdate = datetime.fromtimestamp(getdate).strftime('%Y-%m-%d %H:%M:%S')
return self._process_put(strfdate, job)
#
class SQSmsgCleaner:
"""Cleans up SQS messages to get the body"""
def __init__(self, message):
self.body = json.loads(message['Records'][0]['body'])
#
#===Lambda Handler===
def lambda_handler(event, context):
#
# Register Classes
database_scanner = DatabaseScanner()
data_processer = DataProcesser(database_scanner,bucketname='metrics-bucket')
sqsmsg = SQSmsgCleaner(event).body
#
# Run Processer
result = data_processer.load(sqsmsg)
print(result)
return {'statusCode': 200,'body': json.dumps(str(result))}
This post is licensed under CC BY 4.0 by the author.