Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Use raw history/matching client in task executor #4710

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type (
InstanceID string
ServiceNames map[primitives.ServiceName]struct{}

HistoryRawClient historyservice.HistoryServiceClient
HistoryClient historyservice.HistoryServiceClient

MatchingRawClient matchingservice.MatchingServiceClient
MatchingClient matchingservice.MatchingServiceClient

Expand Down Expand Up @@ -114,6 +117,7 @@ var Module = fx.Options(
fx.Provide(GrpcListenerProvider),
fx.Provide(RuntimeMetricsReporterProvider),
metrics.RuntimeMetricsReporterLifetimeHooksModule,
fx.Provide(HistoryRawClientProvider),
fx.Provide(HistoryClientProvider),
fx.Provide(MatchingRawClientProvider),
fx.Provide(MatchingClientProvider),
Expand Down Expand Up @@ -305,20 +309,22 @@ func RegisterBootstrapContainer(
)
}

func HistoryClientProvider(clientBean client.Bean) historyservice.HistoryServiceClient {
historyRawClient := clientBean.GetHistoryClient()
historyClient := history.NewRetryableClient(
func HistoryRawClientProvider(clientBean client.Bean) HistoryRawClient {
return clientBean.GetHistoryClient()
}

func HistoryClientProvider(historyRawClient HistoryRawClient) HistoryClient {
return history.NewRetryableClient(
historyRawClient,
common.CreateHistoryClientRetryPolicy(),
common.IsServiceClientTransientError,
)
return historyClient
}

func MatchingRawClientProvider(clientBean client.Bean, namespaceRegistry namespace.Registry) (
MatchingRawClient,
error,
) {
func MatchingRawClientProvider(
clientBean client.Bean,
namespaceRegistry namespace.Registry,
) (MatchingRawClient, error) {
return clientBean.GetMatchingClient(namespaceRegistry.GetNamespaceName)
}

Expand Down
7 changes: 3 additions & 4 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/health"
"google.golang.org/grpc/keepalive"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -470,7 +469,7 @@ func AdminHandlerProvider(
persistenceMetadataManager persistence.MetadataManager,
clientFactory client.Factory,
clientBean client.Bean,
historyClient historyservice.HistoryServiceClient,
historyClient resource.HistoryClient,
sdkClientFactory sdk.ClientFactory,
membershipMonitor membership.Monitor,
hostInfoProvider membership.HostInfoProvider,
Expand Down Expand Up @@ -527,7 +526,7 @@ func OperatorHandlerProvider(
saProvider searchattribute.Provider,
saManager searchattribute.Manager,
healthServer *health.Server,
historyClient historyservice.HistoryServiceClient,
historyClient resource.HistoryClient,
clusterMetadataManager persistence.ClusterMetadataManager,
clusterMetadata cluster.Metadata,
clientFactory client.Factory,
Expand Down Expand Up @@ -562,7 +561,7 @@ func HandlerProvider(
clusterMetadataManager persistence.ClusterMetadataManager,
persistenceMetadataManager persistence.MetadataManager,
clientBean client.Bean,
historyClient historyservice.HistoryServiceClient,
historyClient resource.HistoryClient,
matchingClient resource.MatchingClient,
archiverProvider provider.ArchiverProvider,
metricsHandler metrics.Handler,
Expand Down
6 changes: 3 additions & 3 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
sdkclient "go.temporal.io/sdk/client"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
svc "go.temporal.io/server/client"
"go.temporal.io/server/client/admin"
Expand All @@ -57,6 +56,7 @@ import (
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
Expand All @@ -82,7 +82,7 @@ type (
saProvider searchattribute.Provider
saManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
historyClient resource.HistoryClient
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand All @@ -98,7 +98,7 @@ type (
SaProvider searchattribute.Provider
SaManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
historyClient resource.HistoryClient
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand Down
5 changes: 2 additions & 3 deletions service/history/queue_factory_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
carchiver "go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
Expand Down Expand Up @@ -175,8 +174,8 @@ type compileTimeDependencies struct {
client.Bean
archiver.Client
sdk.ClientFactory
resource.MatchingClient
historyservice.HistoryServiceClient
resource.MatchingRawClient
resource.HistoryRawClient
manager.VisibilityManager
archival.Archiver
workflow.RelocatableAttributesFetcher
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/context_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package shard
import (
"go.uber.org/fx"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/clock"
Expand All @@ -38,6 +37,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/configs"
)
Expand All @@ -59,7 +59,7 @@ type (
ClusterMetadata cluster.Metadata
Config *configs.Config
EngineFactory EngineFactory
HistoryClient historyservice.HistoryServiceClient
HistoryClient resource.HistoryClient
HistoryServiceResolver membership.ServiceResolver
HostInfoProvider membership.HostInfoProvider
Logger log.Logger
Expand Down
7 changes: 4 additions & 3 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
Expand All @@ -71,14 +72,14 @@ func newTimerQueueActiveTaskExecutor(
logger log.Logger,
metricProvider metrics.Handler,
config *configs.Config,
matchingClient matchingservice.MatchingServiceClient,
matchingRawClient resource.MatchingRawClient,
) queues.Executor {
return &timerQueueActiveTaskExecutor{
timerQueueTaskExecutorBase: newTimerQueueTaskExecutorBase(
shard,
workflowCache,
workflowDeleteManager,
matchingClient,
matchingRawClient,
logger,
metricProvider,
config,
Expand Down Expand Up @@ -466,7 +467,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityRetryTimerTask(
// NOTE: do not access anything related mutable state after this lock release
release(nil) // release earlier as we don't need the lock anymore

_, retError = t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
_, retError = t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
NamespaceId: task.GetNamespaceID(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: task.GetWorkflowID(),
Expand Down
6 changes: 3 additions & 3 deletions service/history/timer_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type (

ClientBean client.Bean
ArchivalClient archiver.Client
MatchingClient resource.MatchingClient
MatchingRawClient resource.MatchingRawClient
VisibilityManager manager.VisibilityManager
}

Expand Down Expand Up @@ -133,7 +133,7 @@ func (f *timerQueueFactory) CreateQueue(
logger,
f.MetricsHandler,
f.Config,
f.MatchingClient,
f.MatchingRawClient,
)

standbyExecutor := newTimerQueueStandbyTaskExecutor(
Expand All @@ -154,7 +154,7 @@ func (f *timerQueueFactory) CreateQueue(
f.Config.StandbyTaskReReplicationContextTimeout,
logger,
),
f.MatchingClient,
f.MatchingRawClient,
logger,
f.MetricsHandler,
// note: the cluster name is for calculating time for standby tasks,
Expand Down
7 changes: 4 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
Expand All @@ -66,7 +67,7 @@ func newTimerQueueStandbyTaskExecutor(
workflowCache wcache.Cache,
workflowDeleteManager deletemanager.DeleteManager,
nDCHistoryResender xdc.NDCHistoryResender,
matchingClient matchingservice.MatchingServiceClient,
matchingRawClient resource.MatchingRawClient,
logger log.Logger,
metricProvider metrics.Handler,
clusterName string,
Expand All @@ -77,7 +78,7 @@ func newTimerQueueStandbyTaskExecutor(
shard,
workflowCache,
workflowDeleteManager,
matchingClient,
matchingRawClient,
logger,
metricProvider,
config,
Expand Down Expand Up @@ -566,7 +567,7 @@ func (t *timerQueueStandbyTaskExecutor) pushActivity(
activityScheduleToStartTimeout := &pushActivityInfo.activityTaskScheduleToStartTimeout
activityTask := task.(*tasks.ActivityRetryTimerTask)

_, err := t.matchingClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
_, err := t.matchingRawClient.AddActivityTask(ctx, &matchingservice.AddActivityTaskRequest{
NamespaceId: activityTask.NamespaceID,
Execution: &commonpb.WorkflowExecution{
WorkflowId: activityTask.WorkflowID,
Expand Down
8 changes: 4 additions & 4 deletions service/history/timer_queue_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
Expand All @@ -56,7 +56,7 @@ type (
deleteManager deletemanager.DeleteManager
cache wcache.Cache
logger log.Logger
matchingClient matchingservice.MatchingServiceClient
matchingRawClient resource.MatchingRawClient
metricHandler metrics.Handler
config *configs.Config
}
Expand All @@ -66,7 +66,7 @@ func newTimerQueueTaskExecutorBase(
shard shard.Context,
workflowCache wcache.Cache,
deleteManager deletemanager.DeleteManager,
matchingClient matchingservice.MatchingServiceClient,
matchingRawClient resource.MatchingRawClient,
logger log.Logger,
metricHandler metrics.Handler,
config *configs.Config,
Expand All @@ -78,7 +78,7 @@ func newTimerQueueTaskExecutorBase(
cache: workflowCache,
deleteManager: deleteManager,
logger: logger,
matchingClient: matchingClient,
matchingRawClient: matchingRawClient,
metricHandler: metricHandler,
config: config,
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/timer_queue_task_executor_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *timerQueueTaskExecutorBaseSuite) SetupTest() {
s.testShardContext,
s.mockCache,
s.mockDeleteManager,
nil,
s.testShardContext.Resource.MatchingClient,
s.testShardContext.GetLogger(),
metrics.NoopMetricsHandler,
config,
Expand Down
Loading