Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 9d01035
Author: allenchen2244 <102192478+allenchen2244@users.noreply.github.com>
Date:   Wed Mar 29 20:50:38 2023 -0700

    large workflow hot shard detection (#5166)

    Metrics for large workflows

commit dd51c53
Author: David Porter <david.porter@uber.com>
Date:   Wed Mar 29 18:30:06 2023 -0700

    fix build (#5180)

commit 7b281c2
Author: David Porter <david.porter@uber.com>
Date:   Mon Mar 27 10:38:37 2023 -0700

    Adds a small test to catch issues with deadlocks (#5171)

    * Adds a small test to catch issues with deadlocks

commit f1e2476
Author: sonpham96 <sonpham1996@gmail.com>
Date:   Sat Mar 18 05:32:01 2023 +0700

    Upgrade Golang base image to 1.18 to remediate CVEs (#5035)

    Co-authored-by: David Porter <david.porter@uber.com>

commit 1519ace
Author: charlese-instaclustr <76502507+charlese-instaclustr@users.noreply.github.com>
Date:   Fri Mar 17 22:11:27 2023 +0000

    Fix type validation in configstore DC client value updating (#5110)

    * Remove misleading type check, Add more detailed log message

    * removing debugging logging

    * Handle nil update edge case

    ---------

    Co-authored-by: allenchen2244 <102192478+allenchen2244@users.noreply.github.com>
    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit a3e2774
Author: charlese-instaclustr <76502507+charlese-instaclustr@users.noreply.github.com>
Date:   Fri Mar 17 19:02:40 2023 +0000

    Add Canary TLS support (#5086)

    * add support for TLS connections by Canary, add development config for Canary with TLS

    * update README to include new config option

    * remove testing config

    ---------

    Co-authored-by: David Porter <david.porter@uber.com>
    Co-authored-by: Shijie Sheng <shengs@uber.com>
    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit ff4eab2
Author: Shijie Sheng <shengs@uber.com>
Date:   Thu Mar 16 20:10:54 2023 -0700

    [history] more cautious in deciding domain state to make decisions on dropping queued tasks (#5164)

    What changed?

    When domain cache returned entity not found error, don't drop queued tasks to be more conservative.

    Why?

    In cases when the cache is dubious, we shouldn't drop the queued tasks.

commit 55a8d93
Author: neil-xie <104041627+neil-xie@users.noreply.github.com>
Date:   Thu Mar 16 14:18:35 2023 -0700

    Add Pinot docker files, table config and schema (#5163)

    * Initial checkin for pinot config files

commit 1304570
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Thu Mar 16 15:20:29 2023 +0200

    Set poll interval for filebased dynamic config if not set (#5160)

    * Set poll interval for filebased dynamic config if not set

    * update unit test

commit 42a14b1
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Thu Mar 16 10:49:21 2023 +0200

    Elasticsearch: reduce code duplication (#5137)

    * Elasticsearch: reduce code duplication

    * address comments

    ---------

    Co-authored-by: Zijian <Shaddoll@users.noreply.github.com>

commit cbf0d14
Author: bowen xiao <xbowen@uber.com>
Date:   Wed Mar 15 10:19:34 2023 -0700

    fix samples documentation (#5088)

commit ba19a29
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Wed Mar 15 12:52:29 2023 +0200

    Add ShardID to valid attributes (#5161)

commit a25cba8
Author: Mantas Šidlauskas <mantass@netapp.com>
Date:   Wed Mar 15 10:56:50 2023 +0200

    ES: single interface for different ES/OpenSearch versions (#5158)

    * ES: single interface for different ES/OpenSearch versions

    * make fmt

commit e3ac246
Author: Ketsia <115650494+ketsiambaku@users.noreply.github.com>
Date:   Tue Mar 14 12:47:40 2023 -0700

    added logging with workflow/domain tags (#5159)

commit 9581488
Author: Ketsia <115650494+ketsiambaku@users.noreply.github.com>
Date:   Mon Mar 13 16:56:45 2023 -0700

    Consistent query pershard metric (#5143)

    * added and update consistent query per shard metric

    * testing pershard metric

    * move sample logger into persistence metric client for cleaness

    * fix test

    * fix lint

    * fix test again

    * fix lint

    * sample logging with workflowid tag

    * added domain tag to logger

    * metric completed

    * addressing comments

    * fix lint

    * Revert "fix lint"

    This reverts commit 1e96944.

    * fix lint second attempt

    ---------

    Co-authored-by: Allen Chen <allenchen2244@uber.com>
  • Loading branch information
davidporter-id-au committed Mar 30, 2023
1 parent 6c8c1a5 commit 9f8a8d7
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 8 deletions.
31 changes: 30 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,21 @@ const (
// Value type: Int
// Default value: 100
SampleLoggingRate

// LargeShardHistorySizeMetricThreshold defines the threshold for what consititutes a large history storage size to alert on
// KeyName: system.largeShardHistorySizeMetricThreshold
// Value type: Int
// Default value: 10485760 (10mb)
LargeShardHistorySizeMetricThreshold
// LargeShardHistoryEventMetricThreshold defines the threshold for what consititutes a large history event size to alert on
// KeyName: system.largeShardHistoryEventMetricThreshold
// Value type: Int
// Default value: 50 * 1024
LargeShardHistoryEventMetricThreshold
// LargeShardHistoryBlobMetricThreshold defines the threshold for what consititutes a large history blob size to alert on
// KeyName: system.largeShardHistoryBlobMetricThreshold
// Value type: Int
// Default value: 262144 (1/4mb)
LargeShardHistoryBlobMetricThreshold
// LastIntKey must be the last one in this const group
LastIntKey
)
Expand Down Expand Up @@ -3454,6 +3468,21 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "The rate for which sampled logs are logged at. 100 means 1/100 is logged",
DefaultValue: 100,
},
LargeShardHistorySizeMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistorySizeMetricThreshold",
Description: "defines the threshold for what consititutes a large history size to alert on, default is 10mb",
DefaultValue: 10485760,
},
LargeShardHistoryEventMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryEventMetricThreshold",
Description: "defines the threshold for what consititutes a large history event length to alert on, default is 50k",
DefaultValue: 50 * 1024,
},
LargeShardHistoryBlobMetricThreshold: DynamicInt{
KeyName: "system.largeShardHistoryBlobMetricThreshold",
Description: "defines the threshold for what consititutes a large history blob write to alert on, default is 1/4mb",
DefaultValue: 262144,
},
}

var BoolKeys = map[BoolKey]DynamicBool{
Expand Down
15 changes: 15 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,12 @@ const (
HistoryReplicationV2TaskScope
// SyncActivityTaskScope is the scope used by sync activity information processing
SyncActivityTaskScope
// LargeExecutionSizeShardScope is the scope to track large history size for hotshard detection
LargeExecutionSizeShardScope
// LargeExecutionCountShardScope is the scope to track large history count for hotshard detection
LargeExecutionCountShardScope
// LargeExecutionBlobShardScope is the scope to track large blobs for hotshard detection
LargeExecutionBlobShardScope

NumHistoryScopes
)
Expand Down Expand Up @@ -1750,6 +1756,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FailoverMarkerScope: {operation: "FailoverMarker"},
HistoryReplicationV2TaskScope: {operation: "HistoryReplicationV2Task"},
SyncActivityTaskScope: {operation: "SyncActivityTask"},
LargeExecutionSizeShardScope: {operation: "LargeExecutionSizeShard"},
LargeExecutionCountShardScope: {operation: "LargeExecutionCountShard"},
LargeExecutionBlobShardScope: {operation: "LargeExecutionBlobShard"},
},
// Matching Scope Names
Matching: {
Expand Down Expand Up @@ -2247,6 +2256,9 @@ const (
HistoryFailoverCallbackCount
WorkflowVersionCount
WorkflowTypeCount
LargeHistoryBlobCount
LargeHistoryEventCount
LargeHistorySizeCount

NumHistoryMetrics
)
Expand Down Expand Up @@ -2844,6 +2856,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationTasksCount: {metricName: "replication_tasks_count", metricType: Timer},
WorkflowVersionCount: {metricName: "workflow_version_count", metricType: Gauge},
WorkflowTypeCount: {metricName: "workflow_type_count", metricType: Gauge},
LargeHistoryBlobCount: {metricName: "large_history_blob_count", metricType: Counter},
LargeHistoryEventCount: {metricName: "large_history_event_count", metricType: Counter},
LargeHistorySizeCount: {metricName: "large_history_size_count", metricType: Counter},
},
Matching: {
PollSuccessPerTaskListCounter: {metricName: "poll_success_per_tl", metricRollupName: "poll_success"},
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func (p *persistenceMetricsClientBase) callWithDomainAndShardScope(scope int, op

domainMetricsScope.RecordTimer(metrics.PersistenceLatencyPerDomain, duration)
shardOperationsMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)
shardOverallMetricsScope.RecordTimer(metrics.PersistenceLatencyPerShard, duration)

if p.enableLatencyHistogramMetrics {
domainMetricsScope.RecordHistogramDuration(metrics.PersistenceLatencyHistogram, duration)
Expand Down
2 changes: 1 addition & 1 deletion docker/buildkite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py
RUN python get-pip.py
RUN pip install -U 'pip<21'
RUN pip install PyYAML==3.13 cqlsh==5.0.4

RUN pip install futures
# verbose test output from `make`, can be disabled with V=0
ENV V=1

Expand Down
15 changes: 11 additions & 4 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ type Config struct {
EnableDebugMode bool // note that this value is initialized once on service start
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
// Hotshard stuff
SampleLoggingRate dynamicconfig.IntPropertyFn
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn
}

// New returns new service config with default values
Expand Down Expand Up @@ -556,8 +560,11 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.HistoryEnableTaskInfoLogByDomainID),

SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
SampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold),
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),
}

return cfg
Expand Down
7 changes: 5 additions & 2 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {

defer func() {
if retError != nil {
c.Clear()
Expand All @@ -720,11 +719,14 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err != nil {
return err
}

var persistedBlobs events.PersistedBlobs
currentWorkflowSize := c.GetHistorySize()
oldWorkflowSize := currentWorkflowSize
currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1
oldWorkflowHistoryCount := currentWorkflowHistoryCount
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
if err != nil {
return err
}
Expand Down Expand Up @@ -852,6 +854,7 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount)
// emit workflow completion stats if any
if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
Expand Down
34 changes: 34 additions & 0 deletions service/history/execution/context_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,49 @@
package execution

import (
"strconv"
"time"

"github.com/uber/cadence/common"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

func (c *contextImpl) emitLargeWorkflowShardIDStats(blobSize int64, oldHistoryCount int64, oldHistorySize int64, newHistoryCount int64) {
if c.shard.GetConfig().EnableShardIDMetrics() {
shardIDStr := strconv.Itoa(c.shard.GetShardID())

blobSizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryBlobMetricThreshold()), int64(c.shard.GetConfig().BlobSizeLimitWarn(c.GetDomainName())))
// check if blob size is larger than threshold in Dynamic config if so alert on it every time
if blobSize > blobSizeWarn {
c.logger.SampleInfo("Workflow writing a large blob", c.shard.GetConfig().SampleLoggingRate(), tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionBlobShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryBlobCount)
}

historyCountWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistoryEventMetricThreshold()), int64(c.shard.GetConfig().HistoryCountLimitWarn(c.GetDomainName())))
// check if the new history count is greater than our threshold and only count/log it once when it passes it
// this seems to double count and I can't figure out why but should be ok to get a rough idea and identify bad actors
if oldHistoryCount < historyCountWarn && newHistoryCount >= historyCountWarn {
c.logger.Warn("Workflow history event count is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionCountShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistoryEventCount)
}

historySizeWarn := common.MinInt64(int64(c.shard.GetConfig().LargeShardHistorySizeMetricThreshold()), int64(c.shard.GetConfig().HistorySizeLimitWarn(c.GetDomainName())))
// check if the new history size is greater than our threshold and only count/log it once when it passes it
if oldHistorySize < historySizeWarn && c.stats.HistorySize >= historySizeWarn {
c.logger.Warn("Workflow history event size is reaching dangerous levels", tag.WorkflowDomainName(c.GetDomainName()),
tag.WorkflowID(c.workflowExecution.GetWorkflowID()), tag.ShardID(c.shard.GetShardID()), tag.WorkflowRunID(c.workflowExecution.GetRunID()))
c.metricsClient.Scope(metrics.LargeExecutionSizeShardScope, metrics.ShardIDTag(shardIDStr)).IncCounter(metrics.LargeHistorySizeCount)
}
}
}

func emitWorkflowHistoryStats(
metricsClient metrics.Client,
domainName string,
Expand Down
14 changes: 14 additions & 0 deletions service/history/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -266,6 +268,18 @@ func (s *taskSuite) TestTaskNack_ResubmitFailed() {
s.Equal(t.TaskStateNacked, task.State())
}

func (s *taskSuite) TestHandleErr_ErrMaxAttempts() {
taskBase := s.newTestTask(func(task Info) (bool, error) {
return true, nil
}, nil)

taskBase.criticalRetryCount = func(i ...dynamicconfig.FilterOption) int { return 0 }
s.mockTaskInfo.EXPECT().GetTaskType().Return(0)
assert.NotPanics(s.T(), func() {
taskBase.HandleErr(errors.New("err"))
})
}

func (s *taskSuite) newTestTask(
taskFilter Filter,
redispatchFn func(task Task),
Expand Down

0 comments on commit 9f8a8d7

Please # to comment.