From 61c64c3122366a1cf6b96fef437934506b022b1b Mon Sep 17 00:00:00 2001 From: allenchen2244 <102192478+allenchen2244@users.noreply.github.com> Date: Mon, 13 Mar 2023 16:24:13 -0700 Subject: [PATCH] add remaining persistence stuff that goes to a shard (#5142) add remaining persistence stuff that goes to a shard --- common/dynamicconfig/constants.go | 12 +++- common/persistence/client/factory.go | 2 +- common/persistence/config.go | 2 + .../persistence-tests/persistenceTestBase.go | 2 + .../persistence/persistenceMetricClients.go | 72 ++++++++++++++++--- host/integrationbase.go | 1 + service/history/config/config.go | 6 +- 7 files changed, 85 insertions(+), 12 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a892649b3b0..d4a46bbc209 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1342,7 +1342,7 @@ const ( WorkflowDeletionJitterRange // SampleLoggingRate defines the rate we want sampled logs to be logged at - // KeyName: system.SampleLoggingRate + // KeyName: system.sampleLoggingRate // Value type: Int // Default value: 100 SampleLoggingRate @@ -1804,6 +1804,11 @@ const ( EnableCassandraAllConsistencyLevelDelete + // EnableShardIDMetrics turns on or off shardId metrics + // KeyName: system.enableShardIDMetrics + // Value type: Bool + // Default value: true + EnableShardIDMetrics // LastBoolKey must be the last one in this const group LastBoolKey ) @@ -3832,6 +3837,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "Uses all consistency level for Cassandra delete operations", DefaultValue: false, }, + EnableShardIDMetrics: DynamicBool{ + KeyName: "system.enableShardIDMetrics", + Description: "Enable shardId metrics in persistence client", + DefaultValue: true, + }, } var FloatKeys = map[FloatKey]DynamicFloat{ diff --git a/common/persistence/client/factory.go b/common/persistence/client/factory.go index c8c2ee7f94c..f50d2b1b1db 100644 --- a/common/persistence/client/factory.go +++ b/common/persistence/client/factory.go @@ -251,7 +251,7 @@ func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, erro result = p.NewWorkflowExecutionPersistenceRateLimitedClient(result, ds.ratelimit, f.logger) } if f.metricsClient != nil { - result = p.NewWorkflowExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate) + result = p.NewWorkflowExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate, f.dc.EnableShardIDMetrics) } return result, nil } diff --git a/common/persistence/config.go b/common/persistence/config.go index f1ed4176c6d..0af43d68522 100644 --- a/common/persistence/config.go +++ b/common/persistence/config.go @@ -30,6 +30,7 @@ type ( EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn PersistenceSampleLoggingRate dynamicconfig.IntPropertyFn + EnableShardIDMetrics dynamicconfig.BoolPropertyFn } ) @@ -39,5 +40,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction), EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete), PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), + EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), } } diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 41cecd79f3c..f3d72b30dfd 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -141,6 +141,7 @@ func NewTestBaseWithNoSQL(options *TestBaseOptions) TestBase { EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false), EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), + EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), } params := TestBaseParams{ DefaultTestCluster: testCluster, @@ -165,6 +166,7 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase { EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false), EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), + EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), } params := TestBaseParams{ DefaultTestCluster: testCluster, diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 3391c30df9e..8997411759e 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -40,6 +40,7 @@ type ( logger log.Logger enableLatencyHistogramMetrics bool sampleLoggingRate dynamicconfig.IntPropertyFn + enableShardIDMetrics dynamicconfig.BoolPropertyFn } shardPersistenceClient struct { @@ -116,6 +117,7 @@ func NewWorkflowExecutionPersistenceMetricsClient( logger log.Logger, cfg *config.Persistence, sampleLoggingRate dynamicconfig.IntPropertyFn, + enableShardIDMetrics dynamicconfig.BoolPropertyFn, ) ExecutionManager { return &workflowExecutionPersistenceClient{ persistence: persistence, @@ -124,6 +126,7 @@ func NewWorkflowExecutionPersistenceMetricsClient( logger: logger.WithTags(tag.ShardID(persistence.GetShardID())), enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics, sampleLoggingRate: sampleLoggingRate, + enableShardIDMetrics: enableShardIDMetrics, }, } } @@ -415,7 +418,15 @@ func (p *workflowExecutionPersistenceClient) CreateWorkflowExecution( resp, err = p.persistence.CreateWorkflowExecution(ctx, request) return err } - err := p.call(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + p.logger.SampleInfo("Persistence CreateWorkflowExecution called", p.sampleLoggingRate(), + tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.NewWorkflowSnapshot.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID())) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } @@ -432,7 +443,13 @@ func (p *workflowExecutionPersistenceClient) GetWorkflowExecution( resp, err = p.persistence.GetWorkflowExecution(ctx, request) return err } - err := p.call(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } @@ -451,7 +468,13 @@ func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution( } p.logger.SampleInfo("Persistence UpdateWorkflowExecution called", p.sampleLoggingRate(), tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.UpdateWorkflowMutation.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID())) - err := p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } @@ -468,7 +491,15 @@ func (p *workflowExecutionPersistenceClient) ConflictResolveWorkflowExecution( resp, err = p.persistence.ConflictResolveWorkflowExecution(ctx, request) return err } - err := p.call(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + p.logger.SampleInfo("Persistence ConflictResolveWorkflowExecution called", p.sampleLoggingRate(), + tag.WorkflowDomainName(request.DomainName), tag.ShardID(p.GetShardID())) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } @@ -484,7 +515,12 @@ func (p *workflowExecutionPersistenceClient) DeleteWorkflowExecution( } p.logger.SampleInfo("Persistence DeleteWorkflowExecution called", p.sampleLoggingRate(), tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID())) - return p.callWithDomainAndShardScope(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + if p.enableShardIDMetrics() { + return p.callWithDomainAndShardScope(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } + return p.call(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) + } func (p *workflowExecutionPersistenceClient) DeleteCurrentWorkflowExecution( @@ -496,7 +532,11 @@ func (p *workflowExecutionPersistenceClient) DeleteCurrentWorkflowExecution( } p.logger.SampleInfo("Persistence DeleteCurrentWorkflowExecution called", p.sampleLoggingRate(), tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID())) - return p.callWithDomainAndShardScope(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + if p.enableShardIDMetrics() { + return p.callWithDomainAndShardScope(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } + return p.call(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName)) } func (p *workflowExecutionPersistenceClient) GetCurrentExecution( @@ -509,7 +549,15 @@ func (p *workflowExecutionPersistenceClient) GetCurrentExecution( resp, err = p.persistence.GetCurrentExecution(ctx, request) return err } - err := p.call(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName)) + p.logger.SampleInfo("Persistence GetCurrentExecution called", p.sampleLoggingRate(), + tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID())) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } @@ -546,7 +594,15 @@ func (p *workflowExecutionPersistenceClient) IsWorkflowExecutionExists( resp, err = p.persistence.IsWorkflowExecutionExists(ctx, request) return err } - err := p.call(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName)) + p.logger.SampleInfo("Persistence IsWorkflowExecutionExists called", p.sampleLoggingRate(), + tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID())) + var err error + if p.enableShardIDMetrics() { + err = p.callWithDomainAndShardScope(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName), + metrics.ShardIDTag(strconv.Itoa(p.GetShardID()))) + } else { + err = p.call(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName)) + } if err != nil { return nil, err } diff --git a/host/integrationbase.go b/host/integrationbase.go index b2141b73da5..0ac4c738b5f 100644 --- a/host/integrationbase.go +++ b/host/integrationbase.go @@ -109,6 +109,7 @@ func (s *IntegrationBase) setupSuite() { EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false), EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), + EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), } params := pt.TestBaseParams{ DefaultTestCluster: s.defaultTestCluster, diff --git a/service/history/config/config.go b/service/history/config/config.go index a55a3be9a76..9fc2e77f909 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -317,7 +317,8 @@ type Config struct { EnableDebugMode bool // note that this value is initialized once on service start EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - SampleLoggingRate dynamicconfig.IntPropertyFn + SampleLoggingRate dynamicconfig.IntPropertyFn + EnableShardIDMetrics dynamicconfig.BoolPropertyFn } // New returns new service config with default values @@ -555,7 +556,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(), EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID), - SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), + SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), + EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), } return cfg