Skip to content

Commit

Permalink
Release shard lock earlier during delete workflow execution (#3028)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jul 8, 2022
1 parent 05a90f8 commit 082d7c9
Showing 1 changed file with 41 additions and 40 deletions.
81 changes: 41 additions & 40 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,54 +930,55 @@ func (s *ContextImpl) DeleteWorkflowExecution(
}
}

s.wLock()
defer s.wUnlock()
// Wrap step 1 and 2 with function to release the lock with defer after step 2.
err = func() error {
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return err
}
if err := s.errorByStateLocked(); err != nil {
return err
}

// Step 1. Delete visibility.
if deleteVisibilityRecord {
addTasksRequest := &persistence.AddHistoryTasksRequest{
// Step 1. Delete visibility.
if deleteVisibilityRecord {
addTasksRequest := &persistence.AddHistoryTasksRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

Tasks: map[tasks.Category][]tasks.Task{
tasks.CategoryVisibility: {
&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: newTaskVersion,
StartTime: startTime,
CloseTime: closeTime,
},
},
},
}
err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry)
if err != nil {
return err
}
}

// Step 2. Delete current workflow execution pointer.
delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

Tasks: map[tasks.Category][]tasks.Task{
tasks.CategoryVisibility: {
&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: newTaskVersion,
StartTime: startTime,
CloseTime: closeTime,
},
},
},
}
err = s.addTasksLocked(ctx, addTasksRequest, namespaceEntry)
if err != nil {
return err
op := func() error {
return s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest)
}
}

// Step 2. Delete current workflow execution pointer.
delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,
}
op := func() error {
return s.GetExecutionManager().DeleteCurrentWorkflowExecution(ctx, delCurRequest)
}
err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
return err
}
}()

// Step 3. Delete workflow mutable state.
delRequest := &persistence.DeleteWorkflowExecutionRequest{
Expand All @@ -986,7 +987,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
WorkflowID: key.WorkflowID,
RunID: key.RunID,
}
op = func() error {
op := func() error {
return s.GetExecutionManager().DeleteWorkflowExecution(ctx, delRequest)
}
err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
Expand Down

0 comments on commit 082d7c9

Please # to comment.