diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index c29f1443394..cbf4d8186de 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -645,6 +645,8 @@ const ( ExecutionScannerPerShardQPS = "worker.executionScannerPerShardQPS" // ExecutionDataDurationBuffer is the data TTL duration buffer of execution data ExecutionDataDurationBuffer = "worker.executionDataDurationBuffer" + // ExecutionScannerWorkerCount is the execution scavenger worker count + ExecutionScannerWorkerCount = "worker.executionScannerWorkerCount" // TaskQueueScannerEnabled indicates if task queue scanner should be started as part of worker.Scanner TaskQueueScannerEnabled = "worker.taskQueueScannerEnabled" // HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner diff --git a/service/worker/scanner/executions/scavenger.go b/service/worker/scanner/executions/scavenger.go index ec40ea7342e..a5ea2d3b278 100644 --- a/service/worker/scanner/executions/scavenger.go +++ b/service/worker/scanner/executions/scavenger.go @@ -30,6 +30,7 @@ import ( "sync/atomic" "time" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log/tag" @@ -44,9 +45,8 @@ import ( ) const ( - executorPoolSize = 4 executorPollInterval = time.Minute - executorMaxDeferredTasks = 10000 + executorMaxDeferredTasks = 50000 ) type ( @@ -59,6 +59,7 @@ type ( executionManager persistence.ExecutionManager registry namespace.Registry historyClient historyservice.HistoryServiceClient + adminClient adminservice.AdminServiceClient executor executor.Executor rateLimiter quotas.RateLimiter perShardQPS dynamicconfig.IntPropertyFn @@ -87,9 +88,11 @@ func NewScavenger( perHostQPS dynamicconfig.IntPropertyFn, perShardQPS dynamicconfig.IntPropertyFn, executionDataDurationBuffer dynamicconfig.DurationPropertyFn, + executionTaskWorker dynamicconfig.IntPropertyFn, executionManager persistence.ExecutionManager, registry namespace.Registry, historyClient historyservice.HistoryServiceClient, + adminClient adminservice.AdminServiceClient, metricsClient metrics.Client, logger log.Logger, ) *Scavenger { @@ -99,8 +102,9 @@ func NewScavenger( executionManager: executionManager, registry: registry, historyClient: historyClient, + adminClient: adminClient, executor: executor.NewFixedSizePoolExecutor( - executorPoolSize, + executionTaskWorker(), executorMaxDeferredTasks, metricsClient, metrics.ExecutionsScavengerScope, @@ -170,6 +174,7 @@ func (s *Scavenger) run() { s.executionManager, s.registry, s.historyClient, + s.adminClient, s.metrics, s.logger, s, diff --git a/service/worker/scanner/executions/task.go b/service/worker/scanner/executions/task.go index 0611b8eb524..9e41a3dc7c0 100644 --- a/service/worker/scanner/executions/task.go +++ b/service/worker/scanner/executions/task.go @@ -31,9 +31,9 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" @@ -61,6 +61,7 @@ type ( executionManager persistence.ExecutionManager registry namespace.Registry historyClient historyservice.HistoryServiceClient + adminClient adminservice.AdminServiceClient metrics metrics.Client logger log.Logger scavenger *Scavenger @@ -79,6 +80,7 @@ func newTask( executionManager persistence.ExecutionManager, registry namespace.Registry, historyClient historyservice.HistoryServiceClient, + adminClient adminservice.AdminServiceClient, metrics metrics.Client, logger log.Logger, scavenger *Scavenger, @@ -90,6 +92,7 @@ func newTask( executionManager: executionManager, registry: registry, historyClient: historyClient, + adminClient: adminClient, metrics: metrics, logger: logger, @@ -208,14 +211,25 @@ func (t *task) handleFailures( case mutableStateRetentionFailureType: executionInfo := mutableState.GetExecutionInfo() runID := mutableState.GetExecutionState().GetRunId() - _, err := t.historyClient.DeleteWorkflowExecution(t.ctx, &historyservice.DeleteWorkflowExecutionRequest{ - NamespaceId: executionInfo.GetNamespaceId(), - WorkflowExecution: &commonpb.WorkflowExecution{ + ns, err := t.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId())) + switch err.(type) { + case *serviceerror.NotFound, + *serviceerror.NamespaceNotFound: + t.logger.Error("Garbage data in DB after namespace is deleted", tag.WorkflowNamespaceID(executionInfo.GetNamespaceId())) + // We cannot do much in this case. It just ignores this error. + return nil + case nil: + // continue to delete + default: + return err + } + + _, err = t.adminClient.DeleteWorkflowExecution(t.ctx, &adminservice.DeleteWorkflowExecutionRequest{ + Namespace: ns.Name().String(), + Execution: &commonpb.WorkflowExecution{ WorkflowId: executionInfo.GetWorkflowId(), RunId: runID, }, - WorkflowVersion: common.EmptyVersion, - ClosedWorkflowOnly: true, }) switch err.(type) { case *serviceerror.NotFound, diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index b0c5c7cd031..157105b8eab 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -35,6 +35,7 @@ import ( "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/config" "go.temporal.io/server/common/headers" @@ -76,6 +77,8 @@ type ( ExecutionScannerPerShardQPS dynamicconfig.IntPropertyFn // ExecutionDataDurationBuffer is the data TTL duration buffer of execution data ExecutionDataDurationBuffer dynamicconfig.DurationPropertyFn + // ExecutionScannerWorkerCount is the execution scavenger task worker number + ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn } // scannerContext is the context object that get's @@ -88,6 +91,7 @@ type ( executionManager persistence.ExecutionManager taskManager persistence.TaskManager historyClient historyservice.HistoryServiceClient + adminClient adminservice.AdminServiceClient workerFactory sdk.WorkerFactory namespaceRegistry namespace.Registry } @@ -114,6 +118,7 @@ func New( executionManager persistence.ExecutionManager, taskManager persistence.TaskManager, historyClient historyservice.HistoryServiceClient, + adminClient adminservice.AdminServiceClient, registry namespace.Registry, workerFactory sdk.WorkerFactory, ) *Scanner { @@ -126,6 +131,7 @@ func New( executionManager: executionManager, taskManager: taskManager, historyClient: historyClient, + adminClient: adminClient, workerFactory: workerFactory, namespaceRegistry: registry, }, diff --git a/service/worker/scanner/scanner_test.go b/service/worker/scanner/scanner_test.go index 06caffb682e..46f8f15329e 100644 --- a/service/worker/scanner/scanner_test.go +++ b/service/worker/scanner/scanner_test.go @@ -30,6 +30,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + "go.temporal.io/server/api/adminservicemock/v1" "go.temporal.io/server/api/historyservicemock/v1" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" @@ -156,6 +157,7 @@ func (s *scannerTestSuite) TestScannerEnabled() { mockSdkClientFactory := sdk.NewMockClientFactory(ctrl) mockSdkClient := mocksdk.NewMockClient(ctrl) mockNamespaceRegistry := namespace.NewMockRegistry(ctrl) + mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl) scanner := New( log.NewNoopLogger(), &Config{ @@ -195,6 +197,7 @@ func (s *scannerTestSuite) TestScannerEnabled() { p.NewMockExecutionManager(ctrl), p.NewMockTaskManager(ctrl), historyservicemock.NewMockHistoryServiceClient(ctrl), + mockAdminClient, mockNamespaceRegistry, mockWorkerFactory, ) diff --git a/service/worker/scanner/workflow.go b/service/worker/scanner/workflow.go index a79784e303f..f9bec38fe50 100644 --- a/service/worker/scanner/workflow.go +++ b/service/worker/scanner/workflow.go @@ -191,9 +191,11 @@ func ExecutionsScavengerActivity( ctx.cfg.ExecutionScannerPerHostQPS, ctx.cfg.ExecutionScannerPerShardQPS, ctx.cfg.ExecutionDataDurationBuffer, + ctx.cfg.ExecutionScannerWorkerCount, ctx.executionManager, ctx.namespaceRegistry, ctx.historyClient, + ctx.adminClient, metricsClient, ctx.logger, ) diff --git a/service/worker/service.go b/service/worker/service.go index 93f3d5f58a3..a4d3945fe17 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -186,7 +186,9 @@ func NewService( perNamespaceWorkerManager: perNamespaceWorkerManager, workerFactory: workerFactory, } - s.initScanner() + if err := s.initScanner(); err != nil { + return nil, err + } return s, nil } @@ -298,6 +300,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten dynamicconfig.ExecutionDataDurationBuffer, time.Hour*24*30, ), + ExecutionScannerWorkerCount: dc.GetIntProperty( + dynamicconfig.ExecutionScannerWorkerCount, + 8, + ), }, EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true), BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS), @@ -468,7 +474,12 @@ func (s *Service) startBatcher() { } } -func (s *Service) initScanner() { +func (s *Service) initScanner() error { + currentCluster := s.clusterMetadata.GetCurrentClusterName() + adminClient, err := s.clientBean.GetRemoteAdminClient(currentCluster) + if err != nil { + return err + } s.scanner = scanner.New( s.logger, s.config.ScannerCfg, @@ -477,9 +488,11 @@ func (s *Service) initScanner() { s.executionManager, s.taskManager, s.historyClient, + adminClient, s.namespaceRegistry, s.workerFactory, ) + return nil } func (s *Service) startScanner() {