Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Change execution scavenger to call admin delete #3526

Merged
merged 5 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions service/worker/scanner/executions/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync/atomic"
"time"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -59,6 +60,7 @@ type (
executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
executor executor.Executor
rateLimiter quotas.RateLimiter
perShardQPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -90,6 +92,7 @@ func NewScavenger(
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
metricsClient metrics.Client,
logger log.Logger,
) *Scavenger {
Expand All @@ -99,6 +102,7 @@ func NewScavenger(
executionManager: executionManager,
registry: registry,
historyClient: historyClient,
adminClient: adminClient,
executor: executor.NewFixedSizePoolExecutor(
executorPoolSize,
executorMaxDeferredTasks,
Expand Down Expand Up @@ -170,6 +174,7 @@ func (s *Scavenger) run() {
s.executionManager,
s.registry,
s.historyClient,
s.adminClient,
s.metrics,
s.logger,
s,
Expand Down
26 changes: 20 additions & 6 deletions service/worker/scanner/executions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -61,6 +61,7 @@ type (
executionManager persistence.ExecutionManager
registry namespace.Registry
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
metrics metrics.Client
logger log.Logger
scavenger *Scavenger
Expand All @@ -79,6 +80,7 @@ func newTask(
executionManager persistence.ExecutionManager,
registry namespace.Registry,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
metrics metrics.Client,
logger log.Logger,
scavenger *Scavenger,
Expand All @@ -90,6 +92,7 @@ func newTask(
executionManager: executionManager,
registry: registry,
historyClient: historyClient,
adminClient: adminClient,

metrics: metrics,
logger: logger,
Expand Down Expand Up @@ -208,14 +211,25 @@ func (t *task) handleFailures(
case mutableStateRetentionFailureType:
executionInfo := mutableState.GetExecutionInfo()
runID := mutableState.GetExecutionState().GetRunId()
_, err := t.historyClient.DeleteWorkflowExecution(t.ctx, &historyservice.DeleteWorkflowExecutionRequest{
NamespaceId: executionInfo.GetNamespaceId(),
WorkflowExecution: &commonpb.WorkflowExecution{
ns, err := t.registry.GetNamespaceByID(namespace.ID(executionInfo.GetNamespaceId()))
switch err.(type) {
case *serviceerror.NotFound,
*serviceerror.NamespaceNotFound:
t.logger.Error("Garbage data in DB after namespace is deleted", tag.WorkflowNamespaceID(executionInfo.GetNamespaceId()))
// We cannot do much in this case. It just ignores this error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, looks like we still need a way to clean up in this case. Admin DeleteWorkflowExecution should probably take in namespaceID instead of namespaceName?

Maybe create a task for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil
case nil:
// continue to delete
default:
return err
}

_, err = t.adminClient.DeleteWorkflowExecution(t.ctx, &adminservice.DeleteWorkflowExecutionRequest{
Namespace: ns.Name().String(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.GetWorkflowId(),
RunId: runID,
},
WorkflowVersion: common.EmptyVersion,
ClosedWorkflowOnly: true,
})
switch err.(type) {
case *serviceerror.NotFound,
Expand Down
4 changes: 4 additions & 0 deletions service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/headers"
Expand Down Expand Up @@ -88,6 +89,7 @@ type (
executionManager persistence.ExecutionManager
taskManager persistence.TaskManager
historyClient historyservice.HistoryServiceClient
adminClient adminservice.AdminServiceClient
workerFactory sdk.WorkerFactory
namespaceRegistry namespace.Registry
}
Expand All @@ -114,6 +116,7 @@ func New(
executionManager persistence.ExecutionManager,
taskManager persistence.TaskManager,
historyClient historyservice.HistoryServiceClient,
adminClient adminservice.AdminServiceClient,
registry namespace.Registry,
workerFactory sdk.WorkerFactory,
) *Scanner {
Expand All @@ -126,6 +129,7 @@ func New(
executionManager: executionManager,
taskManager: taskManager,
historyClient: historyClient,
adminClient: adminClient,
workerFactory: workerFactory,
namespaceRegistry: registry,
},
Expand Down
1 change: 1 addition & 0 deletions service/worker/scanner/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func ExecutionsScavengerActivity(
ctx.executionManager,
ctx.namespaceRegistry,
ctx.historyClient,
ctx.adminClient,
metricsClient,
ctx.logger,
)
Expand Down
13 changes: 11 additions & 2 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func NewService(
perNamespaceWorkerManager: perNamespaceWorkerManager,
workerFactory: workerFactory,
}
s.initScanner()
if err := s.initScanner(); err != nil {
return nil, err
}
return s, nil
}

Expand Down Expand Up @@ -468,7 +470,12 @@ func (s *Service) startBatcher() {
}
}

func (s *Service) initScanner() {
func (s *Service) initScanner() error {
currentCluster := s.clusterMetadata.GetCurrentClusterName()
adminClient, err := s.clientBean.GetRemoteAdminClient(currentCluster)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a GetAdminClient to ClientBean to keep it similar to Get[Remote]FrontendClient, since frontend and admin clients work the same way.

also, I noticed Get/SetFrontendClient in clientBean.go don't do proper locking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to include this in the next available patch. So I created an issue to track here: #3532.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to fix locking bug in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I don't see caller in those setters. I will remove them in another PR.

if err != nil {
return err
}
s.scanner = scanner.New(
s.logger,
s.config.ScannerCfg,
Expand All @@ -477,9 +484,11 @@ func (s *Service) initScanner() {
s.executionManager,
s.taskManager,
s.historyClient,
adminClient,
s.namespaceRegistry,
s.workerFactory,
)
return nil
}

func (s *Service) startScanner() {
Expand Down