Skip to content

Commit

Permalink
add remaining persistence stuff that goes to a shard (#5142)
Browse files Browse the repository at this point in the history
add remaining persistence stuff that goes to a shard
  • Loading branch information
allenchen2244 authored Mar 13, 2023
1 parent 5e5895a commit 61c64c3
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 12 deletions.
12 changes: 11 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ const (
WorkflowDeletionJitterRange

// SampleLoggingRate defines the rate we want sampled logs to be logged at
// KeyName: system.SampleLoggingRate
// KeyName: system.sampleLoggingRate
// Value type: Int
// Default value: 100
SampleLoggingRate
Expand Down Expand Up @@ -1804,6 +1804,11 @@ const (

EnableCassandraAllConsistencyLevelDelete

// EnableShardIDMetrics turns on or off shardId metrics
// KeyName: system.enableShardIDMetrics
// Value type: Bool
// Default value: true
EnableShardIDMetrics
// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -3832,6 +3837,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Uses all consistency level for Cassandra delete operations",
DefaultValue: false,
},
EnableShardIDMetrics: DynamicBool{
KeyName: "system.enableShardIDMetrics",
Description: "Enable shardId metrics in persistence client",
DefaultValue: true,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, erro
result = p.NewWorkflowExecutionPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
if f.metricsClient != nil {
result = p.NewWorkflowExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate)
result = p.NewWorkflowExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate, f.dc.EnableShardIDMetrics)
}
return result, nil
}
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
EnableSQLAsyncTransaction dynamicconfig.BoolPropertyFn
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
PersistenceSampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
}
)

Expand All @@ -39,5 +40,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
EnableSQLAsyncTransaction: dc.GetBoolProperty(dynamicconfig.EnableSQLAsyncTransaction),
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
}
}
2 changes: 2 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func NewTestBaseWithNoSQL(options *TestBaseOptions) TestBase {
EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false),
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
}
params := TestBaseParams{
DefaultTestCluster: testCluster,
Expand All @@ -165,6 +166,7 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false),
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
}
params := TestBaseParams{
DefaultTestCluster: testCluster,
Expand Down
72 changes: 64 additions & 8 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
logger log.Logger
enableLatencyHistogramMetrics bool
sampleLoggingRate dynamicconfig.IntPropertyFn
enableShardIDMetrics dynamicconfig.BoolPropertyFn
}

shardPersistenceClient struct {
Expand Down Expand Up @@ -116,6 +117,7 @@ func NewWorkflowExecutionPersistenceMetricsClient(
logger log.Logger,
cfg *config.Persistence,
sampleLoggingRate dynamicconfig.IntPropertyFn,
enableShardIDMetrics dynamicconfig.BoolPropertyFn,
) ExecutionManager {
return &workflowExecutionPersistenceClient{
persistence: persistence,
Expand All @@ -124,6 +126,7 @@ func NewWorkflowExecutionPersistenceMetricsClient(
logger: logger.WithTags(tag.ShardID(persistence.GetShardID())),
enableLatencyHistogramMetrics: cfg.EnablePersistenceLatencyHistogramMetrics,
sampleLoggingRate: sampleLoggingRate,
enableShardIDMetrics: enableShardIDMetrics,
},
}
}
Expand Down Expand Up @@ -415,7 +418,15 @@ func (p *workflowExecutionPersistenceClient) CreateWorkflowExecution(
resp, err = p.persistence.CreateWorkflowExecution(ctx, request)
return err
}
err := p.call(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
p.logger.SampleInfo("Persistence CreateWorkflowExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.NewWorkflowSnapshot.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID()))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceCreateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand All @@ -432,7 +443,13 @@ func (p *workflowExecutionPersistenceClient) GetWorkflowExecution(
resp, err = p.persistence.GetWorkflowExecution(ctx, request)
return err
}
err := p.call(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceGetWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand All @@ -451,7 +468,13 @@ func (p *workflowExecutionPersistenceClient) UpdateWorkflowExecution(
}
p.logger.SampleInfo("Persistence UpdateWorkflowExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.UpdateWorkflowMutation.ExecutionInfo.WorkflowID), tag.ShardID(p.GetShardID()))
err := p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceUpdateWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand All @@ -468,7 +491,15 @@ func (p *workflowExecutionPersistenceClient) ConflictResolveWorkflowExecution(
resp, err = p.persistence.ConflictResolveWorkflowExecution(ctx, request)
return err
}
err := p.call(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
p.logger.SampleInfo("Persistence ConflictResolveWorkflowExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.ShardID(p.GetShardID()))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceConflictResolveWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand All @@ -484,7 +515,12 @@ func (p *workflowExecutionPersistenceClient) DeleteWorkflowExecution(
}
p.logger.SampleInfo("Persistence DeleteWorkflowExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
return p.callWithDomainAndShardScope(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
if p.enableShardIDMetrics() {
return p.callWithDomainAndShardScope(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
}
return p.call(metrics.PersistenceDeleteWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))

}

func (p *workflowExecutionPersistenceClient) DeleteCurrentWorkflowExecution(
Expand All @@ -496,7 +532,11 @@ func (p *workflowExecutionPersistenceClient) DeleteCurrentWorkflowExecution(
}
p.logger.SampleInfo("Persistence DeleteCurrentWorkflowExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
return p.callWithDomainAndShardScope(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName), metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
if p.enableShardIDMetrics() {
return p.callWithDomainAndShardScope(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
}
return p.call(metrics.PersistenceDeleteCurrentWorkflowExecutionScope, op, metrics.DomainTag(request.DomainName))
}

func (p *workflowExecutionPersistenceClient) GetCurrentExecution(
Expand All @@ -509,7 +549,15 @@ func (p *workflowExecutionPersistenceClient) GetCurrentExecution(
resp, err = p.persistence.GetCurrentExecution(ctx, request)
return err
}
err := p.call(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName))
p.logger.SampleInfo("Persistence GetCurrentExecution called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceGetCurrentExecutionScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -546,7 +594,15 @@ func (p *workflowExecutionPersistenceClient) IsWorkflowExecutionExists(
resp, err = p.persistence.IsWorkflowExecutionExists(ctx, request)
return err
}
err := p.call(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName))
p.logger.SampleInfo("Persistence IsWorkflowExecutionExists called", p.sampleLoggingRate(),
tag.WorkflowDomainName(request.DomainName), tag.WorkflowID(request.WorkflowID), tag.ShardID(p.GetShardID()))
var err error
if p.enableShardIDMetrics() {
err = p.callWithDomainAndShardScope(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName),
metrics.ShardIDTag(strconv.Itoa(p.GetShardID())))
} else {
err = p.call(metrics.PersistenceIsWorkflowExecutionExistsScope, op, metrics.DomainTag(request.DomainName))
}
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions host/integrationbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *IntegrationBase) setupSuite() {
EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false),
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
}
params := pt.TestBaseParams{
DefaultTestCluster: s.defaultTestCluster,
Expand Down
6 changes: 4 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ type Config struct {
EnableDebugMode bool // note that this value is initialized once on service start
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

SampleLoggingRate dynamicconfig.IntPropertyFn
SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
}

// New returns new service config with default values
Expand Down Expand Up @@ -555,7 +556,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID),

SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
}

return cfg
Expand Down

0 comments on commit 61c64c3

Please # to comment.