From 1a127bec6f18072b1e1e9af84fd35fea3a0fcf2f Mon Sep 17 00:00:00 2001 From: Jacob Alexander Date: Tue, 17 Sep 2019 00:52:48 -0700 Subject: [PATCH] Adding pure python SSL test using asyncio - Uses thread.capnp - Follows same format as thread_client.py/thread_server.py and async_client.py/async_server.py - Including a basic self signed certificate for testing convenience - Python 3.7 has a bug cleaning up SSL when using asyncio.run https://bugs.python.org/issue36709 Have a slightly more verbose workaround to do proper cleanup --- examples/async_ssl_client.py | 103 +++++++++++++++++++++++++++++++++++ examples/async_ssl_server.py | 94 ++++++++++++++++++++++++++++++++ examples/selfsigned.cert | 17 ++++++ examples/selfsigned.key | 28 ++++++++++ 4 files changed, 242 insertions(+) create mode 100755 examples/async_ssl_client.py create mode 100755 examples/async_ssl_server.py create mode 100644 examples/selfsigned.cert create mode 100644 examples/selfsigned.key diff --git a/examples/async_ssl_client.py b/examples/async_ssl_client.py new file mode 100755 index 0000000..6534219 --- /dev/null +++ b/examples/async_ssl_client.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python + +from __future__ import print_function + +import asyncio +import argparse +import threading +import time +import capnp +import socket +import ssl + +import thread_capnp + +capnp.remove_event_loop() +capnp.create_event_loop(threaded=True) + + +def parse_args(): + parser = argparse.ArgumentParser(usage='Connects to the Example thread server \ +at the given address and does some RPCs') + parser.add_argument("host", help="HOST:PORT") + + return parser.parse_args() + + +class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server): + + '''An implementation of the StatusSubscriber interface''' + + def status(self, value, **kwargs): + print('status: {}'.format(time.time())) + + +async def myreader(client, reader): + while True: + data = await reader.read(4096) + client.write(data) + + +async def mywriter(client, writer): + while True: + data = await client.read(4096) + writer.write(data.tobytes()) + await writer.drain() + + +async def background(cap): + subscriber = StatusSubscriber() + promise = cap.subscribeStatus(subscriber) + await promise.a_wait() + + +async def main(host): + host = host.split(':') + addr = host[0] + port = host[1] + + # Setup SSL context + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile='selfsigned.cert') + + # Handle both IPv4 and IPv6 cases + try: + print("Try IPv4") + reader, writer = await asyncio.open_connection( + addr, port, + ssl=ctx, + ) + except: + print("Try IPv6") + reader, writer = await asyncio.open_connection( + addr, port, + ssl=ctx, + family=socket.AF_INET6 + ) + + # Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode) + client = capnp.TwoPartyClient() + cap = client.bootstrap().cast_as(thread_capnp.Example) + + # Assemble reader and writer tasks, run in the background + coroutines = [myreader(client, reader), mywriter(client, writer)] + asyncio.gather(*coroutines, return_exceptions=True) + + # Start background task for subscriber + tasks = [background(cap)] + asyncio.gather(*tasks, return_exceptions=True) + + # Run blocking tasks + print('main: {}'.format(time.time())) + await cap.longRunning().a_wait() + print('main: {}'.format(time.time())) + await cap.longRunning().a_wait() + print('main: {}'.format(time.time())) + await cap.longRunning().a_wait() + print('main: {}'.format(time.time())) + +if __name__ == '__main__': + # Using asyncio.run hits an asyncio ssl bug + # https://bugs.python.org/issue36709 + #asyncio.run(main(parse_args().host), loop=loop, debug=True) + loop = asyncio.get_event_loop() + loop.run_until_complete(main(parse_args().host)) diff --git a/examples/async_ssl_server.py b/examples/async_ssl_server.py new file mode 100755 index 0000000..2fcad3b --- /dev/null +++ b/examples/async_ssl_server.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python + +from __future__ import print_function + +import argparse +import capnp + +import thread_capnp +import asyncio +import socket +import ssl + + +class ExampleImpl(thread_capnp.Example.Server): + + "Implementation of the Example threading Cap'n Proto interface." + + def subscribeStatus(self, subscriber, **kwargs): + return capnp.getTimer().after_delay(10**9) \ + .then(lambda: subscriber.status(True)) \ + .then(lambda _: self.subscribeStatus(subscriber)) + + def longRunning(self, **kwargs): + return capnp.getTimer().after_delay(3 * 10**9) + + +async def myreader(server, reader): + while True: + data = await reader.read(4096) + # Close connection if 0 bytes read + if len(data) == 0: + server.close() + await server.write(data) + + +async def mywriter(server, writer): + while True: + data = await server.read(4096) + writer.write(data.tobytes()) + await writer.drain() + + +async def myserver(reader, writer): + # Start TwoPartyServer using TwoWayPipe (only requires bootstrap) + server = capnp.TwoPartyServer(bootstrap=ExampleImpl()) + + # Assemble reader and writer tasks, run in the background + coroutines = [myreader(server, reader), mywriter(server, writer)] + asyncio.gather(*coroutines, return_exceptions=True) + + await server.poll_forever() + + +def parse_args(): + parser = argparse.ArgumentParser(usage='''Runs the server bound to the\ +given address/port ADDRESS. ''') + + parser.add_argument("address", help="ADDRESS:PORT") + + return parser.parse_args() + + +async def main(): + address = parse_args().address + host = address.split(':') + addr = host[0] + port = host[1] + + # Setup SSL context + ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ctx.load_cert_chain('selfsigned.cert', 'selfsigned.key') + + # Handle both IPv4 and IPv6 cases + try: + print("Try IPv4") + server = await asyncio.start_server( + myserver, + addr, port, + ssl=ctx, + ) + except: + print("Try IPv6") + server = await asyncio.start_server( + myserver, + addr, port, + ssl=ctx, + family=socket.AF_INET6, + ) + + async with server: + await server.serve_forever() + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/examples/selfsigned.cert b/examples/selfsigned.cert new file mode 100644 index 0000000..399026c --- /dev/null +++ b/examples/selfsigned.cert @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE----- +MIICpDCCAYwCCQDR+CRWUUUdKDANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls +b2NhbGhvc3QwHhcNMTkwOTE3MDczODI2WhcNNDcwMjAyMDczODI2WjAUMRIwEAYD +VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCg ++X+utjujNcq/zLgcuj1o0BRfu8cF1ZNS/lLhSi+B064fs7905Ii8XS7rP7LBZhXs +czvUTWDoPhvvbxkHzblPGqytAYuWWTE7YdXQNTIKm4TPZlK4vbEGMSJ1OGQxXbc9 +UNKzf4VQVoa0n0bEnnqXO4kqcNANM4U9+6jN8IFZ4B82eCJmdw5Hd3HHhrPbyapL +GO2kiPzp36388n6CwFngOCv4NvHt9G5fDP9Tp+fhdHGSA9ViuDRoM39C8yHtQTjS +Fcml6J06CITpYeMd9/Of43Y9TpCVfViVlbTDE/8B9uwLzEgXJs5UBCmUjGnWtEON +vWZiM7Ul+9mOPejUMPwlAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAICkFOw0Dd1U +r60rgVpUiHMoFNuBP3ikZfBQ+KXOtfTIYbxbi+iKvvPlB1NFA7Qy3FqqN4sUncvp +QQLxdm+KClM+hvAogng/SyEJJW169vuQbqn5s/1iKCOtGFkI18thCr3rwsI6vTaR +0TmTPtjSQKl5PqcS8kQJTED+CnQhqOAv7C68Bpg+x2dSD9VCq81cPeDbfnK6gico +29qJYUm4RCXMicrzvEwNObx06TQKJb/pWjpl1NAmpFvcz+2MYPL/QTfH/cS5lhgx +KCe4/kDO0HCueOi2MqBFaO2B0kZxanMoZ2KZe2b/Bp1CTJzXCXNYWQj6QbLzZxaD +fOp0J8wAo1U= +-----END CERTIFICATE----- diff --git a/examples/selfsigned.key b/examples/selfsigned.key new file mode 100644 index 0000000..296d799 --- /dev/null +++ b/examples/selfsigned.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCg+X+utjujNcq/ +zLgcuj1o0BRfu8cF1ZNS/lLhSi+B064fs7905Ii8XS7rP7LBZhXsczvUTWDoPhvv +bxkHzblPGqytAYuWWTE7YdXQNTIKm4TPZlK4vbEGMSJ1OGQxXbc9UNKzf4VQVoa0 +n0bEnnqXO4kqcNANM4U9+6jN8IFZ4B82eCJmdw5Hd3HHhrPbyapLGO2kiPzp3638 +8n6CwFngOCv4NvHt9G5fDP9Tp+fhdHGSA9ViuDRoM39C8yHtQTjSFcml6J06CITp +YeMd9/Of43Y9TpCVfViVlbTDE/8B9uwLzEgXJs5UBCmUjGnWtEONvWZiM7Ul+9mO +PejUMPwlAgMBAAECggEAD6Vwlai8zzZRSKc7Vf98LI3dDRkRVS3XLf/uSluNlo7e +o9Iyz8fOypA8GT2NwGKNyvfAXvhObQRsbq9bvXhvhJLRKde2m5x7vovZ3mztOj63 +f/kwHSjC5hksgjxC8NFtGBadBDlm2dIvMasxk7bbr4tn36orbr0NPGMTm0C/Md8B +bSUzuc/mT+6KWfW9g4svqebSbKvC7tGuAu3/RfL1cmbuuvtJJA+EPfRjgCtOiFSk +8NLE0KLUYySf6M3MAHMeSwQhVr1xyYUkiOqQoxMC7CpaplNqaB2rrOe2nEjLpXgx +80WLFbB22HNEkBSgX5zz4FmrLchwaI79f3PiGQ2DWQKBgQDSIsl1t13eR8ubsGU5 +Z097U8/eylyXJC+ZU1/TbWgaPNHlMf26EncSJwlmq7pNJ0Rk7p/edPMWKu9tOgQE +iG0QeKxvxcbc1LzVmfLKXY8hW4DOhiBTiRNNb+YWuYmKYnHXM8hrKthNXBFfRrwb +Pb+mid9FcK4GcIDkWbgXEahmwwKBgQDEG9sB7Ee4fDiQi1oVsPkMwxnxsiNpSRRG +9CmG/xIvL81vSaONPmg1f5q5/Wqkd6s3QmMs4rE5U8+kgRvuw/MnN/UxQBH5/LAn +T3hbo7qlIONOOpgQswg7wIQnKP31spNU2FthA1ACazRBsIyH+hPmgeqvtvFhL4Qy +6q5tfhFy9wKBgGYZtu82aCqPkdOU0qogk1Ll9zNV+cUKNQJ3qzDMkO9mq8mED7cw +L6CnTP8Q45WHRckQ1Ka/Bjm4JNtafAdDzlJZf9dTLnuv9gyHH5vJ97iKgDxYmS5d +hP50J0TVY4nUqWGZ7IB9sdlsqZg0g0NtLkiZ5t0TkcrZMRdCrJqw3rUHAoGBAJNk +wEmEti8Rpk31fsK43aba6KABHJ5gX84oayHcimVOz1/qf/ODyT0UaE2MC2AL1XLW +AcZVp5AHzxO8OitNuW5rn2zh0+EJK7iQAU0XFQxRWKaOYYaDmReXzXvFUoMdMaDe +cGfM3pDC1Gbe8/CrY9OnJ6XjoS5DUWAXhPwkeabnAoGBAMmI9xcyfcPjO6XKKymZ +z+6bwvFaey9Acy5vkxLrEcRqH8pRR09CltWzR4vhpJwHABWPHVGqdtvlLw1/lUrj +xr4RXvYXK28cK9Tdak0c3+HPSsozxrsX6AQzcG27ymo5s0KYaVuxWh98iZU7Eb52 +YfCx3eOSIHPH0ay7KBVAQocG +-----END PRIVATE KEY-----