|
20 | 20 |
|
21 | 21 | import asyncio
|
22 | 22 | import functools
|
| 23 | +import logging |
| 24 | +import pickle |
23 | 25 |
|
24 | 26 | from typing import AsyncGenerator, Generic, Iterator, Optional, TypeVar
|
25 | 27 |
|
| 28 | +import google.protobuf.json_format |
26 | 29 | import grpc
|
27 | 30 | from grpc import aio
|
| 31 | +import proto |
28 | 32 |
|
29 | 33 | from google.api_core import exceptions, grpc_helpers
|
30 | 34 |
|
31 | 35 | # denotes the proto response type for grpc calls
|
32 | 36 | P = TypeVar("P")
|
33 | 37 |
|
| 38 | +_LOGGER = logging.getLogger(__name__) |
| 39 | + |
34 | 40 | # NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform
|
35 | 41 | # automatic patching for us. But that means the overhead of creating an
|
36 | 42 | # extra Python function spreads to every single send and receive.
|
@@ -94,7 +100,28 @@ def __init__(self):
|
94 | 100 |
|
95 | 101 | async def read(self) -> P:
|
96 | 102 | try:
|
97 |
| - return await self._call.read() |
| 103 | + result = await self._call.read() |
| 104 | + logging_enabled = _LOGGER.isEnabledFor(logging.DEBUG) |
| 105 | + if logging_enabled: # pragma: NO COVER |
| 106 | + if isinstance(result, proto.Message): |
| 107 | + response_payload = type(result).to_json(result) |
| 108 | + elif isinstance(result, google.protobuf.message.Message): |
| 109 | + response_payload = google.protobuf.json_format.MessageToJson(result) |
| 110 | + else: |
| 111 | + response_payload = ( |
| 112 | + f"{type(result).__name__}: {pickle.dumps(result)}" |
| 113 | + ) |
| 114 | + grpc_response = { |
| 115 | + "payload": response_payload, |
| 116 | + "status": "OK", |
| 117 | + } |
| 118 | + _LOGGER.debug( |
| 119 | + f"Received response of type {type(result)} via gRPC stream", |
| 120 | + extra={ |
| 121 | + "response": grpc_response, |
| 122 | + }, |
| 123 | + ) |
| 124 | + return result |
98 | 125 | except grpc.RpcError as rpc_error:
|
99 | 126 | raise exceptions.from_grpc_error(rpc_error) from rpc_error
|
100 | 127 |
|
@@ -219,7 +246,7 @@ def create_channel(
|
219 | 246 | default_host=None,
|
220 | 247 | compression=None,
|
221 | 248 | attempt_direct_path: Optional[bool] = False,
|
222 |
| - **kwargs |
| 249 | + **kwargs, |
223 | 250 | ):
|
224 | 251 | """Create an AsyncIO secure channel with credentials.
|
225 | 252 |
|
|
0 commit comments