diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index d7717d8f2f7..1b3467ae966 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -558,7 +558,7 @@ func (s *ContextImpl) AddTasks( ctx context.Context, request *persistence.AddHistoryTasksRequest, ) error { - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return err } @@ -596,7 +596,7 @@ func (s *ContextImpl) CreateWorkflowExecution( ctx context.Context, request *persistence.CreateWorkflowExecutionRequest, ) (*persistence.CreateWorkflowExecutionResponse, error) { - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return nil, err } @@ -640,7 +640,7 @@ func (s *ContextImpl) UpdateWorkflowExecution( ctx context.Context, request *persistence.UpdateWorkflowExecutionRequest, ) (*persistence.UpdateWorkflowExecutionResponse, error) { - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return nil, err } @@ -711,7 +711,7 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution( ctx context.Context, request *persistence.ConflictResolveWorkflowExecutionRequest, ) (*persistence.ConflictResolveWorkflowExecutionResponse, error) { - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return nil, err } @@ -775,7 +775,7 @@ func (s *ContextImpl) SetWorkflowExecution( ctx context.Context, request *persistence.SetWorkflowExecutionRequest, ) (*persistence.SetWorkflowExecutionResponse, error) { - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return nil, err } @@ -925,7 +925,7 @@ func (s *ContextImpl) DeleteWorkflowExecution( // The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually. // Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state). - ctx, cancel, err := s.ensureMinContextTimeout(ctx) + ctx, cancel, err := s.newDetachedContext(ctx) if err != nil { return err } @@ -1954,21 +1954,28 @@ func (s *ContextImpl) GetArchivalMetadata() archiver.ArchivalMetadata { return s.archivalMetadata } -func (s *ContextImpl) ensureMinContextTimeout( +func (s *ContextImpl) newDetachedContext( ctx context.Context, ) (context.Context, context.CancelFunc, error) { if err := ctx.Err(); err != nil { return nil, nil, err } + detachedContext := rpc.CopyContextValues(s.lifecycleCtx, ctx) + + var cancel context.CancelFunc deadline, ok := ctx.Deadline() - if !ok || deadline.Sub(s.GetTimeSource().Now()) >= minContextTimeout { - return ctx, func() {}, nil + if ok { + timeout := deadline.Sub(s.GetTimeSource().Now()) + if timeout < minContextTimeout { + timeout = minContextTimeout + } + detachedContext, cancel = context.WithTimeout(detachedContext, timeout) + } else { + cancel = func() {} } - newContext, cancel := context.WithTimeout(context.Background(), minContextTimeout) - newContext = rpc.CopyContextValues(newContext, ctx) - return newContext, cancel, nil + return detachedContext, cancel, nil } func (s *ContextImpl) newIOContext() (context.Context, context.CancelFunc) {