20
20
21
21
package io .temporal .workflow .activityTests ;
22
22
23
+ import io .temporal .activity .LocalActivityOptions ;
23
24
import io .temporal .client .WorkflowOptions ;
24
25
import io .temporal .testing .internal .SDKTestOptions ;
25
26
import io .temporal .testing .internal .SDKTestWorkflowRule ;
38
39
import org .junit .Assert ;
39
40
import org .junit .Rule ;
40
41
import org .junit .Test ;
42
+ import org .junit .runner .RunWith ;
43
+ import org .junit .runners .Parameterized ;
41
44
45
+ @ RunWith (Parameterized .class )
42
46
public class ParallelLocalActivitiesTest {
43
47
44
48
private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl ();
49
+ static final int TOTAL_LOCAL_ACT_COUNT = 100 ;
50
+ @ Parameterized .Parameter public int maxLocalActivityExecutionSize ;
51
+
52
+ @ Parameterized .Parameters
53
+ public static Object [] data () {
54
+ return new Object [] {50 , TOTAL_LOCAL_ACT_COUNT };
55
+ }
45
56
46
57
@ Rule
47
58
public SDKTestWorkflowRule testWorkflowRule =
@@ -52,7 +63,9 @@ public class ParallelLocalActivitiesTest {
52
63
// Use a number lower than the number of concurrent activities to ensure that the
53
64
// queueing of LAs when task executor is full works
54
65
.setWorkerOptions (
55
- WorkerOptions .newBuilder ().setMaxConcurrentLocalActivityExecutionSize (50 ).build ())
66
+ WorkerOptions .newBuilder ()
67
+ .setMaxConcurrentLocalActivityExecutionSize (maxLocalActivityExecutionSize )
68
+ .build ())
56
69
.build ();
57
70
58
71
@ Test
@@ -66,16 +79,18 @@ public void testParallelLocalActivities() {
66
79
67
80
TestWorkflow1 workflowStub =
68
81
testWorkflowRule .getWorkflowClient ().newWorkflowStub (TestWorkflow1 .class , options );
69
- String result = workflowStub .execute (testWorkflowRule .getTaskQueue ());
82
+ String willQueue = maxLocalActivityExecutionSize < TOTAL_LOCAL_ACT_COUNT ? "yes" : "" ;
83
+ String result = workflowStub .execute (willQueue );
70
84
Assert .assertEquals ("done" , result );
71
- Assert .assertEquals (activitiesImpl .toString (), 100 , activitiesImpl .invocations .size ());
72
- List <String > expected = new ArrayList <String >();
85
+ Assert .assertEquals (
86
+ activitiesImpl .toString (), TOTAL_LOCAL_ACT_COUNT , activitiesImpl .invocations .size ());
87
+ List <String > expected = new ArrayList <>();
73
88
expected .add ("interceptExecuteWorkflow " + SDKTestWorkflowRule .UUID_REGEXP );
74
89
expected .add ("newThread workflow-method" );
75
- for (int i = 0 ; i < TestParallelLocalActivitiesWorkflowImpl . COUNT ; i ++) {
90
+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
76
91
expected .add ("executeLocalActivity SleepActivity" );
77
92
}
78
- for (int i = 0 ; i < TestParallelLocalActivitiesWorkflowImpl . COUNT ; i ++) {
93
+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
79
94
expected .add ("local activity SleepActivity" );
80
95
}
81
96
testWorkflowRule
@@ -84,16 +99,21 @@ public void testParallelLocalActivities() {
84
99
}
85
100
86
101
public static class TestParallelLocalActivitiesWorkflowImpl implements TestWorkflow1 {
87
- static final int COUNT = 100 ;
88
102
89
103
@ Override
90
- public String execute (String taskQueue ) {
104
+ public String execute (String willQueue ) {
105
+ LocalActivityOptions laOptions = SDKTestOptions .newLocalActivityOptions ();
106
+ // For the case where LAs will be forced to queue, we want to use start-to-close rather
107
+ // than schedule-to-start timeouts.
108
+ if (!willQueue .isEmpty ()) {
109
+ laOptions =
110
+ LocalActivityOptions .newBuilder ().setStartToCloseTimeout (Duration .ofSeconds (5 )).build ();
111
+ }
91
112
VariousTestActivities localActivities =
92
- Workflow .newLocalActivityStub (
93
- VariousTestActivities .class , SDKTestOptions .newLocalActivityOptions ());
113
+ Workflow .newLocalActivityStub (VariousTestActivities .class , laOptions );
94
114
List <Promise <String >> laResults = new ArrayList <>();
95
115
Random r = Workflow .newRandom ();
96
- for (int i = 0 ; i < COUNT ; i ++) {
116
+ for (int i = 0 ; i < TOTAL_LOCAL_ACT_COUNT ; i ++) {
97
117
laResults .add (Async .function (localActivities ::sleepActivity , (long ) r .nextInt (3000 ), i ));
98
118
}
99
119
Promise .allOf (laResults ).get ();
0 commit comments