Skip to content

Commit

Permalink
Allow gRPC cluster validation configuration (#650)
Browse files Browse the repository at this point in the history
* Allow gRPC cluster validation configuration

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Updating CHANGELOG.md

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fix failing tests

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic authored Feb 27, 2025
1 parent 14a4665 commit f61fc86
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 2 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@
* [FEATURE] Add methods `Increment`, `FlushAll`, `CompareAndSwap`, `Touch` to `cache.MemcachedClient` #477
* [FEATURE] Add `concurrency.ForEachJobMergeResults()` utility function. #486
* [FEATURE] Add `ring.DoMultiUntilQuorumWithoutSuccessfulContextCancellation()`. #495
* [FEATURE] Add `middleware.ClusterUnaryClientInterceptor`, a `grpc.UnaryClientInterceptor` that propagates a cluster info to the outgoing gRPC metadata. #640
* [FEATURE] Add `middleware.ClusterUnaryServerInterceptor`, a `grpc.UnaryServerInterceptor` that checks if the incoming gRPC metadata contains a correct cluster info, and returns an error if it is not the case. #640
* [FEATURE] Add `middleware.ClusterUnaryClientInterceptor`, a `grpc.UnaryClientInterceptor` that propagates a cluster info to the outgoing gRPC metadata. #640 #648 #649
* [FEATURE] Add `middleware.ClusterUnaryServerInterceptor`, a `grpc.UnaryServerInterceptor` that checks if the incoming gRPC metadata contains a correct cluster info, and returns an error if it is not the case. #640 #648 #649
* [FEATURE] Add support for adding `middleware.ClusterUnaryServerInterceptor` as `server.Server` unary interceptor via the following configuration options: #650
* `-server.cluster-validation.label`
* `-server.cluster-validation.grpc.soft-validation`
* `-server.cluster-validation.grpc.enabled`
* [FEATURE] Add `ring.GetWithOptions()` method to support additional features at a per-call level. #632
* [ENHANCEMENT] Add option to hide token information in ring status page #633
* [ENHANCEMENT] Display token information in partition ring status page #631
Expand Down
47 changes: 47 additions & 0 deletions clusterutil/cluster_validation_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package clusterutil

import (
"flag"
"fmt"
)

type ClusterValidationConfig struct {
Label string
GRPC ClusterValidationProtocolConfig
}

type ClusterValidationProtocolConfig struct {
Enabled bool
SoftValidation bool
}

func (cfg *ClusterValidationConfig) Validate() error {
return cfg.GRPC.Validate("grpc", cfg.Label)
}

func (cfg *ClusterValidationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
clusterValidationPrefix := prefix + ".cluster-validation"
f.StringVar(&cfg.Label, clusterValidationPrefix+".label", "", "Optionally define server's cluster validation label.")
cfg.GRPC.RegisterFlagsWithPrefix(clusterValidationPrefix+".grpc", f)
}

func (cfg *ClusterValidationProtocolConfig) Validate(prefix string, label string) error {
if label == "" {
if cfg.Enabled || cfg.SoftValidation {
return fmt.Errorf("%s: validation cannot be enabled if cluster validation label is not configured", prefix)
}
return nil
}

if !cfg.Enabled && cfg.SoftValidation {
return fmt.Errorf("%s: soft validation can be enabled only if cluster validation is enabled", prefix)
}
return nil
}

func (cfg *ClusterValidationProtocolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
softValidationFlag := prefix + ".soft-validation"
enabledFlag := prefix + ".enabled"
f.BoolVar(&cfg.SoftValidation, softValidationFlag, false, fmt.Sprintf("When enabled, soft cluster label validation will be executed. Can be enabled only together with %s", enabledFlag))
f.BoolVar(&cfg.Enabled, enabledFlag, false, "When enabled, cluster label validation will be executed.")
}
62 changes: 62 additions & 0 deletions clusterutil/cluster_validation_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package clusterutil

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestClusterValidationProtocolConfigValidate(t *testing.T) {
testCases := map[string]struct {
label string
enabled bool
softValidation bool
expectedErr error
}{
"soft validation cannot be done if cluster validation label is not set": {
softValidation: true,
expectedErr: fmt.Errorf("testProtocol: validation cannot be enabled if cluster validation label is not configured"),
},
"cluster validation cannot be done if cluster validation label is not set": {
enabled: true,
expectedErr: fmt.Errorf("testProtocol: validation cannot be enabled if cluster validation label is not configured"),
},
"cluster validation and soft validation can be disabled if cluster validation label is not set": {
label: "",
enabled: false,
softValidation: false,
},
"cluster validation and soft validation can be disabled if cluster validation label is set": {
label: "my-cluster",
enabled: false,
softValidation: false,
},
"soft validation cannot be enabled if cluster validation is disabled": {
label: "my-cluster",
enabled: false,
softValidation: true,
expectedErr: fmt.Errorf("testProtocol: soft validation can be enabled only if cluster validation is enabled"),
},
"soft validation can be disabled if cluster validation is enabled": {
label: "my-cluster",
enabled: true,
softValidation: false,
},
"cluster validation and soft validation can be enabled at the same time": {
label: "my-cluster",
enabled: true,
softValidation: true,
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
testProtocolCfg := ClusterValidationProtocolConfig{
Enabled: testCase.enabled,
SoftValidation: testCase.softValidation,
}
err := testProtocolCfg.Validate("testProtocol", testCase.label)
require.Equal(t, testCase.expectedErr, err)
})
}
}
11 changes: 11 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/httpgrpc"
httpgrpc_server "github.com/grafana/dskit/httpgrpc/server"
"github.com/grafana/dskit/log"
Expand Down Expand Up @@ -155,6 +156,8 @@ type Config struct {
GrpcMethodLimiter GrpcInflightMethodLimiter `yaml:"-"`

Throughput Throughput `yaml:"-"`

ClusterValidation clusterutil.ClusterValidationConfig `yaml:"cluster_validation"`
}

type Throughput struct {
Expand Down Expand Up @@ -218,6 +221,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ProxyProtocolEnabled, "server.proxy-protocol-enabled", false, "Enables PROXY protocol.")
f.DurationVar(&cfg.Throughput.LatencyCutoff, "server.throughput.latency-cutoff", 0, "Requests taking over the cutoff are be observed to measure throughput. Server-Timing header is used with specified unit as the indicator, for example 'Server-Timing: unit;val=8.2'. If set to 0, the throughput is not calculated.")
f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "samples_processed", "Unit of the server throughput metric, for example 'processed_bytes' or 'samples_processed'. Observed values are gathered from the 'Server-Timing' header with the 'val' key. If set, it is appended to the request_server_throughput metric name.")
cfg.ClusterValidation.RegisterFlagsWithPrefix("server", f)
}

func (cfg *Config) Validate() error {
return cfg.ClusterValidation.Validate()
}

func (cfg *Config) registererOrDefault() prometheus.Registerer {
Expand Down Expand Up @@ -400,6 +408,9 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) {
middleware.UnaryServerInstrumentInterceptor(metrics.RequestDuration, grpcInstrumentationOptions...),
}
grpcMiddleware = append(grpcMiddleware, cfg.GRPCMiddleware...)
if cfg.ClusterValidation.GRPC.Enabled {
grpcMiddleware = append(grpcMiddleware, middleware.ClusterUnaryServerInterceptor(cfg.ClusterValidation.Label, cfg.ClusterValidation.GRPC.SoftValidation, logger))
}

grpcStreamMiddleware := []grpc.StreamServerInterceptor{
serverLog.StreamServerInterceptor,
Expand Down

0 comments on commit f61fc86

Please # to comment.