diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index d4a46bbc209..831e1cce88b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1346,7 +1346,21 @@ const ( // Value type: Int // Default value: 100 SampleLoggingRate - + // LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on + // KeyName: system.largeShardHistorySizeMetricThreshold + // Value type: Int + // Default value: 10485760 (10mb) + LargeShardHistorySizeMetricThreshold + // LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on + // KeyName: system.largeShardHistoryEventMetricThreshold + // Value type: Int + // Default value: 50 * 1024 + LargeShardHistoryEventMetricThreshold + // LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on + // KeyName: system.largeShardHistoryBlobMetricThreshold + // Value type: Int + // Default value: 262144 (1/4mb) + LargeShardHistoryBlobMetricThreshold // LastIntKey must be the last one in this const group LastIntKey ) @@ -3454,6 +3468,21 @@ var IntKeys = map[IntKey]DynamicInt{ Description: "The rate for which sampled logs are logged at. 100 means 1/100 is logged", DefaultValue: 100, }, + LargeShardHistorySizeMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistorySizeMetricThreshold", + Description: "defines the threshold for what consititutes a large history size to alert on, default is 10mb", + DefaultValue: 10485760, + }, + LargeShardHistoryEventMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistoryEventMetricThreshold", + Description: "defines the threshold for what consititutes a large history event length to alert on, default is 50k", + DefaultValue: 50 * 1024, + }, + LargeShardHistoryBlobMetricThreshold: DynamicInt{ + KeyName: "system.largeShardHistoryBlobMetricThreshold", + Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb", + DefaultValue: 262144, + }, } var BoolKeys = map[BoolKey]DynamicBool{ diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 99de5345ad5..b92c9b6b0e2 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1157,6 +1157,12 @@ const ( HistoryReplicationV2TaskScope // SyncActivityTaskScope is the scope used by sync activity information processing SyncActivityTaskScope + // LargeExecutionSizeShardScope is the scope to track large history size for hotshard detection + LargeExecutionSizeShardScope + // LargeExecutionCountShardScope is the scope to track large history count for hotshard detection + LargeExecutionCountShardScope + // LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection + LargeExecutionBlobShardScope NumHistoryScopes ) @@ -1750,6 +1756,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ FailoverMarkerScope: {operation: "FailoverMarker"}, HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"}, SyncActivityTaskScope: {operation: "SyncActivityTask"}, + LargeExecutionSizeShardScope: {operation: "LargeExecutionSizeShard"}, + LargeExecutionCountShardScope: {operation: "LargeExecutionCountShard"}, + LargeExecutionBlobShardScope: {operation: "LargeExecutionBlobShard"}, }, // Matching Scope Names Matching: { @@ -2247,6 +2256,9 @@ const ( HistoryFailoverCallbackCount WorkflowVersionCount WorkflowTypeCount + LargeHistoryBlobCount + LargeHistoryEventCount + LargeHistorySizeCount NumHistoryMetrics ) @@ -2844,6 +2856,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ReplicationTasksCount: {metricName: "replication_tasks_count", metricType: Timer}, WorkflowVersionCount: {metricName: "workflow_version_count", metricType: Gauge}, WorkflowTypeCount: {metricName: "workflow_type_count", metricType: Gauge}, + LargeHistoryBlobCount: {metricName: "large_history_blob_count", metricType: Counter}, + LargeHistoryEventCount: {metricName: "large_history_event_count", metricType: Counter}, + LargeHistorySizeCount: {metricName: "large_history_size_count", metricType: Counter}, }, Matching: { PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"}, diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 8997411759e..b553931f801 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -316,6 +316,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration) shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) + shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration) if p.enableLatencyHistogramMetrics { domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration) diff --git a/service/history/config/config.go b/service/history/config/config.go index 9fc2e77f909..41b38c16c91 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -317,8 +317,12 @@ type Config struct { EnableDebugMode bool // note that this value is initialized once on service start EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter - SampleLoggingRate dynamicconfig.IntPropertyFn - EnableShardIDMetrics dynamicconfig.BoolPropertyFn + // Hotshard stuff + SampleLoggingRate dynamicconfig.IntPropertyFn + EnableShardIDMetrics dynamicconfig.BoolPropertyFn + LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn + LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn + LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn } // New returns new service config with default values @@ -556,8 +560,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(), EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID), - SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), - EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), + SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate), + EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics), + LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold), + LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold), + LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold), } return cfg diff --git a/service/history/execution/context.go b/service/history/execution/context.go index e8ecac3bde1..c4c5d021673 100644 --- a/service/history/execution/context.go +++ b/service/history/execution/context.go @@ -706,7 +706,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( currentWorkflowTransactionPolicy TransactionPolicy, newWorkflowTransactionPolicy *TransactionPolicy, ) (retError error) { - defer func() { if retError != nil { c.Clear() @@ -720,11 +719,14 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( if err != nil { return err } - var persistedBlobs events.PersistedBlobs currentWorkflowSize := c.GetHistorySize() + oldWorkflowSize := currentWorkflowSize + currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1 + oldWorkflowHistoryCount := currentWorkflowHistoryCount for _, workflowEvents := range currentWorkflowEventsSeq { blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents) + currentWorkflowHistoryCount += int64(len(workflowEvents.Events)) if err != nil { return err } @@ -852,6 +854,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew( domainName, resp.MutableStateUpdateSessionStats, ) + c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount) // emit workflow completion stats if any if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted { if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil { diff --git a/service/history/execution/context_util.go b/service/history/execution/context_util.go index 8ec94696eef..cc0200e54dc 100644 --- a/service/history/execution/context_util.go +++ b/service/history/execution/context_util.go @@ -21,8 +21,11 @@ package execution import ( + "strconv" "time" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -30,6 +33,37 @@ import ( "github.com/uber/cadence/common/types" ) +func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64, newHistoryCount int64) { + if c.shard.GetConfig().EnableShardIDMetrics() { + shardIDStr := strconv.Itoa(c.shard.GetShardID()) + + blobSizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName()))) + // check if blob size is larger than threshold in Dynamic config if so alert on it every time + if blobSize > blobSizeWarn { + c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) + c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount) + } + + historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName()))) + // check if the new history count is greater than our threshold and only count/log it once when it passes it + // this seems to double count and I can't figure out why but should be ok to get a rough idea and identify bad actors + if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn { + c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) + c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount) + } + + historySizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName()))) + // check if the new history size is greater than our threshold and only count/log it once when it passes it + if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn { + c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()), + tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID())) + c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount) + } + } +} + func emitWorkflowHistoryStats( metricsClient metrics.Client, domainName string,