From 2ebed833aac09d1a702f421c7f8505d6cbed8bd6 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 23 Sep 2024 11:39:56 -0700 Subject: [PATCH 1/2] Batch rate limit --- temporalcli/commands.gen.go | 4 ++ temporalcli/commands.workflow.go | 18 ++++++--- temporalcli/commands.workflow_test.go | 55 +++++++++++++++++++++++---- temporalcli/commandsgen/code.go | 7 ++++ temporalcli/commandsgen/commands.yml | 12 +++++- 5 files changed, 83 insertions(+), 13 deletions(-) diff --git a/temporalcli/commands.gen.go b/temporalcli/commands.gen.go index 9f0a55d9..fbcb7ae1 100644 --- a/temporalcli/commands.gen.go +++ b/temporalcli/commands.gen.go @@ -137,6 +137,7 @@ type SingleWorkflowOrBatchOptions struct { RunId string Reason string Yes bool + Rps float32 } func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) { @@ -145,6 +146,7 @@ func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. Only use with --workflow-id. Cannot use with --query.") f.StringVar(&v.Reason, "reason", "", "Reason for batch operation. Only use with --query. Defaults to user name.") f.BoolVarP(&v.Yes, "yes", "y", false, "Don't prompt to confirm signaling. Only allowed when --query is present.") + f.Float32Var(&v.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.") } type SharedWorkflowStartOptions struct { @@ -2807,6 +2809,7 @@ type TemporalWorkflowTerminateCommand struct { RunId string Reason string Yes bool + Rps float32 } func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTerminateCommand { @@ -2826,6 +2829,7 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. Can only be set with --workflow-id. Do not use with --query.") s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to message with the current user's name.") s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm termination. Can only be used with --query.") + s.Command.Flags().Float32Var(&s.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.") s.Command.Run = func(c *cobra.Command, args []string) { if err := s.run(cctx, args); err != nil { cctx.Options.Fail(err) diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index d9dec350..05f56cf8 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -5,9 +5,10 @@ import ( "encoding/json" "errors" "fmt" - "go.temporal.io/sdk/converter" "os/user" + "go.temporal.io/sdk/converter" + "github.com/fatih/color" "github.com/google/uuid" "github.com/temporalio/cli/temporalcli/internal/printer" @@ -157,6 +158,7 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string) Query: c.Query, Reason: c.Reason, Yes: c.Yes, + Rps: c.Rps, } exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{ @@ -442,11 +444,17 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch( reason = defaultReason() } + // Check rps is used together with query + if s.Rps != 0 && s.Query == "" { + return nil, nil, fmt.Errorf("rps requires query to be set") + } + return nil, &workflowservice.StartBatchOperationRequest{ - Namespace: namespace, - JobId: uuid.NewString(), - VisibilityQuery: s.Query, - Reason: reason, + MaxOperationsPerSecond: s.Rps, + Namespace: namespace, + JobId: uuid.NewString(), + VisibilityQuery: s.Query, + Reason: reason, }, nil } diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index b3164b1f..259c7927 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -1,11 +1,13 @@ package temporalcli_test import ( + "context" "encoding/json" "fmt" "math/rand" "regexp" "strconv" + "sync" "time" "github.com/google/uuid" @@ -13,6 +15,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/workflow" + "google.golang.org/grpc" ) func (s *SharedServerSuite) TestWorkflow_Signal_SingleWorkflowSuccess() { @@ -263,26 +266,63 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess_WithRea } func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess() { - res := s.testTerminateBatchWorkflow(false) + _, res := s.testTerminateBatchWorkflow(5, 0, false) s.Contains(res.Stdout.String(), "approximately 5 workflow(s)") s.Contains(res.Stdout.String(), "Started batch") } -func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccessJSON() { - res := s.testTerminateBatchWorkflow(true) +func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_Ratelimit_MissingQuery() { + res := s.Execute( + "workflow", "terminate", + "--address", s.Address(), + "--rps", "5", + ) + s.Error(res.Err, "must set either workflow ID or query") +} + +func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_Ratelimit() { + var rps float32 = 1 + req, _ := s.testTerminateBatchWorkflow(2, rps, false) + s.Equal(rps, req.MaxOperationsPerSecond) +} + +func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() { + _, res := s.testTerminateBatchWorkflow(5, 0, true) var jsonRes map[string]any s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes)) s.NotEmpty(jsonRes["batchJobId"]) } -func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult { +func (s *SharedServerSuite) testTerminateBatchWorkflow( + total int, + rps float32, + json bool, +) (*workflowservice.StartBatchOperationRequest, *CommandResult) { s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) { ctx.Done().Receive(ctx, nil) return nil, ctx.Err() }) - // Start 5 workflows - runs := make([]client.WorkflowRun, 5) + var lastRequestLock sync.Mutex + var startBatchRequest *workflowservice.StartBatchOperationRequest + s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append( + s.CommandHarness.Options.AdditionalClientGRPCDialOptions, + grpc.WithChainUnaryInterceptor(func( + ctx context.Context, + method string, req, reply any, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, + ) error { + lastRequestLock.Lock() + if r, ok := req.(*workflowservice.StartBatchOperationRequest); ok { + startBatchRequest = r + } + lastRequestLock.Unlock() + return invoker(ctx, method, req, reply, cc, opts...) + }), + ) + + // Start workflows + runs := make([]client.WorkflowRun, total) searchAttr := "keyword-" + uuid.NewString() for i := range runs { run, err := s.Client.ExecuteWorkflow( @@ -310,6 +350,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult // Send batch terminate with a "y" for non-json or "--yes" for json args := []string{ "workflow", "terminate", + "--rps", fmt.Sprint(rps), "--address", s.Address(), "--query", "CustomKeywordField = '" + searchAttr + "'", "--reason", "terminate-test", @@ -338,7 +379,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult } s.True(foundReason) } - return res + return startBatchRequest, res } func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() { diff --git a/temporalcli/commandsgen/code.go b/temporalcli/commandsgen/code.go index 87cc0f0a..c7dd002f 100644 --- a/temporalcli/commandsgen/code.go +++ b/temporalcli/commandsgen/code.go @@ -299,6 +299,8 @@ func (o *Option) writeStructField(w *codeWriter) error { switch o.Type { case "bool", "int", "string": goDataType = o.Type + case "float": + goDataType = "float32" case "duration": goDataType = "Duration" case "timestamp": @@ -342,6 +344,11 @@ func (o *Option) writeFlagBuilding(selfVar, flagVar string, w *codeWriter) error if o.Default == "" { defaultLit = ", 0" } + case "float": + flagMeth, defaultLit = "Float32Var", ", "+o.Default + if o.Default == "" { + defaultLit = ", 0" + } case "string": flagMeth, defaultLit = "StringVar", fmt.Sprintf(", %q", o.Default) case "string[]": diff --git a/temporalcli/commandsgen/commands.yml b/temporalcli/commandsgen/commands.yml index 31131dd6..c36d06ea 100644 --- a/temporalcli/commandsgen/commands.yml +++ b/temporalcli/commandsgen/commands.yml @@ -107,7 +107,7 @@ # option-sets: A list of option sets. (string[]) # * name, summary, and descrption are required fields. All other fields are optional. -# * Available option types are `bool`, `duration`, `int`, `string`, `string[]`, +# * Available option types are `bool`, `duration`, `int`, `float`, `string`, `string[]`, # `string-enum`, or `timestamp`. # * Include a new-line after each command entry. @@ -2630,6 +2630,11 @@ commands: description: | Don't prompt to confirm termination. Can only be used with --query. + - name: rps + type: float + description: | + Limit batch's requests per second. + Only allowed if query is present. - name: temporal workflow trace summary: Workflow Execution live progress @@ -2975,6 +2980,11 @@ option-sets: description: | Don't prompt to confirm signaling. Only allowed when --query is present. + - name: rps + type: float + description: | + Limit batch's requests per second. + Only allowed if query is present. - name: shared-workflow-start options: From 8ba4b82df320197fbe5b8b2054abf08f6153a0e3 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 23 Sep 2024 12:59:27 -0700 Subject: [PATCH 2/2] check rps is used with query --- temporalcli/commands.workflow.go | 7 ++----- temporalcli/commands.workflow_test.go | 10 +++++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/temporalcli/commands.workflow.go b/temporalcli/commands.workflow.go index 05f56cf8..02316aad 100644 --- a/temporalcli/commands.workflow.go +++ b/temporalcli/commands.workflow.go @@ -411,6 +411,8 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch( return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set") } else if s.Yes { return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set") + } else if s.Rps != 0 { + return nil, nil, fmt.Errorf("cannot set rps when workflow ID is set") } return &common.WorkflowExecution{WorkflowId: s.WorkflowId, RunId: s.RunId}, nil, nil } @@ -444,11 +446,6 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch( reason = defaultReason() } - // Check rps is used together with query - if s.Rps != 0 && s.Query == "" { - return nil, nil, fmt.Errorf("rps requires query to be set") - } - return nil, &workflowservice.StartBatchOperationRequest{ MaxOperationsPerSecond: s.Rps, Namespace: namespace, diff --git a/temporalcli/commands.workflow_test.go b/temporalcli/commands.workflow_test.go index 259c7927..8ee07293 100644 --- a/temporalcli/commands.workflow_test.go +++ b/temporalcli/commands.workflow_test.go @@ -277,7 +277,15 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_Ratelimit_Missi "--address", s.Address(), "--rps", "5", ) - s.Error(res.Err, "must set either workflow ID or query") + s.EqualError(res.Err, "must set either workflow ID or query") + + res = s.Execute( + "workflow", "terminate", + "-w", "some-workflow-id", + "--address", s.Address(), + "--rps", "5", + ) + s.EqualError(res.Err, "cannot set rps when workflow ID is set") } func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_Ratelimit() {