diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 43943a2559ab..f1ed6f25587f 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -79,6 +79,20 @@ var ( dataCachePurgeHook = func() {} resetBackoffHook = func() {} + cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{ + Name: "grpc.lb.rls.cache_entries", + Description: "EXPERIMENTAL. Number of entries in the RLS cache.", + Unit: "entry", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"}, + Default: false, + }) + cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{ + Name: "grpc.lb.rls.cache_size", + Description: "EXPERIMENTAL. The current size of the RLS cache.", + Unit: "By", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"}, + Default: false, + }) defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.rls.default_target_picks", Description: "EXPERIMENTAL. Number of LB picks sent to the default target.", @@ -126,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer. updateCh: buffer.NewUnbounded(), } lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb)) - lb.dataCache = newDataCache(maxCacheSize, lb.logger) + lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String()) lb.bg = balancergroup.New(balancergroup.Options{ CC: cc, BuildOpts: opts, @@ -317,18 +331,17 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.stateMu.Unlock() <-done + // We cannot do cache operations above because `cacheMu` needs to be grabbed + // before `stateMu` if we are to hold both locks at the same time. + b.cacheMu.Lock() + b.dataCache.updateRLSServerTarget(newCfg.lookupService) if resizeCache { // If the new config changes reduces the size of the data cache, we // might have to evict entries to get the cache size down to the newly // specified size. - // - // And we cannot do this operation above (where we compute the - // `resizeCache` boolean) because `cacheMu` needs to be grabbed before - // `stateMu` if we are to hold both locks at the same time. - b.cacheMu.Lock() b.dataCache.resize(newCfg.cacheSizeBytes) - b.cacheMu.Unlock() } + b.cacheMu.Unlock() return nil } diff --git a/balancer/rls/cache.go b/balancer/rls/cache.go index ee8df6c3f3b5..f80750fb34e9 100644 --- a/balancer/rls/cache.go +++ b/balancer/rls/cache.go @@ -22,6 +22,8 @@ import ( "container/list" "time" + "github.com/google/uuid" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal/backoff" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" @@ -163,22 +165,40 @@ func (l *lru) getLeastRecentlyUsed() cacheKey { // // It is not safe for concurrent access. type dataCache struct { - maxSize int64 // Maximum allowed size. - currentSize int64 // Current size. - keys *lru // Cache keys maintained in lru order. - entries map[cacheKey]*cacheEntry - logger *internalgrpclog.PrefixLogger - shutdown *grpcsync.Event + maxSize int64 // Maximum allowed size. + currentSize int64 // Current size. + keys *lru // Cache keys maintained in lru order. + entries map[cacheKey]*cacheEntry + logger *internalgrpclog.PrefixLogger + shutdown *grpcsync.Event + rlsServerTarget string + + // Read only after initialization. + grpcTarget string + uuid string + metricsRecorder estats.MetricsRecorder } -func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache { - return &dataCache{ - maxSize: size, - keys: newLRU(), - entries: make(map[cacheKey]*cacheEntry), - logger: logger, - shutdown: grpcsync.NewEvent(), +func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache { + dc := &dataCache{ + maxSize: size, + keys: newLRU(), + entries: make(map[cacheKey]*cacheEntry), + logger: logger, + shutdown: grpcsync.NewEvent(), + grpcTarget: grpcTarget, + uuid: uuid.New().String(), + metricsRecorder: metricsRecorder, } + cacheSizeMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid) + cacheEntriesMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid) + return dc +} + +// updateRLSServerTarget updates the RLS Server Target the RLS Balancer is +// configured with. +func (dc *dataCache) updateRLSServerTarget(rlsServerTarget string) { + dc.rlsServerTarget = rlsServerTarget } // resize changes the maximum allowed size of the data cache. @@ -319,6 +339,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) { dc.currentSize -= entry.size entry.size = newSize dc.currentSize += entry.size + cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid) } func (dc *dataCache) getEntry(key cacheKey) *cacheEntry { @@ -351,6 +372,8 @@ func (dc *dataCache) deleteAndCleanup(key cacheKey, entry *cacheEntry) { delete(dc.entries, key) dc.currentSize -= entry.size dc.keys.removeEntry(key) + cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid) + cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid) } func (dc *dataCache) stop() { diff --git a/balancer/rls/cache_test.go b/balancer/rls/cache_test.go index 80185f39c929..4c064e971ac6 100644 --- a/balancer/rls/cache_test.go +++ b/balancer/rls/cache_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/internal/testutils/stats" ) var ( @@ -119,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) { func (s) TestDataCache_BasicOperations(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -133,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) { func (s) TestDataCache_AddForcesResize(t *testing.T) { initCacheEntries() - dc := newDataCache(1, nil) + dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "") // The first entry in cacheEntries has a minimum expiry time in the future. // This entry would stop the resize operation since we do not evict entries @@ -162,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) { func (s) TestDataCache_Resize(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -193,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) { func (s) TestDataCache_EvictExpiredEntries(t *testing.T) { initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } @@ -220,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) { } initCacheEntries() - dc := newDataCache(5, nil) + dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "") for i, k := range cacheKeys { dc.addEntry(k, cacheEntries[i]) } diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index f36089d47ff5..93dda7ed2a3f 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -242,3 +242,17 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo) } func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {} + +// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent +// nil panics. +type NoopMetricsRecorder struct{} + +func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {} + +func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {} + +func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {} + +func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {} + +func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {}