Skip to content

Commit aa26c12

Browse files
authored
Fix MonoPublishOnTest flakiness (#3898)
The implementation relied on a specific sequencing of events, which in case of slower hardware or random events would occasionally fail the assertions. This change introduces firm ordering and fixes an incorrect assertion, which assumed the `Mono` terminates without error, while in fact the `RejectedExecutionException` is propagated to the `AssertSubscriber`. In effect, the tests are no longer flaky.
1 parent bd5c4e5 commit aa26c12

File tree

1 file changed

+32
-25
lines changed

1 file changed

+32
-25
lines changed

reactor-core/src/test/java/reactor/core/publisher/MonoPublishOnTest.java

+32-25
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2015-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,13 +55,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
5555
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();
5656

5757
try {
58-
59-
CountDownLatch hookLatch = new CountDownLatch(1);
58+
CountDownLatch finallyLatch = new CountDownLatch(1);
59+
CountDownLatch inOnNextLatch = new CountDownLatch(1);
6060

6161
Hooks.onOperatorError((t, d) -> {
6262
throwableInOnOperatorError.set(t);
6363
dataInOnOperatorError.set(d);
64-
hookLatch.countDown();
6564
return t;
6665
});
6766

@@ -73,22 +72,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutor()
7372
.publishOn(fromExecutorService(executor))
7473
.doOnNext(s -> {
7574
try {
75+
inOnNextLatch.countDown();
7676
latch.await();
7777
}
7878
catch (InterruptedException e) {
7979
}
8080
})
8181
.publishOn(fromExecutor(executor))
82+
.doFinally(s -> finallyLatch.countDown())
8283
.subscribe(assertSubscriber);
8384

85+
inOnNextLatch.await();
8486
executor.shutdownNow();
8587

88+
finallyLatch.await();
89+
8690
assertSubscriber.assertNoValues()
87-
.assertNoError()
91+
.assertError(RejectedExecutionException.class)
8892
.assertNotComplete();
8993

90-
hookLatch.await();
91-
9294
assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
9395
assertThat(data).isSameAs(dataInOnOperatorError.get());
9496
}
@@ -109,13 +111,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
109111
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();
110112

111113
try {
112-
113-
CountDownLatch hookLatch = new CountDownLatch(2);
114+
CountDownLatch finallyLatch = new CountDownLatch(1);
115+
CountDownLatch inOnNextLatch = new CountDownLatch(1);
114116

115117
Hooks.onOperatorError((t, d) -> {
116118
throwableInOnOperatorError.set(t);
117119
dataInOnOperatorError.set(d);
118-
hookLatch.countDown();
119120
return t;
120121
});
121122

@@ -127,23 +128,25 @@ public void rejectedExecutionExceptionOnErrorSignalExecutor()
127128
.publishOn(fromExecutorService(executor))
128129
.doOnNext(s -> {
129130
try {
131+
inOnNextLatch.countDown();
130132
latch.await();
131133
}
132134
catch (InterruptedException e) {
133135
throw Exceptions.propagate(exception);
134136
}
135137
})
136138
.publishOn(fromExecutor(executor))
139+
.doFinally(s -> finallyLatch.countDown())
137140
.subscribe(assertSubscriber);
138141

142+
inOnNextLatch.await();
139143
executor.shutdownNow();
140144

145+
finallyLatch.await();
141146
assertSubscriber.assertNoValues()
142-
.assertNoError()
147+
.assertError(RejectedExecutionException.class)
143148
.assertNotComplete();
144149

145-
hookLatch.await();
146-
147150
assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
148151
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
149152
.getSuppressed()[0]);
@@ -164,13 +167,12 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
164167
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();
165168

166169
try {
167-
168-
CountDownLatch hookLatch = new CountDownLatch(1);
170+
CountDownLatch finallyLatch = new CountDownLatch(1);
171+
CountDownLatch inOnNextLatch = new CountDownLatch(1);
169172

170173
Hooks.onOperatorError((t, d) -> {
171174
throwableInOnOperatorError.set(t);
172175
dataInOnOperatorError.set(d);
173-
hookLatch.countDown();
174176
return t;
175177
});
176178

@@ -182,22 +184,25 @@ public void rejectedExecutionExceptionOnDataSignalExecutorService()
182184
.publishOn(fromExecutorService(executor))
183185
.doOnNext(s -> {
184186
try {
187+
inOnNextLatch.countDown();
185188
latch.await();
186189
}
187190
catch (InterruptedException e) {
188191
}
189192
})
190193
.publishOn(fromExecutorService(executor))
194+
.doFinally(s -> finallyLatch.countDown())
191195
.subscribe(assertSubscriber);
192196

197+
inOnNextLatch.await();
198+
193199
executor.shutdownNow();
194200

201+
finallyLatch.await();
195202
assertSubscriber.assertNoValues()
196-
.assertNoError()
203+
.assertError(RejectedExecutionException.class)
197204
.assertNotComplete();
198205

199-
hookLatch.await();
200-
201206
assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
202207
assertThat(data).isSameAs(dataInOnOperatorError.get());
203208
}
@@ -218,13 +223,12 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
218223
final AtomicReference<Object> dataInOnOperatorError = new AtomicReference<>();
219224

220225
try {
221-
222-
CountDownLatch hookLatch = new CountDownLatch(2);
226+
CountDownLatch finallyLatch = new CountDownLatch(1);
227+
CountDownLatch inOnNextLatch = new CountDownLatch(1);
223228

224229
Hooks.onOperatorError((t, d) -> {
225230
throwableInOnOperatorError.set(t);
226231
dataInOnOperatorError.set(d);
227-
hookLatch.countDown();
228232
return t;
229233
});
230234

@@ -236,23 +240,26 @@ public void rejectedExecutionExceptionOnErrorSignalExecutorService()
236240
.publishOn(fromExecutorService(executor))
237241
.doOnNext(s -> {
238242
try {
243+
inOnNextLatch.countDown();
239244
latch.await();
240245
}
241246
catch (InterruptedException e) {
242247
throw Exceptions.propagate(exception);
243248
}
244249
})
245250
.publishOn(fromExecutorService(executor))
251+
.doFinally(s -> finallyLatch.countDown())
246252
.subscribe(assertSubscriber);
247253

254+
inOnNextLatch.await();
255+
248256
executor.shutdownNow();
249257

258+
finallyLatch.await();
250259
assertSubscriber.assertNoValues()
251-
.assertNoError()
260+
.assertError(RejectedExecutionException.class)
252261
.assertNotComplete();
253262

254-
hookLatch.await();
255-
256263
assertThat(throwableInOnOperatorError.get()).isInstanceOf(RejectedExecutionException.class);
257264
assertThat(exception).isSameAs(throwableInOnOperatorError.get()
258265
.getSuppressed()[0]);

0 commit comments

Comments
 (0)