diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index d069e27a760..84b15fb125c 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -96,6 +96,8 @@ const ( rateLimitedErrorType = "RateLimited" nextTimeCacheV1Size = 10 + + impossibleHistorySize = 1e6 // just for testing, no real history can be this long ) type ( @@ -249,7 +251,14 @@ func (s *scheduler) run() error { s.pendingPatch = s.InitialPatch s.InitialPatch = nil - for iters := s.tweakables.IterationsBeforeContinueAsNew; iters > 0 || s.pendingUpdate != nil || s.pendingPatch != nil; iters-- { + iters := s.tweakables.IterationsBeforeContinueAsNew + for { + // TODO: use the real GetContinueAsNewSuggested + continueAsNewSuggested := iters <= 0 || workflow.GetInfo(s.ctx).GetCurrentHistoryLength() >= impossibleHistorySize + if continueAsNewSuggested && s.pendingUpdate == nil && s.pendingPatch == nil { + break + } + iters-- t1 := timestamp.TimeValue(s.State.LastProcessedTime) t2 := s.now() @@ -288,7 +297,6 @@ func (s *scheduler) run() error { // 3. a workflow that we were watching finished s.sleep(nextWakeup) s.updateTweakables() - } // Any watcher activities will get cancelled automatically if running. diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 11b8aaddc8f..63ba93cc6e0 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -48,7 +48,6 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" - "go.temporal.io/server/common/util" ) type ( @@ -192,7 +191,12 @@ type workflowRun struct { result enumspb.WorkflowExecutionStatus } -func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, started map[string]time.Time) { +type runAcrossContinueState struct { + started map[string]time.Time + finished bool +} + +func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, state *runAcrossContinueState) { for _, run := range runs { run := run // capture fresh value // set up start @@ -201,10 +205,10 @@ func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, started map[s }) s.env.OnActivity(new(activities).StartWorkflow, mock.Anything, matchStart).Times(0).Maybe().Return( func(_ context.Context, req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { - if _, ok := started[req.Request.WorkflowId]; ok { + if _, ok := state.started[req.Request.WorkflowId]; ok { s.Failf("multiple starts for %s", req.Request.WorkflowId) } - started[req.Request.WorkflowId] = s.now() + state.started[req.Request.WorkflowId] = s.now() return &schedspb.StartWorkflowResponse{ RunId: uuid.NewString(), RealStartTime: timestamp.TimePtr(time.Now()), @@ -234,13 +238,20 @@ func (s *workflowSuite) setupMocksForWorkflows(runs []workflowRun, started map[s } type delayedCallback struct { - at time.Time - f func() + at time.Time + f func() + finishTest bool } -func (s *workflowSuite) setupDelayedCallbacks(start time.Time, cbs []delayedCallback) { +func (s *workflowSuite) setupDelayedCallbacks(start time.Time, cbs []delayedCallback, state *runAcrossContinueState) { for _, cb := range cbs { if delay := cb.at.Sub(start); delay > 0 { + if cb.finishTest { + cb.f = func() { + s.env.SetCurrentHistoryLength(impossibleHistorySize) // signals workflow loop to exit + state.finished = true // signals test to exit + } + } s.env.RegisterDelayedCallback(cb.f, delay) } } @@ -250,7 +261,6 @@ func (s *workflowSuite) runAcrossContinue( runs []workflowRun, cbs []delayedCallback, sched *schedpb.Schedule, - maxIterations int, ) { // fill this in so callers don't need to sched.Action = s.defaultAction("myid") @@ -268,21 +278,21 @@ func (s *workflowSuite) runAcrossContinue( ConflictToken: InitialConflictToken, }, } - iterations := maxIterations - gotRuns := make(map[string]time.Time) + currentTweakablePolicies.IterationsBeforeContinueAsNew = every + state := runAcrossContinueState{ + started: make(map[string]time.Time), + } for { s.env = s.NewTestWorkflowEnvironment() s.env.SetStartTime(startTime) - s.setupMocksForWorkflows(runs, gotRuns) - s.setupDelayedCallbacks(startTime, cbs) + s.setupMocksForWorkflows(runs, &state) + s.setupDelayedCallbacks(startTime, cbs, &state) - currentTweakablePolicies.IterationsBeforeContinueAsNew = util.Min(iterations, every) - - s.T().Logf("starting workflow for %d iterations out of %d remaining, %d total, start time %s", - currentTweakablePolicies.IterationsBeforeContinueAsNew, iterations, maxIterations, startTime) + s.T().Logf("starting workflow with CAN every %d iterations, start time %s", + currentTweakablePolicies.IterationsBeforeContinueAsNew, startTime) s.env.ExecuteWorkflow(SchedulerWorkflow, startArgs) - s.T().Logf("finished workflow, time is now %s", s.now()) + s.T().Logf("finished workflow, time is now %s, finished is %v", s.now(), state.finished) s.True(s.env.IsWorkflowCompleted()) result := s.env.GetWorkflowError() @@ -291,8 +301,7 @@ func (s *workflowSuite) runAcrossContinue( s.env.AssertExpectations(s.T()) - iterations -= currentTweakablePolicies.IterationsBeforeContinueAsNew - if iterations == 0 { + if state.finished { break } @@ -301,9 +310,9 @@ func (s *workflowSuite) runAcrossContinue( s.NoError(payloads.Decode(canErr.Input, &startArgs)) } // check starts that we actually got - s.Require().Equal(len(runs), len(gotRuns)) + s.Require().Equal(len(runs), len(state.started)) for _, run := range runs { - s.Truef(run.start.Equal(gotRuns[run.id]), "%v != %v", run.start, gotRuns[run.id]) + s.Truef(run.start.Equal(state.started[run.id]), "%v != %v", run.start, state.started[run.id]) } } } @@ -513,6 +522,10 @@ func (s *workflowSuite) TestOverlapSkip() { s.Equal([]string{"myid-2022-06-01T00:15:00Z"}, s.runningWorkflows()) }, }, + { + at: time.Date(2022, 6, 1, 0, 18, 0, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -524,7 +537,6 @@ func (s *workflowSuite) TestOverlapSkip() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, }, }, - 4, ) } @@ -557,19 +569,22 @@ func (s *workflowSuite) TestOverlapBufferOne() { at: time.Date(2022, 6, 1, 0, 6, 0, 0, time.UTC), f: func() { s.Equal([]string{"myid-2022-06-01T00:05:00Z"}, s.runningWorkflows()) }, }, - {at: time.Date(2022, 6, 1, 0, 11, 0, 0, time.UTC), + { + at: time.Date(2022, 6, 1, 0, 11, 0, 0, time.UTC), f: func() { s.Equal(int64(1), s.describe().Info.BufferSize) s.Equal(int64(0), s.describe().Info.OverlapSkipped) }, }, - {at: time.Date(2022, 6, 1, 0, 16, 0, 0, time.UTC), + { + at: time.Date(2022, 6, 1, 0, 16, 0, 0, time.UTC), f: func() { s.Equal(int64(1), s.describe().Info.BufferSize) s.Equal(int64(1), s.describe().Info.OverlapSkipped) }, }, - {at: time.Date(2022, 6, 1, 0, 26, 0, 0, time.UTC), + { + at: time.Date(2022, 6, 1, 0, 26, 0, 0, time.UTC), f: func() { s.Equal(int64(1), s.describe().Info.BufferSize) s.Equal(int64(3), s.describe().Info.OverlapSkipped) @@ -579,12 +594,17 @@ func (s *workflowSuite) TestOverlapBufferOne() { at: time.Date(2022, 6, 1, 0, 31, 0, 0, time.UTC), f: func() { s.Equal([]string{"myid-2022-06-01T00:30:00Z"}, s.runningWorkflows()) }, }, - {at: time.Date(2022, 6, 1, 0, 32, 0, 0, time.UTC), + { + at: time.Date(2022, 6, 1, 0, 32, 0, 0, time.UTC), f: func() { s.Equal(int64(0), s.describe().Info.BufferSize) s.Equal(int64(3), s.describe().Info.OverlapSkipped) }, }, + { + at: time.Date(2022, 6, 1, 0, 34, 59, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -596,7 +616,6 @@ func (s *workflowSuite) TestOverlapBufferOne() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ONE, }, }, - 8, ) } @@ -647,6 +666,10 @@ func (s *workflowSuite) TestOverlapBufferAll() { at: time.Date(2022, 6, 1, 0, 22, 30, 0, time.UTC), f: func() { s.Equal([]string{"myid-2022-06-01T00:20:00Z"}, s.runningWorkflows()) }, }, + { + at: time.Date(2022, 6, 1, 0, 29, 30, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -658,7 +681,6 @@ func (s *workflowSuite) TestOverlapBufferAll() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL, }, }, - 9, ) } @@ -722,6 +744,10 @@ func (s *workflowSuite) TestBufferLimit() { s.Equal(int64(1), s.describe().Info.BufferDropped) }, }, + { + at: time.Date(2022, 6, 1, 0, 29, 30, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -733,7 +759,6 @@ func (s *workflowSuite) TestBufferLimit() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_BUFFER_ALL, }, }, - 8, ) } @@ -864,7 +889,12 @@ func (s *workflowSuite) TestOverlapAllowAll() { result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, }, }, - nil, + []delayedCallback{ + { + at: time.Date(2022, 6, 1, 0, 24, 30, 0, time.UTC), + finishTest: true, + }, + }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ Interval: []*schedpb.IntervalSpec{{ @@ -875,7 +905,6 @@ func (s *workflowSuite) TestOverlapAllowAll() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, }, }, - 5, ) } @@ -1131,6 +1160,10 @@ func (s *workflowSuite) TestTriggerImmediate() { }) }, }, + { + at: time.Date(2022, 6, 1, 0, 54, 0, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -1142,7 +1175,6 @@ func (s *workflowSuite) TestTriggerImmediate() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, }, }, - 4, ) } @@ -1194,6 +1226,10 @@ func (s *workflowSuite) TestBackfill() { }) }, }, + { + at: time.Date(2022, 7, 31, 19, 6, 0, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -1207,7 +1243,6 @@ func (s *workflowSuite) TestBackfill() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, }, }, - 6, ) } @@ -1267,6 +1302,10 @@ func (s *workflowSuite) TestPause() { s.Equal("go ahead", desc.Schedule.State.Notes) }, }, + { + at: time.Date(2022, 6, 1, 0, 28, 8, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -1278,7 +1317,6 @@ func (s *workflowSuite) TestPause() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, }, }, - 12, ) } @@ -1351,6 +1389,10 @@ func (s *workflowSuite) TestUpdate() { }) }, }, + { + at: time.Date(2022, 6, 1, 0, 19, 30, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -1362,7 +1404,6 @@ func (s *workflowSuite) TestUpdate() { OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_SKIP, }, }, - 10, ) } @@ -1404,6 +1445,10 @@ func (s *workflowSuite) TestUpdateNotRetroactive() { }) }, }, + { + at: time.Date(2022, 6, 1, 1, 7, 55, 0, time.UTC), + finishTest: true, + }, }, &schedpb.Schedule{ Spec: &schedpb.ScheduleSpec{ @@ -1412,7 +1457,6 @@ func (s *workflowSuite) TestUpdateNotRetroactive() { }}, }, }, - 5, ) } @@ -1494,16 +1538,15 @@ func (s *workflowSuite) TestLotsOfIterations() { result: enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, } } + testEnd := runs[len(runs)-1].end.Add(time.Second) delayedCallbacks := make([]delayedCallback, backfillIterations) - expected := runIterations // schedule some callbacks to spray backfills among scheduled runs // each call back adds random number of backfills in [10, 20) range for i := range delayedCallbacks { maxRuns := rand.Intn(10) + 10 - expected += maxRuns // a point in time to send the callback request offset := i * runIterations / backfillIterations callbackTime := time.Date(2022, 6, 1, offset, 2, 0, 0, time.UTC) @@ -1535,6 +1578,11 @@ func (s *workflowSuite) TestLotsOfIterations() { } } + delayedCallbacks = append(delayedCallbacks, delayedCallback{ + at: testEnd, + finishTest: true, + }) + s.runAcrossContinue( runs, delayedCallbacks, @@ -1546,7 +1594,6 @@ func (s *workflowSuite) TestLotsOfIterations() { }, }, }, - expected+1, ) }