-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_api_core_repro.py
91 lines (70 loc) · 2.96 KB
/
simple_api_core_repro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
This file demonstrates the error log being generated by the bidi.BackgroundConsumer on shutdown.
It uses no dependencies
"""
import logging
import sys
import time
from concurrent import futures
from concurrent.futures import Future
import grpc
from google.api_core import bidi
from google.protobuf.json_format import MessageToDict
import my_service_pb2_grpc
from my_service_pb2 import PingRequest, PingResponse
logger = logging.getLogger()
class MyServiceServicer(my_service_pb2_grpc.MyServiceServicer):
def PingStream(self, request_iterator, context):
for ping_request in request_iterator:
yield PingResponse(number=ping_request.number)
def perform_ping():
"""The client code which demonstrates the error log being raised by bidi.BackgroundConsumer"""
with grpc.insecure_channel("localhost:50051") as channel:
# Create the RPC instance
stub = my_service_pb2_grpc.MyServiceStub(channel)
rpc = bidi.BidiRpc(stub.PingStream)
# Create a response handler we can wait on
first_future = Future()
def _on_response(response: PingResponse):
logger.debug(f"_on_response({MessageToDict(response)})")
first_future.set_result(response)
# Create the bidi consumer and start it:
consumer = bidi.BackgroundConsumer(rpc, _on_response)
logger.info("Starting consumer")
consumer.start()
# Wait for the RPC to become active, same pattern as:
# https://github.com/googleapis/python-bigquery-storage/blob/v2.24.0/google/cloud/bigquery_storage_v1/writer.py#L179-L188
while not rpc.is_active and consumer.is_active:
logger.debug("RPC not yet active")
time.sleep(0.005)
logger.debug("RPC active")
logger.debug("Sending request")
rpc.send(PingRequest(number=1))
first_future.result()
logger.debug("Response received")
# Shutdown first the consumer then the rpc which is the same pattern as:
# https://github.com/googleapis/python-bigquery-storage/blob/v2.24.0/google/cloud/bigquery_storage_v1/writer.py#L303-L311
logger.debug("Starting shutdown")
consumer.stop()
rpc.close()
logger.debug("Shutdown complete")
# Give any background threads a chance to finish
time.sleep(0.5)
if __name__ == "__main__":
# Setup logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(name)s - %(threadName)s - %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler()],
)
# Start the server, taken from https://grpc.io/docs/languages/python/basics/
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
my_service_pb2_grpc.add_MyServiceServicer_to_server(MyServiceServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
# Run the client test code
perform_ping()
# Shut everything down
server.stop(5)
sys.exit(0)