diff --git a/cmd/client/del/cmd.go b/cmd/client/del/cmd.go index c9fedd7b..eb034338 100644 --- a/cmd/client/del/cmd.go +++ b/cmd/client/del/cmd.go @@ -29,6 +29,7 @@ var ( type flags struct { expectedVersion int64 + partitionKey string } func (flags *flags) Reset() { @@ -37,6 +38,7 @@ func (flags *flags) Reset() { func init() { Cmd.Flags().Int64VarP(&Config.expectedVersion, "expected-version", "e", -1, "Version of entry expected to be on the server") + Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing") } var Cmd = &cobra.Command{ @@ -60,6 +62,9 @@ func exec(_ *cobra.Command, args []string) error { if Config.expectedVersion >= 0 { options = append(options, oxia.ExpectedVersionId(Config.expectedVersion)) } + if Config.partitionKey != "" { + options = append(options, oxia.PartitionKey(Config.partitionKey)) + } return client.Delete(context.Background(), key, options...) } diff --git a/cmd/client/del/cmd_test.go b/cmd/client/del/cmd_test.go index 4c09672e..e7e477e2 100644 --- a/cmd/client/del/cmd_test.go +++ b/cmd/client/del/cmd_test.go @@ -57,5 +57,16 @@ func TestDelete_exec(t *testing.T) { assert.Error(t, err) assert.Equal(t, "Error: failed to delete", out) + common.MockedClient.On("Delete", "my-key", []oxia.DeleteOption{oxia.PartitionKey("xyz")}).Return(nil) + out, err = runCmd(Cmd, "my-key -p xyz") + assert.NoError(t, err) + assert.Empty(t, out) + + common.MockedClient.On("Delete", "my-key", + []oxia.DeleteOption{oxia.ExpectedVersionId(4), oxia.PartitionKey("xyz")}).Return(nil) + out, err = runCmd(Cmd, "my-key -e 4 -p xyz") + assert.NoError(t, err) + assert.Empty(t, out) + common.MockedClient.AssertExpectations(t) } diff --git a/cmd/client/get/cmd.go b/cmd/client/get/cmd.go index 9235df9b..b70d805a 100644 --- a/cmd/client/get/cmd.go +++ b/cmd/client/get/cmd.go @@ -18,6 +18,10 @@ import ( "context" "time" + "github.com/pkg/errors" + + "github.com/streamnative/oxia/oxia" + "github.com/spf13/cobra" "github.com/streamnative/oxia/cmd/client/common" @@ -31,17 +35,25 @@ type flags struct { key string hexDump bool includeVersion bool + partitionKey string + comparisonType string } func (flags *flags) Reset() { flags.key = "" flags.hexDump = false flags.includeVersion = false + flags.partitionKey = "" + flags.comparisonType = "equal" } func init() { Cmd.Flags().BoolVarP(&Config.includeVersion, "include-version", "v", false, "Include the record version object") Cmd.Flags().BoolVar(&Config.hexDump, "hex", false, "Print the value in HexDump format") + Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing") + + Cmd.Flags().StringVarP(&Config.comparisonType, "comparison-type", "t", "equal", + "The type of get comparison. Allowed value: equal, floor, ceiling, lower, higher") } var Cmd = &cobra.Command{ @@ -58,8 +70,28 @@ func exec(cmd *cobra.Command, args []string) error { return err } + var options []oxia.GetOption + if Config.partitionKey != "" { + options = append(options, oxia.PartitionKey(Config.partitionKey)) + } + + switch Config.comparisonType { + case "equal": + // Nothing to do, this is default + case "floor": + options = append(options, oxia.ComparisonFloor()) + case "ceiling": + options = append(options, oxia.ComparisonCeiling()) + case "lower": + options = append(options, oxia.ComparisonLower()) + case "higher": + options = append(options, oxia.ComparisonHigher()) + default: + return errors.Errorf("invalid comparison type: %s", Config.comparisonType) + } + queryKey := args[0] - key, value, version, err := client.Get(context.Background(), queryKey) + key, value, version, err := client.Get(context.Background(), queryKey, options...) if err != nil { return err } diff --git a/cmd/client/get/cmd_test.go b/cmd/client/get/cmd_test.go index 1f09a622..7dcfe343 100644 --- a/cmd/client/get/cmd_test.go +++ b/cmd/client/get/cmd_test.go @@ -48,6 +48,12 @@ func Test_exec(t *testing.T) { }{ {"key", "x", []any{"x", emptyOptions}, "value-x"}, {"other-key", "y", []any{"y", emptyOptions}, "value-y"}, + {"partition-key", "y -p xyz", []any{"y", []oxia.GetOption{oxia.PartitionKey("xyz")}}, "value-y"}, + {"equal", "y -t equal", []any{"y", emptyOptions}, "value-y"}, + {"floor", "y -t floor", []any{"y", []oxia.GetOption{oxia.ComparisonFloor()}}, "value-y"}, + {"ceiling", "y -t ceiling", []any{"y", []oxia.GetOption{oxia.ComparisonCeiling()}}, "value-y"}, + {"lower", "y -t lower", []any{"y", []oxia.GetOption{oxia.ComparisonLower()}}, "value-y"}, + {"higher", "y -t higher", []any{"y", []oxia.GetOption{oxia.ComparisonHigher()}}, "value-y"}, } { t.Run(test.name, func(t *testing.T) { common.MockedClient = common.NewMockClient() diff --git a/cmd/client/list/cmd.go b/cmd/client/list/cmd.go index 08913b0e..38fb041a 100644 --- a/cmd/client/list/cmd.go +++ b/cmd/client/list/cmd.go @@ -29,8 +29,9 @@ var ( ) type flags struct { - keyMin string - keyMax string + keyMin string + keyMax string + partitionKey string } func (flags *flags) Reset() { @@ -41,6 +42,7 @@ func (flags *flags) Reset() { func init() { Cmd.Flags().StringVarP(&Config.keyMin, "key-min", "s", "", "Key range minimum (inclusive)") Cmd.Flags().StringVarP(&Config.keyMax, "key-max", "e", "", "Key range maximum (exclusive)") + Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing") } var Cmd = &cobra.Command{ @@ -62,6 +64,11 @@ func exec(cmd *cobra.Command, _ []string) error { // By default, do not list internal keys Config.keyMax = "__oxia/" } + + if Config.partitionKey != "" { + options = append(options, oxia.PartitionKey(Config.partitionKey)) + } + list, err := client.List(context.Background(), Config.keyMin, Config.keyMax, options...) if err != nil { return err diff --git a/cmd/client/list/cmd_test.go b/cmd/client/list/cmd_test.go index 9a4f16db..64525172 100644 --- a/cmd/client/list/cmd_test.go +++ b/cmd/client/list/cmd_test.go @@ -50,6 +50,7 @@ func TestList_exec(t *testing.T) { {"short", "-s a -e c", []any{"a", "c", emptyOptions}}, {"range-no-min", "--key-max c", []any{"", "c", emptyOptions}}, {"range-no-max", "--key-min a", []any{"a", "__oxia/", emptyOptions}}, + {"partition-key", "-s a -e c -p xyz", []any{"a", "c", []oxia.ListOption{oxia.PartitionKey("xyz")}}}, } { t.Run(test.name, func(t *testing.T) { common.MockedClient = common.NewMockClient() diff --git a/cmd/client/put/cmd.go b/cmd/client/put/cmd.go index 01821b69..1995c452 100644 --- a/cmd/client/put/cmd.go +++ b/cmd/client/put/cmd.go @@ -34,16 +34,22 @@ var ( type flags struct { expectedVersion int64 readValueFromStdIn bool + partitionKey string + sequenceKeysDeltas []int64 } func (flags *flags) Reset() { flags.expectedVersion = -1 flags.readValueFromStdIn = false + flags.partitionKey = "" + flags.sequenceKeysDeltas = nil } func init() { Cmd.Flags().Int64VarP(&Config.expectedVersion, "expected-version", "e", -1, "Version of entry expected to be on the server") Cmd.Flags().BoolVarP(&Config.readValueFromStdIn, "std-in", "c", false, "Read value from stdin") + Cmd.Flags().StringVarP(&Config.partitionKey, "partition-key", "p", "", "Partition Key to be used in override the shard routing") + Cmd.Flags().Int64SliceVarP(&Config.sequenceKeysDeltas, "sequence-keys-deltas", "d", nil, "Specify one or more sequence keys deltas to be added to the inserted key") } var Cmd = &cobra.Command{ @@ -77,12 +83,7 @@ func exec(cmd *cobra.Command, args []string) error { } } - var options []oxia.PutOption - if Config.expectedVersion != -1 { - options = append(options, oxia.ExpectedVersionId(Config.expectedVersion)) - } - - key, version, err := client.Put(context.Background(), key, value, options...) + key, version, err := client.Put(context.Background(), key, value, getOptions()...) if err != nil { return err } @@ -98,3 +99,22 @@ func exec(cmd *cobra.Command, args []string) error { }) return nil } + +func getOptions() []oxia.PutOption { + var options []oxia.PutOption + if Config.expectedVersion != -1 { + options = append(options, oxia.ExpectedVersionId(Config.expectedVersion)) + } + if Config.partitionKey != "" { + options = append(options, oxia.PartitionKey(Config.partitionKey)) + } + if len(Config.sequenceKeysDeltas) > 0 { + deltas := make([]uint64, len(Config.sequenceKeysDeltas)) + for i := range Config.sequenceKeysDeltas { + deltas[i] = uint64(Config.sequenceKeysDeltas[i]) + } + options = append(options, oxia.SequenceKeysDeltas(deltas...)) + } + + return options +} diff --git a/cmd/client/put/cmd_test.go b/cmd/client/put/cmd_test.go index 24b37ad5..39795480 100644 --- a/cmd/client/put/cmd_test.go +++ b/cmd/client/put/cmd_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/spf13/cobra" + "github.com/streamnative/oxia/cmd/client/common" "github.com/streamnative/oxia/oxia" ) @@ -46,6 +47,9 @@ func TestPut_exec(t *testing.T) { {"entry", "x y", "", []any{"x", []byte("y"), emptyOptions}}, {"entry-expected-version", "x y -e 5", "", []any{"x", []byte("y"), []oxia.PutOption{oxia.ExpectedVersionId(5)}}}, {"stdin", "x -c -e 5", "my-value", []any{"x", []byte("my-value"), []oxia.PutOption{oxia.ExpectedVersionId(5)}}}, + {"partition-key", "x y -p abc", "", []any{"x", []byte("y"), []oxia.PutOption{oxia.PartitionKey("abc")}}}, + {"sequence-keys", "x y -p abc -d 1,2,3", "", []any{"x", []byte("y"), + []oxia.PutOption{oxia.PartitionKey("abc"), oxia.SequenceKeysDeltas(1, 2, 3)}}}, } { t.Run(test.name, func(t *testing.T) { common.MockedClient = common.NewMockClient()