73
73
import java .util .function .LongSupplier ;
74
74
import java .util .stream .Collectors ;
75
75
import java .util .stream .IntStream ;
76
+ import javax .annotation .Nonnull ;
76
77
import javax .annotation .Nullable ;
77
78
import org .slf4j .Logger ;
78
79
import org .slf4j .LoggerFactory ;
@@ -136,6 +137,8 @@ private interface UpdateProcedure {
136
137
new ConcurrentHashMap <>();
137
138
public StickyExecutionAttributes stickyExecutionAttributes ;
138
139
private Map <String , Payload > currentMemo ;
140
+ private final Set <String > attachedRequestIds = new HashSet <>();
141
+ private final List <Callback > completionCallbacks = new ArrayList <>();
139
142
140
143
/**
141
144
* @param retryState present if workflow is a retry
@@ -184,6 +187,7 @@ private interface UpdateProcedure {
184
187
this .workflow = StateMachines .newWorkflowStateMachine (data );
185
188
this .workflowTaskStateMachine = StateMachines .newWorkflowTaskStateMachine (store , startRequest );
186
189
this .currentMemo = new HashMap (startRequest .getMemo ().getFieldsMap ());
190
+ this .completionCallbacks .addAll (startRequest .getCompletionCallbacksList ());
187
191
}
188
192
189
193
/** Based on overrideStartWorkflowExecutionRequest from historyEngine.go */
@@ -613,6 +617,29 @@ public void completeWorkflowTask(
613
617
request .hasStickyAttributes () ? request .getStickyAttributes () : null );
614
618
}
615
619
620
+ @ Override
621
+ public void applyOnConflictOptions (@ Nonnull StartWorkflowExecutionRequest request ) {
622
+ update (
623
+ ctx -> {
624
+ OnConflictOptions options = request .getOnConflictOptions ();
625
+ String requestId = null ;
626
+ List <Callback > completionCallbacks = null ;
627
+ List <Link > links = null ;
628
+
629
+ if (options .getAttachRequestId ()) {
630
+ requestId = request .getRequestId ();
631
+ }
632
+ if (options .getAttachCompletionCallbacks ()) {
633
+ completionCallbacks = request .getCompletionCallbacksList ();
634
+ }
635
+ if (options .getAttachLinks ()) {
636
+ links = request .getLinksList ();
637
+ }
638
+
639
+ addWorkflowExecutionOptionsUpdatedEvent (ctx , requestId , completionCallbacks , links );
640
+ });
641
+ }
642
+
616
643
private void failWorkflowTaskWithAReason (
617
644
WorkflowTaskFailedCause failedCause ,
618
645
ServerFailure eventAttributesFailure ,
@@ -1476,6 +1503,7 @@ private void processFailWorkflowExecution(
1476
1503
identity ,
1477
1504
getExecutionId (),
1478
1505
workflow .getData ().firstExecutionRunId ,
1506
+ this ,
1479
1507
parent ,
1480
1508
parentChildInitiatedEventId );
1481
1509
return ;
@@ -1608,6 +1636,7 @@ private void startNewCronRun(
1608
1636
identity ,
1609
1637
getExecutionId (),
1610
1638
workflow .getData ().firstExecutionRunId ,
1639
+ this ,
1611
1640
parent ,
1612
1641
parentChildInitiatedEventId );
1613
1642
}
@@ -1665,6 +1694,7 @@ private void processContinueAsNewWorkflowExecution(
1665
1694
identity ,
1666
1695
getExecutionId (),
1667
1696
workflow .getData ().firstExecutionRunId ,
1697
+ this ,
1668
1698
parent ,
1669
1699
parentChildInitiatedEventId );
1670
1700
}
@@ -1696,7 +1726,7 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
1696
1726
}
1697
1727
});
1698
1728
1699
- for (Callback cb : startRequest . getCompletionCallbacksList () ) {
1729
+ for (Callback cb : completionCallbacks ) {
1700
1730
if (!cb .hasNexus ()) {
1701
1731
// test server only supports nexus callbacks currently
1702
1732
log .warn ("skipping non-nexus completion callback" );
@@ -1718,8 +1748,16 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) {
1718
1748
.build ())
1719
1749
.build ());
1720
1750
1721
- service .completeNexusOperation (
1722
- ref , ctx .getExecution ().getWorkflowId (), startLink , completionEvent .get ());
1751
+ try {
1752
+ service .completeNexusOperation (
1753
+ ref , ctx .getExecution ().getWorkflowId (), startLink , completionEvent .get ());
1754
+ } catch (StatusRuntimeException e ) {
1755
+ // Callback destination not found should not block processing the callbacks nor
1756
+ // completing the workflow.
1757
+ if (e .getStatus ().getCode () != Status .Code .NOT_FOUND ) {
1758
+ throw e ;
1759
+ }
1760
+ }
1723
1761
}
1724
1762
}
1725
1763
@@ -1985,6 +2023,11 @@ public boolean isTerminalState() {
1985
2023
return isTerminalState (workflowState );
1986
2024
}
1987
2025
2026
+ @ Override
2027
+ public boolean isRequestIdAttached (@ Nonnull String requestId ) {
2028
+ return attachedRequestIds .contains (requestId );
2029
+ }
2030
+
1988
2031
private void updateHeartbeatTimer (
1989
2032
RequestContext ctx ,
1990
2033
long activityId ,
@@ -3121,7 +3164,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
3121
3164
.setParentExecution (p .getExecutionId ().getExecution ()));
3122
3165
3123
3166
List <CallbackInfo > callbacks =
3124
- this .startRequest . getCompletionCallbacksList () .stream ()
3167
+ this .completionCallbacks .stream ()
3125
3168
.map (TestWorkflowMutableStateImpl ::constructCallbackInfo )
3126
3169
.collect (Collectors .toList ());
3127
3170
@@ -3429,6 +3472,31 @@ private void addExecutionSignaledByExternalEvent(
3429
3472
ctx .addEvent (executionSignaled );
3430
3473
}
3431
3474
3475
+ private void addWorkflowExecutionOptionsUpdatedEvent (
3476
+ RequestContext ctx , String requestId , List <Callback > completionCallbacks , List <Link > links ) {
3477
+ WorkflowExecutionOptionsUpdatedEventAttributes .Builder attrs =
3478
+ WorkflowExecutionOptionsUpdatedEventAttributes .newBuilder ();
3479
+ if (requestId != null ) {
3480
+ attrs .setAttachedRequestId (requestId );
3481
+ this .attachedRequestIds .add (requestId );
3482
+ }
3483
+ if (completionCallbacks != null ) {
3484
+ attrs .addAllAttachedCompletionCallbacks (completionCallbacks );
3485
+ this .completionCallbacks .addAll (completionCallbacks );
3486
+ }
3487
+
3488
+ HistoryEvent .Builder event =
3489
+ HistoryEvent .newBuilder ()
3490
+ .setWorkerMayIgnore (true )
3491
+ .setEventType (EVENT_TYPE_WORKFLOW_EXECUTION_OPTIONS_UPDATED )
3492
+ .setWorkflowExecutionOptionsUpdatedEventAttributes (attrs );
3493
+ if (links != null ) {
3494
+ event .addAllLinks (links );
3495
+ }
3496
+
3497
+ ctx .addEvent (event .build ());
3498
+ }
3499
+
3432
3500
private StateMachine <ActivityTaskData > getPendingActivityById (String activityId ) {
3433
3501
Long scheduledEventId = activityById .get (activityId );
3434
3502
if (scheduledEventId == null ) {
@@ -3555,4 +3623,9 @@ private boolean isTerminalState(State workflowState) {
3555
3623
|| workflowState == State .TERMINATED
3556
3624
|| workflowState == State .CONTINUED_AS_NEW ;
3557
3625
}
3626
+
3627
+ @ Override
3628
+ public List <Callback > getCompletionCallbacks () {
3629
+ return completionCallbacks ;
3630
+ }
3558
3631
}
0 commit comments