Adding examples as pytest tests

- This way they will be included in CI checks
- Decreased the delay time in the thread-like examples to speed up tests
(probably could decrease the time some more)
- Added an async version of the calculator test
- Forcing python3 support for example scripts
This commit is contained in:
Jacob Alexander 2019-09-27 14:40:54 -07:00
parent 58a5c5fc1f
commit 78776de647
Failed to generate hash of commit
12 changed files with 596 additions and 13 deletions

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python3
from __future__ import print_function
import capnp # noqa: F401

View file

@ -0,0 +1,341 @@
#!/usr/bin/env python3
from __future__ import print_function
import argparse
import asyncio
import socket
import capnp
import calculator_capnp
class PowerFunction(calculator_capnp.Calculator.Function.Server):
'''An implementation of the Function interface wrapping pow(). Note that
we're implementing this on the client side and will pass a reference to
the server. The server will then be able to make calls back to the client.'''
def call(self, params, **kwargs):
'''Note the **kwargs. This is very necessary to include, since
protocols can add parameters over time. Also, by default, a _context
variable is passed to all server methods, but you can also return
results directly as python objects, and they'll be added to the
results struct in the correct order'''
return pow(params[0], params[1])
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
def parse_args():
parser = argparse.ArgumentParser(usage='Connects to the Calculator server \
at the given address and does some RPCs')
parser.add_argument("host", help="HOST:PORT")
return parser.parse_args()
async def main(host):
host = host.split(':')
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
addr, port,
)
except:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
addr, port,
family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
# Pass "calculator" to ez_restore (there's also a `restore` function that
# takes a struct or AnyPointer as an argument), and then cast the returned
# capability to it's proper type. This casting is due to capabilities not
# having a reference to their schema
calculator = client.bootstrap().cast_as(calculator_capnp.Calculator)
'''Make a request that just evaluates the literal value 123.
What's interesting here is that evaluate() returns a "Value", which is
another interface and therefore points back to an object living on the
server. We then have to call read() on that object to read it.
However, even though we are making two RPC's, this block executes in
*one* network round trip because of promise pipelining: we do not wait
for the first call to complete before we send the second call to the
server.'''
print('Evaluating a literal... ', end="")
# Make the request. Note we are using the shorter function form (instead
# of evaluate_request), and we are passing a dictionary that represents a
# struct and its member to evaluate
eval_promise = calculator.evaluate({"literal": 123})
# This is equivalent to:
'''
request = calculator.evaluate_request()
request.expression.literal = 123
# Send it, which returns a promise for the result (without blocking).
eval_promise = request.send()
'''
# Using the promise, create a pipelined request to call read() on the
# returned object. Note that here we are using the shortened method call
# syntax read(), which is mostly just sugar for read_request().send()
read_promise = eval_promise.value.read()
# Now that we've sent all the requests, wait for the response. Until this
# point, we haven't waited at all!
response = await read_promise.a_wait()
assert response.value == 123
print("PASS")
'''Make a request to evaluate 123 + 45 - 67.
The Calculator interface requires that we first call getOperator() to
get the addition and subtraction functions, then call evaluate() to use
them. But, once again, we can get both functions, call evaluate(), and
then read() the result -- four RPCs -- in the time of *one* network
round trip, because of promise pipelining.'''
print("Using add and subtract... ", end='')
# Get the "add" function from the server.
add = calculator.getOperator(op='add').func
# Get the "subtract" function from the server.
subtract = calculator.getOperator(op='subtract').func
# Build the request to evaluate 123 + 45 - 67. Note the form is 'evaluate'
# + '_request', where 'evaluate' is the name of the method we want to call
request = calculator.evaluate_request()
subtract_call = request.expression.init('call')
subtract_call.function = subtract
subtract_params = subtract_call.init('params', 2)
subtract_params[1].literal = 67.0
add_call = subtract_params[0].init('call')
add_call.function = add
add_params = add_call.init('params', 2)
add_params[0].literal = 123
add_params[1].literal = 45
# Send the evaluate() request, read() the result, and wait for read() to finish.
eval_promise = request.send()
read_promise = eval_promise.value.read()
response = await read_promise.a_wait()
assert response.value == 101
print("PASS")
'''
Note: a one liner version of building the previous request (I highly
recommend not doing it this way for such a complicated structure, but I
just wanted to demonstrate it is possible to set all of the fields with a
dictionary):
eval_promise = calculator.evaluate(
{'call': {'function': subtract,
'params': [{'call': {'function': add,
'params': [{'literal': 123},
{'literal': 45}]}},
{'literal': 67.0}]}})
'''
'''Make a request to evaluate 4 * 6, then use the result in two more
requests that add 3 and 5.
Since evaluate() returns its result wrapped in a `Value`, we can pass
that `Value` back to the server in subsequent requests before the first
`evaluate()` has actually returned. Thus, this example again does only
one network round trip.'''
print("Pipelining eval() calls... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op='add').func
# Get the "multiply" function from the server.
multiply = calculator.getOperator(op='multiply').func
# Build the request to evaluate 4 * 6
request = calculator.evaluate_request()
multiply_call = request.expression.init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[0].literal = 4
multiply_params[1].literal = 6
multiply_result = request.send().value
# Use the result in two calls that add 3 and add 5.
add_3_request = calculator.evaluate_request()
add_3_call = add_3_request.expression.init("call")
add_3_call.function = add
add_3_params = add_3_call.init("params", 2)
add_3_params[0].previousResult = multiply_result
add_3_params[1].literal = 3
add_3_promise = add_3_request.send().value.read()
add_5_request = calculator.evaluate_request()
add_5_call = add_5_request.expression.init("call")
add_5_call.function = add
add_5_params = add_5_call.init("params", 2)
add_5_params[0].previousResult = multiply_result
add_5_params[1].literal = 5
add_5_promise = add_5_request.send().value.read()
# Now wait for the results.
assert (await add_3_promise.a_wait()).value == 27
assert (await add_5_promise.a_wait()).value == 29
print("PASS")
'''Our calculator interface supports defining functions. Here we use it
to define two functions and then make calls to them as follows:
f(x, y) = x * 100 + y
g(x) = f(x, x + 1) * 2;
f(12, 34)
g(21)
Once again, the whole thing takes only one network round trip.'''
print("Defining functions... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op='add').func
# Get the "multiply" function from the server.
multiply = calculator.getOperator(op='multiply').func
# Define f.
request = calculator.defFunction_request()
request.paramCount = 2
# Build the function body.
add_call = request.body.init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[1].parameter = 1 # y
multiply_call = add_params[0].init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[0].parameter = 0 # x
multiply_params[1].literal = 100
f = request.send().func
# Define g.
request = calculator.defFunction_request()
request.paramCount = 1
# Build the function body.
multiply_call = request.body.init("call")
multiply_call.function = multiply
multiply_params = multiply_call.init("params", 2)
multiply_params[1].literal = 2
f_call = multiply_params[0].init("call")
f_call.function = f
f_params = f_call.init("params", 2)
f_params[0].parameter = 0
add_call = f_params[1].init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[0].parameter = 0
add_params[1].literal = 1
g = request.send().func
# OK, we've defined all our functions. Now create our eval requests.
# f(12, 34)
f_eval_request = calculator.evaluate_request()
f_call = f_eval_request.expression.init("call")
f_call.function = f
f_params = f_call.init("params", 2)
f_params[0].literal = 12
f_params[1].literal = 34
f_eval_promise = f_eval_request.send().value.read()
# g(21)
g_eval_request = calculator.evaluate_request()
g_call = g_eval_request.expression.init("call")
g_call.function = g
g_call.init('params', 1)[0].literal = 21
g_eval_promise = g_eval_request.send().value.read()
# Wait for the results.
assert (await f_eval_promise.a_wait()).value == 1234
assert (await g_eval_promise.a_wait()).value == 4244
print("PASS")
'''Make a request that will call back to a function defined locally.
Specifically, we will compute 2^(4 + 5). However, exponent is not
defined by the Calculator server. So, we'll implement the Function
interface locally and pass it to the server for it to use when
evaluating the expression.
This example requires two network round trips to complete, because the
server calls back to the client once before finishing. In this
particular case, this could potentially be optimized by using a tail
call on the server side -- see CallContext::tailCall(). However, to
keep the example simpler, we haven't implemented this optimization in
the sample server.'''
print("Using a callback... ", end="")
# Get the "add" function from the server.
add = calculator.getOperator(op='add').func
# Build the eval request for 2^(4+5).
request = calculator.evaluate_request()
pow_call = request.expression.init("call")
pow_call.function = PowerFunction()
pow_params = pow_call.init("params", 2)
pow_params[0].literal = 2
add_call = pow_params[1].init("call")
add_call.function = add
add_params = add_call.init("params", 2)
add_params[0].literal = 4
add_params[1].literal = 5
# Send the request and wait.
response = await request.send().value.read().a_wait()
assert response.value == 512
print("PASS")
if __name__ == '__main__':
asyncio.run(main(parse_args().host))

View file

@ -0,0 +1,182 @@
#!/usr/bin/env python3
from __future__ import print_function
import argparse
import asyncio
import socket
import random
import capnp
import calculator_capnp
async def myreader(client, reader):
while True:
data = await reader.read(4096)
await client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
def read_value(value):
'''Helper function to asynchronously call read() on a Calculator::Value and
return a promise for the result. (In the future, the generated code might
include something like this automatically.)'''
return value.read().then(lambda result: result.value)
def evaluate_impl(expression, params=None):
'''Implementation of CalculatorImpl::evaluate(), also shared by
FunctionImpl::call(). In the latter case, `params` are the parameter
values passed to the function; in the former case, `params` is just an
empty list.'''
which = expression.which()
if which == 'literal':
return capnp.Promise(expression.literal)
elif which == 'previousResult':
return read_value(expression.previousResult)
elif which == 'parameter':
assert expression.parameter < len(params)
return capnp.Promise(params[expression.parameter])
elif which == 'call':
call = expression.call
func = call.function
# Evaluate each parameter.
paramPromises = [evaluate_impl(param, params) for param in call.params]
joinedParams = capnp.join_promises(paramPromises)
# When the parameters are complete, call the function.
ret = (joinedParams
.then(lambda vals: func.call(vals))
.then(lambda result: result.value))
return ret
else:
raise ValueError("Unknown expression type: " + which)
class ValueImpl(calculator_capnp.Calculator.Value.Server):
"Simple implementation of the Calculator.Value Cap'n Proto interface."
def __init__(self, value):
self.value = value
def read(self, **kwargs):
return self.value
class FunctionImpl(calculator_capnp.Calculator.Function.Server):
'''Implementation of the Calculator.Function Cap'n Proto interface, where the
function is defined by a Calculator.Expression.'''
def __init__(self, paramCount, body):
self.paramCount = paramCount
self.body = body.as_builder()
def call(self, params, _context, **kwargs):
'''Note that we're returning a Promise object here, and bypassing the
helper functionality that normally sets the results struct from the
returned object. Instead, we set _context.results directly inside of
another promise'''
assert len(params) == self.paramCount
# using setattr because '=' is not allowed inside of lambdas
return evaluate_impl(self.body, params).then(lambda value: setattr(_context.results, 'value', value))
class OperatorImpl(calculator_capnp.Calculator.Function.Server):
'''Implementation of the Calculator.Function Cap'n Proto interface, wrapping
basic binary arithmetic operators.'''
def __init__(self, op):
self.op = op
def call(self, params, **kwargs):
assert len(params) == 2
op = self.op
if op == 'add':
return params[0] + params[1]
elif op == 'subtract':
return params[0] - params[1]
elif op == 'multiply':
return params[0] * params[1]
elif op == 'divide':
return params[0] / params[1]
else:
raise ValueError('Unknown operator')
class CalculatorImpl(calculator_capnp.Calculator.Server):
"Implementation of the Calculator Cap'n Proto interface."
def evaluate(self, expression, _context, **kwargs):
return evaluate_impl(expression).then(lambda value: setattr(_context.results, 'value', ValueImpl(value)))
def defFunction(self, paramCount, body, _context, **kwargs):
return FunctionImpl(paramCount, body)
def getOperator(self, op, **kwargs):
return OperatorImpl(op)
def parse_args():
parser = argparse.ArgumentParser(usage='''Runs the server bound to the\
given address/port ADDRESS. ''')
parser.add_argument("address", help="ADDRESS:PORT")
return parser.parse_args()
async def myserver(reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
server = capnp.TwoPartyServer(bootstrap=CalculatorImpl())
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(server, reader), mywriter(server, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
await server.poll_forever()
async def main():
address = parse_args().address
host = address.split(':')
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
myserver,
addr, port,
)
except:
print("Try IPv6")
server = await asyncio.start_server(
myserver,
addr, port,
family=socket.AF_INET6
)
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
@ -20,7 +20,7 @@ class ExampleImpl(thread_capnp.Example.Server):
.then(lambda _: self.subscribeStatus(subscriber))
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(3 * 10**9)
return capnp.getTimer().after_delay(1 * 10**9)
async def myreader(server, reader):

View file

@ -1,9 +1,10 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
import asyncio
import argparse
import os
import time
import capnp
import socket
@ -11,6 +12,7 @@ import ssl
import thread_capnp
this_dir = os.path.dirname(os.path.abspath(__file__))
capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)
@ -55,7 +57,7 @@ async def main(host):
port = host[1]
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile='selfsigned.cert')
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=os.path.join(this_dir, 'selfsigned.cert'))
# Handle both IPv4 and IPv6 cases
try:

View file

@ -1,8 +1,9 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
import argparse
import os
import capnp
import thread_capnp
@ -11,6 +12,9 @@ import socket
import ssl
this_dir = os.path.dirname(os.path.abspath(__file__))
class ExampleImpl(thread_capnp.Example.Server):
"Implementation of the Example threading Cap'n Proto interface."
@ -21,7 +25,7 @@ class ExampleImpl(thread_capnp.Example.Server):
.then(lambda _: self.subscribeStatus(subscriber))
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(3 * 10**9)
return capnp.getTimer().after_delay(1 * 10**9)
async def myreader(server, reader):
@ -68,7 +72,7 @@ async def main():
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain('selfsigned.cert', 'selfsigned.key')
ctx.load_cert_chain(os.path.join(this_dir, 'selfsigned.cert'), os.path.join(this_dir, 'selfsigned.key'))
# Handle both IPv4 and IPv6 cases
try:

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
import argparse

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
import argparse

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
from __future__ import print_function
@ -18,7 +18,7 @@ class ExampleImpl(thread_capnp.Example.Server):
.then(lambda _: self.subscribeStatus(subscriber))
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(3 * 10**9)
return capnp.getTimer().after_delay(1 * 10**9)
def parse_args():

52
test/test_examples.py Normal file
View file

@ -0,0 +1,52 @@
import gc
import os
import socket
import subprocess
import sys # add examples dir to sys.path
import time
examples_dir = os.path.join(os.path.dirname(__file__), '..', 'examples')
def run_subprocesses(address, server, client):
server = subprocess.Popen([os.path.join(examples_dir, server), address])
time.sleep(1) # Give the server some small amount of time to start listening
client = subprocess.Popen([os.path.join(examples_dir, client), address])
ret = client.wait()
server.kill()
assert ret == 0
def test_async_calculator_example():
address = 'localhost:36432'
server = 'async_calculator_server.py'
client = 'async_calculator_client.py'
run_subprocesses(address, server, client)
def test_thread_example():
address = 'localhost:36433'
server = 'thread_server.py'
client = 'thread_client.py'
run_subprocesses(address, server, client)
def test_addressbook_example():
proc = subprocess.Popen([os.path.join(examples_dir, 'addressbook.py')])
ret = proc.wait()
assert ret == 0
def test_async_example():
address = 'localhost:36434'
server = 'async_server.py'
client = 'async_client.py'
run_subprocesses(address, server, client)
def test_ssl_async_example():
address = 'localhost:36435'
server = 'async_ssl_server.py'
client = 'async_ssl_client.py'
run_subprocesses(address, server, client)