Skip to content

Commit

Permalink
Activate schedule workflow changes in #5179, #5277, #5344, and #5381 (#…
Browse files Browse the repository at this point in the history
…5698)

## What changed?
Activate schedule workflow logic changes. Some of this code is not in
1.23.0, but we can patch those PRs into 1.23.1 to support downgrades to
the 1.23 series.

## Why?
Fix bugs, support new features, make more efficient.

## How did you test it?
existing tests (on those PRs)

## Potential risks
schedule workflow determinism errors
  • Loading branch information
dnr committed Apr 10, 2024
1 parent 914dc8d commit 119eb93
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 32 deletions.
20 changes: 2 additions & 18 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ var (
BackfillsPerIteration: 10,
AllowZeroSleep: true,
ReuseTimer: true,
NextTimeCacheV2Size: 14, // see note below
Version: DontTrackOverlapping, // TODO: upgrade to UpdateFromPrevious
NextTimeCacheV2Size: 14, // see note below
Version: UpdateFromPrevious,
}

// Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each
Expand Down Expand Up @@ -539,22 +539,6 @@ func (s *scheduler) fillNextTimeCacheV2(start time.Time) {
// Run this logic in a SideEffect so that we can fix bugs there without breaking
// existing schedule workflows.
val := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
// Continue returning json temporarily for forwards-compatibility.
// TODO: remove this after releasing a version that understands proto.
if true {
cache := jsonNextTimeCacheV2{Version: s.tweakables.Version, Start: start}
for t := start; len(cache.Results) < s.tweakables.NextTimeCacheV2Size; {
next := s.cspec.getNextTime(s.jitterSeed(), t)
if next.Next.IsZero() {
cache.Completed = true
break
}
cache.Results = append(cache.Results, next)
t = next.Next
}
return cache
}

cache := &schedspb.NextTimeCache{
Version: int64(s.tweakables.Version),
StartTime: timestamppb.New(start),
Expand Down
14 changes: 0 additions & 14 deletions service/worker/scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,11 +1256,6 @@ func (s *workflowSuite) TestBackfill() {
}

func (s *workflowSuite) TestBackfillInclusiveStartEnd() {
// TODO: remove once default version is InclusiveBackfillStartTime
currentVersion := currentTweakablePolicies.Version
currentTweakablePolicies.Version = InclusiveBackfillStartTime
defer func() { currentTweakablePolicies.Version = currentVersion }()

s.runAcrossContinue(
[]workflowRun{
// if start and end time were not inclusive, this backfill run would not exist
Expand Down Expand Up @@ -1322,9 +1317,7 @@ func (s *workflowSuite) TestBackfillInclusiveStartEnd() {
}

func (s *workflowSuite) TestHugeBackfillAllowAll() {
// TODO: remove once default version is IncrementalBackfill
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = IncrementalBackfill
currentTweakablePolicies.MaxBufferSize = 30 // make smaller for testing
defer func() { currentTweakablePolicies = prevTweakables }()

Expand Down Expand Up @@ -1386,9 +1379,7 @@ func (s *workflowSuite) TestHugeBackfillAllowAll() {
}

func (s *workflowSuite) TestHugeBackfillBuffer() {
// TODO: remove once default version is IncrementalBackfill
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = IncrementalBackfill
currentTweakablePolicies.MaxBufferSize = 30 // make smaller for testing
defer func() { currentTweakablePolicies = prevTweakables }()

Expand Down Expand Up @@ -1673,11 +1664,6 @@ func (s *workflowSuite) TestUpdateNotRetroactive() {
// Tests that an update between a nominal time and jittered time for a start, that doesn't
// modify that start, will still start it.
func (s *workflowSuite) TestUpdateBetweenNominalAndJitter() {
// TODO: remove once default version is UpdateFromPrevious
prevTweakables := currentTweakablePolicies
currentTweakablePolicies.Version = UpdateFromPrevious
defer func() { currentTweakablePolicies = prevTweakables }()

spec := &schedpb.ScheduleSpec{
Interval: []*schedpb.IntervalSpec{{
Interval: durationpb.New(1 * time.Hour),
Expand Down

0 comments on commit 119eb93

Please # to comment.