pycapnp/examples/thread_server.py
Jacob Alexander 78776de647
Adding examples as pytest tests
- This way they will be included in CI checks
- Decreased the delay time in the thread-like examples to speed up tests
(probably could decrease the time some more)
- Added an async version of the calculator test
- Forcing python3 support for example scripts
2019-09-27 14:40:54 -07:00

42 lines
1 KiB
Python
Executable file

#!/usr/bin/env python3
from __future__ import print_function
import argparse
import capnp
import thread_capnp
class ExampleImpl(thread_capnp.Example.Server):
"Implementation of the Example threading Cap'n Proto interface."
def subscribeStatus(self, subscriber, **kwargs):
return capnp.getTimer().after_delay(10**9) \
.then(lambda: subscriber.status(True)) \
.then(lambda _: self.subscribeStatus(subscriber))
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(1 * 10**9)
def parse_args():
parser = argparse.ArgumentParser(usage='''Runs the server bound to the\
given address/port ADDRESS may be '*' to bind to all local addresses.\
:PORT may be omitted to choose a port automatically. ''')
parser.add_argument("address", help="ADDRESS[:PORT]")
return parser.parse_args()
def main():
address = parse_args().address
server = capnp.TwoPartyServer(address, bootstrap=ExampleImpl())
server.run_forever()
if __name__ == '__main__':
main()