Skip to content

Commit

Permalink
Add force-continue-as-new signal to schedule workflow (#5034)
Browse files Browse the repository at this point in the history
**What changed?**
Add `force-continue-as-new` signal to schedule workflow, to force it to
continue-as-new immediately.

**Why?**
This could be used to force all running schedules onto the newest
version of the workflow code, to allow retiring older code paths.

**How did you test it?**
unit test
  • Loading branch information
dnr authored and rodrigozhou committed Oct 31, 2023
1 parent b8163ce commit 2320fc1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
19 changes: 15 additions & 4 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ const (
// id, used for validation in the frontend.
AppendedTimestampForValidation = "-2009-11-10T23:00:00Z"

SignalNameUpdate = "update"
SignalNamePatch = "patch"
SignalNameRefresh = "refresh"
SignalNameUpdate = "update"
SignalNamePatch = "patch"
SignalNameRefresh = "refresh"
SignalNameForceCAN = "force-continue-as-new"

QueryNameDescribe = "describe"
QueryNameListMatchingTimes = "listMatchingTimes"
Expand Down Expand Up @@ -123,6 +124,7 @@ type (
// Signal requests
pendingPatch *schedpb.SchedulePatch
pendingUpdate *schedspb.FullUpdateRequest
forceCAN bool

uuidBatch []string

Expand Down Expand Up @@ -259,7 +261,7 @@ func (s *scheduler) run() error {
suggestContinueAsNew = suggestContinueAsNew || iters <= 0
iters--
} else {
suggestContinueAsNew = suggestContinueAsNew || info.GetContinueAsNewSuggested()
suggestContinueAsNew = suggestContinueAsNew || info.GetContinueAsNewSuggested() || s.forceCAN
}
if suggestContinueAsNew && s.pendingUpdate == nil && s.pendingPatch == nil {
break
Expand Down Expand Up @@ -590,6 +592,9 @@ func (s *scheduler) sleep(nextWakeup time.Time) {
refreshCh := workflow.GetSignalChannel(s.ctx, SignalNameRefresh)
sel.AddReceive(refreshCh, s.handleRefreshSignal)

forceCAN := workflow.GetSignalChannel(s.ctx, SignalNameForceCAN)
sel.AddReceive(forceCAN, s.handleForceCANSignal)

// if we're paused or out of actions, we don't need to wake up until we get an update
if s.tweakables.SleepWhilePaused && !s.canTakeScheduledAction(false, false) {
nextWakeup = time.Time{}
Expand Down Expand Up @@ -724,6 +729,12 @@ func (s *scheduler) handleRefreshSignal(ch workflow.ReceiveChannel, _ bool) {
s.State.NeedRefresh = true
}

func (s *scheduler) handleForceCANSignal(ch workflow.ReceiveChannel, _ bool) {
ch.Receive(s.ctx, nil)
s.logger.Debug("got force-continue-as-new signal")
s.forceCAN = true
}

func (s *scheduler) processSignals() bool {
scheduleChanged := false
if s.pendingPatch != nil {
Expand Down
36 changes: 36 additions & 0 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,3 +1773,39 @@ func (s *workflowSuite) TestCANBySuggested() {
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

func (s *workflowSuite) TestCANBySignal() {
// written using low-level mocks so we can control iteration count

const iters = 30
// note: one fewer run than iters since the first doesn't start anything
for i := 1; i < iters; i++ {
t := baseStartTime.Add(5 * time.Minute * time.Duration(i))
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Equal("myid-"+t.Format(time.RFC3339), req.Request.WorkflowId)
return nil, nil
})
}
// this one catches and fails if we go over
s.expectStart(func(req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
s.Fail("too many starts", req.Request.WorkflowId)
return nil, nil
}).Times(0).Maybe()

s.env.RegisterDelayedCallback(func() {
s.env.SignalWorkflow(SignalNameForceCAN, nil)
}, 5*time.Minute*iters-time.Second)

s.run(&schedpb.Schedule{
Spec: &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: timestamp.DurationPtr(5 * time.Minute),
}},
},
Policies: &schedpb.SchedulePolicies{
OverlapPolicy: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL,
},
}, 0) // 0 means use suggested
s.True(s.env.IsWorkflowCompleted())
s.True(workflow.IsContinueAsNewError(s.env.GetWorkflowError()))
}

0 comments on commit 2320fc1

Please # to comment.