Skip to content

Commit

Permalink
Add tests for reset reapply to batch reset command
Browse files Browse the repository at this point in the history
  • Loading branch information
jamipouchi committed Nov 16, 2024
1 parent 3583bff commit 1e358d2
Showing 1 changed file with 280 additions and 7 deletions.
287 changes: 280 additions & 7 deletions temporalcli/commands.workflow_reset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToFirstWorkflowTask() {
s.Greater(activityExecutions, 1, "Should have re-executed the workflow from the beginning")
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ToFirstWorkflowTask() {
var wfExecutions, activityExecutions int
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
activityExecutions++
return nil, nil
})
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil)
wfExecutions++
return nil, nil
})

// Start the workflow
searchAttr := "keyword-" + uuid.NewString()
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
var junk any
s.NoError(run.Get(s.Context, &junk))
s.Equal(1, wfExecutions)

s.CommandHarness.Stdin.WriteString("y\n")
res := s.Execute(
"workflow", "reset",
"--address", s.Address(),
"--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr),
"-t", "FirstWorkflowTask",
"--reason", "test-reset-FirstWorkflowTask",
)
require.NoError(s.T(), res.Err)
s.awaitNextWorkflow(searchAttr)
s.Equal(2, wfExecutions, "Should have re-executed the workflow from the beginning")
s.Greater(activityExecutions, 1, "Should have re-executed the workflow from the beginning")
}

func (s *SharedServerSuite) TestWorkflow_Reset_ToLastWorkflowTask() {
var wfExecutions, activityExecutions int
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
Expand Down Expand Up @@ -119,6 +161,48 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ToLastWorkflowTask() {
s.Equal(1, activityExecutions, "Should not have re-executed the activity")
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ToLastWorkflowTask() {
var wfExecutions, activityExecutions int
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
activityExecutions++
return nil, nil
})
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.ExecuteActivity(ctx, DevActivity, 1).Get(ctx, nil)
wfExecutions++
return nil, nil
})

// Start the workflow
searchAttr := "keyword-" + uuid.NewString()
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
},
DevWorkflow,
"ignored",
)
s.NoError(err)
var junk any
s.NoError(run.Get(s.Context, &junk))
s.Equal(1, wfExecutions)

s.CommandHarness.Stdin.WriteString("y\n")
res := s.Execute(
"workflow", "reset",
"--address", s.Address(),
"--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr),
"-t", "LastWorkflowTask",
"--reason", "test-reset-LastWorkflowTask",
)
require.NoError(s.T(), res.Err)
s.awaitNextWorkflow(searchAttr)
s.Equal(2, wfExecutions, "Should re-executed the workflow")
s.Equal(1, activityExecutions, "Should not have re-executed the activity")
}

func (s *SharedServerSuite) TestWorkflow_Reset_ToEventID() {
// We execute two activities and will resume just before the second one. We use the same activity for both
// but a unique input so we can check which fake activity is executed
Expand Down Expand Up @@ -248,6 +332,94 @@ func (s *SharedServerSuite) TestBatchResetByBuildId() {
sut.stopWorkerFor("v3")
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_OnlyMatchingQuery() {
var resetWfExecutions, resetActivityExecutions int
var nonResetWfExecutions, nonResetActivityExecutions int
s.Worker().OnDevActivity(func(ctx context.Context, a any) (any, error) {
isReset, ok := a.(bool)
if !ok {
return nil, fmt.Errorf("expected bool, not %T (%v)", a, a)
}
if isReset {
resetActivityExecutions++
} else {
nonResetActivityExecutions++
}
return nil, nil
})
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
workflow.ExecuteActivity(ctx, DevActivity, a).Get(ctx, nil)
isReset, ok := a.(bool)
if !ok {
return nil, fmt.Errorf("expected bool, not %T (%v)", a, a)
}
if isReset {
resetWfExecutions++
} else {
nonResetWfExecutions++
}
return nil, nil
})

resetSearchAttr := "keyword-" + uuid.NewString()
run, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": resetSearchAttr},
},
DevWorkflow,
true,
)
s.NoError(err)
var junk any
s.NoError(run.Get(s.Context, &junk))
s.Equal(1, resetWfExecutions)

nonResetSearchAttr := "keyword-" + uuid.NewString()
nonResetRun, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{
TaskQueue: s.Worker().Options.TaskQueue,
SearchAttributes: map[string]any{"CustomKeywordField": nonResetSearchAttr},
},
DevWorkflow,
false,
)
s.NoError(err)
s.NoError(nonResetRun.Get(s.Context, &junk))
s.Equal(1, nonResetWfExecutions)

s.CommandHarness.Stdin.WriteString("y\n")
res := s.Execute(
"workflow", "reset",
"--address", s.Address(),
"--query", fmt.Sprintf("CustomKeywordField = '%s'", resetSearchAttr),
"-t", "FirstWorkflowTask",
"--reason", "test-reset-FirstWorkflowTask",
)
require.NoError(s.T(), res.Err)
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + resetSearchAttr + "'" + " OR " + "CustomKeywordField = '" + nonResetSearchAttr + "'",
})
s.NoError(err)
if len(resp.Executions) != 3 {
return false
}
for _, exec := range resp.Executions {
if exec.Status != enums.WORKFLOW_EXECUTION_STATUS_COMPLETED {
return false
}
}
return true
}, 3*time.Second, 100*time.Millisecond)
s.Equal(2, resetWfExecutions, "Should have re-executed the workflow from the beginning")
s.Equal(2, resetActivityExecutions, "Should have re-executed the workflow from the beginning")
s.Equal(1, nonResetWfExecutions, "Should not have re-executed the non-matching workflow")
s.Equal(1, nonResetActivityExecutions, "Should not have re-executed the non-matching workflow")
}

type WorkflowResetTest struct {
s *SharedServerSuite
reapplyType string
Expand All @@ -262,7 +434,16 @@ func (s *SharedServerSuite) TestWorkflow_Reset_DefaultReappliesAll() {
expectUpdatesReapplied: true,
expectSignalsReapplied: true,
}
t.run()
t.runSingleReset()
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ReappliesAll() {
t := WorkflowResetTest{
s: s,
expectUpdatesReapplied: true,
expectSignalsReapplied: true,
}
t.runBatchReset()
}

func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeUpdate() {
Expand All @@ -272,7 +453,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeUpdate() {
expectUpdatesReapplied: false,
expectSignalsReapplied: true,
}
t.run()
t.runSingleReset()
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeUpdate() {
t := WorkflowResetTest{
s: s,
reapplyExclude: []string{"Update"},
expectUpdatesReapplied: false,
expectSignalsReapplied: true,
}
t.runBatchReset()
}

func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignal() {
Expand All @@ -282,7 +473,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignal() {
expectUpdatesReapplied: true,
expectSignalsReapplied: false,
}
t.run()
t.runSingleReset()
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeSignal() {
t := WorkflowResetTest{
s: s,
reapplyExclude: []string{"Signal"},
expectUpdatesReapplied: true,
expectSignalsReapplied: false,
}
t.runBatchReset()
}

func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignalAndUpdate() {
Expand All @@ -292,7 +493,17 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ExcludeSignalAndUpdate() {
expectUpdatesReapplied: false,
expectSignalsReapplied: false,
}
t.run()
t.runSingleReset()
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ExcludeSignalAndUpdate() {
t := WorkflowResetTest{
s: s,
reapplyExclude: []string{"Signal", "Update"},
expectUpdatesReapplied: false,
expectSignalsReapplied: false,
}
t.runBatchReset()
}

func (s *SharedServerSuite) TestWorkflow_Reset_ReapplySignalOnly() {
Expand All @@ -302,13 +513,32 @@ func (s *SharedServerSuite) TestWorkflow_Reset_ReapplySignalOnly() {
expectUpdatesReapplied: false,
expectSignalsReapplied: true,
}
t.run()
t.runSingleReset()
}

func (s *SharedServerSuite) TestWorkflow_ResetBatch_ReapplySignalOnly() {
t := WorkflowResetTest{
s: s,
reapplyType: "Signal",
expectUpdatesReapplied: false,
expectSignalsReapplied: true,
}
t.runBatchReset()
}

func (t *WorkflowResetTest) runSingleReset() {
t.run(false)
}

func (t *WorkflowResetTest) runBatchReset() {
t.run(true)
}

func (t *WorkflowResetTest) run() {
func (t *WorkflowResetTest) run(resetBatch bool) {
s := t.s
var wfExecutions, updateHandlerExecutions, signalHandlerExecutions int
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {

// Handle signals
sigChan := workflow.GetSignalChannel(ctx, "mySignal")
workflow.Go(ctx, func(ctx workflow.Context) {
Expand All @@ -333,9 +563,14 @@ func (t *WorkflowResetTest) run() {
searchAttr := "keyword-" + uuid.NewString()
run := t.startWorkflowAndSendTwoSignalsAndTwoUpdates(searchAttr)
s.Equal(2, updateHandlerExecutions)
s.Equal(2, signalHandlerExecutions)
s.Equal(1, wfExecutions)

t.resetWorkflow(run.GetID())
if resetBatch {
t.resetBatchWorkflow(searchAttr)
} else {
t.resetWorkflow(run.GetID())
}
s.awaitNextWorkflow(searchAttr)

if t.expectUpdatesReapplied {
Expand Down Expand Up @@ -365,6 +600,19 @@ func (t *WorkflowResetTest) startWorkflowAndSendTwoSignalsAndTwoUpdates(searchAt
)
s.NoError(err)

// Wait for the workflow to start before sending signals/updates.
// This has to be done, as batch reset with type `FirstWorkflowTask`, will reset to first workflow task completed, so the first signal
// sent before the workflow starts, will be reapplied, as the reset point is later in the history.
// The same would happen with single reset to eventId 4.
s.Eventually(func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "CustomKeywordField = '" + searchAttr + "'",
})
s.NoError(err)
return len(resp.Executions) > 0
}, 3*time.Second, 100*time.Millisecond, "Workflow failed to start")

// before sending signals, we wait for the workflow to execute the activity
for i := 1; i <= 2; i++ {
s.NoError(s.Client.SignalWorkflow(s.Context, run.GetID(), run.GetRunID(), "mySignal", fmt.Sprintf("%d", i)))
updateHandle, err := s.Client.UpdateWorkflow(s.Context, client.UpdateWorkflowOptions{
Expand Down Expand Up @@ -405,6 +653,31 @@ func (t *WorkflowResetTest) resetWorkflow(workflowID string) {
require.NoError(s.T(), res.Err)
}

func (t *WorkflowResetTest) resetBatchWorkflow(searchAttr string) {
s := t.s
args := []string{
"workflow", "reset",
"--address", s.Address(),
"--query", fmt.Sprintf("CustomKeywordField = '%s'", searchAttr),
"--type", "FirstWorkflowTask",
"--reason", "test-workflow-reset",
}
if len(t.reapplyExclude) > 0 && t.reapplyType != "" {
panic("--reapply-type cannot be used with --reapply-exclude")
}
if t.reapplyType != "" {
args = append(args, "--reapply-type", t.reapplyType)
}
if len(t.reapplyExclude) > 0 {
for _, exclude := range t.reapplyExclude {
args = append(args, "--reapply-exclude", exclude)
}
}
s.CommandHarness.Stdin.WriteString("y\n")
res := s.Execute(args...)
require.NoError(s.T(), res.Err)
}

func (s *SharedServerSuite) TestWorkflow_Reset_DoesNotAllowBothReapplyOptions() {
res := s.Execute(
"workflow", "reset",
Expand Down

0 comments on commit 1e358d2

Please # to comment.