From 082d7c9cccd233a6b35c8daf250c7e827826a011 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Mon, 27 Jun 2022 23:38:09 -0700 Subject: [PATCH] Release shard lock earlier during delete workflow execution (#3028) --- service/history/shard/context_impl.go | 81 ++++++++++++++------------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 34a5d2baa29..29cde7edaf3 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -930,54 +930,55 @@ func (s *ContextImpl) DeleteWorkflowExecution( } } - s.wLock() - defer s.wUnlock() + // Wrap step 1 and 2 with function to release the lock with defer after step 2. + err = func() error { + s.wLock() + defer s.wUnlock() - if err := s.errorByStateLocked(); err != nil { - return err - } + if err := s.errorByStateLocked(); err != nil { + return err + } - // Step 1. Delete visibility. - if deleteVisibilityRecord { - addTasksRequest := &persistence.AddHistoryTasksRequest{ + // Step 1. Delete visibility. + if deleteVisibilityRecord { + addTasksRequest := &persistence.AddHistoryTasksRequest{ + ShardID: s.shardID, + NamespaceID: key.NamespaceID, + WorkflowID: key.WorkflowID, + RunID: key.RunID, + + Tasks: map[tasks.Category][]tasks.Task{ + tasks.CategoryVisibility: { + &tasks.DeleteExecutionVisibilityTask{ + // TaskID is set by addTasksLocked + WorkflowKey: key, + VisibilityTimestamp: s.timeSource.Now(), + Version: newTaskVersion, + StartTime: startTime, + CloseTime: closeTime, + }, + }, + }, + } + err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry) + if err != nil { + return err + } + } + + // Step 2. Delete current workflow execution pointer. + delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{ ShardID: s.shardID, NamespaceID: key.NamespaceID, WorkflowID: key.WorkflowID, RunID: key.RunID, - - Tasks: map[tasks.Category][]tasks.Task{ - tasks.CategoryVisibility: { - &tasks.DeleteExecutionVisibilityTask{ - // TaskID is set by addTasksLocked - WorkflowKey: key, - VisibilityTimestamp: s.timeSource.Now(), - Version: newTaskVersion, - StartTime: startTime, - CloseTime: closeTime, - }, - }, - }, } - err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry) - if err != nil { - return err + op := func() error { + return s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest) } - } - - // Step 2. Delete current workflow execution pointer. - delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{ - ShardID: s.shardID, - NamespaceID: key.NamespaceID, - WorkflowID: key.WorkflowID, - RunID: key.RunID, - } - op := func() error { - return s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest) - } - err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) - if err != nil { + err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError) return err - } + }() // Step 3. Delete workflow mutable state. delRequest := &persistence.DeleteWorkflowExecutionRequest{ @@ -986,7 +987,7 @@ func (s *ContextImpl) DeleteWorkflowExecution( WorkflowID: key.WorkflowID, RunID: key.RunID, } - op = func() error { + op := func() error { return s.GetExecutionManager().DeleteWorkflowExecution(ctx, delRequest) } err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)