Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Batch rate limit #675

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type SingleWorkflowOrBatchOptions struct {
RunId string
Reason string
Yes bool
Rps float32
}

func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
Expand All @@ -145,6 +146,7 @@ func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag
f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. Only use with --workflow-id. Cannot use with --query.")
f.StringVar(&v.Reason, "reason", "", "Reason for batch operation. Only use with --query. Defaults to user name.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Don't prompt to confirm signaling. Only allowed when --query is present.")
f.Float32Var(&v.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.")
}

type SharedWorkflowStartOptions struct {
Expand Down Expand Up @@ -2807,6 +2809,7 @@ type TemporalWorkflowTerminateCommand struct {
RunId string
Reason string
Yes bool
Rps float32
}

func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTerminateCommand {
Expand All @@ -2826,6 +2829,7 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. Can only be set with --workflow-id. Do not use with --query.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to message with the current user's name.")
s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm termination. Can only be used with --query.")
s.Command.Flags().Float32Var(&s.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
15 changes: 10 additions & 5 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/json"
"errors"
"fmt"
"go.temporal.io/sdk/converter"
"os/user"

"go.temporal.io/sdk/converter"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
Expand Down Expand Up @@ -157,6 +158,7 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string)
Query: c.Query,
Reason: c.Reason,
Yes: c.Yes,
Rps: c.Rps,
}

exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{
Expand Down Expand Up @@ -409,6 +411,8 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
return nil, nil, fmt.Errorf("cannot set reason when workflow ID is set")
} else if s.Yes {
return nil, nil, fmt.Errorf("cannot set 'yes' when workflow ID is set")
} else if s.Rps != 0 {
return nil, nil, fmt.Errorf("cannot set rps when workflow ID is set")
}
return &common.WorkflowExecution{WorkflowId: s.WorkflowId, RunId: s.RunId}, nil, nil
}
Expand Down Expand Up @@ -443,10 +447,11 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
}

return nil, &workflowservice.StartBatchOperationRequest{
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: reason,
MaxOperationsPerSecond: s.Rps,
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: reason,
}, nil
}

Expand Down
63 changes: 56 additions & 7 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package temporalcli_test

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"regexp"
"strconv"
"sync"
"time"

"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"google.golang.org/grpc"
)

func (s *SharedServerSuite) TestWorkflow_Signal_SingleWorkflowSuccess() {
Expand Down Expand Up @@ -263,26 +266,71 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess_WithRea
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess() {
res := s.testTerminateBatchWorkflow(false)
_, res := s.testTerminateBatchWorkflow(5, 0, false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.Contains(res.Stdout.String(), "Started batch")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccessJSON() {
res := s.testTerminateBatchWorkflow(true)
func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_Ratelimit_MissingQuery() {
res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"--rps", "5",
)
s.EqualError(res.Err, "must set either workflow ID or query")

res = s.Execute(
"workflow", "terminate",
"-w", "some-workflow-id",
"--address", s.Address(),
"--rps", "5",
)
s.EqualError(res.Err, "cannot set rps when workflow ID is set")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_Ratelimit() {
var rps float32 = 1
req, _ := s.testTerminateBatchWorkflow(2, rps, false)
s.Equal(rps, req.MaxOperationsPerSecond)
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() {
_, res := s.testTerminateBatchWorkflow(5, 0, true)
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult {
func (s *SharedServerSuite) testTerminateBatchWorkflow(
total int,
rps float32,
json bool,
) (*workflowservice.StartBatchOperationRequest, *CommandResult) {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
var lastRequestLock sync.Mutex
var startBatchRequest *workflowservice.StartBatchOperationRequest
s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append(
s.CommandHarness.Options.AdditionalClientGRPCDialOptions,
grpc.WithChainUnaryInterceptor(func(
ctx context.Context,
method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
lastRequestLock.Lock()
if r, ok := req.(*workflowservice.StartBatchOperationRequest); ok {
startBatchRequest = r
}
lastRequestLock.Unlock()
return invoker(ctx, method, req, reply, cc, opts...)
}),
)

// Start workflows
runs := make([]client.WorkflowRun, total)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
Expand Down Expand Up @@ -310,6 +358,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult
// Send batch terminate with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "terminate",
"--rps", fmt.Sprint(rps),
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "terminate-test",
Expand Down Expand Up @@ -338,7 +387,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult
}
s.True(foundReason)
}
return res
return startBatchRequest, res
}

func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() {
Expand Down
7 changes: 7 additions & 0 deletions temporalcli/commandsgen/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ func (o *Option) writeStructField(w *codeWriter) error {
switch o.Type {
case "bool", "int", "string":
goDataType = o.Type
case "float":
goDataType = "float32"
case "duration":
goDataType = "Duration"
case "timestamp":
Expand Down Expand Up @@ -342,6 +344,11 @@ func (o *Option) writeFlagBuilding(selfVar, flagVar string, w *codeWriter) error
if o.Default == "" {
defaultLit = ", 0"
}
case "float":
flagMeth, defaultLit = "Float32Var", ", "+o.Default
if o.Default == "" {
defaultLit = ", 0"
}
case "string":
flagMeth, defaultLit = "StringVar", fmt.Sprintf(", %q", o.Default)
case "string[]":
Expand Down
12 changes: 11 additions & 1 deletion temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
# option-sets: A list of option sets. (string[])

# * name, summary, and descrption are required fields. All other fields are optional.
# * Available option types are `bool`, `duration`, `int`, `string`, `string[]`,
# * Available option types are `bool`, `duration`, `int`, `float`, `string`, `string[]`,
# `string-enum`, or `timestamp`.
# * Include a new-line after each command entry.

Expand Down Expand Up @@ -2630,6 +2630,11 @@ commands:
description: |
Don't prompt to confirm termination.
Can only be used with --query.
- name: rps
type: float
description: |
Limit batch's requests per second.
Only allowed if query is present.

- name: temporal workflow trace
summary: Workflow Execution live progress
Expand Down Expand Up @@ -2975,6 +2980,11 @@ option-sets:
description: |
Don't prompt to confirm signaling.
Only allowed when --query is present.
- name: rps
type: float
description: |
Limit batch's requests per second.
Only allowed if query is present.

- name: shared-workflow-start
options:
Expand Down
Loading