diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index 63dd377aaa6..5318d6f2f87 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -78,9 +78,10 @@ const ( // id, used for validation in the frontend. AppendedTimestampForValidation = "-2009-11-10T23:00:00Z" - SignalNameUpdate = "update" - SignalNamePatch = "patch" - SignalNameRefresh = "refresh" + SignalNameUpdate = "update" + SignalNamePatch = "patch" + SignalNameRefresh = "refresh" + SignalNameForceCAN = "force-continue-as-new" QueryNameDescribe = "describe" QueryNameListMatchingTimes = "listMatchingTimes" @@ -123,6 +124,7 @@ type ( // Signal requests pendingPatch *schedpb.SchedulePatch pendingUpdate *schedspb.FullUpdateRequest + forceCAN bool uuidBatch []string @@ -259,7 +261,7 @@ func (s *scheduler) run() error { suggestContinueAsNew = suggestContinueAsNew || iters <= 0 iters-- } else { - suggestContinueAsNew = suggestContinueAsNew || info.GetContinueAsNewSuggested() + suggestContinueAsNew = suggestContinueAsNew || info.GetContinueAsNewSuggested() || s.forceCAN } if suggestContinueAsNew && s.pendingUpdate == nil && s.pendingPatch == nil { break @@ -590,6 +592,9 @@ func (s *scheduler) sleep(nextWakeup time.Time) { refreshCh := workflow.GetSignalChannel(s.ctx, SignalNameRefresh) sel.AddReceive(refreshCh, s.handleRefreshSignal) + forceCAN := workflow.GetSignalChannel(s.ctx, SignalNameForceCAN) + sel.AddReceive(forceCAN, s.handleForceCANSignal) + // if we're paused or out of actions, we don't need to wake up until we get an update if s.tweakables.SleepWhilePaused && !s.canTakeScheduledAction(false, false) { nextWakeup = time.Time{} @@ -724,6 +729,12 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) { s.State.NeedRefresh = true } +func (s *scheduler) handleForceCANSignal(ch workflow.ReceiveChannel, _ bool) { + ch.Receive(s.ctx, nil) + s.logger.Debug("got force-continue-as-new signal") + s.forceCAN = true +} + func (s *scheduler) processSignals() bool { scheduleChanged := false if s.pendingPatch != nil { diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index 123a62f9e9a..145c105930c 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -1773,3 +1773,39 @@ func (s *workflowSuite) TestCANBySuggested() { s.True(s.env.IsWorkflowCompleted()) s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError())) } + +func (s *workflowSuite) TestCANBySignal() { + // written using low-level mocks so we can control iteration count + + const iters = 30 + // note: one fewer run than iters since the first doesn't start anything + for i := 1; i < iters; i++ { + t := baseStartTime.Add(5 * time.Minute * time.Duration(i)) + s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { + s.Equal("myid-"+t.Format(time.RFC3339), req.Request.WorkflowId) + return nil, nil + }) + } + // this one catches and fails if we go over + s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) { + s.Fail("too many starts", req.Request.WorkflowId) + return nil, nil + }).Times(0).Maybe() + + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflow(SignalNameForceCAN, nil) + }, 5*time.Minute*iters-time.Second) + + s.run(&schedpb.Schedule{ + Spec: &schedpb.ScheduleSpec{ + Interval: []*schedpb.IntervalSpec{{ + Interval: timestamp.DurationPtr(5 * time.Minute), + }}, + }, + Policies: &schedpb.SchedulePolicies{ + OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, + }, + }, 0) // 0 means use suggested + s.True(s.env.IsWorkflowCompleted()) + s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError())) +}