Add simpler methods for instantiating TwoPartyServer/Client

This commit is contained in:
Jason Paryani 2013-12-12 11:33:52 -08:00
parent 110a7c7806
commit 174ecd5609

View file

@ -23,6 +23,8 @@ import warnings as _warnings
import inspect as _inspect
from operator import attrgetter as _attrgetter
import threading as _threading
import socket as _socket
import random as _random
# By making it public, we'll be able to call it from capabilityHelper.h
cdef public object wrap_dynamic_struct_reader(Response & r):
@ -1677,9 +1679,12 @@ cdef class TwoPartyClient:
cdef public _Restorer _restorer
cdef public _FdAsyncIoStream _stream
def __init__(self, stream, restorer=None):
self._orig_stream = stream
self._stream = _FdAsyncIoStream(stream.fileno())
def __init__(self, socket, restorer=None):
if isinstance(socket, basestring):
socket = self._connect(socket)
self._orig_stream = socket
self._stream = _FdAsyncIoStream(socket.fileno())
self._network = _TwoPartyVatNetwork()._init(deref(self._stream.thisptr), capnp.CLIENT)
if restorer is None:
self.thisptr = new RpcSystem(makeRpcClient(deref(self._network.thisptr)))
@ -1696,6 +1701,16 @@ cdef class TwoPartyClient:
def __dealloc__(self):
del self.thisptr
cpdef _connect(self, host_string):
host, port = host_string.split(':')
sock = _socket.create_connection((host, port))
# Set TCP_NODELAY on socket to disable Nagle's algorithm. This is not
# neccessary, but it speeds things up.
sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
return sock
cpdef restore(self, objectId) except +reraise_kj_exception:
cdef _MessageBuilder builder
cdef _MessageReader reader
@ -1737,13 +1752,19 @@ cdef class TwoPartyClient:
cdef class TwoPartyServer:
cdef RpcSystem * thisptr
cdef public _TwoPartyVatNetwork _network
cdef public object _orig_stream
cdef public object _orig_stream, _server_socket
cdef public _Restorer _restorer
cdef public _FdAsyncIoStream _stream
cdef public int port
def __init__(self, stream, restorer):
self._orig_stream = stream
self._stream = _FdAsyncIoStream(stream.fileno())
def __init__(self, socket, restorer, server_socket=None):
if isinstance(socket, basestring):
self._connect(socket)
else:
self._orig_stream = socket
self._stream = _FdAsyncIoStream(socket.fileno())
self._server_socket = server_socket
self.port = 0
self._restorer = _convert_restorer(restorer)
self._network = _TwoPartyVatNetwork()._init(deref(self._stream.thisptr), capnp.SERVER)
self.thisptr = new RpcSystem(makeRpcServer(deref(self._network.thisptr), deref(self._restorer.thisptr)))
@ -1753,11 +1774,52 @@ cdef class TwoPartyServer:
Py_INCREF(self._restorer)
Py_INCREF(self._network) # TODO:MEMORY: attach this to onDrained, also figure out what's leaking
cpdef _connect(self, host_string):
if ':' in host_string:
address, port = host_string.split(':')
port = int(port)
else:
port = _random.randint(60000, 61000)
if address == '*':
address = ''
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
# Set TCP_NODELAY on socket to disable Nagle's algorithm. This is not
# neccessary, but it speeds things up.
s.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
s.bind((address, port))
s.listen(1) # service only 1 client at a time
(clientsocket, address) = s.accept()
self._server_socket = s
self._orig_stream = clientsocket
self._stream = _FdAsyncIoStream(self._orig_stream.fileno())
self.port = port
def __dealloc__(self):
del self.thisptr
def run_forever(self):
_VoidPromise()._init(deref(self._network.thisptr).onDisconnect()).wait()
cpdef on_disconnect(self) except +reraise_kj_exception:
return _VoidPromise()._init(deref(self._network.thisptr).onDisconnect())
cpdef run_forever(self):
if self._server_socket is None:
raise ValueError("You must pass a `server_socket` parameter to __init__ or a string as the socket parameter to use this function")
while True:
try:
self.on_disconnect().wait()
(clientsocket, address) = self._server_socket.accept()
self._stream = _FdAsyncIoStream(clientsocket.fileno())
self._network = _TwoPartyVatNetwork()._init(deref(self._stream.thisptr), capnp.SERVER)
self.thisptr = new RpcSystem(makeRpcServer(deref(self._network.thisptr), deref(self._restorer.thisptr)))
except KeyboardInterrupt:
break
# TODO: add restore functionality here?