Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

added workflow start-update-with-start and workflow execute-update-with-start commands #762

Merged
merged 4 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowDeleteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowDescribeCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowExecuteCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowFixHistoryJsonCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowListCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowMetadataCommand(cctx, &s).Command)
Expand All @@ -2870,6 +2871,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartUpdateWithStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command)
Expand Down Expand Up @@ -3026,6 +3028,58 @@ func NewTemporalWorkflowExecuteCommand(cctx *CommandContext, parent *TemporalWor
return &s
}

type TemporalWorkflowExecuteUpdateWithStartCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
UpdateName string
UpdateFirstExecutionRunId string
UpdateId string
RunId string
UpdateInput []string
UpdateInputFile []string
UpdateInputMeta []string
UpdateInputBase64 bool
}

func NewTemporalWorkflowExecuteUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowExecuteUpdateWithStartCommand {
var s TemporalWorkflowExecuteUpdateWithStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "execute-update-with-start [flags]"
s.Command.Short = "Send an Update and wait for it to complete (Experimental)"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to complete. If the Workflow Execution is not running, then a new workflow\nexecution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow execute-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name")
s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.")
s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.")
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
"update-type": "update-name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowFixHistoryJsonCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down Expand Up @@ -3407,6 +3461,62 @@ func NewTemporalWorkflowStartCommand(cctx *CommandContext, parent *TemporalWorkf
return &s
}

type TemporalWorkflowStartUpdateWithStartCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
SharedWorkflowStartOptions
WorkflowStartOptions
PayloadInputOptions
UpdateName string
UpdateFirstExecutionRunId string
UpdateWaitForStage StringEnum
UpdateId string
RunId string
UpdateInput []string
UpdateInputFile []string
UpdateInputMeta []string
UpdateInputBase64 bool
}

func NewTemporalWorkflowStartUpdateWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowStartUpdateWithStartCommand {
var s TemporalWorkflowStartUpdateWithStartCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "start-update-with-start [flags]"
s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n\x1b[1mtemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. If the Workflow Execution is not running, \nthen a new workflow execution is started and the update is sent.\n\nExperimental.\n\n```\ntemporal workflow start-update-with-start \\\n --update-name YourUpdate \\\n --update-input '{\"update-key\": \"update-value\"}' \\\n --update-wait-for-stage accepted \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --id-conflict-policy Fail \\\n --input '{\"wf-key\": \"wf-value\"}'\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.UpdateName, "update-name", "", "Update name. Required. Aliased as \"--update-type\".")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "update-name")
s.Command.Flags().StringVar(&s.UpdateFirstExecutionRunId, "update-first-execution-run-id", "", "Parent Run ID. The update is sent to the last Workflow Execution in the chain started with this Run ID.")
s.UpdateWaitForStage = NewStringEnum([]string{"accepted"}, "")
s.Command.Flags().Var(&s.UpdateWaitForStage, "update-wait-for-stage", "Update stage to wait for. The only option is `accepted`, but this option is required. This is to allow a future version of the CLI to choose a default value. Accepted values: accepted. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "update-wait-for-stage")
s.Command.Flags().StringVar(&s.UpdateId, "update-id", "", "Update ID. If unset, defaults to a UUID.")
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. If unset, looks for an Update against the currently-running Workflow Execution.")
s.Command.Flags().StringArrayVar(&s.UpdateInput, "update-input", nil, "Update input value. Use JSON content or set --update-input-meta to override. Can't be combined with --update-input-file. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.UpdateInputFile, "update-input-file", nil, "A path or paths for input file(s). Use JSON content or set --update-input-meta to override. Can't be combined with --update-input. Can be passed multiple times to pass multiple arguments.")
s.Command.Flags().StringArrayVar(&s.UpdateInputMeta, "update-input-meta", nil, "Input update payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
s.Command.Flags().BoolVar(&s.UpdateInputBase64, "update-input-base64", false, "Assume update inputs are base64-encoded and attempt to decode them.")
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
"name": "type",
"update-type": "update-name",
}))
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowTerminateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down Expand Up @@ -3599,9 +3709,9 @@ func NewTemporalWorkflowUpdateStartCommand(cctx *CommandContext, parent *Tempora
s.Command.Use = "start [flags]"
s.Command.Short = "Send an Update and wait for it to be accepted or rejected (Experimental)"
if hasHighlighting {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using \x1b[1mtemporal workflow update execute\x1b[0m.\n\nExperimental.\n\n\x1b[1mtemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\x1b[0m"
} else {
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n```"
s.Command.Long = "Send a message to a Workflow Execution to invoke an Update handler, and wait for\nthe update to be accepted or rejected. You can subsequently wait for the update\nto complete by using `temporal workflow update execute`.\n\nExperimental.\n\n```\ntemporal workflow update start \\\n --workflow-id YourWorkflowId \\\n --name YourUpdate \\\n --input '{\"some-key\": \"some-value\"}'\n --wait-for-stage accepted\n```"
}
s.Command.Args = cobra.NoArgs
s.WaitForStage = NewStringEnum([]string{"accepted"}, "")
Expand Down
182 changes: 179 additions & 3 deletions temporalcli/commands.workflow_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s
signalPayloadInputOpts := PayloadInputOptions{
Input: c.SignalInput,
InputFile: c.SignalInputFile,
InputMeta: c.InputMeta,
InputMeta: c.SignalInputMeta,
InputBase64: c.SignalInputBase64,
}
signalInput, err := signalPayloadInputOpts.buildRawInputPayloads()
Expand Down Expand Up @@ -194,16 +195,191 @@ func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []s
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
TaskQueue string `json:"taskQueue"`
}{
WorkflowId: c.WorkflowId,
RunId: resp.RunId,
Type: c.Type,
Namespace: c.Parent.Namespace,
TaskQueue: c.TaskQueue,
}, printer.StructuredOptions{})
}

func (c *TemporalWorkflowStartUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error {
waitForStage := client.WorkflowUpdateStageUnspecified
switch c.UpdateWaitForStage.Value {
case "accepted":
waitForStage = client.WorkflowUpdateStageAccepted
}
if waitForStage != client.WorkflowUpdateStageAccepted {
return fmt.Errorf("invalid wait for stage: %v, valid values are: 'accepted'", c.UpdateWaitForStage)
}

updatePayloadInputOpts := PayloadInputOptions{
Input: c.UpdateInput,
InputFile: c.UpdateInputFile,
InputMeta: c.UpdateInputMeta,
InputBase64: c.UpdateInputBase64,
}
updateInput, err := updatePayloadInputOpts.buildRawInput()
if err != nil {
return err
}
updateOpts := client.UpdateWorkflowOptions{
UpdateID: c.UpdateId,
WorkflowID: c.WorkflowId,
RunID: c.RunId,
UpdateName: c.UpdateName,
Args: updateInput,
WaitForStage: waitForStage,
FirstExecutionRunID: c.UpdateFirstExecutionRunId,
}

handle, err := executeUpdateWithStartWorkflow(
cctx,
c.Parent.ClientOptions,
c.SharedWorkflowStartOptions,
c.WorkflowStartOptions,
c.PayloadInputOptions,
updateOpts,
)
if err != nil {
return err
}

// Currently we only accept 'accepted' as a valid wait for stage value, but we intend
// to support more in the future.
if waitForStage == client.WorkflowUpdateStageAccepted {
// Use a canceled context to check whether the initial server response
// shows that the update has _already_ failed, without issuing a second request.
ctx, cancel := context.WithCancel(cctx)
cancel()
err = handle.Get(ctx, nil)
var timeoutOrCanceledErr *client.WorkflowUpdateServiceTimeoutOrCanceledError
if err != nil && !errors.As(err, &timeoutOrCanceledErr) {
return fmt.Errorf("unable to update workflow: %w", err)
}
}
Comment on lines +248 to +260
Copy link
Member

@cretz cretz Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, I don't think UpdateWithStartWorkflow will return until it is at least accepted or rejected, and if there is some error, it will be reflected in the error to that call. @dandavison - is this correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example is sending a request for an update name that does not exist. err is nil, but the update is not accepted, and handle contains within it an error, hence this code (which is also present in CLI update impl) to extract that error without making a network request.

Copy link
Member

@cretz cretz Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, there is "accepted but failed". We should consider adding a Done() bool to WorkflowUpdateHandle for this, but in the meantime, all good here.

(EDIT: Found we do have an issue at temporalio/features#428)


cctx.Printer.Println(color.MagentaString("Running execution:"))
return cctx.Printer.PrintStructured(struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
UpdateName string `json:"updateName"`
UpdateID string `json:"updateId"`
}{
WorkflowId: c.WorkflowId,
RunId: handle.RunID(),
Type: c.Type,
Namespace: c.Parent.Namespace,
UpdateName: c.UpdateName,
UpdateID: handle.UpdateID(),
}, printer.StructuredOptions{})
}

func (c *TemporalWorkflowExecuteUpdateWithStartCommand) run(cctx *CommandContext, _ []string) error {
updatePayloadInputOpts := PayloadInputOptions{
Input: c.UpdateInput,
InputFile: c.UpdateInputFile,
InputMeta: c.UpdateInputMeta,
InputBase64: c.UpdateInputBase64,
}
updateInput, err := updatePayloadInputOpts.buildRawInput()
if err != nil {
return err
}

updateOpts := client.UpdateWorkflowOptions{
UpdateName: c.UpdateName,
UpdateID: c.UpdateId,
WorkflowID: c.WorkflowId,
RunID: c.RunId,
Args: updateInput,
WaitForStage: client.WorkflowUpdateStageCompleted,
FirstExecutionRunID: c.UpdateFirstExecutionRunId,
}

handle, err := executeUpdateWithStartWorkflow(
cctx,
c.Parent.ClientOptions,
c.SharedWorkflowStartOptions,
c.WorkflowStartOptions,
c.PayloadInputOptions,
updateOpts,
)
if err != nil {
return err
}

var valuePtr interface{}
err = handle.Get(cctx, &valuePtr)
if err != nil {
return fmt.Errorf("unable to update workflow: %w", err)
}
Comment on lines +314 to +318
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of a problem with this approach and the one taken by temporal workflow update execute. This presumes that the result can be JSON decoded, but many people use protos or other formats. On user-defined results we need to support all payloads (gracefully using JSON where we can). I believe this is the first time we have needed to get a raw payload from a high-level client call to display to a user.

I think we need to add support for rawValuePayloadConverter.FromPayload in client.go, then change the code in this PR to be something like:

var raw RawValue
if err = handle.Get(cctx, raw); err != nil {
	return fmt.Errorf("unable to update workflow: %w", err)
}
result, err := cctx.MarshalFriendlyJSONPayloads(raw.Payload)

And have Result in the structure be a json.RawMessage. Same for temporal workflow update. And confirm it looks right even with -o json. Thoughts @THardy98 and @dandavison?


cctx.Printer.Println(color.MagentaString("Running execution:"))
return cctx.Printer.PrintStructured(struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Type string `json:"type"`
Namespace string `json:"namespace"`
UpdateName string `json:"updateName"`
UpdateID string `json:"updateId"`
UpdateResult interface{} `json:"updateResult"`
}{
WorkflowId: c.WorkflowId,
RunId: handle.RunID(),
Type: c.Type,
Namespace: c.Parent.Namespace,
UpdateName: c.UpdateName,
UpdateID: c.UpdateId,
UpdateResult: valuePtr,
}, printer.StructuredOptions{})
}

func executeUpdateWithStartWorkflow(
cctx *CommandContext,
clientOpts ClientOptions,
sharedWfOpts SharedWorkflowStartOptions,
wfStartOpts WorkflowStartOptions,
wfInputOpts PayloadInputOptions,
updateWfOpts client.UpdateWorkflowOptions,
) (client.WorkflowUpdateHandle, error) {
if sharedWfOpts.WorkflowId == "" {
return nil, fmt.Errorf("--workflow-id flag must be provided")
}
if wfStartOpts.IdConflictPolicy.Value == "" {
return nil, fmt.Errorf("--id-conflict-policy flag must be provided")
}
cl, err := clientOpts.dialClient(cctx)
if err != nil {
return nil, err
}
defer cl.Close()

clStartWfOpts, err := buildStartOptions(&sharedWfOpts, &wfStartOpts)
if err != nil {
return nil, err
}

wfArgs, err := wfInputOpts.buildRawInput()
if err != nil {
return nil, err
}

startOp := cl.NewWithStartWorkflowOperation(
clStartWfOpts,
sharedWfOpts.Type,
wfArgs...,
)

// Execute the update with start operation
return cl.UpdateWithStartWorkflow(cctx, client.UpdateWithStartWorkflowOptions{
StartWorkflowOperation: startOp,
UpdateOptions: updateWfOpts,
})
}

type workflowJSONResult struct {
WorkflowId string `json:"workflowId"`
RunId string `json:"runId"`
Expand Down
Loading