-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshared_mem_ex_customfile.py
150 lines (131 loc) · 4.5 KB
/
shared_mem_ex_customfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import current_process, cpu_count, Process
from datetime import datetime
import numpy as np
import pandas as pd
import tracemalloc
import time
import sys
import os
def sortFile(fileName, opNum):
file = open(fileName, "r")
wordsInFile = []
for line in file:
splitList = line.split()
for i in splitList:
wordsInFile.append(i)
file.close()
wordsInFile.sort()
outName = "result" + str(opNum) + ".txt"
outputFile = open(outName, "w")
for i in wordsInFile:
outputFile.writelines(i)
outputFile.writelines(" ")
outputFile.close()
def getFile(file):
wordsInFile = []
for line in file:
splitList = line.split()
for i in splitList:
wordsInFile.append(i)
file.close()
return wordsInFile
# Python program for implementation of MergeSort
def mergeSort(arr):
if len(arr) > 1:
# Finding the mid of the array
mid = len(arr)//2
# Dividing the array elements
L = arr[:mid]
# into 2 halves
R = arr[mid:]
# Sorting the first half
mergeSort(L)
# Sorting the second half
mergeSort(R)
i = j = k = 0
# Copy data to temp arrays L[] and R[]
while i < len(L) and j < len(R):
if L[i] < R[j]:
arr[k] = L[i]
i += 1
else:
arr[k] = R[j]
j += 1
k += 1
# Checking if any element was left
while i < len(L):
arr[k] = L[i]
i += 1
k += 1
while j < len(R):
arr[k] = R[j]
j += 1
k += 1
def getSize(filename):
st = os.stat(filename)
return st.st_size
def work_with_shared_memory(shm_name, shape, dtype, i):
print(f'With SharedMemory: {current_process()}')
# Locate the shared memory by its name
shm = SharedMemory(shm_name)
# Create the np.recarray from the buffer of the shared memory
np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
mergeSort(np_array[(i-1)*(np_array.size/cpu_count()):i*(np_array.size/cpu_count())])
return np.nansum(np_array.val)
def work_no_shared_memory(np_array: np.recarray):
print(f'No SharedMemory: {current_process()}')
# Without shared memory, the np_array is copied into the child process
mergeSort(np_array)
return np.nansum(np_array.val)
if __name__ == "__main__":
# User file input
fileName = input("Enter File Name: ")
file = open(fileName, "r")
# Display file size in KB or MB (whichever looks better)
numBytes = getSize(fileName)
if (numBytes < 1e6):
print(f"file size = {getSize(fileName)/1e3}KB")
else:
print(f"file size = {getSize(fileName)/1e6}MB")
# Make a large data frame with date, float and character columns
a = getFile(file)
#df = pd.DataFrame(a)
#Convert into numpy recarray to preserve the dtypes
np_array = np.array(a)
#del df
shape, dtype = np_array.shape, np_array.dtype
# With shared memory
# Start tracking memory usage
tracemalloc.start()
start_time = time.time()
with SharedMemoryManager() as smm:
# Create a shared memory of size numBytes
shm = smm.SharedMemory(sys.getsizeof(np_array))
# Create a np.recarray using the buffer of shm
shm_np_array = np.recarray(shape=shape, dtype=dtype, buf=shm.buf)
# Copy the data into the shared memory
np.copyto(shm_np_array, np_array)
# Spawn some processes to do some work
with ProcessPoolExecutor(cpu_count()) as exe:
fs = [exe.submit(work_with_shared_memory, shm.name, shape, dtype, i)
for i in range(cpu_count())]
for _ in as_completed(fs):
pass
# Check memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
print(f'Time elapsed: {time.time()-start_time:.2f}s')
tracemalloc.stop()
# Without shared memory
tracemalloc.start()
start_time = time.time()
mergeSort(np_array)
# Check memory usage
current, peak = tracemalloc.get_traced_memory()
print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB")
print(f'Time elapsed: {time.time()-start_time:.2f}s')
tracemalloc.stop()
file.close()