Skip to content

Commit b0fd362

Browse files
committed
Master FaaS
1 parent 9dc08c1 commit b0fd362

File tree

3 files changed

+272
-0
lines changed

3 files changed

+272
-0
lines changed

master/main.py

+255
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
import functions_framework
2+
import os
3+
import re
4+
import json
5+
import hashlib
6+
import csv
7+
import json
8+
import threading
9+
import requests
10+
from google.cloud import storage
11+
12+
def read_file_from_gcs(bucket, folder_name, file_name):
13+
blob = bucket.blob(f"{folder_name}/{file_name}")
14+
content = blob.download_as_text()
15+
return content
16+
17+
def write_file_to_gcs(bucket, folder_name, file_name, content):
18+
blob = bucket.blob(f"{folder_name}/{file_name}")
19+
blob.upload_from_string(content)
20+
21+
def divide_and_save_to_gcs(bucket, content, number_of_mappers, base_file_name):
22+
lines = content.splitlines()
23+
total_lines = len(lines)
24+
lines_per_part = total_lines // number_of_mappers
25+
26+
for i in range(number_of_mappers):
27+
start_index = i * lines_per_part
28+
end_index = (i + 1) * lines_per_part if i < (number_of_mappers-1) else total_lines
29+
part_content = '\n'.join(lines[start_index:end_index])
30+
31+
write_file_to_gcs(bucket, f"temp{i}", base_file_name, part_content)
32+
33+
def distribute_files_to_buckets(file_tuples, k, bucket_name, folder_name):
34+
# Sort files in descending order of size
35+
sorted_files = sorted(file_tuples, key=lambda x: x[1], reverse=True)
36+
37+
# Initialize buckets
38+
buckets = {i: [] for i in range(k)}
39+
bucket_sizes = [0] * k
40+
41+
# Greedy allocation of files to buckets
42+
for file_name, size in sorted_files:
43+
min_bucket_index = min(range(k), key=lambda i: bucket_sizes[i])
44+
buckets[min_bucket_index].append((bucket_name, folder_name, file_name))
45+
bucket_sizes[min_bucket_index] += size
46+
47+
return buckets
48+
49+
def executeGroupBy(number_of_mappers, number_of_reducers):
50+
url = "https://us-central1-piyush-chaudhari-fall2023.cloudfunctions.net/groupby"
51+
parameters = {"number_of_mappers":number_of_mappers, "number_of_reducers":number_of_reducers}
52+
r = requests.post(url, json=parameters)
53+
return r.content.decode() == "groupby OK"
54+
55+
# Function to be executed by each mapper thread
56+
def thread_function_mapper(thread_id, file_list):
57+
url = f"https://us-central1-piyush-chaudhari-fall2023.cloudfunctions.net/mapper{thread_id}"
58+
parameters = {"mapper_name" : f"mapper{thread_id}", "file_list" : file_list}
59+
r = requests.post(url, json=parameters)
60+
result = r.content.decode()
61+
62+
# Store the result in the thread object
63+
if result == f"mapper{thread_id} OK":
64+
threading.current_thread().return_value = 1
65+
else:
66+
threading.current_thread().return_value = 0
67+
68+
# Function to be executed by each reducer thread
69+
def thread_function_reducer(thread_id):
70+
url = f"https://us-central1-piyush-chaudhari-fall2023.cloudfunctions.net/reducer{thread_id}"
71+
parameters = {"reducer_name" : f"reducer{thread_id}"}
72+
r = requests.post(url, json=parameters)
73+
result = r.content.decode()
74+
75+
# Store the result in the thread object
76+
if result == f"reducer{thread_id} OK":
77+
threading.current_thread().return_value = 1
78+
else:
79+
threading.current_thread().return_value = 0
80+
81+
82+
def merge_reducer_outputs(eccmr_final_result_bucket, number_of_reducers, reducer_bucket):
83+
final_dict = {}
84+
for reducerid in range(number_of_reducers):
85+
file_path = f"reducer{reducerid}/reducer{reducerid}.json"
86+
reducerblob = reducer_bucket.blob(file_path)
87+
content_text = reducerblob.download_as_text()
88+
json_object = json.loads(content_text)
89+
90+
if reducerid == 0:
91+
final_dict = json_object
92+
continue
93+
94+
# otherwise merging would start from reducerid 1
95+
for ipkey in json_object.keys():
96+
if ipkey in final_dict.keys():
97+
# word found now check for filenames
98+
for ipfilename in json_object[ipkey].keys():
99+
if ipfilename in final_dict[ipkey].keys():
100+
final_dict[ipkey][ipfilename] += json_object[ipkey][ipfilename]
101+
else:
102+
final_dict[ipkey][ipfilename] = json_object[ipkey][ipfilename]
103+
else:
104+
final_dict[ipkey] = json_object[ipkey]
105+
# print("this is final_dict(0):", final_dict)
106+
# what if reducer bucket alread has file? we have to merge it too
107+
final_file_blob = eccmr_final_result_bucket.blob(f"final_results.json")
108+
if (final_file_blob.exists()):
109+
# merge
110+
content_text = final_file_blob.download_as_text()
111+
json_object = json.loads(content_text)
112+
for ipkey in json_object.keys():
113+
if ipkey in final_dict.keys():
114+
# word found now check for filenames
115+
for ipfilename in json_object[ipkey].keys():
116+
if ipfilename in final_dict[ipkey].keys():
117+
final_dict[ipkey][ipfilename] += json_object[ipkey][ipfilename]
118+
else:
119+
final_dict[ipkey][ipfilename] = json_object[ipkey][ipfilename]
120+
else:
121+
final_dict[ipkey] = json_object[ipkey]
122+
# print("this is final_dict(1):", final_dict)
123+
# Convert the JSON data to a string
124+
json_string = json.dumps(final_dict, indent=4)
125+
# Upload the JSON data to the specified file in Google Cloud Storage
126+
final_file_blob.upload_from_string(json_string, content_type="application/json")
127+
128+
def delete_intermediate_files(eccrm_dataset_temp_bucket, mapper_bucket, reducer_bucket, groupby_bucket):
129+
bucket_list = [eccrm_dataset_temp_bucket, mapper_bucket, reducer_bucket, groupby_bucket]
130+
for bucket in bucket_list:
131+
blobs = bucket.list_blobs()
132+
for blob in blobs:
133+
blob.delete()
134+
135+
@functions_framework.http
136+
def master(request):
137+
request_json = request.get_json(silent=True)
138+
139+
#input parameters
140+
filenames = request_json["filenames"]
141+
number_of_mappers = request_json["number_of_mappers"]
142+
number_of_reducers = request_json["number_of_reducers"]
143+
144+
client = storage.Client.from_service_account_json('piyush-chaudhari-fall2023-9ae1ed20a7f3.json')
145+
eccrm_dataset_bucket_name = 'eccrm_dataset_bucket'
146+
eccrm_dataset_bucket = client.get_bucket(eccrm_dataset_bucket_name)
147+
eccrm_dataset_temp_bucket_name = 'eccrm_dataset_temp_bucket'
148+
eccrm_dataset_temp_bucket = client.get_bucket(eccrm_dataset_temp_bucket_name)
149+
eccmr_final_result_bucket_name = 'eccmr_final_result_bucket'
150+
eccmr_final_result_bucket = client.get_bucket(eccmr_final_result_bucket_name)
151+
mapper_bucket_name = 'mapper_bucket'
152+
mapper_bucket = client.get_bucket(mapper_bucket_name)
153+
reducer_bucket_name = 'reducer_bucket'
154+
reducer_bucket = client.get_bucket(reducer_bucket_name)
155+
groupby_bucket_name = 'groupby_bucket'
156+
groupby_bucket = client.get_bucket(groupby_bucket_name)
157+
158+
allocation = {}
159+
160+
if len(filenames) == 1:
161+
# cloud storage trigger event
162+
# divide the file into #number_of_mappers chunks and dump it on google cloud storage
163+
folder_name = "dataset"
164+
file_content = read_file_from_gcs(eccrm_dataset_bucket, folder_name, filenames[0])
165+
divide_and_save_to_gcs(eccrm_dataset_temp_bucket, file_content, number_of_mappers, filenames[0])
166+
basefilename = filenames[0]
167+
filenames = []
168+
for indx in range(number_of_mappers):
169+
allocation[indx] = [(eccrm_dataset_temp_bucket_name, f"temp{indx}", basefilename)] # [(bucketname, foldername=[temp<id>], filename)]
170+
# filenames.append((f"temp{indx}", basefilename))
171+
print(allocation)
172+
else:
173+
# distribute files of dataset to mapper in a way such that every mapper gets equall/similar dataload
174+
# implemented greedy algorithm of partition.
175+
file_tuples = []
176+
foldername = "dataset"
177+
for filename in filenames:
178+
blob = eccrm_dataset_bucket.blob(f"{foldername}/{filename}")
179+
blob.reload()
180+
file_tuples.append((filename, round(blob.size/1000)))
181+
182+
allocation = distribute_files_to_buckets(file_tuples, number_of_mappers, eccrm_dataset_bucket_name, foldername)
183+
print(allocation)
184+
185+
# first spawn mappers
186+
# Create threads
187+
threads = []
188+
threadid = 0
189+
for key in allocation.keys():
190+
if len(allocation[key]) == 0: # meaning no files present for that id/bucket/ skip it then
191+
continue
192+
thread = threading.Thread(target=thread_function_mapper, args=(threadid, allocation[key]))
193+
threads.append(thread)
194+
threadid += 1
195+
196+
# Start all threads
197+
for thread in threads:
198+
thread.start()
199+
200+
# Join all threads
201+
for thread in threads:
202+
thread.join()
203+
204+
# Main thread checks results
205+
results_mappers = [thread.return_value for thread in threads]
206+
207+
if not (sum(results_mappers) == len(threads)):
208+
print("ERROR OCCURED IN MAPPERS EXECUTION.")
209+
return "master NOT OK - ERROR OCCURED IN MAPPERS EXECUTION."
210+
211+
print("***********MAPPER EXECUTED SUCCESSFULLY.***********")
212+
213+
print("***********GROUPBY EXECUTION STARTED.***********")
214+
# groupby
215+
if not (executeGroupBy(number_of_mappers, number_of_reducers)):
216+
print("ERROR OCCURED IN GROUPBY FaaS EXECUTION")
217+
return "master NOT OK - ERROR OCCURED IN GROUPBY FaaS EXECUTION."
218+
219+
print("***********GROUPBY EXECUTED SUCCESSFULLY.***********")
220+
221+
print("***********REDUCERS EXECUTION STARTED.***********")
222+
# spawn reducers
223+
# Create threads
224+
threads = []
225+
for threadid in range(number_of_reducers):
226+
thread = threading.Thread(target=thread_function_reducer, args=(threadid,))
227+
threads.append(thread)
228+
229+
# Start all threads
230+
for thread in threads:
231+
thread.start()
232+
233+
# Join all threads
234+
for thread in threads:
235+
thread.join()
236+
237+
# Main thread checks results
238+
results_reducers = [thread.return_value for thread in threads]
239+
240+
if not (sum(results_reducers) == number_of_reducers):
241+
print("ERROR OCCURED IN REDUCERS EXECUTION.")
242+
return "master NOT OK - ERROR OCCURED IN REDUCERS EXECUTION."
243+
244+
print("***********REDUCERS EXECUTED SUCCESSFULLY.***********")
245+
246+
# merge reducer output to single file
247+
merge_reducer_outputs(eccmr_final_result_bucket, number_of_reducers, reducer_bucket)
248+
print("***********REDUCERS MERGED SUCCESSFULLY.***********")
249+
250+
print("***********DELETION OF INTERMEDIATE FILES IN PROGRESS***********")
251+
delete_intermediate_files(eccrm_dataset_temp_bucket, mapper_bucket, reducer_bucket, groupby_bucket)
252+
print("***********DELETION OF INTERMEDIATE FILES DONE SUCCESSFULLY***********")
253+
254+
print("***********MASTER OK.***********")
255+
return "master OK"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"type": "service_account",
3+
"project_id": "piyush-chaudhari-fall2023",
4+
"private_key_id": "",
5+
"private_key": "",
6+
"client_email": "googlecloudstorage@piyush-chaudhari-fall2023.iam.gserviceaccount.com",
7+
"client_id": "",
8+
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
9+
"token_uri": "https://oauth2.googleapis.com/token",
10+
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
11+
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/googlecloudstorage%40piyush-chaudhari-fall2023.iam.gserviceaccount.com",
12+
"universe_domain": "googleapis.com"
13+
}

master/requirements.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
functions-framework==3.*
2+
google-cloud-storage==2.13.0
3+
six==1.16.0
4+
pandas==2.0.3

0 commit comments

Comments
 (0)