-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConditionBoundLock.java
108 lines (97 loc) · 4.04 KB
/
ConditionBoundLock.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kp.synchronizers.locks;
import kp.utils.Printer;
import kp.utils.Utils;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
/**
* Utilizes {@link ReentrantLock} and {@link Condition} to manage a {@link Deque} with concurrent access.
* <p>
* {@link Lock} and {@link Condition}:
* </p>
* <ul>
* <li>{@link Lock} replaces the use of 'synchronized' methods and statements</li>
* <li>{@link Condition} replaces the use of the Object monitor methods</li>
* </ul>
*/
public class ConditionBoundLock {
private static final int CAPACITY_LIMIT = 4;
private static final int LIST_SIZE = 4;
private static final List<Integer> DATA_1 = IntStream.rangeClosed(101, 100 + LIST_SIZE).boxed().toList();
private static final List<Integer> DATA_2 = IntStream.rangeClosed(20001, 20000 + LIST_SIZE).boxed().toList();
private static final AtomicBoolean PAUSE_ATOMIC = new AtomicBoolean();
private final Lock lock = new ReentrantLock();
private final Condition canPopCondition = lock.newCondition();
private final Condition canPushCondition = lock.newCondition();
private final Deque<Integer> deque = new ArrayDeque<>();
/**
* Starts the threads to process pushing and popping from the deque.
*/
public static void process() {
final ConditionBoundLock conditionBoundLock = new ConditionBoundLock();
IntStream.rangeClosed(0, 1).forEach(_ -> {
Thread.ofPlatform().name("Pop-Thread").start(() -> IntStream.range(0, 2 * LIST_SIZE)
.forEach(_ -> conditionBoundLock.popFromDeque()));
Utils.sleepMillis(10);
Thread.ofPlatform().name("Push-Thr-1").start(() -> DATA_1.forEach(conditionBoundLock::pushToDeque));
Thread.ofPlatform().name("Push-Thr-2").start(() -> DATA_2.forEach(conditionBoundLock::pushToDeque));
Utils.sleepMillis(500);
Printer.printHor();
PAUSE_ATOMIC.set(true);
});
}
/**
* Pushes an element to the deque.
*
* @param element the element to push
*/
private void pushToDeque(int element) {
final String name = Thread.currentThread().getName();
try {
lock.lock();
while (deque.size() == CAPACITY_LIMIT) {
Printer.printf("name[%s] |█|█|█|█|█| await 'CAN PUSH' condition before element[%d] push", name, element);
canPushCondition.await();
}
deque.push(element);
canPopCondition.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Preserve interrupt status
Printer.printInterruptedException(e);
System.exit(1);
} finally {
lock.unlock();
}
Printer.printf("name[%s] ►►►►► push element[%s]", name, element);
}
/**
* Pops an element from the deque.
*/
private void popFromDeque() {
final String name = Thread.currentThread().getName();
try {
lock.lock();
while (deque.isEmpty()) {
Printer.printf("name[%s] |_|_|_|_|_| await 'CAN POP' condition before element pop", name);
canPopCondition.await();
}
final int element = deque.pop();
canPushCondition.signalAll();
Printer.printf("name[%s] ◄◄◄◄◄ pop element[%d]", name, element);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Preserve interrupt status
Printer.printInterruptedException(e);
System.exit(1);
} finally {
lock.unlock();
}
if (PAUSE_ATOMIC.get()) {
Utils.sleepMillis(10);
}
}
}