diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 index 7158a4c430..147b998efd 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc.py.j2 @@ -59,7 +59,14 @@ except ImportError: # pragma: NO COVER _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + {# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #} + {# The reason is that gRPC returns an iterator, which we provide directly as the response #} + def intercept_unary_stream(self, continuation, client_call_details, request): +{{ shared_macros.unary_request_interceptor_common(service) }} + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response = continuation(client_call_details, request) diff --git a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 index 64aeb0abf6..e0c0e17c4b 100644 --- a/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 +++ b/gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/grpc_asyncio.py.j2 @@ -64,7 +64,14 @@ except ImportError: # pragma: NO COVER _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + {# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #} + {# The reason is that gRPC returns an iterator, which we provide directly as the response #} + async def intercept_unary_stream(self, continuation, client_call_details, request): +{{ shared_macros.unary_request_interceptor_common(service) }} + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): {{ shared_macros.unary_request_interceptor_common(service) }} response = await continuation(client_call_details, request) @@ -296,6 +303,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py index 2f1d30c418..26255bf6fa 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py index 96833f741b..45c2c411c0 100755 --- a/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py +++ b/tests/integration/goldens/asset/google/cloud/asset_v1/services/asset_service/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.asset.v1.AssetService", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py index 4472c18352..3e380dd524 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc.py @@ -42,7 +42,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py index c97b00af64..26738407c9 100755 --- a/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py +++ b/tests/integration/goldens/credentials/google/iam/credentials_v1/services/iam_credentials/transports/grpc_asyncio.py @@ -46,7 +46,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.iam.credentials.v1.IAMCredentials", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -307,6 +339,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py index 5ec53389bb..951c4c2094 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc.py @@ -53,7 +53,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py index fdbfeab462..aaf558a1a0 100755 --- a/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py +++ b/tests/integration/goldens/eventarc/google/cloud/eventarc_v1/services/eventarc/transports/grpc_asyncio.py @@ -57,7 +57,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.eventarc.v1.Eventarc", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -312,6 +344,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py index 8e56dd2231..51b9054674 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py index 46dd72d79d..35ce853891 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py index ec9737507e..9b32fb0cf6 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py index 1604ba62bc..d8ad687b1e 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py index 70a0c89da1..3dffb4fdca 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py index dcadfbe957..29984993d1 100755 --- a/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py index 8e56dd2231..51b9054674 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py index 46dd72d79d..35ce853891 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/config_service_v2/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.ConfigServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -302,6 +334,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py index ec9737507e..9b32fb0cf6 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py index 1604ba62bc..d8ad687b1e 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/logging_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.LoggingServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py index 70a0c89da1..3dffb4fdca 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc.py @@ -44,7 +44,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py index dcadfbe957..29984993d1 100755 --- a/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py +++ b/tests/integration/goldens/logging_internal/google/cloud/logging_v2/services/metrics_service_v2/transports/grpc_asyncio.py @@ -48,7 +48,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.logging.v2.MetricsServiceV2", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -300,6 +332,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py index 6267d2c78a..85e09d609e 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py index 3731518549..94206ff97e 100755 --- a/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py +++ b/tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -322,6 +354,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists diff --git a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py index fcaefca376..fb247706c1 100755 --- a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py +++ b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc.py @@ -45,7 +45,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER + def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = continuation(client_call_details, request) + return response_it + def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER diff --git a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py index 23337db892..56a63a2b72 100755 --- a/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py +++ b/tests/integration/goldens/redis_selective/google/cloud/redis_v1/services/cloud_redis/transports/grpc_asyncio.py @@ -49,7 +49,39 @@ _LOGGER = std_logging.getLogger(__name__) -class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER +class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER + async def intercept_unary_stream(self, continuation, client_call_details, request): + logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) + if logging_enabled: # pragma: NO COVER + request_metadata = client_call_details.metadata + if isinstance(request, proto.Message): + request_payload = type(request).to_json(request) + elif isinstance(request, google.protobuf.message.Message): + request_payload = MessageToJson(request) + else: + request_payload = f"{type(request).__name__}: {pickle.dumps(request)}" + + request_metadata = { + key: value.decode("utf-8") if isinstance(value, bytes) else value + for key, value in request_metadata + } + grpc_request = { + "payload": request_payload, + "requestMethod": "grpc", + "metadata": dict(request_metadata), + } + _LOGGER.debug( + f"Sending request for {client_call_details.method}", + extra = { + "serviceName": "google.cloud.redis.v1.CloudRedis", + "rpcName": str(client_call_details.method), + "request": grpc_request, + "metadata": grpc_request["metadata"], + }, + ) + response_it = await continuation(client_call_details, request) + return response_it + async def intercept_unary_unary(self, continuation, client_call_details, request): logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG) if logging_enabled: # pragma: NO COVER @@ -322,6 +354,7 @@ def __init__(self, *, self._interceptor = _LoggingClientAIOInterceptor() self._grpc_channel._unary_unary_interceptors.append(self._interceptor) + self._grpc_channel._unary_stream_interceptors.append(self._interceptor) self._logged_channel = self._grpc_channel self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters # Wrap messages. This must be done after self._logged_channel exists