Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Batch rate limit #675

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type SingleWorkflowOrBatchOptions struct {
RunId string
Reason string
Yes bool
Rps float32
}

func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
Expand All @@ -145,6 +146,7 @@ func (v *SingleWorkflowOrBatchOptions) buildFlags(cctx *CommandContext, f *pflag
f.StringVarP(&v.RunId, "run-id", "r", "", "Run ID. Only use with --workflow-id. Cannot use with --query.")
f.StringVar(&v.Reason, "reason", "", "Reason for batch operation. Only use with --query. Defaults to user name.")
f.BoolVarP(&v.Yes, "yes", "y", false, "Don't prompt to confirm signaling. Only allowed when --query is present.")
f.Float32Var(&v.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.")
}

type SharedWorkflowStartOptions struct {
Expand Down Expand Up @@ -2807,6 +2809,7 @@ type TemporalWorkflowTerminateCommand struct {
RunId string
Reason string
Yes bool
Rps float32
}

func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTerminateCommand {
Expand All @@ -2826,6 +2829,7 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW
s.Command.Flags().StringVarP(&s.RunId, "run-id", "r", "", "Run ID. Can only be set with --workflow-id. Do not use with --query.")
s.Command.Flags().StringVar(&s.Reason, "reason", "", "Reason for termination. Defaults to message with the current user's name.")
s.Command.Flags().BoolVarP(&s.Yes, "yes", "y", false, "Don't prompt to confirm termination. Can only be used with --query.")
s.Command.Flags().Float32Var(&s.Rps, "rps", 0, "Limit batch's requests per second. Only allowed if query is present.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down
18 changes: 13 additions & 5 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/json"
"errors"
"fmt"
"go.temporal.io/sdk/converter"
"os/user"

"go.temporal.io/sdk/converter"

"github.com/fatih/color"
"github.com/google/uuid"
"github.com/temporalio/cli/temporalcli/internal/printer"
Expand Down Expand Up @@ -157,6 +158,7 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string)
Query: c.Query,
Reason: c.Reason,
Yes: c.Yes,
Rps: c.Rps,
}

exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{
Expand Down Expand Up @@ -442,11 +444,17 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
reason = defaultReason()
}

// Check rps is used together with query
if s.Rps != 0 && s.Query == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition will never be true down here (Query is always non-empty down here, it's checked above)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout 👍 fixed!

return nil, nil, fmt.Errorf("rps requires query to be set")
}

return nil, &workflowservice.StartBatchOperationRequest{
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: reason,
MaxOperationsPerSecond: s.Rps,
Namespace: namespace,
JobId: uuid.NewString(),
VisibilityQuery: s.Query,
Reason: reason,
}, nil
}

Expand Down
55 changes: 48 additions & 7 deletions temporalcli/commands.workflow_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package temporalcli_test

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"regexp"
"strconv"
"sync"
"time"

"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"google.golang.org/grpc"
)

func (s *SharedServerSuite) TestWorkflow_Signal_SingleWorkflowSuccess() {
Expand Down Expand Up @@ -263,26 +266,63 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_SingleWorkflowSuccess_WithRea
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess() {
res := s.testTerminateBatchWorkflow(false)
_, res := s.testTerminateBatchWorkflow(5, 0, false)
s.Contains(res.Stdout.String(), "approximately 5 workflow(s)")
s.Contains(res.Stdout.String(), "Started batch")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccessJSON() {
res := s.testTerminateBatchWorkflow(true)
func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_Ratelimit_MissingQuery() {
res := s.Execute(
"workflow", "terminate",
"--address", s.Address(),
"--rps", "5",
)
s.Error(res.Err, "must set either workflow ID or query")
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_Ratelimit() {
var rps float32 = 1
req, _ := s.testTerminateBatchWorkflow(2, rps, false)
s.Equal(rps, req.MaxOperationsPerSecond)
}

func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() {
_, res := s.testTerminateBatchWorkflow(5, 0, true)
var jsonRes map[string]any
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonRes))
s.NotEmpty(jsonRes["batchJobId"])
}

func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult {
func (s *SharedServerSuite) testTerminateBatchWorkflow(
total int,
rps float32,
json bool,
) (*workflowservice.StartBatchOperationRequest, *CommandResult) {
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
ctx.Done().Receive(ctx, nil)
return nil, ctx.Err()
})

// Start 5 workflows
runs := make([]client.WorkflowRun, 5)
var lastRequestLock sync.Mutex
var startBatchRequest *workflowservice.StartBatchOperationRequest
s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append(
s.CommandHarness.Options.AdditionalClientGRPCDialOptions,
grpc.WithChainUnaryInterceptor(func(
ctx context.Context,
method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) error {
lastRequestLock.Lock()
if r, ok := req.(*workflowservice.StartBatchOperationRequest); ok {
startBatchRequest = r
}
lastRequestLock.Unlock()
return invoker(ctx, method, req, reply, cc, opts...)
}),
)

// Start workflows
runs := make([]client.WorkflowRun, total)
searchAttr := "keyword-" + uuid.NewString()
for i := range runs {
run, err := s.Client.ExecuteWorkflow(
Expand Down Expand Up @@ -310,6 +350,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult
// Send batch terminate with a "y" for non-json or "--yes" for json
args := []string{
"workflow", "terminate",
"--rps", fmt.Sprint(rps),
"--address", s.Address(),
"--query", "CustomKeywordField = '" + searchAttr + "'",
"--reason", "terminate-test",
Expand Down Expand Up @@ -338,7 +379,7 @@ func (s *SharedServerSuite) testTerminateBatchWorkflow(json bool) *CommandResult
}
s.True(foundReason)
}
return res
return startBatchRequest, res
}

func (s *SharedServerSuite) TestWorkflow_Cancel_SingleWorkflowSuccess() {
Expand Down
7 changes: 7 additions & 0 deletions temporalcli/commandsgen/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ func (o *Option) writeStructField(w *codeWriter) error {
switch o.Type {
case "bool", "int", "string":
goDataType = o.Type
case "float":
goDataType = "float32"
case "duration":
goDataType = "Duration"
case "timestamp":
Expand Down Expand Up @@ -342,6 +344,11 @@ func (o *Option) writeFlagBuilding(selfVar, flagVar string, w *codeWriter) error
if o.Default == "" {
defaultLit = ", 0"
}
case "float":
flagMeth, defaultLit = "Float32Var", ", "+o.Default
if o.Default == "" {
defaultLit = ", 0"
}
case "string":
flagMeth, defaultLit = "StringVar", fmt.Sprintf(", %q", o.Default)
case "string[]":
Expand Down
12 changes: 11 additions & 1 deletion temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
# option-sets: A list of option sets. (string[])

# * name, summary, and descrption are required fields. All other fields are optional.
# * Available option types are `bool`, `duration`, `int`, `string`, `string[]`,
# * Available option types are `bool`, `duration`, `int`, `float`, `string`, `string[]`,
# `string-enum`, or `timestamp`.
# * Include a new-line after each command entry.

Expand Down Expand Up @@ -2630,6 +2630,11 @@ commands:
description: |
Don't prompt to confirm termination.
Can only be used with --query.
- name: rps
type: float
description: |
Limit batch's requests per second.
Only allowed if query is present.

- name: temporal workflow trace
summary: Workflow Execution live progress
Expand Down Expand Up @@ -2975,6 +2980,11 @@ option-sets:
description: |
Don't prompt to confirm signaling.
Only allowed when --query is present.
- name: rps
type: float
description: |
Limit batch's requests per second.
Only allowed if query is present.

- name: shared-workflow-start
options:
Expand Down
Loading