-
Notifications
You must be signed in to change notification settings - Fork 124
/
Copy pathasync_calculator_client.py
executable file
·308 lines (231 loc) · 10.5 KB
/
async_calculator_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python3
import argparse
import asyncio
import capnp
import calculator_capnp
class PowerFunction(calculator_capnp.Calculator.Function.Server):
"""An implementation of the Function interface wrapping pow(). Note that
we're implementing this on the client side and will pass a reference to
the server. The server will then be able to make calls back to the client."""
async def call(self, params, **kwargs):
"""Note the **kwargs. This is very necessary to include, since
protocols can add parameters over time. Also, by default, a _context
variable is passed to all server methods, but you can also return
results directly as python objects, and they'll be added to the
results struct in the correct order"""
return pow(params[0], params[1])
def parse_args():
parser = argparse.ArgumentParser(
usage="Connects to the Calculator server at the given address and does some RPCs"
)
parser.add_argument("host", help="HOST:PORT")
return parser.parse_args()
async def main(connection):
client = capnp.TwoPartyClient(connection)
# Bootstrap the Calculator interface
calculator = client.bootstrap().cast_as(calculator_capnp.Calculator)
"""Make a request that just evaluates the literal value 123.
What's interesting here is that evaluate() returns a "Value", which is
another interface and therefore points back to an object living on the
server. We then have to call read() on that object to read it.
However, even though we are making two RPC's, this block executes in
*one* network round trip because of promise pipelining: we do not wait
for the first call to complete before we send the second call to the
server."""
print("Evaluating a literal... ", end="")
# Make the request. Note we are using the shorter function form (instead
# of evaluate_request), and we are passing a dictionary that represents a
# struct and its member to evaluate
eval_promise = calculator.evaluate({"literal": 123})
# This is equivalent to:
"""
request = calculator.evaluate_request()
request.expression.literal = 123
# Send it, which returns a promise for the result (without blocking).
eval_promise = request.send()
"""
# Using the promise, create a pipelined request to call read() on the
# returned object. Note that here we are using the shortened method call
# syntax read(), which is mostly just sugar for read_request().send()
read_promise = eval_promise.value.read()
# Now that we've sent all the requests, wait for the response. Until this
# point, we haven't waited at all!
response = await read_promise
assert response.value == 123
print("PASS")
"""Make a request to evaluate 123 + 45 - 67.
The Calculator interface requires that we first call getOperator() to
get the addition and subtraction functions, then call evaluate() to use
them. But, once again, we can get both functions, call evaluate(), and
then read() the result -- four RPCs -- in the time of *one* network
round trip, because of promise pipelining."""
print("Using add and subtract... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op="add").func
# Get the "subtract" function from the server.
subtract = calculator.getOperator(op="subtract").func
# Build the request to evaluate 123 + 45 - 67. Note the form is 'evaluate'
# + '_request', where 'evaluate' is the name of the method we want to call
request = calculator.evaluate_request()
subtract_call = request.expression.init("call")
subtract_call.function = subtract
subtract_params = subtract_call.init("params", 2)
subtract_params[1].literal = 67.0
add_call = subtract_params[0].init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[0].literal = 123
add_params[1].literal = 45
# Send the evaluate() request, read() the result, and wait for read() to finish.
eval_promise = request.send()
read_promise = eval_promise.value.read()
response = await read_promise
assert response.value == 101
print("PASS")
"""
Note: a one liner version of building the previous request (I highly
recommend not doing it this way for such a complicated structure, but I
just wanted to demonstrate it is possible to set all of the fields with a
dictionary):
eval_promise = calculator.evaluate(
{'call': {'function': subtract,
'params': [{'call': {'function': add,
'params': [{'literal': 123},
{'literal': 45}]}},
{'literal': 67.0}]}})
"""
"""Make a request to evaluate 4 * 6, then use the result in two more
requests that add 3 and 5.
Since evaluate() returns its result wrapped in a `Value`, we can pass
that `Value` back to the server in subsequent requests before the first
`evaluate()` has actually returned. Thus, this example again does only
one network round trip."""
print("Pipelining eval() calls... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op="add").func
# Get the "multiply" function from the server.
multiply = calculator.getOperator(op="multiply").func
# Build the request to evaluate 4 * 6
request = calculator.evaluate_request()
multiply_call = request.expression.init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[0].literal = 4
multiply_params[1].literal = 6
multiply_result = request.send().value
# Use the result in two calls that add 3 and add 5.
add_3_request = calculator.evaluate_request()
add_3_call = add_3_request.expression.init("call")
add_3_call.function = add
add_3_params = add_3_call.init("params", 2)
add_3_params[0].previousResult = multiply_result
add_3_params[1].literal = 3
add_3_promise = add_3_request.send().value.read()
add_5_request = calculator.evaluate_request()
add_5_call = add_5_request.expression.init("call")
add_5_call.function = add
add_5_params = add_5_call.init("params", 2)
add_5_params[0].previousResult = multiply_result
add_5_params[1].literal = 5
add_5_promise = add_5_request.send().value.read()
# Now wait for the results.
assert (await add_3_promise).value == 27
assert (await add_5_promise).value == 29
print("PASS")
"""Our calculator interface supports defining functions. Here we use it
to define two functions and then make calls to them as follows:
f(x, y) = x * 100 + y
g(x) = f(x, x + 1) * 2;
f(12, 34)
g(21)
Once again, the whole thing takes only one network round trip."""
print("Defining functions... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op="add").func
# Get the "multiply" function from the server.
multiply = calculator.getOperator(op="multiply").func
# Define f.
request = calculator.defFunction_request()
request.paramCount = 2
# Build the function body.
add_call = request.body.init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[1].parameter = 1 # y
multiply_call = add_params[0].init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[0].parameter = 0 # x
multiply_params[1].literal = 100
f = request.send().func
# Define g.
request = calculator.defFunction_request()
request.paramCount = 1
# Build the function body.
multiply_call = request.body.init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[1].literal = 2
f_call = multiply_params[0].init("call")
f_call.function = f
f_params = f_call.init("params", 2)
f_params[0].parameter = 0
add_call = f_params[1].init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[0].parameter = 0
add_params[1].literal = 1
g = request.send().func
# OK, we've defined all our functions. Now create our eval requests.
# f(12, 34)
f_eval_request = calculator.evaluate_request()
f_call = f_eval_request.expression.init("call")
f_call.function = f
f_params = f_call.init("params", 2)
f_params[0].literal = 12
f_params[1].literal = 34
f_eval_promise = f_eval_request.send().value.read()
# g(21)
g_eval_request = calculator.evaluate_request()
g_call = g_eval_request.expression.init("call")
g_call.function = g
g_call.init("params", 1)[0].literal = 21
g_eval_promise = g_eval_request.send().value.read()
# Wait for the results.
assert (await f_eval_promise).value == 1234
assert (await g_eval_promise).value == 4244
print("PASS")
"""Make a request that will call back to a function defined locally.
Specifically, we will compute 2^(4 + 5). However, exponent is not
defined by the Calculator server. So, we'll implement the Function
interface locally and pass it to the server for it to use when
evaluating the expression.
This example requires two network round trips to complete, because the
server calls back to the client once before finishing. In this
particular case, this could potentially be optimized by using a tail
call on the server side -- see CallContext::tailCall(). However, to
keep the example simpler, we haven't implemented this optimization in
the sample server."""
print("Using a callback... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op="add").func
# Build the eval request for 2^(4+5).
request = calculator.evaluate_request()
pow_call = request.expression.init("call")
pow_call.function = PowerFunction()
pow_params = pow_call.init("params", 2)
pow_params[0].literal = 2
add_call = pow_params[1].init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[0].literal = 4
add_params[1].literal = 5
# Send the request and wait.
response = await request.send().value.read()
assert response.value == 512
print("PASS")
async def cmd_main(host):
host, port = host.split(":")
await main(await capnp.AsyncIoStream.create_connection(host=host, port=port))
if __name__ == "__main__":
asyncio.run(capnp.run(cmd_main(parse_args().host)))