Adding pure python SSL test using asyncio

- Uses thread.capnp
- Follows same format as thread_client.py/thread_server.py and
async_client.py/async_server.py
- Including a basic self signed certificate for testing convenience
- Python 3.7 has a bug cleaning up SSL when using asyncio.run
  https://bugs.python.org/issue36709
  Have a slightly more verbose workaround to do proper cleanup
This commit is contained in:
Jacob Alexander 2019-09-17 00:52:48 -07:00
parent 8915ef79f1
commit 1a127bec6f
Failed to generate hash of commit
4 changed files with 242 additions and 0 deletions

103
examples/async_ssl_client.py Executable file
View file

@ -0,0 +1,103 @@
#!/usr/bin/env python
from __future__ import print_function
import asyncio
import argparse
import threading
import time
import capnp
import socket
import ssl
import thread_capnp
capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)
def parse_args():
parser = argparse.ArgumentParser(usage='Connects to the Example thread server \
at the given address and does some RPCs')
parser.add_argument("host", help="HOST:PORT")
return parser.parse_args()
class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server):
'''An implementation of the StatusSubscriber interface'''
def status(self, value, **kwargs):
print('status: {}'.format(time.time()))
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
async def background(cap):
subscriber = StatusSubscriber()
promise = cap.subscribeStatus(subscriber)
await promise.a_wait()
async def main(host):
host = host.split(':')
addr = host[0]
port = host[1]
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile='selfsigned.cert')
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
addr, port,
ssl=ctx,
)
except:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
addr, port,
ssl=ctx,
family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
cap = client.bootstrap().cast_as(thread_capnp.Example)
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
# Start background task for subscriber
tasks = [background(cap)]
asyncio.gather(*tasks, return_exceptions=True)
# Run blocking tasks
print('main: {}'.format(time.time()))
await cap.longRunning().a_wait()
print('main: {}'.format(time.time()))
await cap.longRunning().a_wait()
print('main: {}'.format(time.time()))
await cap.longRunning().a_wait()
print('main: {}'.format(time.time()))
if __name__ == '__main__':
# Using asyncio.run hits an asyncio ssl bug
# https://bugs.python.org/issue36709
#asyncio.run(main(parse_args().host), loop=loop, debug=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(parse_args().host))

94
examples/async_ssl_server.py Executable file
View file

@ -0,0 +1,94 @@
#!/usr/bin/env python
from __future__ import print_function
import argparse
import capnp
import thread_capnp
import asyncio
import socket
import ssl
class ExampleImpl(thread_capnp.Example.Server):
"Implementation of the Example threading Cap'n Proto interface."
def subscribeStatus(self, subscriber, **kwargs):
return capnp.getTimer().after_delay(10**9) \
.then(lambda: subscriber.status(True)) \
.then(lambda _: self.subscribeStatus(subscriber))
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(3 * 10**9)
async def myreader(server, reader):
while True:
data = await reader.read(4096)
# Close connection if 0 bytes read
if len(data) == 0:
server.close()
await server.write(data)
async def mywriter(server, writer):
while True:
data = await server.read(4096)
writer.write(data.tobytes())
await writer.drain()
async def myserver(reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
server = capnp.TwoPartyServer(bootstrap=ExampleImpl())
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(server, reader), mywriter(server, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
await server.poll_forever()
def parse_args():
parser = argparse.ArgumentParser(usage='''Runs the server bound to the\
given address/port ADDRESS. ''')
parser.add_argument("address", help="ADDRESS:PORT")
return parser.parse_args()
async def main():
address = parse_args().address
host = address.split(':')
addr = host[0]
port = host[1]
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain('selfsigned.cert', 'selfsigned.key')
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
myserver,
addr, port,
ssl=ctx,
)
except:
print("Try IPv6")
server = await asyncio.start_server(
myserver,
addr, port,
ssl=ctx,
family=socket.AF_INET6,
)
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())

17
examples/selfsigned.cert Normal file
View file

@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICpDCCAYwCCQDR+CRWUUUdKDANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwHhcNMTkwOTE3MDczODI2WhcNNDcwMjAyMDczODI2WjAUMRIwEAYD
VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCg
+X+utjujNcq/zLgcuj1o0BRfu8cF1ZNS/lLhSi+B064fs7905Ii8XS7rP7LBZhXs
czvUTWDoPhvvbxkHzblPGqytAYuWWTE7YdXQNTIKm4TPZlK4vbEGMSJ1OGQxXbc9
UNKzf4VQVoa0n0bEnnqXO4kqcNANM4U9+6jN8IFZ4B82eCJmdw5Hd3HHhrPbyapL
GO2kiPzp36388n6CwFngOCv4NvHt9G5fDP9Tp+fhdHGSA9ViuDRoM39C8yHtQTjS
Fcml6J06CITpYeMd9/Of43Y9TpCVfViVlbTDE/8B9uwLzEgXJs5UBCmUjGnWtEON
vWZiM7Ul+9mOPejUMPwlAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAICkFOw0Dd1U
r60rgVpUiHMoFNuBP3ikZfBQ+KXOtfTIYbxbi+iKvvPlB1NFA7Qy3FqqN4sUncvp
QQLxdm+KClM+hvAogng/SyEJJW169vuQbqn5s/1iKCOtGFkI18thCr3rwsI6vTaR
0TmTPtjSQKl5PqcS8kQJTED+CnQhqOAv7C68Bpg+x2dSD9VCq81cPeDbfnK6gico
29qJYUm4RCXMicrzvEwNObx06TQKJb/pWjpl1NAmpFvcz+2MYPL/QTfH/cS5lhgx
KCe4/kDO0HCueOi2MqBFaO2B0kZxanMoZ2KZe2b/Bp1CTJzXCXNYWQj6QbLzZxaD
fOp0J8wAo1U=
-----END CERTIFICATE-----

28
examples/selfsigned.key Normal file
View file

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCg+X+utjujNcq/
zLgcuj1o0BRfu8cF1ZNS/lLhSi+B064fs7905Ii8XS7rP7LBZhXsczvUTWDoPhvv
bxkHzblPGqytAYuWWTE7YdXQNTIKm4TPZlK4vbEGMSJ1OGQxXbc9UNKzf4VQVoa0
n0bEnnqXO4kqcNANM4U9+6jN8IFZ4B82eCJmdw5Hd3HHhrPbyapLGO2kiPzp3638
8n6CwFngOCv4NvHt9G5fDP9Tp+fhdHGSA9ViuDRoM39C8yHtQTjSFcml6J06CITp
YeMd9/Of43Y9TpCVfViVlbTDE/8B9uwLzEgXJs5UBCmUjGnWtEONvWZiM7Ul+9mO
PejUMPwlAgMBAAECggEAD6Vwlai8zzZRSKc7Vf98LI3dDRkRVS3XLf/uSluNlo7e
o9Iyz8fOypA8GT2NwGKNyvfAXvhObQRsbq9bvXhvhJLRKde2m5x7vovZ3mztOj63
f/kwHSjC5hksgjxC8NFtGBadBDlm2dIvMasxk7bbr4tn36orbr0NPGMTm0C/Md8B
bSUzuc/mT+6KWfW9g4svqebSbKvC7tGuAu3/RfL1cmbuuvtJJA+EPfRjgCtOiFSk
8NLE0KLUYySf6M3MAHMeSwQhVr1xyYUkiOqQoxMC7CpaplNqaB2rrOe2nEjLpXgx
80WLFbB22HNEkBSgX5zz4FmrLchwaI79f3PiGQ2DWQKBgQDSIsl1t13eR8ubsGU5
Z097U8/eylyXJC+ZU1/TbWgaPNHlMf26EncSJwlmq7pNJ0Rk7p/edPMWKu9tOgQE
iG0QeKxvxcbc1LzVmfLKXY8hW4DOhiBTiRNNb+YWuYmKYnHXM8hrKthNXBFfRrwb
Pb+mid9FcK4GcIDkWbgXEahmwwKBgQDEG9sB7Ee4fDiQi1oVsPkMwxnxsiNpSRRG
9CmG/xIvL81vSaONPmg1f5q5/Wqkd6s3QmMs4rE5U8+kgRvuw/MnN/UxQBH5/LAn
T3hbo7qlIONOOpgQswg7wIQnKP31spNU2FthA1ACazRBsIyH+hPmgeqvtvFhL4Qy
6q5tfhFy9wKBgGYZtu82aCqPkdOU0qogk1Ll9zNV+cUKNQJ3qzDMkO9mq8mED7cw
L6CnTP8Q45WHRckQ1Ka/Bjm4JNtafAdDzlJZf9dTLnuv9gyHH5vJ97iKgDxYmS5d
hP50J0TVY4nUqWGZ7IB9sdlsqZg0g0NtLkiZ5t0TkcrZMRdCrJqw3rUHAoGBAJNk
wEmEti8Rpk31fsK43aba6KABHJ5gX84oayHcimVOz1/qf/ODyT0UaE2MC2AL1XLW
AcZVp5AHzxO8OitNuW5rn2zh0+EJK7iQAU0XFQxRWKaOYYaDmReXzXvFUoMdMaDe
cGfM3pDC1Gbe8/CrY9OnJ6XjoS5DUWAXhPwkeabnAoGBAMmI9xcyfcPjO6XKKymZ
z+6bwvFaey9Acy5vkxLrEcRqH8pRR09CltWzR4vhpJwHABWPHVGqdtvlLw1/lUrj
xr4RXvYXK28cK9Tdak0c3+HPSsozxrsX6AQzcG27ymo5s0KYaVuxWh98iZU7Eb52
YfCx3eOSIHPH0ay7KBVAQocG
-----END PRIVATE KEY-----