diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index ccab7830..45a4069c 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -2859,6 +2859,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowDeleteCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowDescribeCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowExecuteCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowFixHistoryJsonCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowMetadataCommand(cctx, &s).Command) @@ -2870,6 +2871,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) * s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command) + s.Command.AddCommand(&NewTemporalWorkflowStartUpdateWithStartCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command) s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command) @@ -3026,6 +3028,58 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor return &s } +type TemporalWorkflowExecuteUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowExecuteUpdateWithStartCommand { + var s TemporalWorkflowExecuteUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "execute-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to complete (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowFixHistoryJsonCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3407,6 +3461,62 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf return &s } +type TemporalWorkflowStartUpdateWithStartCommand struct { + Parent *TemporalWorkflowCommand + Command cobra.Command + SharedWorkflowStartOptions + WorkflowStartOptions + PayloadInputOptions + UpdateName string + UpdateFirstExecutionRunId string + UpdateWaitForStage StringEnum + UpdateId string + RunId string + UpdateInput []string + UpdateInputFile []string + UpdateInputMeta []string + UpdateInputBase64 bool +} + +func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowStartUpdateWithStartCommand { + var s TemporalWorkflowStartUpdateWithStartCommand + s.Parent = parent + s.Command.DisableFlagsInUseLine = true + s.Command.Use = "start-update-with-start [flags]" + s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" + if hasHighlighting { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m" + } else { + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```" + } + s.Command.Args = cobra.NoArgs + s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name") + s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.") + s.UpdateWaitForStage = NewStringEnum([]string{"accepted"}, "") + s.Command.Flags().Var(&s.UpdateWaitForStage, "update-wait-for-stage", "Update stage to wait for. The only option is `accepted`, but this option is required. This is to allow a future version of the CLI to choose a default value. Accepted values: accepted. Required.") + _ = cobra.MarkFlagRequired(s.Command.Flags(), "update-wait-for-stage") + s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.") + s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.") + s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.") + s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.") + s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.") + s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags()) + s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags()) + s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{ + "name": "type", + "update-type": "update-name", + })) + s.Command.Run = func(c *cobra.Command, args []string) { + if err := s.run(cctx, args); err != nil { + cctx.Options.Fail(err) + } + } + return &s +} + type TemporalWorkflowTerminateCommand struct { Parent *TemporalWorkflowCommand Command cobra.Command @@ -3599,9 +3709,9 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora s.Command.Use = "start [flags]" s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)" if hasHighlighting { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\x1b[0m" } else { - s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```" + s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\n```" } s.Command.Args = cobra.NoArgs s.WaitForStage = NewStringEnum([]string{"accepted"}, "") diff --git a/temporalcli/commands.workflow_exec.go b/temporalcli/commands.workflow_exec.go index cb5cf77f..a88d89ba 100644 --- a/temporalcli/commands.workflow_exec.go +++ b/temporalcli/commands.workflow_exec.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "os" "reflect" @@ -121,7 +122,7 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s signalPayloadInputOpts := PayloadInputOptions{ Input: c.SignalInput, InputFile: c.SignalInputFile, - InputMeta: c.InputMeta, + InputMeta: c.SignalInputMeta, InputBase64: c.SignalInputBase64, } signalInput, err := signalPayloadInputOpts.buildRawInputPayloads() @@ -194,16 +195,191 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s RunId string `json:"runId"` Type string `json:"type"` Namespace string `json:"namespace"` - TaskQueue string `json:"taskQueue"` }{ WorkflowId: c.WorkflowId, RunId: resp.RunId, Type: c.Type, Namespace: c.Parent.Namespace, - TaskQueue: c.TaskQueue, }, printer.StructuredOptions{}) } +func (c *TemporalWorkflowStartUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + waitForStage := client.WorkflowUpdateStageUnspecified + switch c.UpdateWaitForStage.Value { + case "accepted": + waitForStage = client.WorkflowUpdateStageAccepted + } + if waitForStage != client.WorkflowUpdateStageAccepted { + return fmt.Errorf("invalid wait for stage: %v, valid values are: 'accepted'", c.UpdateWaitForStage) + } + + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + updateOpts := client.UpdateWorkflowOptions{ + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + UpdateName: c.UpdateName, + Args: updateInput, + WaitForStage: waitForStage, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + // Currently we only accept 'accepted' as a valid wait for stage value, but we intend + // to support more in the future. + if waitForStage == client.WorkflowUpdateStageAccepted { + // Use a canceled context to check whether the initial server response + // shows that the update has _already_ failed, without issuing a second request. + ctx, cancel := context.WithCancel(cctx) + cancel() + err = handle.Get(ctx, nil) + var timeoutOrCanceledErr *client.WorkflowUpdateServiceTimeoutOrCanceledError + if err != nil && !errors.As(err, &timeoutOrCanceledErr) { + return fmt.Errorf("unable to update workflow: %w", err) + } + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: handle.UpdateID(), + }, printer.StructuredOptions{}) +} + +func (c *TemporalWorkflowExecuteUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error { + updatePayloadInputOpts := PayloadInputOptions{ + Input: c.UpdateInput, + InputFile: c.UpdateInputFile, + InputMeta: c.UpdateInputMeta, + InputBase64: c.UpdateInputBase64, + } + updateInput, err := updatePayloadInputOpts.buildRawInput() + if err != nil { + return err + } + + updateOpts := client.UpdateWorkflowOptions{ + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + WorkflowID: c.WorkflowId, + RunID: c.RunId, + Args: updateInput, + WaitForStage: client.WorkflowUpdateStageCompleted, + FirstExecutionRunID: c.UpdateFirstExecutionRunId, + } + + handle, err := executeUpdateWithStartWorkflow( + cctx, + c.Parent.ClientOptions, + c.SharedWorkflowStartOptions, + c.WorkflowStartOptions, + c.PayloadInputOptions, + updateOpts, + ) + if err != nil { + return err + } + + var valuePtr interface{} + err = handle.Get(cctx, &valuePtr) + if err != nil { + return fmt.Errorf("unable to update workflow: %w", err) + } + + cctx.Printer.Println(color.MagentaString("Running execution:")) + return cctx.Printer.PrintStructured(struct { + WorkflowId string `json:"workflowId"` + RunId string `json:"runId"` + Type string `json:"type"` + Namespace string `json:"namespace"` + UpdateName string `json:"updateName"` + UpdateID string `json:"updateId"` + UpdateResult interface{} `json:"updateResult"` + }{ + WorkflowId: c.WorkflowId, + RunId: handle.RunID(), + Type: c.Type, + Namespace: c.Parent.Namespace, + UpdateName: c.UpdateName, + UpdateID: c.UpdateId, + UpdateResult: valuePtr, + }, printer.StructuredOptions{}) +} + +func executeUpdateWithStartWorkflow( + cctx *CommandContext, + clientOpts ClientOptions, + sharedWfOpts SharedWorkflowStartOptions, + wfStartOpts WorkflowStartOptions, + wfInputOpts PayloadInputOptions, + updateWfOpts client.UpdateWorkflowOptions, +) (client.WorkflowUpdateHandle, error) { + if sharedWfOpts.WorkflowId == "" { + return nil, fmt.Errorf("--workflow-id flag must be provided") + } + if wfStartOpts.IdConflictPolicy.Value == "" { + return nil, fmt.Errorf("--id-conflict-policy flag must be provided") + } + cl, err := clientOpts.dialClient(cctx) + if err != nil { + return nil, err + } + defer cl.Close() + + clStartWfOpts, err := buildStartOptions(&sharedWfOpts, &wfStartOpts) + if err != nil { + return nil, err + } + + wfArgs, err := wfInputOpts.buildRawInput() + if err != nil { + return nil, err + } + + startOp := cl.NewWithStartWorkflowOperation( + clStartWfOpts, + sharedWfOpts.Type, + wfArgs..., + ) + + // Execute the update with start operation + return cl.UpdateWithStartWorkflow(cctx, client.UpdateWithStartWorkflowOptions{ + StartWorkflowOperation: startOp, + UpdateOptions: updateWfOpts, + }) +} + type workflowJSONResult struct { WorkflowId string `json:"workflowId"` RunId string `json:"runId"` diff --git a/temporalcli/commands.workflow_exec_test.go b/temporalcli/commands.workflow_exec_test.go index e706d946..9d88a110 100644 --- a/temporalcli/commands.workflow_exec_test.go +++ b/temporalcli/commands.workflow_exec_test.go @@ -675,8 +675,40 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_WorkflowIDMandatory() { s.ErrorContains(res.Err, "--workflow-id flag must be provided") } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartNewWorkflow() { + s.testSignalWithStartHelper(false) +} + +func (s *SharedServerSuite) TestWorkflow_SignalWithStart_SendSignalToExistingWorkflow() { + s.testSignalWithStartHelper(true) +} + +func (s *SharedServerSuite) testSignalWithStartHelper(useExistingWorkflow bool) { wfId := uuid.NewString() + signalWfInput := `"workflow-input"` + signalInput := `"signal-input"` + expectedWfOutput := map[string]string{ + "workflow": "workflow-input", + "signal": "signal-input", + } + + if useExistingWorkflow { + run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") + s.NoError(err) + // Re-assign wfId for the signal to be sent to an existing workflow. + wfId = run.GetID() + expectedWfOutput["workflow"] = "not-signal-with-start-input" + } + + // Run workflow, block on signal. + s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + var sigReceived string + workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) + wfState["signal"] = sigReceived + return wfState, nil + }) // Send signal-with-start command. res := s.Execute( @@ -684,84 +716,250 @@ func (s *SharedServerSuite) TestWorkflow_SignalWithStart_StartsWorkflow() { "--address", s.Address(), "--workflow-id", wfId, "--type", "DevWorkflow", - "--input", `{"wf-signal-with-start": "workflow-input"}`, - "--task-queue", "tq", + "--input", signalWfInput, + "--task-queue", s.Worker().Options.TaskQueue, "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, + "--signal-input", signalInput, ) - s.NoError(res.Err) - // Confirm text output has key/vals as expected out := res.Stdout.String() s.ContainsOnSameLine(out, "WorkflowId", wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", "tq") s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") - // Check that new workflow was started with expected workflow ID. - run := s.Client.GetWorkflow(s.Context, wfId, "") - s.Equal(wfId, run.GetID()) - - // Run workflow, block on signal. - var sigReceived any - s.StartDevWorker(s.t, DevWorkerOptions{TaskQueue: "tq"}).OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil - }) + // Check that a new workflow was started with expected workflow ID. + if !useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, wfId, "") + s.Equal(wfId, run.GetID()) + } // Wait for workflow to complete. - var wfReturn any + wfReturn := make(map[string]string) err := s.Client.GetWorkflow(s.Context, wfId, "").Get(s.Context, &wfReturn) s.NoError(err) - // Expect workflow to have received signal and given inputs from signal-with-start. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) - s.Equal(map[string]any{"wf-signal-with-start": "workflow-input"}, wfReturn) + // Compare the extracted values with what the workflow returned + s.Equal(expectedWfOutput["signal"], wfReturn["signal"]) + s.Equal(expectedWfOutput["workflow"], wfReturn["workflow"]) } -func (s *SharedServerSuite) TestWorkflow_SignalWithStart_ExistingWorkflow() { - // Run workflow, block on signal. - var sigReceived any +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "start-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--update-wait-for-stage", "accepted", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_RuntimeOptionChecks() { + res := s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + ) + s.ErrorContains(res.Err, "--workflow-id flag must be provided") + res = s.Execute( + "workflow", "execute-update-with-start", + "--type", "wfType", + "--task-queue", "tq", + "--update-name", "updateName", + "--workflow-id", "wfId", + ) + s.ErrorContains(res.Err, "--id-conflict-policy flag must be provided") +} + +type updateWithStartTest struct { + updateWithStartSetup + useStart bool + idConflictPolicy string + expectedError string + expectedUpdateResult string + expectedWfOutput map[string]string +} + +type updateWithStartSetup struct { + wfId string + updateName string + updateId string + useExistingWorkflow bool +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_StartsNewWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_StartUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: true, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_StartsWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(false) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedUpdateResult: "update-input", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) TestWorkflow_ExecuteUpdateWithStart_SendUpdateToExistingWorkflow() { + updateWithStartSetup := s.updateWithStartTestSetup(true) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "Fail", + expectedError: "Workflow execution is already running", + expectedWfOutput: map[string]string{"workflow": "workflow-input", "update": "update-input"}, + }) + s.testStartUpdateWithStartHelper(updateWithStartTest{ + updateWithStartSetup: updateWithStartSetup, + useStart: false, + idConflictPolicy: "UseExisting", + expectedWfOutput: map[string]string{"workflow": "not-update-with-start-workflow-input", "update": "update-input"}, + }) +} + +func (s *SharedServerSuite) updateWithStartTestSetup(useExistingWorkflow bool) updateWithStartSetup { + wfId := uuid.NewString() + updateName := "test-update-name" + updateId := uuid.NewString() + if useExistingWorkflow { + // Start a workflow with a specific workflow ID. + run, err := s.Client.ExecuteWorkflow( + s.Context, + client.StartWorkflowOptions{ + TaskQueue: s.Worker().Options.TaskQueue, + }, + DevWorkflow, + "not-update-with-start-workflow-input", + ) + s.NoError(err) + // Re-assign wfId for the update to be sent to an existing workflow. + wfId = run.GetID() + } + + // Run workflow. s.Worker().OnDevWorkflow(func(ctx workflow.Context, wfInput any) (any, error) { - workflow.GetSignalChannel(ctx, "sigName").Receive(ctx, &sigReceived) - return wfInput, nil + wfState := make(map[string]string) + wfState["workflow"] = wfInput.(string) + + err := workflow.SetUpdateHandlerWithOptions( + ctx, + updateName, + func(ctx workflow.Context, updateInput string) (string, error) { + wfState["update"] = updateInput + return updateInput, nil + }, + workflow.UpdateHandlerOptions{}, + ) + if err != nil { + return nil, err + } + // Block workflow completion on signal. + workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) + return wfState, nil }) + return updateWithStartSetup{wfId, updateName, updateId, useExistingWorkflow} +} - // Start workflow - run, err := s.Client.ExecuteWorkflow(s.Context, client.StartWorkflowOptions{TaskQueue: s.Worker().Options.TaskQueue}, DevWorkflow, "not-signal-with-start-input") - s.NoError(err) +func (s *SharedServerSuite) testStartUpdateWithStartHelper(opts updateWithStartTest) { + cmdName := "execute-update-with-start" + additionalArgs := []string{} - wfId := run.GetID() + if opts.useStart { + cmdName = "start-update-with-start" + additionalArgs = []string{"--update-wait-for-stage", "accepted"} + } - // Send signal-with-start command. - res := s.Execute( - "workflow", "signal-with-start", + baseArgs := []string{ + "workflow", cmdName, "--address", s.Address(), - "--workflow-id", wfId, + "--workflow-id", opts.wfId, "--type", "DevWorkflow", - "--input", `{"workflow": "workflow-input"}`, + "--input", `"workflow-input"`, "--task-queue", s.Worker().Options.TaskQueue, - "--signal-name", "sigName", - "--signal-input", `{"signal-with-start": "signal-input"}`, - ) + "--id-conflict-policy", opts.idConflictPolicy, + "--update-name", opts.updateName, + "--update-id", opts.updateId, + "--update-input", `"update-input"`, + } + + // Send start-update-with-start command. + args := append(baseArgs, additionalArgs...) + res := s.Execute(args...) + + // Check expected error. + if opts.expectedError != "" { + s.ErrorContains(res.Err, opts.expectedError) + return + } + s.NoError(res.Err) // Confirm text output has key/vals as expected out := res.Stdout.String() - s.ContainsOnSameLine(out, "WorkflowId", wfId) + s.ContainsOnSameLine(out, "WorkflowId", opts.wfId) s.Contains(out, "RunId") - s.ContainsOnSameLine(out, "TaskQueue", s.Worker().Options.TaskQueue) s.ContainsOnSameLine(out, "Type", "DevWorkflow") s.ContainsOnSameLine(out, "Namespace", "default") + s.ContainsOnSameLine(out, "UpdateName", opts.updateName) + s.ContainsOnSameLine(out, "UpdateID", opts.updateId) + + // Check expected update result. + if opts.expectedUpdateResult != "" { + s.ContainsOnSameLine(out, "UpdateResult", opts.expectedUpdateResult) + } + + // Check that new workflow was started with expected workflow ID. + if !opts.useExistingWorkflow { + run := s.Client.GetWorkflow(s.Context, opts.wfId, "") + s.Equal(opts.wfId, run.GetID()) + } + + // Send signal to complete workflow. + err := s.Client.SignalWorkflow(s.Context, opts.wfId, "", "complete", nil) + s.NoError(err) // Wait for workflow to complete. - var ret any - s.NoError(run.Get(s.Context, &ret)) + wfReturn := make(map[string]string) + err = s.Client.GetWorkflow(s.Context, opts.wfId, "").Get(s.Context, &wfReturn) + s.NoError(err) - // Expect workflow to have not been started by the signal-with-start command. - s.Equal("not-signal-with-start-input", ret) - // Expect signal to have been received with given input. - s.Equal(map[string]any{"signal-with-start": "signal-input"}, sigReceived) + // Expect workflow to have received update and given inputs from start-update-with-start. + s.Equal(opts.expectedWfOutput["workflow"], wfReturn["workflow"]) + s.Equal(opts.expectedWfOutput["update"], wfReturn["update"]) } diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index ea676a11..b0e6597a 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -3673,6 +3673,7 @@ commands: --workflow-id YourWorkflowId \ --name YourUpdate \ --input '{"some-key": "some-value"}' + --wait-for-stage accepted ``` option-sets: - update-starting @@ -3688,6 +3689,163 @@ commands: - accepted required: true + - name: temporal workflow start-update-with-start + summary: Send an Update and wait for it to be accepted or rejected (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to be accepted or rejected. If the Workflow Execution is not running, + then a new workflow execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow start-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --update-wait-for-stage accepted \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-wait-for-stage + type: string-enum + description: | + Update stage to wait for. + The only option is `accepted`, but this option is required. This is to allow + a future version of the CLI to choose a default value. + enum-values: + - accepted + required: true + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + + - name: temporal workflow execute-update-with-start + summary: Send an Update and wait for it to complete (Experimental) + description: | + Send a message to a Workflow Execution to invoke an Update handler, and wait for + the update to complete. If the Workflow Execution is not running, then a new workflow + execution is started and the update is sent. + + Experimental. + + ``` + temporal workflow execute-update-with-start \ + --update-name YourUpdate \ + --update-input '{"update-key": "update-value"}' \ + --workflow-id YourWorkflowId \ + --type YourWorkflowType \ + --task-queue YourTaskQueue \ + --id-conflict-policy Fail \ + --input '{"wf-key": "wf-value"}' + ``` + + option-sets: + # workflow-id and id-conflict-policy are "required" (runtime checks) + - shared-workflow-start + - workflow-start + - payload-input + options: + - name: update-name + type: string + description: Update name. + required: true + aliases: + - update-type + - name: update-first-execution-run-id + type: string + description: | + Parent Run ID. + The update is sent to the last Workflow Execution in the chain started + with this Run ID. + - name: update-id + type: string + description: | + Update ID. + If unset, defaults to a UUID. + - name: run-id + type: string + short: r + description: | + Run ID. + If unset, looks for an Update against the currently-running Workflow Execution. + - name: update-input + type: string[] + description: | + Update input value. + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input-file. + Can be passed multiple times to pass multiple arguments. + - name: update-input-file + type: string[] + description: | + A path or paths for input file(s). + Use JSON content or set --update-input-meta to override. + Can't be combined with --update-input. + Can be passed multiple times to pass multiple arguments. + - name: update-input-meta + type: string[] + description: | + Input update payload metadata as a `KEY=VALUE` pair. + When the KEY is "encoding", this overrides the default ("json/plain"). + Can be passed multiple times. + - name: update-input-base64 + type: bool + description: | + Assume update inputs are base64-encoded and attempt to decode them. + option-sets: - name: client options: