From b1dbd26693f403343e6733ecb8e043aa15a627a9 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 31 Jul 2023 16:07:25 -0700 Subject: [PATCH 1/2] Use raw history/matching client in task executor --- common/resource/fx.go | 22 ++++++++++------- service/frontend/fx.go | 7 +++--- service/frontend/operator_handler.go | 6 ++--- service/history/shard/context_factory.go | 4 ++-- .../timer_queue_active_task_executor.go | 7 +++--- service/history/timer_queue_factory.go | 6 ++--- .../timer_queue_standby_task_executor.go | 7 +++--- .../history/timer_queue_task_executor_base.go | 8 +++---- .../timer_queue_task_executor_base_test.go | 2 +- .../transfer_queue_active_task_executor.go | 24 ++++++++++--------- ...ransfer_queue_active_task_executor_test.go | 1 + service/history/transfer_queue_factory.go | 10 ++++---- .../transfer_queue_standby_task_executor.go | 12 ++++++---- ...ansfer_queue_standby_task_executor_test.go | 3 ++- .../transfer_queue_task_executor_base.go | 17 ++++++------- service/matching/fx.go | 3 +-- service/matching/handler.go | 6 ++--- service/matching/matching_engine.go | 11 +++++---- service/matching/matching_engine_test.go | 2 +- service/matching/task_queue_manager.go | 6 ++--- service/worker/deletenamespace/fx.go | 6 ++--- service/worker/migration/fx.go | 4 ++-- service/worker/scheduler/fx.go | 4 ++-- service/worker/service.go | 5 ++-- 24 files changed, 99 insertions(+), 84 deletions(-) diff --git a/common/resource/fx.go b/common/resource/fx.go index 6ac992cdd7f..355df2f5cfc 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -76,6 +76,9 @@ type ( InstanceID string ServiceNames map[primitives.ServiceName]struct{} + HistoryRawClient historyservice.HistoryServiceClient + HistoryClient historyservice.HistoryServiceClient + MatchingRawClient matchingservice.MatchingServiceClient MatchingClient matchingservice.MatchingServiceClient @@ -114,6 +117,7 @@ var Module = fx.Options( fx.Provide(GrpcListenerProvider), fx.Provide(RuntimeMetricsReporterProvider), metrics.RuntimeMetricsReporterLifetimeHooksModule, + fx.Provide(HistoryRawClientProvider), fx.Provide(HistoryClientProvider), fx.Provide(MatchingRawClientProvider), fx.Provide(MatchingClientProvider), @@ -305,20 +309,22 @@ func RegisterBootstrapContainer( ) } -func HistoryClientProvider(clientBean client.Bean) historyservice.HistoryServiceClient { - historyRawClient := clientBean.GetHistoryClient() - historyClient := history.NewRetryableClient( +func HistoryRawClientProvider(clientBean client.Bean) HistoryRawClient { + return clientBean.GetHistoryClient() +} + +func HistoryClientProvider(historyRawClient HistoryRawClient) HistoryClient { + return history.NewRetryableClient( historyRawClient, common.CreateHistoryClientRetryPolicy(), common.IsServiceClientTransientError, ) - return historyClient } -func MatchingRawClientProvider(clientBean client.Bean, namespaceRegistry namespace.Registry) ( - MatchingRawClient, - error, -) { +func MatchingRawClientProvider( + clientBean client.Bean, + namespaceRegistry namespace.Registry, +) (MatchingRawClient, error) { return clientBean.GetMatchingClient(namespaceRegistry.GetNamespaceName) } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 59603b2a753..0307480e921 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/health" "google.golang.org/grpc/keepalive" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" "go.temporal.io/server/common/archiver" @@ -470,7 +469,7 @@ func AdminHandlerProvider( persistenceMetadataManager persistence.MetadataManager, clientFactory client.Factory, clientBean client.Bean, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, sdkClientFactory sdk.ClientFactory, membershipMonitor membership.Monitor, hostInfoProvider membership.HostInfoProvider, @@ -527,7 +526,7 @@ func OperatorHandlerProvider( saProvider searchattribute.Provider, saManager searchattribute.Manager, healthServer *health.Server, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, clusterMetadataManager persistence.ClusterMetadataManager, clusterMetadata cluster.Metadata, clientFactory client.Factory, @@ -562,7 +561,7 @@ func HandlerProvider( clusterMetadataManager persistence.ClusterMetadataManager, persistenceMetadataManager persistence.MetadataManager, clientBean client.Bean, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, matchingClient resource.MatchingClient, archiverProvider provider.ArchiverProvider, metricsHandler metrics.Handler, diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 26dc8a51355..7a28cff8d85 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -42,7 +42,6 @@ import ( sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/api/adminservice/v1" - "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" svc "go.temporal.io/server/client" "go.temporal.io/server/client/admin" @@ -57,6 +56,7 @@ import ( "go.temporal.io/server/common/persistence/visibility/store/elasticsearch" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/util" @@ -82,7 +82,7 @@ type ( saProvider searchattribute.Provider saManager searchattribute.Manager healthServer *health.Server - historyClient historyservice.HistoryServiceClient + historyClient resource.HistoryClient clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory @@ -98,7 +98,7 @@ type ( SaProvider searchattribute.Provider SaManager searchattribute.Manager healthServer *health.Server - historyClient historyservice.HistoryServiceClient + historyClient resource.HistoryClient clusterMetadataManager persistence.ClusterMetadataManager clusterMetadata clustermetadata.Metadata clientFactory svc.Factory diff --git a/service/history/shard/context_factory.go b/service/history/shard/context_factory.go index c1272d99100..1a964a4d472 100644 --- a/service/history/shard/context_factory.go +++ b/service/history/shard/context_factory.go @@ -27,7 +27,6 @@ package shard import ( "go.uber.org/fx" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/clock" @@ -38,6 +37,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" ) @@ -59,7 +59,7 @@ type ( ClusterMetadata cluster.Metadata Config *configs.Config EngineFactory EngineFactory - HistoryClient historyservice.HistoryServiceClient + HistoryClient resource.HistoryClient HistoryServiceResolver membership.ServiceResolver HostInfoProvider membership.HostInfoProvider Logger log.Logger diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index 8d113ddbe57..57d93469c68 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -71,14 +72,14 @@ func newTimerQueueActiveTaskExecutor( logger log.Logger, metricProvider metrics.Handler, config *configs.Config, - matchingClient matchingservice.MatchingServiceClient, + matchingRawClient resource.MatchingRawClient, ) queues.Executor { return &timerQueueActiveTaskExecutor{ timerQueueTaskExecutorBase: newTimerQueueTaskExecutorBase( shard, workflowCache, workflowDeleteManager, - matchingClient, + matchingRawClient, logger, metricProvider, config, @@ -466,7 +467,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask( // NOTE: do not access anything related mutable state after this lock release release(nil) // release earlier as we don't need the lock anymore - _, retError = t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ + _, retError = t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ NamespaceId: task.GetNamespaceID(), Execution: &commonpb.WorkflowExecution{ WorkflowId: task.GetWorkflowID(), diff --git a/service/history/timer_queue_factory.go b/service/history/timer_queue_factory.go index 9616f374175..12718b3c5d9 100644 --- a/service/history/timer_queue_factory.go +++ b/service/history/timer_queue_factory.go @@ -57,7 +57,7 @@ type ( ClientBean client.Bean ArchivalClient archiver.Client - MatchingClient resource.MatchingClient + MatchingRawClient resource.MatchingRawClient VisibilityManager manager.VisibilityManager } @@ -133,7 +133,7 @@ func (f *timerQueueFactory) CreateQueue( logger, f.MetricsHandler, f.Config, - f.MatchingClient, + f.MatchingRawClient, ) standbyExecutor := newTimerQueueStandbyTaskExecutor( @@ -154,7 +154,7 @@ func (f *timerQueueFactory) CreateQueue( f.Config.StandbyTaskReReplicationContextTimeout, logger, ), - f.MatchingClient, + f.MatchingRawClient, logger, f.MetricsHandler, // note: the cluster name is for calculating time for standby tasks, diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index 82a8f949888..6fc11f3e473 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -66,7 +67,7 @@ func newTimerQueueStandbyTaskExecutor( workflowCache wcache.Cache, workflowDeleteManager deletemanager.DeleteManager, nDCHistoryResender xdc.NDCHistoryResender, - matchingClient matchingservice.MatchingServiceClient, + matchingRawClient resource.MatchingRawClient, logger log.Logger, metricProvider metrics.Handler, clusterName string, @@ -77,7 +78,7 @@ func newTimerQueueStandbyTaskExecutor( shard, workflowCache, workflowDeleteManager, - matchingClient, + matchingRawClient, logger, metricProvider, config, @@ -566,7 +567,7 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity( activityScheduleToStartTimeout := &pushActivityInfo.activityTaskScheduleToStartTimeout activityTask := task.(*tasks.ActivityRetryTimerTask) - _, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ + _, err := t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ NamespaceId: activityTask.NamespaceID, Execution: &commonpb.WorkflowExecution{ WorkflowId: activityTask.WorkflowID, diff --git a/service/history/timer_queue_task_executor_base.go b/service/history/timer_queue_task_executor_base.go index 05ee66981d8..3c3afab343a 100644 --- a/service/history/timer_queue_task_executor_base.go +++ b/service/history/timer_queue_task_executor_base.go @@ -31,12 +31,12 @@ import ( "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" - "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/resource" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/deletemanager" @@ -56,7 +56,7 @@ type ( deleteManager deletemanager.DeleteManager cache wcache.Cache logger log.Logger - matchingClient matchingservice.MatchingServiceClient + matchingRawClient resource.MatchingRawClient metricHandler metrics.Handler config *configs.Config } @@ -66,7 +66,7 @@ func newTimerQueueTaskExecutorBase( shard shard.Context, workflowCache wcache.Cache, deleteManager deletemanager.DeleteManager, - matchingClient matchingservice.MatchingServiceClient, + matchingRawClient resource.MatchingRawClient, logger log.Logger, metricHandler metrics.Handler, config *configs.Config, @@ -78,7 +78,7 @@ func newTimerQueueTaskExecutorBase( cache: workflowCache, deleteManager: deleteManager, logger: logger, - matchingClient: matchingClient, + matchingRawClient: matchingRawClient, metricHandler: metricHandler, config: config, } diff --git a/service/history/timer_queue_task_executor_base_test.go b/service/history/timer_queue_task_executor_base_test.go index 6813805780f..b998e15f27f 100644 --- a/service/history/timer_queue_task_executor_base_test.go +++ b/service/history/timer_queue_task_executor_base_test.go @@ -97,7 +97,7 @@ func (s *timerQueueTaskExecutorBaseSuite) SetupTest() { s.testShardContext, s.mockCache, s.mockDeleteManager, - nil, + s.testShardContext.Resource.MatchingClient, s.testShardContext.GetLogger(), metrics.NoopMetricsHandler, config, diff --git a/service/history/transfer_queue_active_task_executor.go b/service/history/transfer_queue_active_task_executor.go index d6f7426a057..946c519f7b7 100644 --- a/service/history/transfer_queue_active_task_executor.go +++ b/service/history/transfer_queue_active_task_executor.go @@ -39,7 +39,6 @@ import ( clockspb "go.temporal.io/server/api/clock/v1" "go.temporal.io/server/api/historyservice/v1" - "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" workflowspb "go.temporal.io/server/api/workflow/v1" "go.temporal.io/server/common" @@ -51,6 +50,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/sdk" serviceerrors "go.temporal.io/server/common/serviceerror" @@ -85,7 +85,8 @@ func newTransferQueueActiveTaskExecutor( logger log.Logger, metricProvider metrics.Handler, config *configs.Config, - matchingClient matchingservice.MatchingServiceClient, + historyRawClient resource.HistoryRawClient, + matchingRawClient resource.MatchingRawClient, visibilityManager manager.VisibilityManager, ) queues.Executor { return &transferQueueActiveTaskExecutor{ @@ -95,7 +96,8 @@ func newTransferQueueActiveTaskExecutor( archivalClient, logger, metricProvider, - matchingClient, + historyRawClient, + matchingRawClient, visibilityManager, ), workflowResetter: ndc.NewWorkflowResetter( @@ -382,7 +384,7 @@ func (t *transferQueueActiveTaskExecutor) processCloseExecution( // Communicate the result to parent execution if this is Child Workflow execution if replyToParentWorkflow { - _, err := t.historyClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{ + _, err := t.historyRawClient.RecordChildExecutionCompleted(ctx, &historyservice.RecordChildExecutionCompletedRequest{ NamespaceId: parentNamespaceID, WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: parentWorkflowID, @@ -686,7 +688,7 @@ func (t *transferQueueActiveTaskExecutor) processSignalExecution( // the rest of logic is making RPC call, which takes time. release(retError) // remove signalRequestedID from target workflow, after Signal detail is removed from source workflow - _, err = t.historyClient.RemoveSignalMutableState(ctx, &historyservice.RemoveSignalMutableStateRequest{ + _, err = t.historyRawClient.RemoveSignalMutableState(ctx, &historyservice.RemoveSignalMutableStateRequest{ NamespaceId: task.TargetNamespaceID, WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: task.TargetWorkflowID, @@ -1076,7 +1078,7 @@ func (t *transferQueueActiveTaskExecutor) createFirstWorkflowTask( parentClock *clockspb.VectorClock, childClock *clockspb.VectorClock, ) error { - _, err := t.historyClient.ScheduleWorkflowTask(ctx, &historyservice.ScheduleWorkflowTaskRequest{ + _, err := t.historyRawClient.ScheduleWorkflowTask(ctx, &historyservice.ScheduleWorkflowTaskRequest{ NamespaceId: namespaceID, WorkflowExecution: execution, IsFirstWorkflowTask: true, @@ -1272,7 +1274,7 @@ func (t *transferQueueActiveTaskExecutor) requestCancelExternalExecution( ChildWorkflowOnly: task.TargetChildWorkflowOnly, } - _, err := t.historyClient.RequestCancelWorkflowExecution(ctx, request) + _, err := t.historyRawClient.RequestCancelWorkflowExecution(ctx, request) return err } @@ -1306,7 +1308,7 @@ func (t *transferQueueActiveTaskExecutor) signalExternalExecution( ChildWorkflowOnly: task.TargetChildWorkflowOnly, } - _, err := t.historyClient.SignalWorkflowExecution(ctx, request) + _, err := t.historyRawClient.SignalWorkflowExecution(ctx, request) return err } @@ -1356,7 +1358,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( request.SourceVersionStamp = sourceVersionStamp - response, err := t.historyClient.StartWorkflowExecution(ctx, request) + response, err := t.historyRawClient.StartWorkflowExecution(ctx, request) if err != nil { return "", nil, err } @@ -1537,7 +1539,7 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy( return err } } - _, err := t.historyClient.TerminateWorkflowExecution(ctx, &historyservice.TerminateWorkflowExecutionRequest{ + _, err := t.historyRawClient.TerminateWorkflowExecution(ctx, &historyservice.TerminateWorkflowExecutionRequest{ NamespaceId: childNamespaceID.String(), TerminateRequest: &workflowservice.TerminateWorkflowExecutionRequest{ Namespace: childInfo.GetNamespace(), @@ -1567,7 +1569,7 @@ func (t *transferQueueActiveTaskExecutor) applyParentClosePolicy( } } - _, err := t.historyClient.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{ + _, err := t.historyRawClient.RequestCancelWorkflowExecution(ctx, &historyservice.RequestCancelWorkflowExecutionRequest{ NamespaceId: childNamespaceID.String(), CancelRequest: &workflowservice.RequestCancelWorkflowExecutionRequest{ Namespace: childInfo.GetNamespace(), diff --git a/service/history/transfer_queue_active_task_executor_test.go b/service/history/transfer_queue_active_task_executor_test.go index 045ebaa4d99..5510f4f8aa2 100644 --- a/service/history/transfer_queue_active_task_executor_test.go +++ b/service/history/transfer_queue_active_task_executor_test.go @@ -241,6 +241,7 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { s.logger, metrics.NoopMetricsHandler, config, + s.mockShard.Resource.HistoryClient, s.mockShard.Resource.MatchingClient, s.mockVisibilityManager, ).(*transferQueueActiveTaskExecutor) diff --git a/service/history/transfer_queue_factory.go b/service/history/transfer_queue_factory.go index a1a41eb049d..db8d59ed281 100644 --- a/service/history/transfer_queue_factory.go +++ b/service/history/transfer_queue_factory.go @@ -58,8 +58,8 @@ type ( ClientBean client.Bean ArchivalClient archiver.Client SdkClientFactory sdk.ClientFactory - MatchingClient resource.MatchingClient - HistoryClient historyservice.HistoryServiceClient + HistoryRawClient resource.HistoryRawClient + MatchingRawClient resource.MatchingRawClient VisibilityManager manager.VisibilityManager } @@ -127,7 +127,8 @@ func (f *transferQueueFactory) CreateQueue( logger, f.MetricsHandler, f.Config, - f.MatchingClient, + f.HistoryRawClient, + f.MatchingRawClient, f.VisibilityManager, ) @@ -152,7 +153,8 @@ func (f *transferQueueFactory) CreateQueue( logger, f.MetricsHandler, currentClusterName, - f.MatchingClient, + f.HistoryRawClient, + f.MatchingRawClient, f.VisibilityManager, ) diff --git a/service/history/transfer_queue_standby_task_executor.go b/service/history/transfer_queue_standby_task_executor.go index 77ee9a7176f..1d38b4d5b85 100644 --- a/service/history/transfer_queue_standby_task_executor.go +++ b/service/history/transfer_queue_standby_task_executor.go @@ -35,7 +35,6 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/historyservice/v1" - "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -43,6 +42,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/ndc" @@ -75,7 +75,8 @@ func newTransferQueueStandbyTaskExecutor( logger log.Logger, metricProvider metrics.Handler, clusterName string, - matchingClient matchingservice.MatchingServiceClient, + historyRawClient resource.HistoryRawClient, + matchingRawClient resource.MatchingRawClient, visibilityManager manager.VisibilityManager, ) queues.Executor { return &transferQueueStandbyTaskExecutor{ @@ -85,7 +86,8 @@ func newTransferQueueStandbyTaskExecutor( archivalClient, logger, metricProvider, - matchingClient, + historyRawClient, + matchingRawClient, visibilityManager, ), clusterName: clusterName, @@ -290,7 +292,7 @@ func (t *transferQueueStandbyTaskExecutor) processCloseExecution( } if verifyCompletionRecorded { - _, err := t.historyClient.VerifyChildExecutionCompletionRecorded(ctx, &historyservice.VerifyChildExecutionCompletionRecordedRequest{ + _, err := t.historyRawClient.VerifyChildExecutionCompletionRecorded(ctx, &historyservice.VerifyChildExecutionCompletionRecordedRequest{ NamespaceId: executionInfo.ParentNamespaceId, ParentExecution: &commonpb.WorkflowExecution{ WorkflowId: executionInfo.ParentWorkflowId, @@ -450,7 +452,7 @@ func (t *transferQueueStandbyTaskExecutor) processStartChildExecution( }, nil } - _, err = t.historyClient.VerifyFirstWorkflowTaskScheduled(ctx, &historyservice.VerifyFirstWorkflowTaskScheduledRequest{ + _, err = t.historyRawClient.VerifyFirstWorkflowTaskScheduled(ctx, &historyservice.VerifyFirstWorkflowTaskScheduledRequest{ NamespaceId: transferTask.TargetNamespaceID, WorkflowExecution: &commonpb.WorkflowExecution{ WorkflowId: childWorkflowInfo.StartedWorkflowId, diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index 4bd1992d35c..455d57ae95b 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -217,7 +217,8 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() { s.logger, metrics.NoopMetricsHandler, s.clusterName, - s.mockShard.Resource.GetMatchingClient(), + s.mockShard.Resource.HistoryClient, + s.mockShard.Resource.MatchingClient, s.mockVisibilityManager, ).(*transferQueueStandbyTaskExecutor) } diff --git a/service/history/transfer_queue_task_executor_base.go b/service/history/transfer_queue_task_executor_base.go index c2b2cc2d176..25985ab2dd0 100644 --- a/service/history/transfer_queue_task_executor_base.go +++ b/service/history/transfer_queue_task_executor_base.go @@ -33,7 +33,6 @@ import ( "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" taskqueuespb "go.temporal.io/server/api/taskqueue/v1" "go.temporal.io/server/common" @@ -44,6 +43,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" @@ -73,8 +73,8 @@ type ( archivalClient archiver.Client logger log.Logger metricHandler metrics.Handler - historyClient historyservice.HistoryServiceClient - matchingClient matchingservice.MatchingServiceClient + historyRawClient resource.HistoryRawClient + matchingRawClient resource.MatchingRawClient config *configs.Config searchAttributesProvider searchattribute.Provider visibilityManager manager.VisibilityManager @@ -88,7 +88,8 @@ func newTransferQueueTaskExecutorBase( archivalClient archiver.Client, logger log.Logger, metricHandler metrics.Handler, - matchingClient matchingservice.MatchingServiceClient, + historyRawClient resource.HistoryRawClient, + matchingRawClient resource.MatchingRawClient, visibilityManager manager.VisibilityManager, ) *transferQueueTaskExecutorBase { return &transferQueueTaskExecutorBase{ @@ -99,8 +100,8 @@ func newTransferQueueTaskExecutorBase( archivalClient: archivalClient, logger: logger, metricHandler: metricHandler, - historyClient: shard.GetHistoryClient(), - matchingClient: matchingClient, + historyRawClient: historyRawClient, + matchingRawClient: matchingRawClient, config: shard.GetConfig(), searchAttributesProvider: shard.GetSearchAttributesProvider(), visibilityManager: visibilityManager, @@ -121,7 +122,7 @@ func (t *transferQueueTaskExecutorBase) pushActivity( activityScheduleToStartTimeout *time.Duration, directive *taskqueuespb.TaskVersionDirective, ) error { - _, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ + _, err := t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{ NamespaceId: task.NamespaceID, Execution: &commonpb.WorkflowExecution{ WorkflowId: task.WorkflowID, @@ -152,7 +153,7 @@ func (t *transferQueueTaskExecutorBase) pushWorkflowTask( workflowTaskScheduleToStartTimeout *time.Duration, directive *taskqueuespb.TaskVersionDirective, ) error { - _, err := t.matchingClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{ + _, err := t.matchingRawClient.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{ NamespaceId: task.NamespaceID, Execution: &commonpb.WorkflowExecution{ WorkflowId: task.WorkflowID, diff --git a/service/matching/fx.go b/service/matching/fx.go index 413431bb0d1..883e7a20989 100644 --- a/service/matching/fx.go +++ b/service/matching/fx.go @@ -27,7 +27,6 @@ package matching import ( "go.uber.org/fx" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" @@ -179,7 +178,7 @@ func HandlerProvider( logger log.SnTaggedLogger, throttledLogger log.ThrottledLogger, taskManager persistence.TaskManager, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, matchingRawClient resource.MatchingRawClient, matchingServiceResolver membership.ServiceResolver, metricsHandler metrics.Handler, diff --git a/service/matching/handler.go b/service/matching/handler.go index 9e3b0c5a859..04ba8a75b38 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -31,7 +31,6 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/cluster" @@ -41,6 +40,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/resource" ) type ( @@ -70,8 +70,8 @@ func NewHandler( logger log.Logger, throttledLogger log.Logger, taskManager persistence.TaskManager, - historyClient historyservice.HistoryServiceClient, - matchingRawClient matchingservice.MatchingServiceClient, + historyClient resource.HistoryClient, + matchingRawClient resource.MatchingRawClient, matchingServiceResolver membership.ServiceResolver, metricsHandler metrics.Handler, namespaceRegistry namespace.Registry, diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 2fbb9b1cf33..5c6b939ce8a 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -60,6 +60,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/resource" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/tasktoken" "go.temporal.io/server/common/worker_versioning" @@ -113,8 +114,8 @@ type ( matchingEngineImpl struct { status int32 taskManager persistence.TaskManager - historyClient historyservice.HistoryServiceClient - matchingClient matchingservice.MatchingServiceClient + historyClient resource.HistoryClient + matchingRawClient resource.MatchingRawClient tokenSerializer common.TaskTokenSerializer logger log.Logger throttledLogger log.ThrottledLogger @@ -164,8 +165,8 @@ var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed im // NewEngine creates an instance of matching engine func NewEngine( taskManager persistence.TaskManager, - historyClient historyservice.HistoryServiceClient, - matchingClient matchingservice.MatchingServiceClient, + historyClient resource.HistoryClient, + matchingRawClient resource.MatchingRawClient, config *Config, logger log.Logger, throttledLogger log.ThrottledLogger, @@ -181,7 +182,7 @@ func NewEngine( status: common.DaemonStatusInitialized, taskManager: taskManager, historyClient: historyClient, - matchingClient: matchingClient, + matchingRawClient: matchingRawClient, tokenSerializer: common.NewProtoTaskTokenSerializer(), logger: log.With(logger, tag.ComponentMatchingEngine), throttledLogger: log.With(throttledLogger, tag.ComponentMatchingEngine), diff --git a/service/matching/matching_engine_test.go b/service/matching/matching_engine_test.go index 470354481d6..94ae764ca4d 100644 --- a/service/matching/matching_engine_test.go +++ b/service/matching/matching_engine_test.go @@ -157,7 +157,7 @@ func newMatchingEngine( logger: logger, throttledLogger: log.ThrottledLogger(logger), metricsHandler: metrics.NoopMetricsHandler, - matchingClient: mockMatchingClient, + matchingRawClient: mockMatchingClient, tokenSerializer: common.NewProtoTaskTokenSerializer(), config: config, namespaceRegistry: mockNamespaceCache, diff --git a/service/matching/task_queue_manager.go b/service/matching/task_queue_manager.go index 889d41bd799..8a0c33053df 100644 --- a/service/matching/task_queue_manager.go +++ b/service/matching/task_queue_manager.go @@ -225,7 +225,7 @@ func newTaskQueueManager( taskQueueConfig := newTaskQueueConfig(taskQueue, config, nsName) - db := newTaskQueueDB(e.taskManager, e.matchingClient, taskQueue.namespaceID, taskQueue, stickyInfo.kind, e.logger) + db := newTaskQueueDB(e.taskManager, e.matchingRawClient, taskQueue.namespaceID, taskQueue, stickyInfo.kind, e.logger) logger := log.With(e.logger, tag.WorkflowTaskQueueName(taskQueue.FullName()), tag.WorkflowTaskQueueType(taskQueue.taskType), @@ -244,7 +244,7 @@ func newTaskQueueManager( status: common.DaemonStatusInitialized, engine: e, namespaceRegistry: e.namespaceRegistry, - matchingClient: e.matchingClient, + matchingClient: e.matchingRawClient, metricsHandler: e.metricsHandler, taskQueueID: taskQueue, stickyInfo: stickyInfo, @@ -278,7 +278,7 @@ func newTaskQueueManager( // Forward without version set, the target will resolve the correct version set from // the build id itself. TODO: check if we still need this here after tqm refactoring forwardTaskQueue := newTaskQueueIDWithVersionSet(taskQueue, "") - fwdr = newForwarder(&taskQueueConfig.forwarderConfig, forwardTaskQueue, stickyInfo.kind, e.matchingClient) + fwdr = newForwarder(&taskQueueConfig.forwarderConfig, forwardTaskQueue, stickyInfo.kind, e.matchingRawClient) } tlMgr.matcher = newTaskMatcher(taskQueueConfig, fwdr, tlMgr.taggedMetricsHandler) for _, opt := range opts { diff --git a/service/worker/deletenamespace/fx.go b/service/worker/deletenamespace/fx.go index cd799cdebe4..d4099d67b76 100644 --- a/service/worker/deletenamespace/fx.go +++ b/service/worker/deletenamespace/fx.go @@ -29,11 +29,11 @@ import ( "go.temporal.io/sdk/workflow" "go.uber.org/fx" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/resource" workercommon "go.temporal.io/server/service/worker/common" "go.temporal.io/server/service/worker/deletenamespace/deleteexecutions" "go.temporal.io/server/service/worker/deletenamespace/reclaimresources" @@ -44,7 +44,7 @@ type ( deleteNamespaceComponent struct { visibilityManager manager.VisibilityManager metadataManager persistence.MetadataManager - historyClient historyservice.HistoryServiceClient + historyClient resource.HistoryClient metricsHandler metrics.Handler logger log.Logger } @@ -62,7 +62,7 @@ var Module = fx.Options( func newComponent( visibilityManager manager.VisibilityManager, metadataManager persistence.MetadataManager, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, metricsHandler metrics.Handler, logger log.Logger, ) component { diff --git a/service/worker/migration/fx.go b/service/worker/migration/fx.go index 768b2be2027..f8aa8f1375b 100644 --- a/service/worker/migration/fx.go +++ b/service/worker/migration/fx.go @@ -30,13 +30,13 @@ import ( "go.temporal.io/sdk/workflow" "go.uber.org/fx" - "go.temporal.io/server/api/historyservice/v1" serverClient "go.temporal.io/server/client" "go.temporal.io/server/common/config" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/resource" workercommon "go.temporal.io/server/service/worker/common" ) @@ -46,7 +46,7 @@ type ( PersistenceConfig *config.Persistence ExecutionManager persistence.ExecutionManager NamespaceRegistry namespace.Registry - HistoryClient historyservice.HistoryServiceClient + HistoryClient resource.HistoryClient FrontendClient workflowservice.WorkflowServiceClient ClientFactory serverClient.Factory NamespaceReplicationQueue persistence.NamespaceReplicationQueue diff --git a/service/worker/scheduler/fx.go b/service/worker/scheduler/fx.go index 3b7414f7c06..37220bc8595 100644 --- a/service/worker/scheduler/fx.go +++ b/service/worker/scheduler/fx.go @@ -31,12 +31,12 @@ import ( sdkworker "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/resource" workercommon "go.temporal.io/server/service/worker/common" ) @@ -56,7 +56,7 @@ type ( fx.In MetricsHandler metrics.Handler Logger log.Logger - HistoryClient historyservice.HistoryServiceClient + HistoryClient resource.HistoryClient FrontendClient workflowservice.WorkflowServiceClient } diff --git a/service/worker/service.go b/service/worker/service.go index 07e71c8121c..8e2ad8bfb01 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -32,7 +32,6 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/client" carchiver "go.temporal.io/server/common/archiver" @@ -75,7 +74,7 @@ type ( hostInfo membership.HostInfo executionManager persistence.ExecutionManager taskManager persistence.TaskManager - historyClient historyservice.HistoryServiceClient + historyClient resource.HistoryClient namespaceRegistry namespace.Registry workerServiceResolver membership.ServiceResolver visibilityManager manager.VisibilityManager @@ -145,7 +144,7 @@ func NewService( metricsHandler metrics.Handler, metadataManager persistence.MetadataManager, taskManager persistence.TaskManager, - historyClient historyservice.HistoryServiceClient, + historyClient resource.HistoryClient, workerManager *workerManager, perNamespaceWorkerManager *perNamespaceWorkerManager, visibilityManager manager.VisibilityManager, From 78e49cc53820800ade5735eeb92646f0b7efccc5 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 31 Jul 2023 16:29:42 -0700 Subject: [PATCH 2/2] fix unit tests --- service/history/queue_factory_base_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go index c94723370e4..644f4b599e1 100644 --- a/service/history/queue_factory_base_test.go +++ b/service/history/queue_factory_base_test.go @@ -32,7 +32,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/fx" - "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" carchiver "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/clock" @@ -175,8 +174,8 @@ type compileTimeDependencies struct { client.Bean archiver.Client sdk.ClientFactory - resource.MatchingClient - historyservice.HistoryServiceClient + resource.MatchingRawClient + resource.HistoryRawClient manager.VisibilityManager archival.Archiver workflow.RelocatableAttributesFetcher