Skip to content

Commit

Permalink
Change execution scavenger to call admin delete (#3526)
Browse files Browse the repository at this point in the history
* Change execution scavenger to call admin delete
  • Loading branch information
yux0 authored Oct 26, 2022
1 parent b42bc85 commit 119478a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 11 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ const (
ExecutionScannerPerShardQPS = "worker.executionScannerPerShardQPS"
// ExecutionDataDurationBuffer is the data TTL duration buffer of execution data
ExecutionDataDurationBuffer = "worker.executionDataDurationBuffer"
// ExecutionScannerWorkerCount is the execution scavenger worker count
ExecutionScannerWorkerCount = "worker.executionScannerWorkerCount"
// TaskQueueScannerEnabled indicates if task queue scanner should be started as part of worker.Scanner
TaskQueueScannerEnabled = "worker.taskQueueScannerEnabled"
// HistoryScannerEnabled indicates if history scanner should be started as part of worker.Scanner
Expand Down
11 changes: 8 additions & 3 deletions service/worker/scanner/executions/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync/atomic"
"time"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
Expand All @@ -44,9 +45,8 @@ import (
)

const (
executorPoolSize = 4
executorPollInterval = time.Minute
executorMaxDeferredTasks = 10000
executorMaxDeferredTasks = 50000
)

type (
Expand All @@ -59,6 +59,7 @@ type (
executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
executor executor.Executor
rateLimiter quotas.RateLimiter
perShardQPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -87,9 +88,11 @@ func NewScavenger(
perHostQPS dynamicconfig.IntPropertyFn,
perShardQPS dynamicconfig.IntPropertyFn,
executionDataDurationBuffer dynamicconfig.DurationPropertyFn,
executionTaskWorker dynamicconfig.IntPropertyFn,
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
metricsClient metrics.Client,
logger log.Logger,
) *Scavenger {
Expand All @@ -99,8 +102,9 @@ func NewScavenger(
executionManager: executionManager,
registry: registry,
historyClient: historyClient,
adminClient: adminClient,
executor: executor.NewFixedSizePoolExecutor(
executorPoolSize,
executionTaskWorker(),
executorMaxDeferredTasks,
metricsClient,
metrics.ExecutionsScavengerScope,
Expand Down Expand Up @@ -170,6 +174,7 @@ func (s *Scavenger) run() {
s.executionManager,
s.registry,
s.historyClient,
s.adminClient,
s.metrics,
s.logger,
s,
Expand Down
26 changes: 20 additions & 6 deletions service/worker/scanner/executions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -61,6 +61,7 @@ type (
executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
metrics metrics.Client
logger log.Logger
scavenger *Scavenger
Expand All @@ -79,6 +80,7 @@ func newTask(
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
metrics metrics.Client,
logger log.Logger,
scavenger *Scavenger,
Expand All @@ -90,6 +92,7 @@ func newTask(
executionManager: executionManager,
registry: registry,
historyClient: historyClient,
adminClient: adminClient,

metrics: metrics,
logger: logger,
Expand Down Expand Up @@ -208,14 +211,25 @@ func (t *task) handleFailures(
case mutableStateRetentionFailureType:
executionInfo := mutableState.GetExecutionInfo()
runID := mutableState.GetExecutionState().GetRunId()
_, err := t.historyClient.DeleteWorkflowExecution(t.ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: executionInfo.GetNamespaceId(),
WorkflowExecution: &commonpb.WorkflowExecution{
ns, err := t.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId()))
switch err.(type) {
case *serviceerror.NotFound,
*serviceerror.NamespaceNotFound:
t.logger.Error("Garbage data in DB after namespace is deleted", tag.WorkflowNamespaceID(executionInfo.GetNamespaceId()))
// We cannot do much in this case. It just ignores this error.
return nil
case nil:
// continue to delete
default:
return err
}

_, err = t.adminClient.DeleteWorkflowExecution(t.ctx, &adminservice.DeleteWorkflowExecutionRequest{
Namespace: ns.Name().String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.GetWorkflowId(),
RunId: runID,
},
WorkflowVersion: common.EmptyVersion,
ClosedWorkflowOnly: true,
})
switch err.(type) {
case *serviceerror.NotFound,
Expand Down
6 changes: 6 additions & 0 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/headers"
Expand Down Expand Up @@ -76,6 +77,8 @@ type (
ExecutionScannerPerShardQPS dynamicconfig.IntPropertyFn
// ExecutionDataDurationBuffer is the data TTL duration buffer of execution data
ExecutionDataDurationBuffer dynamicconfig.DurationPropertyFn
// ExecutionScannerWorkerCount is the execution scavenger task worker number
ExecutionScannerWorkerCount dynamicconfig.IntPropertyFn
}

// scannerContext is the context object that get's
Expand All @@ -88,6 +91,7 @@ type (
executionManager persistence.ExecutionManager
taskManager persistence.TaskManager
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
workerFactory sdk.WorkerFactory
namespaceRegistry namespace.Registry
}
Expand All @@ -114,6 +118,7 @@ func New(
executionManager persistence.ExecutionManager,
taskManager persistence.TaskManager,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
registry namespace.Registry,
workerFactory sdk.WorkerFactory,
) *Scanner {
Expand All @@ -126,6 +131,7 @@ func New(
executionManager: executionManager,
taskManager: taskManager,
historyClient: historyClient,
adminClient: adminClient,
workerFactory: workerFactory,
namespaceRegistry: registry,
},
Expand Down
3 changes: 3 additions & 0 deletions service/worker/scanner/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"

"go.temporal.io/server/api/adminservicemock/v1"
"go.temporal.io/server/api/historyservicemock/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (s *scannerTestSuite) TestScannerEnabled() {
mockSdkClientFactory := sdk.NewMockClientFactory(ctrl)
mockSdkClient := mocksdk.NewMockClient(ctrl)
mockNamespaceRegistry := namespace.NewMockRegistry(ctrl)
mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl)
scanner := New(
log.NewNoopLogger(),
&Config{
Expand Down Expand Up @@ -195,6 +197,7 @@ func (s *scannerTestSuite) TestScannerEnabled() {
p.NewMockExecutionManager(ctrl),
p.NewMockTaskManager(ctrl),
historyservicemock.NewMockHistoryServiceClient(ctrl),
mockAdminClient,
mockNamespaceRegistry,
mockWorkerFactory,
)
Expand Down
2 changes: 2 additions & 0 deletions service/worker/scanner/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ func ExecutionsScavengerActivity(
ctx.cfg.ExecutionScannerPerHostQPS,
ctx.cfg.ExecutionScannerPerShardQPS,
ctx.cfg.ExecutionDataDurationBuffer,
ctx.cfg.ExecutionScannerWorkerCount,
ctx.executionManager,
ctx.namespaceRegistry,
ctx.historyClient,
ctx.adminClient,
metricsClient,
ctx.logger,
)
Expand Down
17 changes: 15 additions & 2 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func NewService(
perNamespaceWorkerManager: perNamespaceWorkerManager,
workerFactory: workerFactory,
}
s.initScanner()
if err := s.initScanner(); err != nil {
return nil, err
}
return s, nil
}

Expand Down Expand Up @@ -298,6 +300,10 @@ func NewConfig(dc *dynamicconfig.Collection, persistenceConfig *config.Persisten
dynamicconfig.ExecutionDataDurationBuffer,
time.Hour*24*30,
),
ExecutionScannerWorkerCount: dc.GetIntProperty(
dynamicconfig.ExecutionScannerWorkerCount,
8,
),
},
EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true),
BatcherRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BatcherRPS, batcher.DefaultRPS),
Expand Down Expand Up @@ -468,7 +474,12 @@ func (s *Service) startBatcher() {
}
}

func (s *Service) initScanner() {
func (s *Service) initScanner() error {
currentCluster := s.clusterMetadata.GetCurrentClusterName()
adminClient, err := s.clientBean.GetRemoteAdminClient(currentCluster)
if err != nil {
return err
}
s.scanner = scanner.New(
s.logger,
s.config.ScannerCfg,
Expand All @@ -477,9 +488,11 @@ func (s *Service) initScanner() {
s.executionManager,
s.taskManager,
s.historyClient,
adminClient,
s.namespaceRegistry,
s.workerFactory,
)
return nil
}

func (s *Service) startScanner() {
Expand Down

0 comments on commit 119478a

Please # to comment.