-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdata.py
249 lines (220 loc) · 7.17 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
"""
WarriorBeatGraphQL
Data Handlers
"""
import os
import boto3
from boto3.dynamodb.conditions import Key, Attr
from botocore.exceptions import ClientError
import requests
# Connection Info
TABLES = {
'media': {
'table_name': os.environ.get("DB_MEDIA"),
},
'article': {
'table_name': os.environ.get("DB_ARTICLE"),
},
'article_likes': {
'table_name': os.environ.get("DB_ARTICLELIKES")
},
'category': {
'table_name': os.environ.get("DB_CATEGORY"),
},
'poll_votes': {
'table_name': os.environ.get("DB_POLLVOTES")
},
'poll_options': {
'table_name': os.environ.get("DB_POLLOPTIONS")
},
'users': {
'table_name': os.environ.get("DB_USERS")
},
'user_subs': {
'table_name': os.environ.get("DB_USERSUBSCRIPTIONS")
},
'author': {
'table_name': os.environ.get("DB_AUTHORS")
}
}
BUCKETS = {
'media': {
'bucket_name': os.environ.get("S3_MEDIA"),
'parent_key': 'media/'
}
}
class DynamoDB:
"""
AWS Boto3 DynamoDB
Handles transactions with database
params:
table_name: string
name of table to access
"""
def __init__(self, table):
self.client = boto3.client('dynamodb')
self.dynamodb = boto3.resource('dynamodb')
self.table = TABLES[table]
self.hash = self.table.get('primary_key', 'id')
self.db = self.dynamodb.Table(self.table['table_name'])
def clean_item(self, item):
"""prepare item for database insertion"""
clean = item.copy()
for k, v in item.items():
if type(v) is str and len(v) <= 0:
clean.pop(k)
return clean
def add_item(self, item):
"""adds item to database"""
item = self.clean_item(item)
self.db.put_item(Item=item)
return item
def delete_item(self, itemId):
"""deletes item from database"""
self.db.delete_item(Key={
self.hash: itemId
})
return itemId
def get_item(self, itemId):
"""retrieves an item from database"""
try:
resp = self.db.get_item(Key={
self.hash: itemId
})
return resp.get('Item')
except ClientError as e:
print(e)
return None
def update_list(self, item, update, append=False, id_key='id', key=None, range_key=None, return_val=None):
"""updates item list attribute"""
key_val = key or self.hash
return_val = return_val or "NONE"
item_id = item.get(id_key)
up_attr, up_val = update
up_val = up_val if type(up_val) is list else [up_val]
update_expr = f":value"
update_vals = {
":value": up_val
}
if append:
update_expr = f"list_append(if_not_exists({up_attr}, :emptyList), :value)"
update_vals[":emptyList"] = []
update = {
"Key": {
key_val: item_id
},
"ReturnValues": return_val,
"UpdateExpression": f"set {up_attr} = {update_expr}",
"ExpressionAttributeValues": update_vals
}
if range_key:
update["Key"][range_key[0]] = range_key[1]
resp = self.db.update_item(**update)
response = resp.get("Attributes", resp)
return response
def query(self, itemId, key=None, range_key=None, index=None, filters=None):
"""queries for item from database"""
key_val = key or self.hash
query = {
"KeyConditionExpression": Key(key_val).eq(itemId),
}
if range_key:
expr = query["KeyConditionExpression"]
query["KeyConditionExpression"] = expr & Key(
range_key[0]).eq(range_key[1])
if index:
query["IndexName"] = index
if filters:
filters = filters if type(filters) is list else [filters]
init_f = filters[0]
query["FilterExpression"] = Attr(init_f[0]).eq(init_f[1])
for f in filters[1:]:
query["FilterExpression"] = query["FilterExpression"] & Attr(
f[0]).eq(f[1])
try:
resp = self.db.query(**query)
return resp.get('Items')
except ClientError as e:
print(e)
return None
def exists(self, itemId):
"""checks if an item exists in the database"""
item = self.get_item(itemId)
return False if item is None else item
def scan(self, limit=20, next_token=None):
"""scans table with options"""
if next_token:
paginator = self.client.get_paginator('scan')
resp = paginator.paginate(
TableName=self.table['table_name'],
PaginationConfig={
'MaxItems': limit,
'PageSize': limit,
'StartingToken': next_token
}
)
return resp['Items']
resp = self.db.scan(
Limit=limit
)
return resp["Items"]
@property
def all(self):
"""list all items in the database table"""
resp = self.db.scan()
items = resp["Items"]
return items
class S3Storage:
"""
AWS S3 Bucket
Handles transactions with s3 Buckets
params:
bucket_name: string
name of bucket to access
"""
FILE_EXTENSIONS = {
"image/jpeg": ".jpeg",
"image/png": ".png"
}
def __init__(self, bucket):
self.s3bucket = boto3.resource('s3')
self.bucket = BUCKETS[bucket]
self.storage = self.s3bucket.Bucket(self.bucket['bucket_name'])
self.key = self.bucket['parent_key']
def get_url(self, key, **kwargs):
"""generates url where the image is hosted"""
aws_root = "https://s3.amazonaws.com"
url = f"{aws_root}/{self.bucket['bucket_name']}/{key}"
return url, key
def upload(self, path, key=None):
"""uploads a file to the s3 bucket"""
if key:
key = self.key + key
self.storage.upload_file(path, key, ACL='public-read')
return self.get_url(key)
else:
self.storage.upload_file(path, self.key, ACL='public-read')
return self.get_url(self.key)
def upload_obj(self, obj, key=''):
"""upload file object"""
_key = self.key + key + obj[1]
self.storage.put_object(Key=_key, Body=obj[0], ACL='public-read')
return self.get_url(_key)
def upload_from_url(self, url, **kwargs):
"""upload file from url"""
img_stream = requests.get(url, stream=True)
content_type = img_stream.headers.get('content-type', 'image/jpeg')
file_ext = self.FILE_EXTENSIONS[content_type]
img_obj = img_stream.raw
img_data = img_obj.read()
return self.upload_obj((img_data, file_ext), **kwargs)
def delete(self, keys):
"""deletes files from s3 bucket"""
key_objs = [{'Key': k} for k in keys] if type(keys) is list else [
{'Key': keys}]
self.storage.delete_objects(
Delete={
'Objects': key_objs,
},
)
return key_objs