-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbq_storage_write_repro.py
120 lines (98 loc) · 3.51 KB
/
bq_storage_write_repro.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
This file demonstrates the error log being generated by the bidi.BackgroundConsumer on shutdown.
It uses no dependencies
"""
import argparse
import logging
import random
from google.cloud import bigquery
from google.cloud.bigquery_storage_v1.types import ProtoRows, ProtoSchema
from google.cloud.bigquery_storage_v1.types.storage import AppendRowsRequest
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.protobuf import descriptor_pb2
from google.cloud.bigquery_storage_v1 import BigQueryWriteClient
from my_service_pb2 import PingResponse
logger = logging.getLogger()
def get_create_table_sql(args):
return f"""
CREATE TABLE IF NOT EXISTS `{args.project_id}.{args.dataset_id}.{args.table_id}` (
number INT64,
current_ts_ms INT64
)
OPTIONS(
-- Any test dataset should already have a default expiration time, but explicitly set one as a
-- matter of good practice.
expiration_timestamp = TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 HOUR)
)
"""
def do_insert_via_append_rows_stream(args):
bq_storage_client = BigQueryWriteClient()
descriptor = descriptor_pb2.DescriptorProto()
PingResponse.DESCRIPTOR.CopyToProto(descriptor)
#
grpc_stream = AppendRowsStream(
bq_storage_client,
initial_request_template=AppendRowsRequest(
write_stream=f"{bq_storage_client.table_path(args.project_id, args.dataset_id, args.table_id)}/streams/_default",
proto_rows=AppendRowsRequest.ProtoData(
writer_schema=ProtoSchema(proto_descriptor=descriptor),
),
),
)
f = grpc_stream.send(
AppendRowsRequest(
proto_rows=AppendRowsRequest.ProtoData(
rows=ProtoRows(
serialized_rows=[
PingResponse(
number=random.randint(1, 2**63)
).SerializeToString()
]
)
)
)
)
f.result()
grpc_stream.close()
logger.debug("Stream closed, insertion success")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Repro for grpc error when using the BQ storage write API"
)
parser.add_argument(
"--project-id", type=str, required=True, help="Google cloud project id to use"
)
parser.add_argument(
"--dataset-id", type=str, required=True, help="BQ dataset to use"
)
parser.add_argument(
"--table-id", type=str, default="test_table", help="BQ table to use"
)
parser.add_argument(
"--create-table",
action="store_true",
help="Run the SQL to create the testing table",
)
parser.add_argument(
"--show-create-table-sql",
action="store_true",
help="Print the SQL to create the testing table. Use this if you want to run manually",
)
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(name)s - %(threadName)s - %(message)s",
datefmt="%H:%M:%S",
handlers=[logging.StreamHandler()],
)
create_table_sql = get_create_table_sql(args)
if args.show_create_table_sql:
print()
print("Create table SQL:")
print(create_table_sql)
print()
if args.create_table:
bq_client = bigquery.Client(project=args.project_id)
bq_client.query(create_table_sql).result()
do_insert_via_append_rows_stream(args)