37
37
import java .util .Optional ;
38
38
import java .util .Set ;
39
39
import java .util .TreeSet ;
40
- import java .util .concurrent .CompletableFuture ;
41
- import java .util .concurrent .ExecutionException ;
42
- import java .util .concurrent .Future ;
40
+ import java .util .concurrent .*;
43
41
import java .util .concurrent .locks .Lock ;
44
42
import java .util .concurrent .locks .ReentrantLock ;
45
43
import java .util .function .Supplier ;
@@ -192,18 +190,19 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
192
190
threads .add (rootWorkflowThread );
193
191
rootWorkflowThread .start ();
194
192
}
195
- if (!rootWorkflowThread .isStarted ()) {
196
- throw new IllegalStateException ("start not called" );
197
- }
198
193
lock .lock ();
199
194
try {
200
195
checkNotClosed ();
201
-
196
+ checkNotCloseRequestedLocked ();
202
197
inRunUntilAllBlocked = true ;
203
198
// Keep repeating until at least one of the threads makes progress.
204
199
boolean progress ;
205
200
outerLoop :
206
201
do {
202
+ if (exitRequested ) {
203
+ closeRequested = true ;
204
+ break ;
205
+ }
207
206
if (!toExecuteInWorkflowThread .isEmpty ()) {
208
207
for (NamedRunnable nr : toExecuteInWorkflowThread ) {
209
208
Object callbackThread =
@@ -222,15 +221,7 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
222
221
+ callbackThread );
223
222
}
224
223
225
- // It is important to prepend threads as there are callbacks
226
- // like signals that have to run before any other threads.
227
- // Otherwise signal might be never processed if it was received
228
- // after workflow decided to close.
229
- // Adding the callbacks in the same order as they appear in history.
230
- for (int i = callbackThreadsToAdd .size () - 1 ; i >= 0 ; i --) {
231
- threads .add (callbackThreadsToAdd .get (i ));
232
- }
233
- callbackThreadsToAdd .clear ();
224
+ appendCallbackThreadsLocked ();
234
225
}
235
226
toExecuteInWorkflowThread .clear ();
236
227
progress = false ;
@@ -251,8 +242,7 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
251
242
}
252
243
}
253
244
}
254
- threads .addAll (workflowThreadsToAdd );
255
- workflowThreadsToAdd .clear ();
245
+ appendWorkflowThreadsLocked ();
256
246
} while (progress && !threads .isEmpty ());
257
247
} catch (PotentialDeadlockException e ) {
258
248
String triggerThreadStackTrace = "" ;
@@ -284,7 +274,9 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
284
274
public boolean isDone () {
285
275
lock .lock ();
286
276
try {
287
- return closeFuture .isDone () || threads .isEmpty ();
277
+ return closeFuture .isDone ()
278
+ // if close is requested, we should wait for the closeFuture to be filled
279
+ || !closeRequested && !areThreadsToBeExecuted ();
288
280
} finally {
289
281
lock .unlock ();
290
282
}
@@ -302,7 +294,6 @@ public void cancel(String reason) {
302
294
*/
303
295
@ Override
304
296
public void close () {
305
- List <Future <?>> threadFutures = new ArrayList <>();
306
297
lock .lock ();
307
298
if (closeFuture .isDone ()) {
308
299
lock .unlock ();
@@ -324,52 +315,80 @@ public void close() {
324
315
closeFuture .join ();
325
316
return ;
326
317
}
318
+
327
319
closeStarted = true ;
320
+ // lock is taken here
328
321
try {
329
- try {
330
- threads .addAll (workflowThreadsToAdd );
331
- workflowThreadsToAdd .clear ();
332
-
333
- for (WorkflowThread c : threads ) {
334
- threadFutures .add (c .stopNow ());
335
- }
336
- threads .clear ();
337
-
338
- // We cannot use an iterator to unregister failed Promises since f.get()
339
- // will remove the promise directly from failedPromises. This causes an
340
- // ConcurrentModificationException
341
- // For this reason we will loop over a copy of failedPromises.
342
- Set <Promise <?>> failedPromisesLoop = new HashSet <>(failedPromises );
343
- for (Promise <?> f : failedPromisesLoop ) {
344
- try {
345
- f .get ();
346
- throw new Error ("unreachable" );
347
- } catch (RuntimeException e ) {
348
- log .warn (
349
- "Promise completed with exception and was never accessed. The ignored exception:" ,
350
- CheckedExceptionWrapper .unwrap (e ));
322
+ // in some circumstances when a workflow broke Deadline Detector,
323
+ // runUntilAllBlocked may return while workflow threads are still running.
324
+ // If this happens, these threads may potentially start new additional threads that will be
325
+ // in workflowThreadsToAdd and callbackThreadsToAdd.
326
+ // That's why we need to make sure that all the spawned threads were shut down in a cycle.
327
+ while (areThreadsToBeExecuted ()) {
328
+ List <WorkflowThreadStopFuture > threadFutures = new ArrayList <>();
329
+ try {
330
+ toExecuteInWorkflowThread .clear ();
331
+ appendWorkflowThreadsLocked ();
332
+ appendCallbackThreadsLocked ();
333
+ for (WorkflowThread workflowThread : threads ) {
334
+ threadFutures .add (
335
+ new WorkflowThreadStopFuture (workflowThread , workflowThread .stopNow ()));
351
336
}
337
+ threads .clear ();
338
+
339
+ // We cannot use an iterator to unregister failed Promises since f.get()
340
+ // will remove the promise directly from failedPromises. This causes an
341
+ // ConcurrentModificationException
342
+ // For this reason we will loop over a copy of failedPromises.
343
+ Set <Promise <?>> failedPromisesLoop = new HashSet <>(failedPromises );
344
+ for (Promise <?> f : failedPromisesLoop ) {
345
+ try {
346
+ f .get ();
347
+ throw new Error ("unreachable" );
348
+ } catch (RuntimeException e ) {
349
+ log .warn (
350
+ "Promise completed with exception and was never accessed. The ignored exception:" ,
351
+ CheckedExceptionWrapper .unwrap (e ));
352
+ }
353
+ }
354
+ } finally {
355
+ // we need to unlock for the further code because threads will not be able to proceed with
356
+ // destruction otherwise.
357
+ lock .unlock ();
352
358
}
353
- } finally {
354
- lock .unlock ();
355
- }
356
359
357
- // Context is destroyed in c.StopNow(). Wait on all tasks outside the lock since
358
- // these tasks use the same lock to execute.
359
- for (Future <?> future : threadFutures ) {
360
+ // Wait on all tasks outside the lock since these tasks use the same lock to execute.
360
361
try {
361
- future .get ();
362
+ for (WorkflowThreadStopFuture threadFuture : threadFutures ) {
363
+ try {
364
+ threadFuture .stopFuture .get (10 , TimeUnit .SECONDS );
365
+ } catch (TimeoutException e ) {
366
+ WorkflowThread workflowThread = threadFuture .workflowThread ;
367
+ log .error (
368
+ "[BUG] Workflow thread '{}' of workflow '{}' can't be destroyed in time. "
369
+ + "This will lead to a workflow cache leak. "
370
+ + "This problem is usually caused by a workflow implementation swallowing java.lang.Error instead of rethrowing it. "
371
+ + " Thread dump of the stuck thread:\n {}" ,
372
+ workflowThread .getName (),
373
+ workflowContext .getContext ().getWorkflowId (),
374
+ workflowThread .getStackTrace ());
375
+ }
376
+ }
362
377
} catch (InterruptedException e ) {
363
378
Thread .currentThread ().interrupt ();
364
379
// Worker is likely stopped with shutdownNow()
365
380
// TODO consider propagating as an original interrupted exception to the top level
366
381
throw new Error ("Worker executor thread interrupted during stopping of a coroutine" , e );
367
382
} catch (ExecutionException e ) {
368
383
throw new Error ("[BUG] Unexpected failure while stopping a coroutine" , e );
384
+ } finally {
385
+ // acquire the lock back as it should be taken for the loop condition check.
386
+ lock .lock ();
369
387
}
370
388
}
371
389
} finally {
372
390
closeFuture .complete (null );
391
+ lock .unlock ();
373
392
}
374
393
}
375
394
@@ -393,6 +412,26 @@ public String stackTrace() {
393
412
return result .toString ();
394
413
}
395
414
415
+ private void appendWorkflowThreadsLocked () {
416
+ threads .addAll (workflowThreadsToAdd );
417
+ workflowThreadsToAdd .clear ();
418
+ }
419
+
420
+ private void appendCallbackThreadsLocked () {
421
+ // TODO I'm not sure this comment makes sense, because threads list has comparator and we use
422
+ // thread priorities anyway.
423
+
424
+ // It is important to prepend threads as there are callbacks
425
+ // like signals that have to run before any other threads.
426
+ // Otherwise signal might be never processed if it was received
427
+ // after workflow decided to close.
428
+ // Adding the callbacks in the same order as they appear in history.
429
+ for (int i = callbackThreadsToAdd .size () - 1 ; i >= 0 ; i --) {
430
+ threads .add (callbackThreadsToAdd .get (i ));
431
+ }
432
+ callbackThreadsToAdd .clear ();
433
+ }
434
+
396
435
/** Creates a new instance of a root workflow thread. */
397
436
private WorkflowThread newRootThread (Runnable runnable ) {
398
437
String name = WORKFLOW_ROOT_THREAD_NAME ;
@@ -519,12 +558,28 @@ private void checkWorkflowThreadOnly() {
519
558
}
520
559
}
521
560
561
+ private void checkNotCloseRequestedLocked () {
562
+ if (closeRequested ) {
563
+ throw new Error ("close requested" );
564
+ }
565
+ }
566
+
522
567
private void checkNotClosed () {
523
568
if (closeFuture .isDone ()) {
524
569
throw new Error ("closed" );
525
570
}
526
571
}
527
572
573
+ /**
574
+ * @return true if there are no threads left to be processed for this workflow.
575
+ */
576
+ private boolean areThreadsToBeExecuted () {
577
+ return !threads .isEmpty ()
578
+ || !workflowThreadsToAdd .isEmpty ()
579
+ || !callbackThreadsToAdd .isEmpty ()
580
+ || !toExecuteInWorkflowThread .isEmpty ();
581
+ }
582
+
528
583
@ SuppressWarnings ("unchecked" )
529
584
<T > Optional <T > getRunnerLocal (RunnerLocalInternal <T > key ) {
530
585
if (!runnerLocalMap .containsKey (key )) {
@@ -576,4 +631,14 @@ private NamedRunnable(String name, Runnable runnable) {
576
631
this .runnable = runnable ;
577
632
}
578
633
}
634
+
635
+ private static class WorkflowThreadStopFuture {
636
+ private final WorkflowThread workflowThread ;
637
+ private final Future <?> stopFuture ;
638
+
639
+ public WorkflowThreadStopFuture (WorkflowThread workflowThread , Future <?> stopFuture ) {
640
+ this .workflowThread = workflowThread ;
641
+ this .stopFuture = stopFuture ;
642
+ }
643
+ }
579
644
}
0 commit comments