From a067c55b97c37479e02d3198e301847d603d047d Mon Sep 17 00:00:00 2001 From: wenquan xing Date: Thu, 8 Sep 2022 10:44:14 -0700 Subject: [PATCH] Move remove signal from mutable state to api package --- .../api/removesignalmutablestate/api.go | 78 +++++++++++++++++++ .../api/respondactivitytaskcanceled/api.go | 2 +- service/history/handler.go | 4 +- service/history/historyEngine.go | 41 ++-------- service/history/historyEngine_test.go | 4 +- service/history/shard/engine.go | 2 +- service/history/shard/engine_mock.go | 7 +- 7 files changed, 94 insertions(+), 44 deletions(-) create mode 100644 service/history/api/removesignalmutablestate/api.go diff --git a/service/history/api/removesignalmutablestate/api.go b/service/history/api/removesignalmutablestate/api.go new file mode 100644 index 00000000000..87b98ddd82b --- /dev/null +++ b/service/history/api/removesignalmutablestate/api.go @@ -0,0 +1,78 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package removesignalmutablestate + +import ( + "context" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/consts" + "go.temporal.io/server/service/history/shard" +) + +func Invoke( + ctx context.Context, + req *historyservice.RemoveSignalMutableStateRequest, + shard shard.Context, + workflowConsistencyChecker api.WorkflowConsistencyChecker, +) (resp *historyservice.RemoveSignalMutableStateResponse, retError error) { + _, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + if err != nil { + return nil, err + } + + err = api.GetAndUpdateWorkflowWithNew( + ctx, + nil, + api.BypassMutableStateConsistencyPredicate, + definition.NewWorkflowKey( + req.NamespaceId, + req.WorkflowExecution.WorkflowId, + req.WorkflowExecution.RunId, + ), + func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) { + mutableState := workflowContext.GetMutableState() + if !mutableState.IsWorkflowExecutionRunning() { + return nil, consts.ErrWorkflowCompleted + } + + mutableState.DeleteSignalRequested(req.GetRequestId()) + return &api.UpdateWorkflowAction{ + Noop: false, + CreateWorkflowTask: false, + }, nil + }, + nil, + shard, + workflowConsistencyChecker, + ) + if err != nil { + return nil, err + } + return &historyservice.RemoveSignalMutableStateResponse{}, nil +} diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index a7bb7c8dd6a..3a43fb68b4b 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -22,7 +22,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package respondactivitytaskcandeled +package respondactivitytaskcanceled import ( "context" diff --git a/service/history/handler.go b/service/history/handler.go index 36195298c27..2ce26646ad8 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -956,12 +956,12 @@ func (h *Handler) RemoveSignalMutableState(ctx context.Context, request *history return nil, h.convertError(err) } - err2 := engine.RemoveSignalMutableState(ctx, request) + resp, err2 := engine.RemoveSignalMutableState(ctx, request) if err2 != nil { return nil, h.convertError(err2) } - return &historyservice.RemoveSignalMutableStateResponse{}, nil + return resp, nil } // TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 0a4fe6d72bc..c4bc2db7136 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -62,12 +62,13 @@ import ( "go.temporal.io/server/service/history/api/recordactivitytaskheartbeat" "go.temporal.io/server/service/history/api/recordactivitytaskstarted" "go.temporal.io/server/service/history/api/recordchildworkflowcompleted" + "go.temporal.io/server/service/history/api/removesignalmutablestate" replicationapi "go.temporal.io/server/service/history/api/replication" "go.temporal.io/server/service/history/api/replicationadmin" "go.temporal.io/server/service/history/api/requestcancelworkflow" "go.temporal.io/server/service/history/api/resetstickytaskqueue" "go.temporal.io/server/service/history/api/resetworkflow" - respondactivitytaskcandeled "go.temporal.io/server/service/history/api/respondactivitytaskcanceled" + "go.temporal.io/server/service/history/api/respondactivitytaskcanceled" "go.temporal.io/server/service/history/api/respondactivitytaskcompleted" "go.temporal.io/server/service/history/api/respondactivitytaskfailed" "go.temporal.io/server/service/history/api/signalwithstartworkflow" @@ -823,7 +824,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled( ctx context.Context, req *historyservice.RespondActivityTaskCanceledRequest, ) (*historyservice.RespondActivityTaskCanceledResponse, error) { - return respondactivitytaskcandeled.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker) + return respondactivitytaskcanceled.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker) } // RecordActivityTaskHeartbeat records an hearbeat for a task. @@ -871,39 +872,9 @@ func (h *historyEngineImpl) UpdateWorkflow( // RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate func (e *historyEngineImpl) RemoveSignalMutableState( ctx context.Context, - request *historyservice.RemoveSignalMutableStateRequest, -) error { - - _, err := e.getActiveNamespaceEntry(namespace.ID(request.GetNamespaceId())) - if err != nil { - return err - } - - return api.GetAndUpdateWorkflowWithNew( - ctx, - nil, - api.BypassMutableStateConsistencyPredicate, - definition.NewWorkflowKey( - request.NamespaceId, - request.WorkflowExecution.WorkflowId, - request.WorkflowExecution.RunId, - ), - func(workflowContext api.WorkflowContext) (*api.UpdateWorkflowAction, error) { - mutableState := workflowContext.GetMutableState() - if !mutableState.IsWorkflowExecutionRunning() { - return nil, consts.ErrWorkflowCompleted - } - - mutableState.DeleteSignalRequested(request.GetRequestId()) - return &api.UpdateWorkflowAction{ - Noop: false, - CreateWorkflowTask: false, - }, nil - }, - nil, - e.shard, - e.workflowConsistencyChecker, - ) + req *historyservice.RemoveSignalMutableStateRequest, +) (*historyservice.RemoveSignalMutableStateResponse, error) { + return removesignalmutablestate.Invoke(ctx, req, e.shard, e.workflowConsistencyChecker) } func (e *historyEngineImpl) TerminateWorkflowExecution( diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 5457cda35cc..a4ef03cc320 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -4893,7 +4893,7 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() { func (s *engineSuite) TestRemoveSignalMutableState() { removeRequest := &historyservice.RemoveSignalMutableStateRequest{} - err := s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest) + _, err := s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest) s.EqualError(err, "Missing namespace UUID.") execution := commonpb.WorkflowExecution{ @@ -4920,7 +4920,7 @@ func (s *engineSuite) TestRemoveSignalMutableState() { s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(gwmsResponse, nil) s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil) - err = s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest) + _, err = s.mockHistoryEngine.RemoveSignalMutableState(context.Background(), removeRequest) s.Nil(err) } diff --git a/service/history/shard/engine.go b/service/history/shard/engine.go index 54fe8bd20ed..a58039a2473 100644 --- a/service/history/shard/engine.go +++ b/service/history/shard/engine.go @@ -63,7 +63,7 @@ type ( RequestCancelWorkflowExecution(ctx context.Context, request *historyservice.RequestCancelWorkflowExecutionRequest) (*historyservice.RequestCancelWorkflowExecutionResponse, error) SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error) SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error) - RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) error + RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) (*historyservice.RemoveSignalMutableStateResponse, error) TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error) DeleteWorkflowExecution(ctx context.Context, deleteRequest *historyservice.DeleteWorkflowExecutionRequest) error ResetWorkflowExecution(ctx context.Context, request *historyservice.ResetWorkflowExecutionRequest) (*historyservice.ResetWorkflowExecutionResponse, error) diff --git a/service/history/shard/engine_mock.go b/service/history/shard/engine_mock.go index 6d9be178faa..809ef7fff7a 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/shard/engine_mock.go @@ -387,11 +387,12 @@ func (mr *MockEngineMockRecorder) RefreshWorkflowTasks(ctx, namespaceUUID, execu } // RemoveSignalMutableState mocks base method. -func (m *MockEngine) RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) error { +func (m *MockEngine) RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) (*historyservice.RemoveSignalMutableStateResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RemoveSignalMutableState", ctx, request) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(*historyservice.RemoveSignalMutableStateResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 } // RemoveSignalMutableState indicates an expected call of RemoveSignalMutableState.