diff --git a/client/admin/metric_client.go b/client/admin/metric_client.go index 000e398b1b9..0533e04571a 100644 --- a/client/admin/metric_client.go +++ b/client/admin/metric_client.go @@ -25,21 +25,63 @@ package admin import ( + "go.temporal.io/api/serviceerror" + "go.temporal.io/server/api/adminservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" ) var _ adminservice.AdminServiceClient = (*metricClient)(nil) type metricClient struct { - client adminservice.AdminServiceClient - metricsClient metrics.Client + client adminservice.AdminServiceClient + metricsClient metrics.Client + throttledLogger log.Logger } // NewMetricClient creates a new instance of adminservice.AdminServiceClient that emits metrics -func NewMetricClient(client adminservice.AdminServiceClient, metricsClient metrics.Client) adminservice.AdminServiceClient { +func NewMetricClient( + client adminservice.AdminServiceClient, + metricsClient metrics.Client, + throttledLogger log.Logger, +) adminservice.AdminServiceClient { return &metricClient{ - client: client, - metricsClient: metricsClient, + client: client, + metricsClient: metricsClient, + throttledLogger: throttledLogger, + } +} + +func (c *metricClient) startMetricsRecording( + metricScope int, +) (metrics.Scope, metrics.Stopwatch) { + scope := c.metricsClient.Scope(metricScope) + scope.IncCounter(metrics.ClientRequests) + stopwatch := scope.StartTimer(metrics.ClientLatency) + return scope, stopwatch +} + +func (c *metricClient) finishMetricsRecording( + scope metrics.Scope, + stopwatch metrics.Stopwatch, + err error, +) { + if err != nil { + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.QueryFailed, + *serviceerror.NamespaceNotFound, + *serviceerror.WorkflowNotReady, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + c.throttledLogger.Info("admin client encountered error", tag.Error(err), tag.ErrorType(err)) + } + scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) } + stopwatch.Stop() } diff --git a/client/admin/metric_client_gen.go b/client/admin/metric_client_gen.go index ab8ac5c673e..8a9892c392c 100644 --- a/client/admin/metric_client_gen.go +++ b/client/admin/metric_client_gen.go @@ -39,484 +39,376 @@ func (c *metricClient) AddOrUpdateRemoteCluster( ctx context.Context, request *adminservice.AddOrUpdateRemoteClusterRequest, opts ...grpc.CallOption, -) (*adminservice.AddOrUpdateRemoteClusterResponse, error) { +) (_ *adminservice.AddOrUpdateRemoteClusterResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientAddOrUpdateRemoteClusterScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientAddOrUpdateRemoteClusterScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientAddOrUpdateRemoteClusterScope, metrics.ClientLatency) - resp, err := c.client.AddOrUpdateRemoteCluster(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientAddOrUpdateRemoteClusterScope, metrics.ClientFailures) - } - return resp, err + return c.client.AddOrUpdateRemoteCluster(ctx, request, opts...) } func (c *metricClient) AddSearchAttributes( ctx context.Context, request *adminservice.AddSearchAttributesRequest, opts ...grpc.CallOption, -) (*adminservice.AddSearchAttributesResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributesScope, metrics.ClientRequests) +) (_ *adminservice.AddSearchAttributesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientAddSearchAttributesScope, metrics.ClientLatency) - resp, err := c.client.AddSearchAttributes(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientAddSearchAttributesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributesScope, metrics.ClientFailures) - } - return resp, err + return c.client.AddSearchAttributes(ctx, request, opts...) } func (c *metricClient) CloseShard( ctx context.Context, request *adminservice.CloseShardRequest, opts ...grpc.CallOption, -) (*adminservice.CloseShardResponse, error) { +) (_ *adminservice.CloseShardResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientCloseShardScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientCloseShardScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientCloseShardScope, metrics.ClientLatency) - resp, err := c.client.CloseShard(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientCloseShardScope, metrics.ClientFailures) - } - return resp, err + return c.client.CloseShard(ctx, request, opts...) } func (c *metricClient) DeleteWorkflowExecution( ctx context.Context, request *adminservice.DeleteWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*adminservice.DeleteWorkflowExecutionResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientDeleteWorkflowExecutionScope, metrics.ClientRequests) +) (_ *adminservice.DeleteWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientDeleteWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.DeleteWorkflowExecution(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientDeleteWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientDeleteWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.DeleteWorkflowExecution(ctx, request, opts...) } func (c *metricClient) DescribeCluster( ctx context.Context, request *adminservice.DescribeClusterRequest, opts ...grpc.CallOption, -) (*adminservice.DescribeClusterResponse, error) { +) (_ *adminservice.DescribeClusterResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientDescribeClusterScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientDescribeClusterScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientDescribeClusterScope, metrics.ClientLatency) - resp, err := c.client.DescribeCluster(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientDescribeClusterScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeCluster(ctx, request, opts...) } func (c *metricClient) DescribeHistoryHost( ctx context.Context, request *adminservice.DescribeHistoryHostRequest, opts ...grpc.CallOption, -) (*adminservice.DescribeHistoryHostResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientDescribeHistoryHostScope, metrics.ClientRequests) +) (_ *adminservice.DescribeHistoryHostResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientDescribeHistoryHostScope, metrics.ClientLatency) - resp, err := c.client.DescribeHistoryHost(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientDescribeHistoryHostScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientDescribeHistoryHostScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeHistoryHost(ctx, request, opts...) } func (c *metricClient) DescribeMutableState( ctx context.Context, request *adminservice.DescribeMutableStateRequest, opts ...grpc.CallOption, -) (*adminservice.DescribeMutableStateResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientDescribeMutableStateScope, metrics.ClientRequests) +) (_ *adminservice.DescribeMutableStateResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientDescribeMutableStateScope, metrics.ClientLatency) - resp, err := c.client.DescribeMutableState(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientDescribeMutableStateScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientDescribeMutableStateScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeMutableState(ctx, request, opts...) } func (c *metricClient) GetDLQMessages( ctx context.Context, request *adminservice.GetDLQMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.GetDLQMessagesResponse, error) { +) (_ *adminservice.GetDLQMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientGetDLQMessagesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetDLQMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientGetDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.GetDLQMessages(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetDLQMessages(ctx, request, opts...) } func (c *metricClient) GetDLQReplicationMessages( ctx context.Context, request *adminservice.GetDLQReplicationMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.GetDLQReplicationMessagesResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientGetDLQReplicationMessagesScope, metrics.ClientRequests) +) (_ *adminservice.GetDLQReplicationMessagesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientGetDLQReplicationMessagesScope, metrics.ClientLatency) - resp, err := c.client.GetDLQReplicationMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetDLQReplicationMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetDLQReplicationMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetDLQReplicationMessages(ctx, request, opts...) } func (c *metricClient) GetNamespaceReplicationMessages( ctx context.Context, request *adminservice.GetNamespaceReplicationMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.GetNamespaceReplicationMessagesResponse, error) { +) (_ *adminservice.GetNamespaceReplicationMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientGetNamespaceReplicationMessagesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetNamespaceReplicationMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientGetNamespaceReplicationMessagesScope, metrics.ClientLatency) - resp, err := c.client.GetNamespaceReplicationMessages(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetNamespaceReplicationMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetNamespaceReplicationMessages(ctx, request, opts...) } func (c *metricClient) GetReplicationMessages( ctx context.Context, request *adminservice.GetReplicationMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.GetReplicationMessagesResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientGetReplicationMessagesScope, metrics.ClientRequests) +) (_ *adminservice.GetReplicationMessagesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientGetReplicationMessagesScope, metrics.ClientLatency) - resp, err := c.client.GetReplicationMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetReplicationMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetReplicationMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetReplicationMessages(ctx, request, opts...) } func (c *metricClient) GetSearchAttributes( ctx context.Context, request *adminservice.GetSearchAttributesRequest, opts ...grpc.CallOption, -) (*adminservice.GetSearchAttributesResponse, error) { +) (_ *adminservice.GetSearchAttributesResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientGetSearchAttributesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetSearchAttributesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientGetSearchAttributesScope, metrics.ClientLatency) - resp, err := c.client.GetSearchAttributes(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetSearchAttributesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetSearchAttributes(ctx, request, opts...) } func (c *metricClient) GetShard( ctx context.Context, request *adminservice.GetShardRequest, opts ...grpc.CallOption, -) (*adminservice.GetShardResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientGetShardScope, metrics.ClientRequests) +) (_ *adminservice.GetShardResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientGetShardScope, metrics.ClientLatency) - resp, err := c.client.GetShard(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetShardScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetShardScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetShard(ctx, request, opts...) } func (c *metricClient) GetTaskQueueTasks( ctx context.Context, request *adminservice.GetTaskQueueTasksRequest, opts ...grpc.CallOption, -) (*adminservice.GetTaskQueueTasksResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientGetTaskQueueTasksScope, metrics.ClientRequests) +) (_ *adminservice.GetTaskQueueTasksResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientGetTaskQueueTasksScope, metrics.ClientLatency) - resp, err := c.client.GetTaskQueueTasks(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetTaskQueueTasksScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetTaskQueueTasksScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetTaskQueueTasks(ctx, request, opts...) } func (c *metricClient) GetWorkflowExecutionRawHistoryV2( ctx context.Context, request *adminservice.GetWorkflowExecutionRawHistoryV2Request, opts ...grpc.CallOption, -) (*adminservice.GetWorkflowExecutionRawHistoryV2Response, error) { +) (_ *adminservice.GetWorkflowExecutionRawHistoryV2Response, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.ClientLatency) - resp, err := c.client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientGetWorkflowExecutionRawHistoryV2Scope, metrics.ClientFailures) - } - return resp, err + return c.client.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...) } func (c *metricClient) ListClusterMembers( ctx context.Context, request *adminservice.ListClusterMembersRequest, opts ...grpc.CallOption, -) (*adminservice.ListClusterMembersResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientListClusterMembersScope, metrics.ClientRequests) +) (_ *adminservice.ListClusterMembersResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientListClusterMembersScope, metrics.ClientLatency) - resp, err := c.client.ListClusterMembers(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientListClusterMembersScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientListClusterMembersScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListClusterMembers(ctx, request, opts...) } func (c *metricClient) ListClusters( ctx context.Context, request *adminservice.ListClustersRequest, opts ...grpc.CallOption, -) (*adminservice.ListClustersResponse, error) { +) (_ *adminservice.ListClustersResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientListClustersScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientListClustersScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientListClustersScope, metrics.ClientLatency) - resp, err := c.client.ListClusters(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientListClustersScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListClusters(ctx, request, opts...) } func (c *metricClient) ListHistoryTasks( ctx context.Context, request *adminservice.ListHistoryTasksRequest, opts ...grpc.CallOption, -) (*adminservice.ListHistoryTasksResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientListHistoryTasksScope, metrics.ClientRequests) +) (_ *adminservice.ListHistoryTasksResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientListHistoryTasksScope, metrics.ClientLatency) - resp, err := c.client.ListHistoryTasks(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientListHistoryTasksScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientListHistoryTasksScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListHistoryTasks(ctx, request, opts...) } func (c *metricClient) MergeDLQMessages( ctx context.Context, request *adminservice.MergeDLQMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.MergeDLQMessagesResponse, error) { +) (_ *adminservice.MergeDLQMessagesResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientMergeDLQMessagesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientMergeDLQMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientMergeDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.MergeDLQMessages(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientMergeDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.MergeDLQMessages(ctx, request, opts...) } func (c *metricClient) PurgeDLQMessages( ctx context.Context, request *adminservice.PurgeDLQMessagesRequest, opts ...grpc.CallOption, -) (*adminservice.PurgeDLQMessagesResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientPurgeDLQMessagesScope, metrics.ClientRequests) +) (_ *adminservice.PurgeDLQMessagesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientPurgeDLQMessagesScope, metrics.ClientLatency) - resp, err := c.client.PurgeDLQMessages(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientPurgeDLQMessagesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientPurgeDLQMessagesScope, metrics.ClientFailures) - } - return resp, err + return c.client.PurgeDLQMessages(ctx, request, opts...) } func (c *metricClient) ReapplyEvents( ctx context.Context, request *adminservice.ReapplyEventsRequest, opts ...grpc.CallOption, -) (*adminservice.ReapplyEventsResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientReapplyEventsScope, metrics.ClientRequests) +) (_ *adminservice.ReapplyEventsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientReapplyEventsScope, metrics.ClientLatency) - resp, err := c.client.ReapplyEvents(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientReapplyEventsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientReapplyEventsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ReapplyEvents(ctx, request, opts...) } func (c *metricClient) RebuildMutableState( ctx context.Context, request *adminservice.RebuildMutableStateRequest, opts ...grpc.CallOption, -) (*adminservice.RebuildMutableStateResponse, error) { +) (_ *adminservice.RebuildMutableStateResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientRebuildMutableStateScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientRebuildMutableStateScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientRebuildMutableStateScope, metrics.ClientLatency) - resp, err := c.client.RebuildMutableState(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientRebuildMutableStateScope, metrics.ClientFailures) - } - return resp, err + return c.client.RebuildMutableState(ctx, request, opts...) } func (c *metricClient) RefreshWorkflowTasks( ctx context.Context, request *adminservice.RefreshWorkflowTasksRequest, opts ...grpc.CallOption, -) (*adminservice.RefreshWorkflowTasksResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientRefreshWorkflowTasksScope, metrics.ClientRequests) +) (_ *adminservice.RefreshWorkflowTasksResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientRefreshWorkflowTasksScope, metrics.ClientLatency) - resp, err := c.client.RefreshWorkflowTasks(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientRefreshWorkflowTasksScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientRefreshWorkflowTasksScope, metrics.ClientFailures) - } - return resp, err + return c.client.RefreshWorkflowTasks(ctx, request, opts...) } func (c *metricClient) RemoveRemoteCluster( ctx context.Context, request *adminservice.RemoveRemoteClusterRequest, opts ...grpc.CallOption, -) (*adminservice.RemoveRemoteClusterResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientRemoveRemoteClusterScope, metrics.ClientRequests) +) (_ *adminservice.RemoveRemoteClusterResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientRemoveRemoteClusterScope, metrics.ClientLatency) - resp, err := c.client.RemoveRemoteCluster(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientRemoveRemoteClusterScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientRemoveRemoteClusterScope, metrics.ClientFailures) - } - return resp, err + return c.client.RemoveRemoteCluster(ctx, request, opts...) } func (c *metricClient) RemoveSearchAttributes( ctx context.Context, request *adminservice.RemoveSearchAttributesRequest, opts ...grpc.CallOption, -) (*adminservice.RemoveSearchAttributesResponse, error) { +) (_ *adminservice.RemoveSearchAttributesResponse, retError error) { - c.metricsClient.IncCounter(metrics.AdminClientRemoveSearchAttributesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientRemoveSearchAttributesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.AdminClientRemoveSearchAttributesScope, metrics.ClientLatency) - resp, err := c.client.RemoveSearchAttributes(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientRemoveSearchAttributesScope, metrics.ClientFailures) - } - return resp, err + return c.client.RemoveSearchAttributes(ctx, request, opts...) } func (c *metricClient) RemoveTask( ctx context.Context, request *adminservice.RemoveTaskRequest, opts ...grpc.CallOption, -) (*adminservice.RemoveTaskResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientRemoveTaskScope, metrics.ClientRequests) +) (_ *adminservice.RemoveTaskResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientRemoveTaskScope, metrics.ClientLatency) - resp, err := c.client.RemoveTask(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientRemoveTaskScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientRemoveTaskScope, metrics.ClientFailures) - } - return resp, err + return c.client.RemoveTask(ctx, request, opts...) } func (c *metricClient) ResendReplicationTasks( ctx context.Context, request *adminservice.ResendReplicationTasksRequest, opts ...grpc.CallOption, -) (*adminservice.ResendReplicationTasksResponse, error) { - - c.metricsClient.IncCounter(metrics.AdminClientResendReplicationTasksScope, metrics.ClientRequests) +) (_ *adminservice.ResendReplicationTasksResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.AdminClientResendReplicationTasksScope, metrics.ClientLatency) - resp, err := c.client.ResendReplicationTasks(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.AdminClientResendReplicationTasksScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.AdminClientResendReplicationTasksScope, metrics.ClientFailures) - } - return resp, err + return c.client.ResendReplicationTasks(ctx, request, opts...) } diff --git a/client/clientfactory.go b/client/clientfactory.go index 3766b1f904b..d1b2f05d538 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -198,7 +198,7 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout( } client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider)) if cf.metricsClient != nil { - client = frontend.NewMetricClient(client, cf.metricsClient) + client = frontend.NewMetricClient(client, cf.metricsClient, cf.throttledLogger) } return client } @@ -216,7 +216,7 @@ func (cf *rpcClientFactory) NewAdminClientWithTimeout( client := admin.NewClient(timeout, largeTimeout, common.NewClientCache(keyResolver, clientProvider)) if cf.metricsClient != nil { - client = admin.NewMetricClient(client, cf.metricsClient) + client = admin.NewMetricClient(client, cf.metricsClient, cf.throttledLogger) } return client } diff --git a/client/frontend/metric_client.go b/client/frontend/metric_client.go index 94c25b98cf3..6c1790d2147 100644 --- a/client/frontend/metric_client.go +++ b/client/frontend/metric_client.go @@ -25,22 +25,63 @@ package frontend import ( + "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" ) var _ workflowservice.WorkflowServiceClient = (*metricClient)(nil) type metricClient struct { - client workflowservice.WorkflowServiceClient - metricsClient metrics.Client + client workflowservice.WorkflowServiceClient + metricsClient metrics.Client + throttledLogger log.Logger } // NewMetricClient creates a new instance of workflowservice.WorkflowServiceClient that emits metrics -func NewMetricClient(client workflowservice.WorkflowServiceClient, metricsClient metrics.Client) workflowservice.WorkflowServiceClient { +func NewMetricClient( + client workflowservice.WorkflowServiceClient, + metricsClient metrics.Client, + throttledLogger log.Logger, +) workflowservice.WorkflowServiceClient { return &metricClient{ - client: client, - metricsClient: metricsClient, + client: client, + metricsClient: metricsClient, + throttledLogger: throttledLogger, } } + +func (c *metricClient) startMetricsRecording( + metricScope int, +) (metrics.Scope, metrics.Stopwatch) { + scope := c.metricsClient.Scope(metricScope) + scope.IncCounter(metrics.ClientRequests) + stopwatch := scope.StartTimer(metrics.ClientLatency) + return scope, stopwatch +} + +func (c *metricClient) finishMetricsRecording( + scope metrics.Scope, + stopwatch metrics.Stopwatch, + err error, +) { + if err != nil { + switch err.(type) { + case *serviceerror.Canceled, + *serviceerror.DeadlineExceeded, + *serviceerror.NotFound, + *serviceerror.QueryFailed, + *serviceerror.NamespaceNotFound, + *serviceerror.WorkflowNotReady, + *serviceerror.WorkflowExecutionAlreadyStarted: + // noop - not interest and too many logs + default: + c.throttledLogger.Info("frontend client encountered error", tag.Error(err), tag.ErrorType(err)) + } + scope.Tagged(metrics.ServiceErrorTypeTag(err)).IncCounter(metrics.ClientFailures) + } + stopwatch.Stop() +} diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index bbf66052d49..2cdee8574aa 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -39,898 +39,698 @@ func (c *metricClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.CountWorkflowExecutionsResponse, error) { +) (_ *workflowservice.CountWorkflowExecutionsResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientCountWorkflowExecutionsScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientCountWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientCountWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.CountWorkflowExecutions(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientCountWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.CountWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) CreateSchedule( ctx context.Context, request *workflowservice.CreateScheduleRequest, opts ...grpc.CallOption, -) (*workflowservice.CreateScheduleResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientCreateScheduleScope, metrics.ClientRequests) +) (_ *workflowservice.CreateScheduleResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientCreateScheduleScope, metrics.ClientLatency) - resp, err := c.client.CreateSchedule(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientCreateScheduleScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientCreateScheduleScope, metrics.ClientFailures) - } - return resp, err + return c.client.CreateSchedule(ctx, request, opts...) } func (c *metricClient) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, opts ...grpc.CallOption, -) (*workflowservice.DeleteScheduleResponse, error) { +) (_ *workflowservice.DeleteScheduleResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientDeleteScheduleScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDeleteScheduleScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientDeleteScheduleScope, metrics.ClientLatency) - resp, err := c.client.DeleteSchedule(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDeleteScheduleScope, metrics.ClientFailures) - } - return resp, err + return c.client.DeleteSchedule(ctx, request, opts...) } func (c *metricClient) DeprecateNamespace( ctx context.Context, request *workflowservice.DeprecateNamespaceRequest, opts ...grpc.CallOption, -) (*workflowservice.DeprecateNamespaceResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientDeprecateNamespaceScope, metrics.ClientRequests) +) (_ *workflowservice.DeprecateNamespaceResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientDeprecateNamespaceScope, metrics.ClientLatency) - resp, err := c.client.DeprecateNamespace(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDeprecateNamespaceScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDeprecateNamespaceScope, metrics.ClientFailures) - } - return resp, err + return c.client.DeprecateNamespace(ctx, request, opts...) } func (c *metricClient) DescribeNamespace( ctx context.Context, request *workflowservice.DescribeNamespaceRequest, opts ...grpc.CallOption, -) (*workflowservice.DescribeNamespaceResponse, error) { +) (_ *workflowservice.DescribeNamespaceResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeNamespaceScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDescribeNamespaceScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeNamespaceScope, metrics.ClientLatency) - resp, err := c.client.DescribeNamespace(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeNamespaceScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeNamespace(ctx, request, opts...) } func (c *metricClient) DescribeSchedule( ctx context.Context, request *workflowservice.DescribeScheduleRequest, opts ...grpc.CallOption, -) (*workflowservice.DescribeScheduleResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientDescribeScheduleScope, metrics.ClientRequests) +) (_ *workflowservice.DescribeScheduleResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeScheduleScope, metrics.ClientLatency) - resp, err := c.client.DescribeSchedule(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDescribeScheduleScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeScheduleScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeSchedule(ctx, request, opts...) } func (c *metricClient) DescribeTaskQueue( ctx context.Context, request *workflowservice.DescribeTaskQueueRequest, opts ...grpc.CallOption, -) (*workflowservice.DescribeTaskQueueResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientRequests) +) (_ *workflowservice.DescribeTaskQueueResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.DescribeTaskQueue(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDescribeTaskQueueScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeTaskQueue(ctx, request, opts...) } func (c *metricClient) DescribeWorkflowExecution( ctx context.Context, request *workflowservice.DescribeWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.DescribeWorkflowExecutionResponse, error) { +) (_ *workflowservice.DescribeWorkflowExecutionResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeWorkflowExecutionScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientDescribeWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.DescribeWorkflowExecution(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientDescribeWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.DescribeWorkflowExecution(ctx, request, opts...) } func (c *metricClient) GetClusterInfo( ctx context.Context, request *workflowservice.GetClusterInfoRequest, opts ...grpc.CallOption, -) (*workflowservice.GetClusterInfoResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientGetClusterInfoScope, metrics.ClientRequests) +) (_ *workflowservice.GetClusterInfoResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetClusterInfoScope, metrics.ClientLatency) - resp, err := c.client.GetClusterInfo(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetClusterInfoScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetClusterInfoScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetClusterInfo(ctx, request, opts...) } func (c *metricClient) GetSearchAttributes( ctx context.Context, request *workflowservice.GetSearchAttributesRequest, opts ...grpc.CallOption, -) (*workflowservice.GetSearchAttributesResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientGetSearchAttributesScope, metrics.ClientRequests) +) (_ *workflowservice.GetSearchAttributesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetSearchAttributesScope, metrics.ClientLatency) - resp, err := c.client.GetSearchAttributes(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetSearchAttributesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetSearchAttributesScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetSearchAttributes(ctx, request, opts...) } func (c *metricClient) GetSystemInfo( ctx context.Context, request *workflowservice.GetSystemInfoRequest, opts ...grpc.CallOption, -) (*workflowservice.GetSystemInfoResponse, error) { +) (_ *workflowservice.GetSystemInfoResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientGetSystemInfoScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetSystemInfoScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetSystemInfoScope, metrics.ClientLatency) - resp, err := c.client.GetSystemInfo(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetSystemInfoScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetSystemInfo(ctx, request, opts...) } func (c *metricClient) GetWorkerBuildIdOrdering( ctx context.Context, request *workflowservice.GetWorkerBuildIdOrderingRequest, opts ...grpc.CallOption, -) (*workflowservice.GetWorkerBuildIdOrderingResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkerBuildIdOrderingScope, metrics.ClientRequests) +) (_ *workflowservice.GetWorkerBuildIdOrderingResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetWorkerBuildIdOrderingScope, metrics.ClientLatency) - resp, err := c.client.GetWorkerBuildIdOrdering(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetWorkerBuildIdOrderingScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkerBuildIdOrderingScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetWorkerBuildIdOrdering(ctx, request, opts...) } func (c *metricClient) GetWorkflowExecutionHistory( ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryRequest, opts ...grpc.CallOption, -) (*workflowservice.GetWorkflowExecutionHistoryResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkflowExecutionHistoryScope, metrics.ClientRequests) +) (_ *workflowservice.GetWorkflowExecutionHistoryResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetWorkflowExecutionHistoryScope, metrics.ClientLatency) - resp, err := c.client.GetWorkflowExecutionHistory(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetWorkflowExecutionHistoryScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkflowExecutionHistoryScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetWorkflowExecutionHistory(ctx, request, opts...) } func (c *metricClient) GetWorkflowExecutionHistoryReverse( ctx context.Context, request *workflowservice.GetWorkflowExecutionHistoryReverseRequest, opts ...grpc.CallOption, -) (*workflowservice.GetWorkflowExecutionHistoryReverseResponse, error) { +) (_ *workflowservice.GetWorkflowExecutionHistoryReverseResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkflowExecutionHistoryReverseScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientGetWorkflowExecutionHistoryReverseScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientGetWorkflowExecutionHistoryReverseScope, metrics.ClientLatency) - resp, err := c.client.GetWorkflowExecutionHistoryReverse(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientGetWorkflowExecutionHistoryReverseScope, metrics.ClientFailures) - } - return resp, err + return c.client.GetWorkflowExecutionHistoryReverse(ctx, request, opts...) } func (c *metricClient) ListArchivedWorkflowExecutions( ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListArchivedWorkflowExecutionsScope, metrics.ClientRequests) +) (_ *workflowservice.ListArchivedWorkflowExecutionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListArchivedWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.ListArchivedWorkflowExecutions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListArchivedWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListArchivedWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListArchivedWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) ListClosedWorkflowExecutions( ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListClosedWorkflowExecutionsScope, metrics.ClientRequests) +) (_ *workflowservice.ListClosedWorkflowExecutionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListClosedWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.ListClosedWorkflowExecutions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListClosedWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListClosedWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListClosedWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) ListNamespaces( ctx context.Context, request *workflowservice.ListNamespacesRequest, opts ...grpc.CallOption, -) (*workflowservice.ListNamespacesResponse, error) { +) (_ *workflowservice.ListNamespacesResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientListNamespacesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListNamespacesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientListNamespacesScope, metrics.ClientLatency) - resp, err := c.client.ListNamespaces(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListNamespacesScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListNamespaces(ctx, request, opts...) } func (c *metricClient) ListOpenWorkflowExecutions( ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListOpenWorkflowExecutionsScope, metrics.ClientRequests) +) (_ *workflowservice.ListOpenWorkflowExecutionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListOpenWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.ListOpenWorkflowExecutions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListOpenWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListOpenWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListOpenWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) ListScheduleMatchingTimes( ctx context.Context, request *workflowservice.ListScheduleMatchingTimesRequest, opts ...grpc.CallOption, -) (*workflowservice.ListScheduleMatchingTimesResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientRequests) +) (_ *workflowservice.ListScheduleMatchingTimesResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientLatency) - resp, err := c.client.ListScheduleMatchingTimes(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListScheduleMatchingTimesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListScheduleMatchingTimesScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListScheduleMatchingTimes(ctx, request, opts...) } func (c *metricClient) ListSchedules( ctx context.Context, request *workflowservice.ListSchedulesRequest, opts ...grpc.CallOption, -) (*workflowservice.ListSchedulesResponse, error) { +) (_ *workflowservice.ListSchedulesResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientListSchedulesScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListSchedulesScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientListSchedulesScope, metrics.ClientLatency) - resp, err := c.client.ListSchedules(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListSchedulesScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListSchedules(ctx, request, opts...) } func (c *metricClient) ListTaskQueuePartitions( ctx context.Context, request *workflowservice.ListTaskQueuePartitionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ListTaskQueuePartitionsResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientRequests) +) (_ *workflowservice.ListTaskQueuePartitionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientLatency) - resp, err := c.client.ListTaskQueuePartitions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListTaskQueuePartitionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListTaskQueuePartitions(ctx, request, opts...) } func (c *metricClient) ListWorkflowExecutions( ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ListWorkflowExecutionsResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientListWorkflowExecutionsScope, metrics.ClientRequests) +) (_ *workflowservice.ListWorkflowExecutionsResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientListWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.ListWorkflowExecutions(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientListWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientListWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ListWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) PatchSchedule( ctx context.Context, request *workflowservice.PatchScheduleRequest, opts ...grpc.CallOption, -) (*workflowservice.PatchScheduleResponse, error) { +) (_ *workflowservice.PatchScheduleResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientPatchScheduleScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientPatchScheduleScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientPatchScheduleScope, metrics.ClientLatency) - resp, err := c.client.PatchSchedule(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientPatchScheduleScope, metrics.ClientFailures) - } - return resp, err + return c.client.PatchSchedule(ctx, request, opts...) } func (c *metricClient) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, opts ...grpc.CallOption, -) (*workflowservice.PollActivityTaskQueueResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientPollActivityTaskQueueScope, metrics.ClientRequests) +) (_ *workflowservice.PollActivityTaskQueueResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientPollActivityTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.PollActivityTaskQueue(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientPollActivityTaskQueueScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientPollActivityTaskQueueScope, metrics.ClientFailures) - } - return resp, err + return c.client.PollActivityTaskQueue(ctx, request, opts...) } func (c *metricClient) PollWorkflowTaskQueue( ctx context.Context, request *workflowservice.PollWorkflowTaskQueueRequest, opts ...grpc.CallOption, -) (*workflowservice.PollWorkflowTaskQueueResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientPollWorkflowTaskQueueScope, metrics.ClientRequests) +) (_ *workflowservice.PollWorkflowTaskQueueResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientPollWorkflowTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.PollWorkflowTaskQueue(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientPollWorkflowTaskQueueScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientPollWorkflowTaskQueueScope, metrics.ClientFailures) - } - return resp, err + return c.client.PollWorkflowTaskQueue(ctx, request, opts...) } func (c *metricClient) QueryWorkflow( ctx context.Context, request *workflowservice.QueryWorkflowRequest, opts ...grpc.CallOption, -) (*workflowservice.QueryWorkflowResponse, error) { +) (_ *workflowservice.QueryWorkflowResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientQueryWorkflowScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientQueryWorkflowScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientQueryWorkflowScope, metrics.ClientLatency) - resp, err := c.client.QueryWorkflow(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientQueryWorkflowScope, metrics.ClientFailures) - } - return resp, err + return c.client.QueryWorkflow(ctx, request, opts...) } func (c *metricClient) RecordActivityTaskHeartbeat( ctx context.Context, request *workflowservice.RecordActivityTaskHeartbeatRequest, opts ...grpc.CallOption, -) (*workflowservice.RecordActivityTaskHeartbeatResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRecordActivityTaskHeartbeatScope, metrics.ClientRequests) +) (_ *workflowservice.RecordActivityTaskHeartbeatResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRecordActivityTaskHeartbeatScope, metrics.ClientLatency) - resp, err := c.client.RecordActivityTaskHeartbeat(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRecordActivityTaskHeartbeatScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRecordActivityTaskHeartbeatScope, metrics.ClientFailures) - } - return resp, err + return c.client.RecordActivityTaskHeartbeat(ctx, request, opts...) } func (c *metricClient) RecordActivityTaskHeartbeatById( ctx context.Context, request *workflowservice.RecordActivityTaskHeartbeatByIdRequest, opts ...grpc.CallOption, -) (*workflowservice.RecordActivityTaskHeartbeatByIdResponse, error) { +) (_ *workflowservice.RecordActivityTaskHeartbeatByIdResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientRecordActivityTaskHeartbeatByIdScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRecordActivityTaskHeartbeatByIdScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientRecordActivityTaskHeartbeatByIdScope, metrics.ClientLatency) - resp, err := c.client.RecordActivityTaskHeartbeatById(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRecordActivityTaskHeartbeatByIdScope, metrics.ClientFailures) - } - return resp, err + return c.client.RecordActivityTaskHeartbeatById(ctx, request, opts...) } func (c *metricClient) RegisterNamespace( ctx context.Context, request *workflowservice.RegisterNamespaceRequest, opts ...grpc.CallOption, -) (*workflowservice.RegisterNamespaceResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRegisterNamespaceScope, metrics.ClientRequests) +) (_ *workflowservice.RegisterNamespaceResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRegisterNamespaceScope, metrics.ClientLatency) - resp, err := c.client.RegisterNamespace(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRegisterNamespaceScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRegisterNamespaceScope, metrics.ClientFailures) - } - return resp, err + return c.client.RegisterNamespace(ctx, request, opts...) } func (c *metricClient) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.RequestCancelWorkflowExecutionResponse, error) { +) (_ *workflowservice.RequestCancelWorkflowExecutionResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientRequestCancelWorkflowExecutionScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRequestCancelWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientRequestCancelWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.RequestCancelWorkflowExecution(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRequestCancelWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.RequestCancelWorkflowExecution(ctx, request, opts...) } func (c *metricClient) ResetStickyTaskQueue( ctx context.Context, request *workflowservice.ResetStickyTaskQueueRequest, opts ...grpc.CallOption, -) (*workflowservice.ResetStickyTaskQueueResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientRequests) +) (_ *workflowservice.ResetStickyTaskQueueResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientLatency) - resp, err := c.client.ResetStickyTaskQueue(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientResetStickyTaskQueueScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientFailures) - } - return resp, err + return c.client.ResetStickyTaskQueue(ctx, request, opts...) } func (c *metricClient) ResetWorkflowExecution( ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.ResetWorkflowExecutionResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.ClientRequests) +) (_ *workflowservice.ResetWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientResetWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.ResetWorkflowExecution(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientResetWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientResetWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.ResetWorkflowExecution(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskCanceled( ctx context.Context, request *workflowservice.RespondActivityTaskCanceledRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskCanceledResponse, error) { +) (_ *workflowservice.RespondActivityTaskCanceledResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCanceledScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskCanceledScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskCanceledScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCanceled(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCanceledScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskCanceled(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskCanceledById( ctx context.Context, request *workflowservice.RespondActivityTaskCanceledByIdRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskCanceledByIdResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCanceledByIdScope, metrics.ClientRequests) +) (_ *workflowservice.RespondActivityTaskCanceledByIdResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskCanceledByIdScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCanceledById(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskCanceledByIdScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCanceledByIdScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskCanceledById(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskCompleted( ctx context.Context, request *workflowservice.RespondActivityTaskCompletedRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskCompletedResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCompletedScope, metrics.ClientRequests) +) (_ *workflowservice.RespondActivityTaskCompletedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskCompletedScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCompleted(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskCompletedScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCompletedScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskCompleted(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskCompletedById( ctx context.Context, request *workflowservice.RespondActivityTaskCompletedByIdRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskCompletedByIdResponse, error) { +) (_ *workflowservice.RespondActivityTaskCompletedByIdResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCompletedByIdScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskCompletedByIdScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskCompletedByIdScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskCompletedById(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskCompletedByIdScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskCompletedById(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskFailed( ctx context.Context, request *workflowservice.RespondActivityTaskFailedRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskFailedResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskFailedScope, metrics.ClientRequests) +) (_ *workflowservice.RespondActivityTaskFailedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskFailedScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskFailed(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskFailedScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskFailedScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskFailed(ctx, request, opts...) } func (c *metricClient) RespondActivityTaskFailedById( ctx context.Context, request *workflowservice.RespondActivityTaskFailedByIdRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondActivityTaskFailedByIdResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskFailedByIdScope, metrics.ClientRequests) +) (_ *workflowservice.RespondActivityTaskFailedByIdResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondActivityTaskFailedByIdScope, metrics.ClientLatency) - resp, err := c.client.RespondActivityTaskFailedById(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondActivityTaskFailedByIdScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondActivityTaskFailedByIdScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondActivityTaskFailedById(ctx, request, opts...) } func (c *metricClient) RespondQueryTaskCompleted( ctx context.Context, request *workflowservice.RespondQueryTaskCompletedRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondQueryTaskCompletedResponse, error) { +) (_ *workflowservice.RespondQueryTaskCompletedResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientRespondQueryTaskCompletedScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondQueryTaskCompletedScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondQueryTaskCompletedScope, metrics.ClientLatency) - resp, err := c.client.RespondQueryTaskCompleted(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondQueryTaskCompletedScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondQueryTaskCompleted(ctx, request, opts...) } func (c *metricClient) RespondWorkflowTaskCompleted( ctx context.Context, request *workflowservice.RespondWorkflowTaskCompletedRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondWorkflowTaskCompletedResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondWorkflowTaskCompletedScope, metrics.ClientRequests) +) (_ *workflowservice.RespondWorkflowTaskCompletedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondWorkflowTaskCompletedScope, metrics.ClientLatency) - resp, err := c.client.RespondWorkflowTaskCompleted(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondWorkflowTaskCompletedScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondWorkflowTaskCompletedScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondWorkflowTaskCompleted(ctx, request, opts...) } func (c *metricClient) RespondWorkflowTaskFailed( ctx context.Context, request *workflowservice.RespondWorkflowTaskFailedRequest, opts ...grpc.CallOption, -) (*workflowservice.RespondWorkflowTaskFailedResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientRespondWorkflowTaskFailedScope, metrics.ClientRequests) +) (_ *workflowservice.RespondWorkflowTaskFailedResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientRespondWorkflowTaskFailedScope, metrics.ClientLatency) - resp, err := c.client.RespondWorkflowTaskFailed(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientRespondWorkflowTaskFailedScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientRespondWorkflowTaskFailedScope, metrics.ClientFailures) - } - return resp, err + return c.client.RespondWorkflowTaskFailed(ctx, request, opts...) } func (c *metricClient) ScanWorkflowExecutions( ctx context.Context, request *workflowservice.ScanWorkflowExecutionsRequest, opts ...grpc.CallOption, -) (*workflowservice.ScanWorkflowExecutionsResponse, error) { +) (_ *workflowservice.ScanWorkflowExecutionsResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientScanWorkflowExecutionsScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientScanWorkflowExecutionsScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientScanWorkflowExecutionsScope, metrics.ClientLatency) - resp, err := c.client.ScanWorkflowExecutions(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientScanWorkflowExecutionsScope, metrics.ClientFailures) - } - return resp, err + return c.client.ScanWorkflowExecutions(ctx, request, opts...) } func (c *metricClient) SignalWithStartWorkflowExecution( ctx context.Context, request *workflowservice.SignalWithStartWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.SignalWithStartWorkflowExecutionResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientSignalWithStartWorkflowExecutionScope, metrics.ClientRequests) +) (_ *workflowservice.SignalWithStartWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientSignalWithStartWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.SignalWithStartWorkflowExecution(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientSignalWithStartWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientSignalWithStartWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.SignalWithStartWorkflowExecution(ctx, request, opts...) } func (c *metricClient) SignalWorkflowExecution( ctx context.Context, request *workflowservice.SignalWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.SignalWorkflowExecutionResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientSignalWorkflowExecutionScope, metrics.ClientRequests) +) (_ *workflowservice.SignalWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientSignalWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.SignalWorkflowExecution(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientSignalWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientSignalWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.SignalWorkflowExecution(ctx, request, opts...) } func (c *metricClient) StartWorkflowExecution( ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.StartWorkflowExecutionResponse, error) { +) (_ *workflowservice.StartWorkflowExecutionResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientStartWorkflowExecutionScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientStartWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientStartWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.StartWorkflowExecution(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientStartWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.StartWorkflowExecution(ctx, request, opts...) } func (c *metricClient) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, opts ...grpc.CallOption, -) (*workflowservice.TerminateWorkflowExecutionResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientTerminateWorkflowExecutionScope, metrics.ClientRequests) +) (_ *workflowservice.TerminateWorkflowExecutionResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientTerminateWorkflowExecutionScope, metrics.ClientLatency) - resp, err := c.client.TerminateWorkflowExecution(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientTerminateWorkflowExecutionScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientTerminateWorkflowExecutionScope, metrics.ClientFailures) - } - return resp, err + return c.client.TerminateWorkflowExecution(ctx, request, opts...) } func (c *metricClient) UpdateNamespace( ctx context.Context, request *workflowservice.UpdateNamespaceRequest, opts ...grpc.CallOption, -) (*workflowservice.UpdateNamespaceResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientUpdateNamespaceScope, metrics.ClientRequests) +) (_ *workflowservice.UpdateNamespaceResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientUpdateNamespaceScope, metrics.ClientLatency) - resp, err := c.client.UpdateNamespace(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientUpdateNamespaceScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientUpdateNamespaceScope, metrics.ClientFailures) - } - return resp, err + return c.client.UpdateNamespace(ctx, request, opts...) } func (c *metricClient) UpdateSchedule( ctx context.Context, request *workflowservice.UpdateScheduleRequest, opts ...grpc.CallOption, -) (*workflowservice.UpdateScheduleResponse, error) { +) (_ *workflowservice.UpdateScheduleResponse, retError error) { - c.metricsClient.IncCounter(metrics.FrontendClientUpdateScheduleScope, metrics.ClientRequests) + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientUpdateScheduleScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - sw := c.metricsClient.StartTimer(metrics.FrontendClientUpdateScheduleScope, metrics.ClientLatency) - resp, err := c.client.UpdateSchedule(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientUpdateScheduleScope, metrics.ClientFailures) - } - return resp, err + return c.client.UpdateSchedule(ctx, request, opts...) } func (c *metricClient) UpdateWorkerBuildIdOrdering( ctx context.Context, request *workflowservice.UpdateWorkerBuildIdOrderingRequest, opts ...grpc.CallOption, -) (*workflowservice.UpdateWorkerBuildIdOrderingResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientUpdateWorkerBuildIdOrderingScope, metrics.ClientRequests) +) (_ *workflowservice.UpdateWorkerBuildIdOrderingResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientUpdateWorkerBuildIdOrderingScope, metrics.ClientLatency) - resp, err := c.client.UpdateWorkerBuildIdOrdering(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientUpdateWorkerBuildIdOrderingScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientUpdateWorkerBuildIdOrderingScope, metrics.ClientFailures) - } - return resp, err + return c.client.UpdateWorkerBuildIdOrdering(ctx, request, opts...) } func (c *metricClient) UpdateWorkflow( ctx context.Context, request *workflowservice.UpdateWorkflowRequest, opts ...grpc.CallOption, -) (*workflowservice.UpdateWorkflowResponse, error) { - - c.metricsClient.IncCounter(metrics.FrontendClientUpdateWorkflowScope, metrics.ClientRequests) +) (_ *workflowservice.UpdateWorkflowResponse, retError error) { - sw := c.metricsClient.StartTimer(metrics.FrontendClientUpdateWorkflowScope, metrics.ClientLatency) - resp, err := c.client.UpdateWorkflow(ctx, request, opts...) - sw.Stop() + scope, stopwatch := c.startMetricsRecording(metrics.FrontendClientUpdateWorkflowScope) + defer func() { + c.finishMetricsRecording(scope, stopwatch, retError) + }() - if err != nil { - c.metricsClient.IncCounter(metrics.FrontendClientUpdateWorkflowScope, metrics.ClientFailures) - } - return resp, err + return c.client.UpdateWorkflow(ctx, request, opts...) } diff --git a/cmd/tools/rpcwrappers/main.go b/cmd/tools/rpcwrappers/main.go index 89626b901f1..c2cbfb7184f 100644 --- a/cmd/tools/rpcwrappers/main.go +++ b/cmd/tools/rpcwrappers/main.go @@ -44,43 +44,33 @@ import ( type ( service struct { - name string - clientType reflect.Type - clientGenerator func(io.Writer, service) - metricGenerator func(io.Writer, service) - retryableGenerator func(io.Writer, service) + name string + clientType reflect.Type + clientGenerator func(io.Writer, service) } ) var ( services = []service{ service{ - name: "frontend", - clientType: reflect.TypeOf((*workflowservice.WorkflowServiceClient)(nil)), - clientGenerator: generateFrontendOrAdminClient, - metricGenerator: generateFrontendOrAdminMetricClient, - retryableGenerator: generateRetryableClient, + name: "frontend", + clientType: reflect.TypeOf((*workflowservice.WorkflowServiceClient)(nil)), + clientGenerator: generateFrontendOrAdminClient, }, service{ - name: "admin", - clientType: reflect.TypeOf((*adminservice.AdminServiceClient)(nil)), - clientGenerator: generateFrontendOrAdminClient, - metricGenerator: generateFrontendOrAdminMetricClient, - retryableGenerator: generateRetryableClient, + name: "admin", + clientType: reflect.TypeOf((*adminservice.AdminServiceClient)(nil)), + clientGenerator: generateFrontendOrAdminClient, }, service{ - name: "history", - clientType: reflect.TypeOf((*historyservice.HistoryServiceClient)(nil)), - clientGenerator: generateHistoryClient, - metricGenerator: generateHistoryOrMatchingMetricClient, - retryableGenerator: generateRetryableClient, + name: "history", + clientType: reflect.TypeOf((*historyservice.HistoryServiceClient)(nil)), + clientGenerator: generateHistoryClient, }, service{ - name: "matching", - clientType: reflect.TypeOf((*matchingservice.MatchingServiceClient)(nil)), - clientGenerator: generateMatchingClient, - metricGenerator: generateHistoryOrMatchingMetricClient, - retryableGenerator: generateRetryableClient, + name: "matching", + clientType: reflect.TypeOf((*matchingservice.MatchingServiceClient)(nil)), + clientGenerator: generateMatchingClient, }, } @@ -339,42 +329,7 @@ func (c *clientImpl) {{.Method}}( `) } -func generateFrontendOrAdminMetricClient(w io.Writer, service service) { - writeTemplatedCode(w, service, ` -package {{.ServiceName}} - -import ( - "context" - - "{{.ServicePackagePath}}" - "google.golang.org/grpc" - - "go.temporal.io/server/common/metrics" -) -`) - - writeTemplatedMethods(w, service, "metricsClient", ` -func (c *metricClient) {{.Method}}( - ctx context.Context, - request {{.RequestType}}, - opts ...grpc.CallOption, -) ({{.ResponseType}}, error) { - - c.metricsClient.IncCounter(metrics.{{.MetricPrefix}}{{.Method}}Scope, metrics.ClientRequests) - - sw := c.metricsClient.StartTimer(metrics.{{.MetricPrefix}}{{.Method}}Scope, metrics.ClientLatency) - resp, err := c.client.{{.Method}}(ctx, request, opts...) - sw.Stop() - - if err != nil { - c.metricsClient.IncCounter(metrics.{{.MetricPrefix}}{{.Method}}Scope, metrics.ClientFailures) - } - return resp, err -} -`) -} - -func generateHistoryOrMatchingMetricClient(w io.Writer, service service) { +func generateMetricClient(w io.Writer, service service) { writeTemplatedCode(w, service, ` package {{.ServiceName}} @@ -476,6 +431,6 @@ func main() { licenseText := readLicenseFile(*licenseFlag) callWithFile(svc.clientGenerator, svc, "client", licenseText) - callWithFile(svc.metricGenerator, svc, "metric_client", licenseText) - callWithFile(svc.retryableGenerator, svc, "retryable_client", licenseText) + callWithFile(generateMetricClient, svc, "metric_client", licenseText) + callWithFile(generateRetryableClient, svc, "retryable_client", licenseText) }