From 119eb93fead05a5f07e4252123f8e7bc686ec999 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 10 Apr 2024 20:22:21 +0000 Subject: [PATCH] Activate schedule workflow changes in #5179, #5277, #5344, and #5381 (#5698) ## What changed? Activate schedule workflow logic changes. Some of this code is not in 1.23.0, but we can patch those PRs into 1.23.1 to support downgrades to the 1.23 series. ## Why? Fix bugs, support new features, make more efficient. ## How did you test it? existing tests (on those PRs) ## Potential risks schedule workflow determinism errors --- service/worker/scheduler/workflow.go | 20 ++------------------ service/worker/scheduler/workflow_test.go | 14 -------------- 2 files changed, 2 insertions(+), 32 deletions(-) diff --git a/service/worker/scheduler/workflow.go b/service/worker/scheduler/workflow.go index a624c8430ee..7989dc031cf 100644 --- a/service/worker/scheduler/workflow.go +++ b/service/worker/scheduler/workflow.go @@ -213,8 +213,8 @@ var ( BackfillsPerIteration: 10, AllowZeroSleep: true, ReuseTimer: true, - NextTimeCacheV2Size: 14, // see note below - Version: DontTrackOverlapping, // TODO: upgrade to UpdateFromPrevious + NextTimeCacheV2Size: 14, // see note below + Version: UpdateFromPrevious, } // Note on NextTimeCacheV2Size: This value must be > FutureActionCountForList. Each @@ -539,22 +539,6 @@ func (s *scheduler) fillNextTimeCacheV2(start time.Time) { // Run this logic in a SideEffect so that we can fix bugs there without breaking // existing schedule workflows. val := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} { - // Continue returning json temporarily for forwards-compatibility. - // TODO: remove this after releasing a version that understands proto. - if true { - cache := jsonNextTimeCacheV2{Version: s.tweakables.Version, Start: start} - for t := start; len(cache.Results) < s.tweakables.NextTimeCacheV2Size; { - next := s.cspec.getNextTime(s.jitterSeed(), t) - if next.Next.IsZero() { - cache.Completed = true - break - } - cache.Results = append(cache.Results, next) - t = next.Next - } - return cache - } - cache := &schedspb.NextTimeCache{ Version: int64(s.tweakables.Version), StartTime: timestamppb.New(start), diff --git a/service/worker/scheduler/workflow_test.go b/service/worker/scheduler/workflow_test.go index a386182235e..60f9ffb360a 100644 --- a/service/worker/scheduler/workflow_test.go +++ b/service/worker/scheduler/workflow_test.go @@ -1256,11 +1256,6 @@ func (s *workflowSuite) TestBackfill() { } func (s *workflowSuite) TestBackfillInclusiveStartEnd() { - // TODO: remove once default version is InclusiveBackfillStartTime - currentVersion := currentTweakablePolicies.Version - currentTweakablePolicies.Version = InclusiveBackfillStartTime - defer func() { currentTweakablePolicies.Version = currentVersion }() - s.runAcrossContinue( []workflowRun{ // if start and end time were not inclusive, this backfill run would not exist @@ -1322,9 +1317,7 @@ func (s *workflowSuite) TestBackfillInclusiveStartEnd() { } func (s *workflowSuite) TestHugeBackfillAllowAll() { - // TODO: remove once default version is IncrementalBackfill prevTweakables := currentTweakablePolicies - currentTweakablePolicies.Version = IncrementalBackfill currentTweakablePolicies.MaxBufferSize = 30 // make smaller for testing defer func() { currentTweakablePolicies = prevTweakables }() @@ -1386,9 +1379,7 @@ func (s *workflowSuite) TestHugeBackfillAllowAll() { } func (s *workflowSuite) TestHugeBackfillBuffer() { - // TODO: remove once default version is IncrementalBackfill prevTweakables := currentTweakablePolicies - currentTweakablePolicies.Version = IncrementalBackfill currentTweakablePolicies.MaxBufferSize = 30 // make smaller for testing defer func() { currentTweakablePolicies = prevTweakables }() @@ -1673,11 +1664,6 @@ func (s *workflowSuite) TestUpdateNotRetroactive() { // Tests that an update between a nominal time and jittered time for a start, that doesn't // modify that start, will still start it. func (s *workflowSuite) TestUpdateBetweenNominalAndJitter() { - // TODO: remove once default version is UpdateFromPrevious - prevTweakables := currentTweakablePolicies - currentTweakablePolicies.Version = UpdateFromPrevious - defer func() { currentTweakablePolicies = prevTweakables }() - spec := &schedpb.ScheduleSpec{ Interval: []*schedpb.IntervalSpec{{ Interval: durationpb.New(1 * time.Hour),