Skip to content

feat: add client debug logging support for streaming gRPC calls #2336

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ except ImportError: # pragma: NO COVER
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
{# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #}
{# The reason is that gRPC returns an iterator, which we provide directly as the response #}
def intercept_unary_stream(self, continuation, client_call_details, request):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a note here to clarify that:

  • request logging for streaming calls lives in the generator but response logging for streaming calls lives in python-api-core, briefly explaining why.
  • request / response logging for unary calls lives in the generator.

This'll make it easier for us to track down the implementation in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 84c7986

{{ shared_macros.unary_request_interceptor_common(service) }}
response_it = continuation(client_call_details, request)
return response_it

def intercept_unary_unary(self, continuation, client_call_details, request):
{{ shared_macros.unary_request_interceptor_common(service) }}
response = continuation(client_call_details, request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ except ImportError: # pragma: NO COVER
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
{# Streaming responses will be logged in `google-api-core`, while all requests and unary responses are logged here. #}
{# The reason is that gRPC returns an iterator, which we provide directly as the response #}
async def intercept_unary_stream(self, continuation, client_call_details, request):
{{ shared_macros.unary_request_interceptor_common(service) }}
response_it = await continuation(client_call_details, request)
return response_it

async def intercept_unary_unary(self, continuation, client_call_details, request):
{{ shared_macros.unary_request_interceptor_common(service) }}
response = await continuation(client_call_details, request)
Expand Down Expand Up @@ -296,6 +303,7 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport):

self._interceptor = _LoggingClientAIOInterceptor()
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
self._logged_channel = self._grpc_channel
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
# Wrap messages. This must be done after self._logged_channel exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.cloud.asset.v1.AssetService",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = continuation(client_call_details, request)
return response_it

def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
async def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.cloud.asset.v1.AssetService",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = await continuation(client_call_details, request)
return response_it

async def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down Expand Up @@ -302,6 +334,7 @@ def __init__(self, *,

self._interceptor = _LoggingClientAIOInterceptor()
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
self._logged_channel = self._grpc_channel
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
# Wrap messages. This must be done after self._logged_channel exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.iam.credentials.v1.IAMCredentials",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = continuation(client_call_details, request)
return response_it

def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
async def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.iam.credentials.v1.IAMCredentials",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = await continuation(client_call_details, request)
return response_it

async def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down Expand Up @@ -307,6 +339,7 @@ def __init__(self, *,

self._interceptor = _LoggingClientAIOInterceptor()
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
self._logged_channel = self._grpc_channel
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
# Wrap messages. This must be done after self._logged_channel exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.cloud.eventarc.v1.Eventarc",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = continuation(client_call_details, request)
return response_it

def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientAIOInterceptor(grpc.aio.UnaryUnaryClientInterceptor, grpc.aio.UnaryStreamClientInterceptor): # pragma: NO COVER
async def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.cloud.eventarc.v1.Eventarc",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = await continuation(client_call_details, request)
return response_it

async def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down Expand Up @@ -312,6 +344,7 @@ def __init__(self, *,

self._interceptor = _LoggingClientAIOInterceptor()
self._grpc_channel._unary_unary_interceptors.append(self._interceptor)
self._grpc_channel._unary_stream_interceptors.append(self._interceptor)
self._logged_channel = self._grpc_channel
self._wrap_with_kind = "kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
# Wrap messages. This must be done after self._logged_channel exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,39 @@
_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor): # pragma: NO COVER
def intercept_unary_stream(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra = {
"serviceName": "google.logging.v2.ConfigServiceV2",
"rpcName": str(client_call_details.method),
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)
response_it = continuation(client_call_details, request)
return response_it

def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(std_logging.DEBUG)
if logging_enabled: # pragma: NO COVER
Expand Down
Loading
Loading