From 83faadf3cedcdef689b88d911cfe9d22d36fc9d5 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 11 Jul 2023 12:38:29 -0700 Subject: [PATCH] Clean up cluster metadata initialization logic (#4531) Clean up cluster metadata initialization logic --- temporal/fx.go | 352 ++++++++++++++++++++++++-------------------- temporal/fx_test.go | 124 ++++++++++++++++ 2 files changed, 314 insertions(+), 162 deletions(-) create mode 100644 temporal/fx_test.go diff --git a/temporal/fx.go b/temporal/fx.go index c83c3f32cc6..2e8897a8e01 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -26,6 +26,7 @@ package temporal import ( "context" + "errors" "fmt" "strings" @@ -37,6 +38,7 @@ import ( otelsdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.10.0" "go.opentelemetry.io/otel/trace" + "go.temporal.io/api/serviceerror" "go.uber.org/fx" "go.uber.org/fx/fxevent" "google.golang.org/grpc" @@ -72,6 +74,11 @@ import ( "go.temporal.io/server/service/worker" ) +var ( + clusterMetadataInitErr = errors.New("failed to initialize current cluster metadata") + missingCurrentClusterMetadataErr = errors.New("missing current cluster metadata under clusterMetadata.ClusterInformation") +) + type ( ServicesGroupOut struct { fx.Out @@ -575,7 +582,7 @@ func WorkerServiceProvider( // TODO: move this to cluster.fx func ApplyClusterMetadataConfigProvider( logger log.Logger, - config *config.Config, + svc *config.Config, persistenceServiceResolver resolver.ServiceResolver, persistenceFactoryProvider persistenceClient.FactoryProviderFn, customDataStoreFactory persistenceClient.AbstractDataStoreFactory, @@ -583,22 +590,22 @@ func ApplyClusterMetadataConfigProvider( ctx := context.TODO() logger = log.With(logger, tag.ComponentMetadataInitializer) - clusterName := persistenceClient.ClusterName(config.ClusterMetadata.CurrentClusterName) + clusterName := persistenceClient.ClusterName(svc.ClusterMetadata.CurrentClusterName) dataStoreFactory, _ := persistenceClient.DataStoreFactoryProvider( clusterName, persistenceServiceResolver, - &config.Persistence, + &svc.Persistence, customDataStoreFactory, logger, nil, ) factory := persistenceFactoryProvider(persistenceClient.NewFactoryParams{ DataStoreFactory: dataStoreFactory, - Cfg: &config.Persistence, + Cfg: &svc.Persistence, PersistenceMaxQPS: nil, PersistenceNamespaceMaxQPS: nil, EnablePriorityRateLimiting: nil, - ClusterName: persistenceClient.ClusterName(config.ClusterMetadata.CurrentClusterName), + ClusterName: persistenceClient.ClusterName(svc.ClusterMetadata.CurrentClusterName), MetricsHandler: nil, Logger: logger, }) @@ -606,181 +613,80 @@ func ApplyClusterMetadataConfigProvider( clusterMetadataManager, err := factory.NewClusterMetadataManager() if err != nil { - return config.ClusterMetadata, config.Persistence, fmt.Errorf("error initializing cluster metadata manager: %w", err) + return svc.ClusterMetadata, svc.Persistence, fmt.Errorf("error initializing cluster metadata manager: %w", err) } defer clusterMetadataManager.Close() var sqlIndexNames []string initialIndexSearchAttributes := make(map[string]*persistencespb.IndexSearchAttributes) - if ds := config.Persistence.GetVisibilityStoreConfig(); ds.SQL != nil { + if ds := svc.Persistence.GetVisibilityStoreConfig(); ds.SQL != nil { indexName := ds.GetIndexName() sqlIndexNames = append(sqlIndexNames, indexName) initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() } - if ds := config.Persistence.GetSecondaryVisibilityStoreConfig(); ds.SQL != nil { + if ds := svc.Persistence.GetSecondaryVisibilityStoreConfig(); ds.SQL != nil { indexName := ds.GetIndexName() sqlIndexNames = append(sqlIndexNames, indexName) initialIndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() } - clusterData := config.ClusterMetadata - for clusterName, clusterInfo := range clusterData.ClusterInformation { - if clusterName != clusterData.CurrentClusterName { - logger.Warn( - "ClusterInformation in ClusterMetadata config is deprecated. "+ - "Please use TCTL admin tool to configure remote cluster connections", - tag.Key("clusterInformation"), - tag.ClusterName(clusterName), - tag.IgnoredValue(clusterInfo)) - - // Only configure current cluster metadata from static config file - continue - } - - var clusterId string - if uuid.Parse(clusterInfo.ClusterID) == nil { - if clusterInfo.ClusterID != "" { - logger.Warn("Cluster Id in Cluster Metadata config is not a valid uuid. Generating a new Cluster Id") - } - clusterId = uuid.New() - } else { - clusterId = clusterInfo.ClusterID - } - - applied, err := clusterMetadataManager.SaveClusterMetadata( + clusterMetadata := svc.ClusterMetadata + if len(clusterMetadata.ClusterInformation) > 1 { + logger.Warn( + "All remote cluster settings under ClusterMetadata.ClusterInformation config will be ignored. "+ + "Please use TCTL admin tool to configure remote cluster settings", + tag.Key("clusterInformation")) + } + if _, ok := clusterMetadata.ClusterInformation[clusterMetadata.CurrentClusterName]; !ok { + logger.Error("Current cluster setting is missing under clusterMetadata.ClusterInformation", + tag.ClusterName(clusterMetadata.CurrentClusterName)) + return svc.ClusterMetadata, svc.Persistence, missingCurrentClusterMetadataErr + } + resp, err := clusterMetadataManager.GetClusterMetadata( + ctx, + &persistence.GetClusterMetadataRequest{ClusterName: clusterMetadata.CurrentClusterName}, + ) + switch err.(type) { + case nil: + // Update current record + if updateErr := updateCurrentClusterMetadataRecord( ctx, - &persistence.SaveClusterMetadataRequest{ - ClusterMetadata: persistencespb.ClusterMetadata{ - HistoryShardCount: config.Persistence.NumHistoryShards, - ClusterName: clusterName, - ClusterId: clusterId, - ClusterAddress: clusterInfo.RPCAddress, - FailoverVersionIncrement: clusterData.FailoverVersionIncrement, - InitialFailoverVersion: clusterInfo.InitialFailoverVersion, - IsGlobalNamespaceEnabled: clusterData.EnableGlobalNamespace, - IsConnectionEnabled: clusterInfo.Enabled, - UseClusterIdMembership: true, // Enable this for new cluster after 1.19. This is to prevent two clusters join into one ring. - IndexSearchAttributes: initialIndexSearchAttributes, - }, - }) - if err != nil { - logger.Warn("Failed to save cluster metadata.", tag.Error(err), tag.ClusterName(clusterName)) + clusterMetadataManager, + svc, + resp, + ); updateErr != nil { + return svc.ClusterMetadata, svc.Persistence, updateErr } - if applied { - logger.Info("Successfully saved cluster metadata.", tag.ClusterName(clusterName)) - continue - } - - resp, err := clusterMetadataManager.GetClusterMetadata( - ctx, - &persistence.GetClusterMetadataRequest{ClusterName: clusterName}, + // Ignore invalid cluster metadata + overwriteCurrentClusterMetadataWithDBRecord( + svc, + resp, + logger, ) - if err != nil { - return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while fetching cluster metadata: %w", err) - } - currentMetadata := resp.ClusterMetadata - - // TODO (rodrigozhou): Remove this block for v1.21. - // Handle registering custom search attributes when upgrading to v1.20. - if len(sqlIndexNames) > 0 { - needSave := false - if currentMetadata.IndexSearchAttributes == nil { - currentMetadata.IndexSearchAttributes = initialIndexSearchAttributes - needSave = true - } else { - for _, indexName := range sqlIndexNames { - if _, ok := currentMetadata.IndexSearchAttributes[indexName]; !ok { - currentMetadata.IndexSearchAttributes[indexName] = searchattribute.GetSqlDbIndexSearchAttributes() - needSave = true - } - } - } - - if needSave { - _, err := clusterMetadataManager.SaveClusterMetadata( - ctx, - &persistence.SaveClusterMetadataRequest{ - ClusterMetadata: currentMetadata, - Version: resp.Version, - }, - ) - if err != nil { - logger.Warn( - "Failed to register search attributes.", - tag.Error(err), - tag.ClusterName(clusterName), - ) - } - logger.Info("Successfully registered search attributes.", tag.ClusterName(clusterName)) - - // need to re-fetch cluster metadata since it might need to be updated again below - resp, err = clusterMetadataManager.GetClusterMetadata( - ctx, - &persistence.GetClusterMetadataRequest{ClusterName: clusterName}, - ) - if err != nil { - return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while fetching cluster metadata: %w", err) - } - currentMetadata = resp.ClusterMetadata - } - } - - // Allow updating cluster metadata if global namespace is disabled - if !resp.IsGlobalNamespaceEnabled && clusterData.EnableGlobalNamespace { - currentMetadata.IsGlobalNamespaceEnabled = clusterData.EnableGlobalNamespace - currentMetadata.InitialFailoverVersion = clusterInfo.InitialFailoverVersion - currentMetadata.FailoverVersionIncrement = clusterData.FailoverVersionIncrement - - applied, err := clusterMetadataManager.SaveClusterMetadata( - ctx, - &persistence.SaveClusterMetadataRequest{ - ClusterMetadata: currentMetadata, - Version: resp.Version, - }) - if !applied || err != nil { - return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while updating cluster metadata: %w", err) - } - } else if resp.IsGlobalNamespaceEnabled != clusterData.EnableGlobalNamespace { - logger.Warn( - mismatchLogMessage, - tag.Key("clusterMetadata.EnableGlobalNamespace"), - tag.IgnoredValue(clusterData.EnableGlobalNamespace), - tag.Value(resp.IsGlobalNamespaceEnabled)) - config.ClusterMetadata.EnableGlobalNamespace = resp.IsGlobalNamespaceEnabled - } - - // Verify current cluster metadata - persistedShardCount := resp.HistoryShardCount - if config.Persistence.NumHistoryShards != persistedShardCount { - logger.Warn( - mismatchLogMessage, - tag.Key("persistence.numHistoryShards"), - tag.IgnoredValue(config.Persistence.NumHistoryShards), - tag.Value(persistedShardCount)) - config.Persistence.NumHistoryShards = persistedShardCount - } - if resp.FailoverVersionIncrement != clusterData.FailoverVersionIncrement { - logger.Warn( - mismatchLogMessage, - tag.Key("clusterMetadata.FailoverVersionIncrement"), - tag.IgnoredValue(clusterData.FailoverVersionIncrement), - tag.Value(resp.FailoverVersionIncrement)) - config.ClusterMetadata.FailoverVersionIncrement = resp.FailoverVersionIncrement + case *serviceerror.NotFound: + // Initialize current cluster record + if initErr := initCurrentClusterMetadataRecord( + ctx, + clusterMetadataManager, + svc, + initialIndexSearchAttributes, + logger, + ); initErr != nil { + return svc.ClusterMetadata, svc.Persistence, initErr } + default: + return svc.ClusterMetadata, svc.Persistence, fmt.Errorf("error while fetching cluster metadata: %w", err) } - err = loadClusterInformationFromStore(ctx, config, clusterMetadataManager, logger) + + err = loadClusterInformationFromStore(ctx, svc, clusterMetadataManager, logger) if err != nil { - return config.ClusterMetadata, config.Persistence, fmt.Errorf("error while loading metadata from cluster: %w", err) + return svc.ClusterMetadata, svc.Persistence, fmt.Errorf("error while loading metadata from cluster: %w", err) } - return config.ClusterMetadata, config.Persistence, nil -} - -func PersistenceFactoryProvider() persistenceClient.FactoryProviderFn { - return persistenceClient.FactoryProvider + return svc.ClusterMetadata, svc.Persistence, nil } // TODO: move this to cluster.fx -func loadClusterInformationFromStore(ctx context.Context, config *config.Config, clusterMsg persistence.ClusterMetadataManager, logger log.Logger) error { +func loadClusterInformationFromStore(ctx context.Context, svc *config.Config, clusterMsg persistence.ClusterMetadataManager, logger log.Logger) error { iter := collection.NewPagingIterator(func(paginationToken []byte) ([]interface{}, []byte, error) { request := &persistence.ListClusterMetadataRequest{ PageSize: 100, @@ -805,8 +711,8 @@ func loadClusterInformationFromStore(ctx context.Context, config *config.Config, metadata := item.(*persistence.GetClusterMetadataResponse) shardCount := metadata.HistoryShardCount if shardCount == 0 { - // This is to add backward compatibility to the config based cluster connection. - shardCount = config.Persistence.NumHistoryShards + // This is to add backward compatibility to the svc based cluster connection. + shardCount = svc.Persistence.NumHistoryShards } newMetadata := cluster.ClusterInformation{ Enabled: metadata.IsConnectionEnabled, @@ -814,10 +720,10 @@ func loadClusterInformationFromStore(ctx context.Context, config *config.Config, RPCAddress: metadata.ClusterAddress, ShardCount: shardCount, } - if staticClusterMetadata, ok := config.ClusterMetadata.ClusterInformation[metadata.ClusterName]; ok { - if metadata.ClusterName != config.ClusterMetadata.CurrentClusterName { + if staticClusterMetadata, ok := svc.ClusterMetadata.ClusterInformation[metadata.ClusterName]; ok { + if metadata.ClusterName != svc.ClusterMetadata.CurrentClusterName { logger.Warn( - "ClusterInformation in ClusterMetadata config is deprecated. Please use TCTL tool to configure remote cluster connections", + "ClusterInformation in ClusterMetadata svc is deprecated. Please use TCTL tool to configure remote cluster connections", tag.Key("clusterInformation"), tag.IgnoredValue(staticClusterMetadata), tag.Value(newMetadata)) @@ -826,11 +732,133 @@ func loadClusterInformationFromStore(ctx context.Context, config *config.Config, logger.Info(fmt.Sprintf("Use rpc address %v for cluster %v.", newMetadata.RPCAddress, metadata.ClusterName)) } } - config.ClusterMetadata.ClusterInformation[metadata.ClusterName] = newMetadata + svc.ClusterMetadata.ClusterInformation[metadata.ClusterName] = newMetadata + } + return nil +} + +func initCurrentClusterMetadataRecord( + ctx context.Context, + clusterMetadataManager persistence.ClusterMetadataManager, + svc *config.Config, + initialIndexSearchAttributes map[string]*persistencespb.IndexSearchAttributes, + logger log.Logger, +) error { + var clusterId string + currentClusterName := svc.ClusterMetadata.CurrentClusterName + currentClusterInfo := svc.ClusterMetadata.ClusterInformation[currentClusterName] + if uuid.Parse(currentClusterInfo.ClusterID) == nil { + if currentClusterInfo.ClusterID != "" { + logger.Warn("Cluster Id in Cluster Metadata config is not a valid uuid. Generating a new Cluster Id") + } + clusterId = uuid.New() + } else { + clusterId = currentClusterInfo.ClusterID + } + + applied, err := clusterMetadataManager.SaveClusterMetadata( + ctx, + &persistence.SaveClusterMetadataRequest{ + ClusterMetadata: persistencespb.ClusterMetadata{ + HistoryShardCount: svc.Persistence.NumHistoryShards, + ClusterName: currentClusterName, + ClusterId: clusterId, + ClusterAddress: currentClusterInfo.RPCAddress, + FailoverVersionIncrement: svc.ClusterMetadata.FailoverVersionIncrement, + InitialFailoverVersion: currentClusterInfo.InitialFailoverVersion, + IsGlobalNamespaceEnabled: svc.ClusterMetadata.EnableGlobalNamespace, + IsConnectionEnabled: currentClusterInfo.Enabled, + UseClusterIdMembership: true, // Enable this for new cluster after 1.19. This is to prevent two clusters join into one ring. + IndexSearchAttributes: initialIndexSearchAttributes, + }, + }) + if err != nil { + logger.Warn("Failed to save cluster metadata.", tag.Error(err), tag.ClusterName(currentClusterName)) + return err + } + if !applied { + logger.Error("Failed to apple cluster metadata.", tag.ClusterName(currentClusterName)) + return clusterMetadataInitErr } return nil } +func updateCurrentClusterMetadataRecord( + ctx context.Context, + clusterMetadataManager persistence.ClusterMetadataManager, + svc *config.Config, + currentClusterDBRecord *persistence.GetClusterMetadataResponse, +) error { + updateDBRecord := false + currentClusterMetadata := svc.ClusterMetadata + currentClusterName := currentClusterMetadata.CurrentClusterName + currentCLusterInfo := currentClusterMetadata.ClusterInformation[currentClusterName] + // Allow updating cluster metadata if global namespace is disabled + if !currentClusterDBRecord.IsGlobalNamespaceEnabled && currentClusterMetadata.EnableGlobalNamespace { + currentClusterDBRecord.IsGlobalNamespaceEnabled = currentClusterMetadata.EnableGlobalNamespace + currentClusterDBRecord.InitialFailoverVersion = currentCLusterInfo.InitialFailoverVersion + currentClusterDBRecord.FailoverVersionIncrement = currentClusterMetadata.FailoverVersionIncrement + updateDBRecord = true + } + if currentClusterDBRecord.ClusterAddress != currentCLusterInfo.RPCAddress { + currentClusterDBRecord.ClusterAddress = currentCLusterInfo.RPCAddress + updateDBRecord = true + } + // TODO: Add cluster tags + + if !updateDBRecord { + return nil + } + + applied, err := clusterMetadataManager.SaveClusterMetadata( + ctx, + &persistence.SaveClusterMetadataRequest{ + ClusterMetadata: currentClusterDBRecord.ClusterMetadata, + Version: currentClusterDBRecord.Version, + }) + if !applied || err != nil { + return fmt.Errorf("error while updating cluster metadata: %w", err) + } + return nil +} + +func overwriteCurrentClusterMetadataWithDBRecord( + svc *config.Config, + currentClusterDBRecord *persistence.GetClusterMetadataResponse, + logger log.Logger, +) { + clusterMetadata := svc.ClusterMetadata + if currentClusterDBRecord.IsGlobalNamespaceEnabled && !clusterMetadata.EnableGlobalNamespace { + logger.Warn( + mismatchLogMessage, + tag.Key("clusterMetadata.EnableGlobalNamespace"), + tag.IgnoredValue(clusterMetadata.EnableGlobalNamespace), + tag.Value(currentClusterDBRecord.IsGlobalNamespaceEnabled)) + svc.ClusterMetadata.EnableGlobalNamespace = currentClusterDBRecord.IsGlobalNamespaceEnabled + } + persistedShardCount := currentClusterDBRecord.HistoryShardCount + if svc.Persistence.NumHistoryShards != persistedShardCount { + logger.Warn( + mismatchLogMessage, + tag.Key("persistence.numHistoryShards"), + tag.IgnoredValue(svc.Persistence.NumHistoryShards), + tag.Value(persistedShardCount)) + svc.Persistence.NumHistoryShards = persistedShardCount + } + if currentClusterDBRecord.FailoverVersionIncrement != clusterMetadata.FailoverVersionIncrement { + logger.Warn( + mismatchLogMessage, + tag.Key("clusterMetadata.FailoverVersionIncrement"), + tag.IgnoredValue(clusterMetadata.FailoverVersionIncrement), + tag.Value(currentClusterDBRecord.FailoverVersionIncrement)) + svc.ClusterMetadata.FailoverVersionIncrement = currentClusterDBRecord.FailoverVersionIncrement + } +} + +func PersistenceFactoryProvider() persistenceClient.FactoryProviderFn { + return persistenceClient.FactoryProvider +} + func ServerLifetimeHooks( lc fx.Lifecycle, svr *ServerImpl, diff --git a/temporal/fx_test.go b/temporal/fx_test.go new file mode 100644 index 00000000000..5fbb4d5abe3 --- /dev/null +++ b/temporal/fx_test.go @@ -0,0 +1,124 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import ( + "context" + "path" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common/config" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/persistence" + "go.temporal.io/server/tests/testutils" +) + +func TestInitCurrentClusterMetadataRecord(t *testing.T) { + configDir := path.Join(testutils.GetRepoRootDirectory(), "config") + cfg, err := config.LoadConfig("development-cass", configDir, "") + require.NoError(t, err) + controller := gomock.NewController(t) + + mockClusterMetadataManager := persistence.NewMockClusterMetadataManager(controller) + mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.SaveClusterMetadataRequest) (bool, error) { + require.Equal(t, cfg.ClusterMetadata.EnableGlobalNamespace, request.IsGlobalNamespaceEnabled) + require.Equal(t, cfg.ClusterMetadata.CurrentClusterName, request.ClusterName) + require.Equal(t, cfg.ClusterMetadata.ClusterInformation[cfg.ClusterMetadata.CurrentClusterName].RPCAddress, request.ClusterAddress) + require.Equal(t, cfg.ClusterMetadata.ClusterInformation[cfg.ClusterMetadata.CurrentClusterName].InitialFailoverVersion, request.InitialFailoverVersion) + require.Equal(t, cfg.Persistence.NumHistoryShards, request.HistoryShardCount) + require.Equal(t, cfg.ClusterMetadata.FailoverVersionIncrement, request.FailoverVersionIncrement) + require.Equal(t, int64(0), request.Version) + return true, nil + }, + ) + err = initCurrentClusterMetadataRecord( + context.TODO(), + mockClusterMetadataManager, + cfg, + nil, + log.NewNoopLogger(), + ) + require.NoError(t, err) +} + +func TestUpdateCurrentClusterMetadataRecord(t *testing.T) { + configDir := path.Join(testutils.GetRepoRootDirectory(), "config") + cfg, err := config.LoadConfig("development-cluster-a", configDir, "") + require.NoError(t, err) + controller := gomock.NewController(t) + + mockClusterMetadataManager := persistence.NewMockClusterMetadataManager(controller) + mockClusterMetadataManager.EXPECT().SaveClusterMetadata(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, request *persistence.SaveClusterMetadataRequest) (bool, error) { + require.Equal(t, cfg.ClusterMetadata.EnableGlobalNamespace, request.IsGlobalNamespaceEnabled) + require.Equal(t, "", request.ClusterName) + require.Equal(t, cfg.ClusterMetadata.ClusterInformation[cfg.ClusterMetadata.CurrentClusterName].RPCAddress, request.ClusterAddress) + require.Equal(t, cfg.ClusterMetadata.ClusterInformation[cfg.ClusterMetadata.CurrentClusterName].InitialFailoverVersion, request.InitialFailoverVersion) + require.Equal(t, int32(0), request.HistoryShardCount) + require.Equal(t, cfg.ClusterMetadata.FailoverVersionIncrement, request.FailoverVersionIncrement) + require.Equal(t, int64(1), request.Version) + return true, nil + }, + ) + updateRecord := &persistence.GetClusterMetadataResponse{ + ClusterMetadata: persistencespb.ClusterMetadata{}, + Version: 1, + } + err = updateCurrentClusterMetadataRecord( + context.TODO(), + mockClusterMetadataManager, + cfg, + updateRecord, + ) + require.NoError(t, err) +} + +func TestOverwriteCurrentClusterMetadataWithDBRecord(t *testing.T) { + configDir := path.Join(testutils.GetRepoRootDirectory(), "config") + cfg, err := config.LoadConfig("development-cass", configDir, "") + require.NoError(t, err) + + dbRecord := &persistence.GetClusterMetadataResponse{ + ClusterMetadata: persistencespb.ClusterMetadata{ + HistoryShardCount: 1024, + FailoverVersionIncrement: 10000, + IsGlobalNamespaceEnabled: true, + }, + Version: 1, + } + overwriteCurrentClusterMetadataWithDBRecord( + cfg, + dbRecord, + log.NewNoopLogger(), + ) + require.Equal(t, int64(10000), cfg.ClusterMetadata.FailoverVersionIncrement) + require.True(t, cfg.ClusterMetadata.EnableGlobalNamespace) + require.Equal(t, int32(1024), cfg.Persistence.NumHistoryShards) +}