-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtest_concurrentprocessing.py
62 lines (50 loc) · 1.77 KB
/
test_concurrentprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import time
import pytest
from thread import ConcurrentProcessing, exceptions
# >>>>>>>>>> Dummy Functions <<<<<<<<<< #
def _dummy_dataProcessor(data_in: int, delay: float = 0) -> int:
time.sleep(delay)
return data_in
def _dummy_raiseException(x: Exception, delay: float = 0):
time.sleep(delay)
raise x
# >>>>>>>>>> General Use <<<<<<<<<< #
def test_threadsScaleDown():
"""This test is for testing if threads scale down `max_threads` when the dataset is lesser than the thread count"""
dataset = list(range(0, 2))
new = ConcurrentProcessing(
function=_dummy_dataProcessor,
dataset=dataset,
max_threads=4,
kwargs={'delay': 2},
daemon=True,
)
new.start()
assert len(new._threads) == 2
def test_threadsProcessing():
"""This test is for testing if threads correctly order data in the `dataset` arrangement"""
dataset = list(range(0, 500))
new = ConcurrentProcessing(
function=_dummy_dataProcessor, dataset=dataset, args=[0.001], daemon=True
)
new.start()
assert new.get_return_values() == dataset
# >>>>>>>>>> Raising Exceptions <<<<<<<<<< #
def test_raises_StillRunningError():
"""This test should raise ThreadStillRunningError"""
dataset = list(range(0, 8))
new = ConcurrentProcessing(
function=_dummy_dataProcessor, dataset=dataset, args=[1], daemon=True
)
new.start()
with pytest.raises(exceptions.ThreadStillRunningError):
_ = new.results
def test_raises_RunTimeError():
"""This test should raise a RunTimeError"""
dataset = [RuntimeError()] * 8
new = ConcurrentProcessing(
function=_dummy_raiseException, dataset=dataset, args=[0.01], daemon=True
)
with pytest.raises(RuntimeError):
new.start()
new.join()