-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtutorial_8.py
255 lines (187 loc) · 7.3 KB
/
tutorial_8.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# Python Threading Tutorial: Run Code Concurrently Using the Threading Module
import time
# Original function
start = time.perf_counter()
# Starting a counter to know the execution time of the script
def do_something(seconds):
print(f"Sleeping for {seconds} second...")
time.sleep(seconds)
return f"Done Sleeping for {seconds} second..."
# Running in order like this, is called running synchronously as it is running one after the other
print("Synchronous Code")
print()
for _ in range(2):
# _ variable is a throw away variable in python, which has no actual use
print(do_something(1))
print()
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
# Difference between CPU bound and IO bound task
# CPU bound is when CPU makes delays to process something and we have to wait
# IO bound, same as CPU bound but here we wait for input and output operations to be completed
# Example of IO bound task: reading and writing to the file system, network operations, downloading files online
# Benefits of using threading and concurrency
# Use threading for a lot of IO bound task as we have a lot of waiting for it's completion
# CPU bound task do not benefit from using threading
# Some program even run slow using threads because of the added overhead cost for creating and destroying different threads
# CPU bound task use multiprocessing and run the jobs in parallel instead
# How threading works?
# It does not run code at same time, it just give the illusion that it's running at the same time
# When IO bound tasks come to a point where we have to wait for an output to continue with the code execution
# With threading or concurrency, it is just going to move on with the script and execute other codes while the IO operations finish
# Manual Threading Method - the old way
import threading
import queue
start = time.perf_counter()
print("Manual Threading Method")
print()
# Queue is used to get the return of the function do_something
que = queue.Queue()
# this type of thread cannot return the value of the function
# t1 = threading.Thread(target=do_something, args=[1.5])
# the lambda function takes 2 parameter: queue, seconds
# the args will supply the values of the 2 parameters required by the lambda function: queue.put(do_something(seconds))
# the function will look like: que.put(do_something(1.5))
# que.put() will append the return value of the function into the que list
# later the que.put() can be retrived and it will contain the values of all the function which ran in threads
t1 = threading.Thread(
# lambda expression
target=lambda queue, seconds: queue.put(do_something(seconds)),
args=[que, 1.5],
)
t2 = threading.Thread(
target=lambda queue, seconds: queue.put(do_something(seconds)), args=[que, 1.5]
)
# Starts the thread
t1.start()
t2.start()
# To tell the threads that they have to finish before executing the rest of the code as they are dependant on the thread completion
# Output when not using thread.join()
"""
Sleeping for 1 second...
Sleeping for 1 second...
Finished in 0.0 second(s)
Done Sleeping...
Done Sleeping...
"""
t1.join()
t2.join()
# Check thread's return value
while not que.empty():
result = que.get()
print(result)
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
# Demonstrate its impact for many concurrent jobs
start = time.perf_counter()
print("Multiple concurrent jobs using Manual Threading Method")
print()
# Keep track of all threads running
threads = []
que = queue.Queue()
for _ in range(10):
t = threading.Thread(
target=lambda queue, seconds: queue.put(do_something(seconds)), args=[que, 1.5]
)
t.start()
threads.append(t)
# End all thread execution before continuing
for thread in threads:
thread.join()
# Check thread's return value
while not que.empty():
result = que.get()
print(result)
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
# ThreadPool Executor - new, easier and more efficient way
import concurrent.futures
print("ThreadPool Executor Method")
print()
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit method schedule a function to be executed and returns a future object
# Future object encapsulated the execution of the function and allows us to check in on it after it's been scheduled
# Can check if it is running, or if it's done, or the result
f1 = executor.submit(do_something, 1)
f2 = executor.submit(do_something, 1)
# Result method will wait for the function to complete
print(f1.result())
print(f2.result())
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
print("Multiple concurrent jobs using ThreadPool Executor Method")
print("Example 1")
print()
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
# List comprehension
results = [executor.submit(do_something, 1.5) for _ in range(10)]
# as_completed will display the return value of do_something function in real time as it completes a concurrent job
for f in concurrent.futures.as_completed(results):
print(f.result())
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
print("Multiple concurrent jobs using ThreadPool Executor Method")
print("Example 2")
print()
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
secs = [3, 2.5, 1.5, 1]
results = [executor.submit(do_something, sec) for sec in secs]
# as_completed will display the return value of do_something function in real time as it completes a concurrent job
for f in concurrent.futures.as_completed(results):
# The 1 second job will finish first then the 1.5, 2.5, 3 thus proving how as_completed works
# as_completed will return the future object in the order that they completed
print(f.result())
"""
Sleeping for 3 second...
Sleeping for 2.5 second...
Sleeping for 1.5 second...
Sleeping for 1 second...
Done Sleeping for 1 second...
Done Sleeping for 1.5 second...
Done Sleeping for 2.5 second...
Done Sleeping for 3 second...
Finished in 3.02 second(s)
"""
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()
print("Multiple concurrent jobs using ThreadPool Executor Method")
print("Example 3")
print()
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as executor:
secs = [3, 2.5, 1.5, 1]
# To return the results in the same order that they started, we use the map function
# But they all ran concurrently and did not slow down
results = executor.map(do_something, secs)
for result in results:
print(result)
"""
Sleeping for 3 second...
Sleeping for 2.5 second...
Sleeping for 1.5 second...
Sleeping for 1 second...
Done Sleeping for 3 second...
Done Sleeping for 2.5 second...
Done Sleeping for 1.5 second...
Done Sleeping for 1 second...
Finished in 3.02 second(s)
"""
finish = time.perf_counter()
print(f"Finished in {round(finish-start, 2)} second(s)")
print()
print()