mirror of
synced 2025-03-04 08:24:43 +01:00
Remove a bunch of unused code
This commit is contained in:
5 changed files with 10 additions and 315 deletions
@ -19,52 +19,3 @@ capnp::Capability::Client bootstrapHelperServer(capnp::RpcSystem<capnp::rpc::two
return client.bootstrap(hostId);
class ErrorHandler : public kj::TaskSet::ErrorHandler {
void taskFailed(kj::Exception&& exception) override {
struct ServerContext {
kj::Own<kj::AsyncIoStream> stream;
capnp::TwoPartyVatNetwork network;
capnp::RpcSystem<capnp::rpc::twoparty::SturdyRefHostId> rpcSystem;
ServerContext(kj::Own<kj::AsyncIoStream>&& stream, capnp::Capability::Client client, capnp::ReaderOptions & opts)
: stream(kj::mv(stream)),
network(*this->stream, capnp::rpc::twoparty::Side::SERVER, opts),
rpcSystem(makeRpcServer(network, client)) {}
void acceptLoop(kj::TaskSet & tasks, capnp::Capability::Client client, kj::Own<kj::ConnectionReceiver>&& listener, capnp::ReaderOptions & opts) {
auto ptr = listener.get();
[&, client, opts](kj::Own<kj::ConnectionReceiver>&& listener,
kj::Own<kj::AsyncIoStream>&& connection) mutable {
acceptLoop(tasks, client, kj::mv(listener), opts);
auto server = kj::heap<ServerContext>(kj::mv(connection), client, opts);
// Arrange to destroy the server context when all references are gone, or when the
// EzRpcServer is destroyed (which will destroy the TaskSet).
kj::Promise<kj::Own<PyRefCounter>> connectServer(kj::TaskSet & tasks, capnp::Capability::Client client, kj::AsyncIoProvider * provider, kj::StringPtr bindAddress, capnp::ReaderOptions & opts) {
auto paf = kj::newPromiseAndFulfiller<unsigned int>();
auto portPromise = paf.promise.fork();
[&, client, opts](kj::Own<kj::PromiseFulfiller<unsigned int>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) mutable {
auto listener = addr->listen();
acceptLoop(tasks, client, kj::mv(listener), opts);
return portPromise.addBranch().then([&](unsigned int port) {
return stealPyRef(PyLong_FromUnsignedLong(port)); });
@ -54,8 +54,6 @@ cdef extern from "kj/memory.h" namespace " ::kj":
Own[T] heap[T](...)
Own[TwoPartyVatNetwork] makeTwoPartyVatNetwork" ::kj::heap< ::capnp::TwoPartyVatNetwork>"(
AsyncIoStream& stream, Side, ReaderOptions)
Own[PromiseFulfillerPair] copyPromiseFulfillerPair" ::kj::heap< ::kj::PromiseFulfillerPair<void> >"(
cdef extern from "kj/async.h" namespace " ::kj":
cdef cppclass Promise[T] nogil:
@ -553,10 +551,6 @@ cdef extern from "kj/async.h" namespace " ::kj":
cdef cppclass VoidPromiseFulfiller"::kj::PromiseFulfiller<void>" nogil:
void fulfill()
void reject(Exception&& exception)
cdef cppclass PromiseFulfillerPair" ::kj::PromiseFulfillerPair<void>" nogil:
VoidPromise promise
Own[VoidPromiseFulfiller] fulfiller
PromiseFulfillerPair newPromiseAndFulfiller" ::kj::newPromiseAndFulfiller<void>"() nogil
PyPromiseArray joinPromises(Array[PyPromise]) nogil
cdef extern from "capnp/helpers/capabilityHelper.h":
@ -11,7 +11,6 @@ from capnp.includes.capnp_cpp cimport (
DynamicCapability as C_DynamicCapability, Request, Response, RemotePromise, Promise,
CallContext, RpcSystem, makeRpcServerBootstrap, makeRpcClient, Capability as C_Capability,
TwoPartyVatNetwork as C_TwoPartyVatNetwork, Side, AsyncIoStream, Own, makeTwoPartyVatNetwork,
PromiseFulfillerPair as C_PromiseFulfillerPair, copyPromiseFulfillerPair, newPromiseAndFulfiller,
PyArray, DynamicStruct_Builder, TwoWayPipe, PyRefCounter, PyAsyncIoStream
from capnp.includes.schema_cpp cimport Node as C_Node, EnumNode as C_EnumNode
@ -342,7 +342,6 @@ ctypedef fused PromiseTypes:
# PromiseFulfillerPair
cdef extern from "Python.h":
@ -1867,7 +1866,6 @@ cdef class _EventLoop:
cdef Own[LowLevelAsyncIoProvider] lowLevelProvider
cdef Own[AsyncIoProvider] provider
cdef WaitScope * waitScope
cdef readonly in_asyncio_mode
cdef AsyncIoEventPort *customPort
@ -1880,7 +1878,6 @@ cdef class _EventLoop:
kjLoop = self.customPort.getKjLoop()
self.waitScope = new WaitScope(deref(kjLoop))
loop.close = _partial(_asyncio_close_patch, loop, loop.close, self)
self.in_asyncio_mode = True
def __dealloc__(self):
if not self.customPort == NULL:
@ -1888,16 +1885,6 @@ cdef class _EventLoop:
del self.waitScope
del self.customPort
cdef TwoWayPipe makeTwoWayPipe(self):
if self.in_asyncio_mode:
raise RuntimeError("Cannot call makeTwoWayPipe in asyncio mode")
return deref(self.provider).newTwoWayPipe()
cdef Own[AsyncIoStream] wrapSocketFd(self, int fd):
if self.in_asyncio_mode:
raise RuntimeError("Cannot call wrapSocketFd in asyncio mode")
return deref(self.lowLevelProvider).wrapSocketFd(fd)
_C_DEFAULT_EVENT_LOOP_LOCAL = _threading.local()
@ -1923,24 +1910,6 @@ cdef _EventLoop C_DEFAULT_EVENT_LOOP_GETTER():
def wait_forever():
Use libcapnp event loop to poll/wait forever
cdef _EventLoop loop = C_DEFAULT_EVENT_LOOP_GETTER()
with nogil:
def poll_once():
Poll libcapnp event loop once
cdef _EventLoop loop = C_DEFAULT_EVENT_LOOP_GETTER()
with nogil:
cdef class _CallContext:
cdef CallContext * thisptr
@ -2396,10 +2365,6 @@ cdef class _TwoPartyVatNetwork:
self.thisptr = makeTwoPartyVatNetwork(deref(stream.thisptr), side, opts)
return self
cdef _init_pipe(self, _TwoWayPipe pipe, Side side, schema_cpp.ReaderOptions opts):
self.thisptr = makeTwoPartyVatNetwork(deref(pipe._pipe.ends[0]), side, opts)
return self
cpdef on_disconnect(self) except +reraise_kj_exception:
return _VoidPromise()._init(deref(self.thisptr).onDisconnect())
@ -2408,84 +2373,28 @@ cdef class TwoPartyClient:
TwoPartyClient for RPC Communication
Can be initialized using a socket wrapper (libcapnp) or using asyncio controlled sockets.
:param socket: Can be a string defining a socket (e.g. localhost:12345) or a file descriptor for a socket.
Passes socket directly to libcapnp. Do not use with asyncio.
:param socket: AsyncIoStream
:param traversal_limit_in_words: Pointer derefence limit (see https://capnproto.org/cxx.html).
:param nesting_limit: Recursive limit when reading types (see https://capnproto.org/cxx.html).
cdef RpcSystem * thisptr
cdef public _TwoPartyVatNetwork _network
cdef public _TwoWayPipe _pipe
cdef _TwoPartyVatNetwork _network
def __init__(self, socket=None, traversal_limit_in_words=None, nesting_limit=None):
if isinstance(socket, basestring):
if C_DEFAULT_EVENT_LOOP_GETTER().in_asyncio_mode:
raise RuntimeError("Pycapnp is in asyncio mode. Pass a AsyncIoStream")
socket = self._connect(socket)
cdef schema_cpp.ReaderOptions opts = make_reader_opts(traversal_limit_in_words, nesting_limit)
if socket is None:
# Initialize TwoWayPipe, to use pipe() acquire other end of the pipe using read() and write() methods
self._pipe = _TwoWayPipe()
self._network = _TwoPartyVatNetwork()._init_pipe(self._pipe, capnp.CLIENT, opts)
elif isinstance(socket, _AsyncIoStream):
if isinstance(socket, _AsyncIoStream):
self._network = _TwoPartyVatNetwork()._init(socket, capnp.CLIENT, opts)
elif isinstance(socket, _socket.socket):
stream = _FdAsyncIoStream(socket)
self._network = _TwoPartyVatNetwork()._init(stream, capnp.CLIENT, opts)
raise ValueError(f"Argument socket should be a string, socket, AsyncIoStream or None, was {type(socket)}")
raise ValueError(f"Argument socket should be a AsyncIoStream, was {type(socket)}")
self.thisptr = new RpcSystem(makeRpcClient(deref(self._network.thisptr)))
async def read(self, bufsize):
libcapnp reader (asyncio sockets only)
:param bufsize: Buffer size to read from the libcapnp library
cdef array.array read_buffer = array.array('b', [])
array.resize(read_buffer, bufsize)
read_size_actual = await _Promise()._init(
self._pipe._pipe.ends[1].get().read(read_buffer.data.as_voidptr, 1, bufsize)))
array.resize(read_buffer, read_size_actual)
return read_buffer
async def write(self, data):
libcapnp writer (asyncio sockets only)
:param data: Buffer to write to the libcapnp library
cdef array.array write_buffer = array.array('b', data)
await _VoidPromise()._init(
def __dealloc__(self):
if not self.thisptr == NULL:
del self.thisptr
def _connect(self, host_string):
if host_string.startswith('unix:'):
path = host_string[5:]
sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
host, port = host_string.split(':')
sock = _socket.create_connection((host, port))
# Set TCP_NODELAY on socket to disable Nagle's algorithm. This is not
# neccessary, but it speeds things up.
sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1)
return sock
cpdef bootstrap(self) except +reraise_kj_exception:
return _CapabilityClient()._init(helpers.bootstrapHelper(deref(self.thisptr)), self)
@ -2497,142 +2406,37 @@ cdef class TwoPartyServer:
TwoPartyServer for RPC Communication
Can be initialized using a socket wrapper (libcapnp) or using asyncio controlled sockets.
:param socket: Can be a string defining a socket (e.g. localhost:12345, also supports *:12345)
or a file descriptor for a socket.
Passes socket directly to libcapnp. Do not use with asyncio.
:param socket: AsyncIoStream
:param bootstrap: Class object defining the implementation of the Cap'n'proto interface.
:param traversal_limit_in_words: Pointer derefence limit (see https://capnproto.org/cxx.html).
:param nesting_limit: Recursive limit when reading types (see https://capnproto.org/cxx.html).
cdef RpcSystem * thisptr
cdef public _TwoPartyVatNetwork _network
cdef public _TwoWayPipe _pipe
cdef object _port
cdef public object port_promise, _bootstrap
cdef capnp.TaskSet * _task_set
cdef capnp.ErrorHandler _error_handler
cdef _TwoPartyVatNetwork _network
def __init__(self, socket=None, bootstrap=None, traversal_limit_in_words=None, nesting_limit=None):
if not bootstrap:
raise KjException("You must provide a bootstrap interface to a server constructor.")
cdef _InterfaceSchema schema
self._bootstrap = None
if isinstance(socket, basestring):
if C_DEFAULT_EVENT_LOOP_GETTER().in_asyncio_mode:
raise RuntimeError("Pycapnp is in asyncio mode. Please start an asyncio server using"
"TwoPartyClient.create_server and pass any resulting connection to this class.")
self._connect(socket, bootstrap, traversal_limit_in_words, nesting_limit)
opts = make_reader_opts(traversal_limit_in_words, nesting_limit)
if isinstance(socket, _AsyncIoStream):
self._network = _TwoPartyVatNetwork()._init(socket, capnp.SERVER, opts)
elif isinstance(socket, _socket.socket):
if C_DEFAULT_EVENT_LOOP_GETTER().in_asyncio_mode:
raise RuntimeError("Pycapnp is in asyncio mode. Please pass an AsyncIoStream instance.")
stream = _FdAsyncIoStream(socket)
self._network = _TwoPartyVatNetwork()._init(stream, capnp.SERVER, opts)
elif socket is None:
# Initialize TwoWayPipe, to use pipe() acquire other end of the pipe using read() and write() methods
self._pipe = _TwoWayPipe()
self._network = _TwoPartyVatNetwork()._init_pipe(
self._pipe, capnp.SERVER, opts)
raise KjException("Unexpected typ for socket in TwoPartyServer")
raise ValueError(f"Argument socket should be a AsyncIoStream, was {type(socket)}")
self._port = 0
if bootstrap:
self._bootstrap = bootstrap
schema = bootstrap.schema
cdef _InterfaceSchema schema = bootstrap.schema
self.thisptr = new RpcSystem(makeRpcServerBootstrap(
deref(self._network.thisptr), helpers.server_to_client(schema.thisptr, <PyObject *>bootstrap)))
async def read(self, bufsize):
libcapnp reader (asyncio sockets only)
:param bufsize: Buffer size to read from the libcapnp library
cdef array.array read_buffer = array.array('b', [])
array.resize(read_buffer, bufsize)
read_size_actual = await _Promise()._init(
self._pipe._pipe.ends[1].get().read(read_buffer.data.as_voidptr, 1, bufsize)))
array.resize(read_buffer, read_size_actual)
return read_buffer
async def write(self, data):
libcapnp writer (asyncio sockets only)
:param data: Buffer to write to the libcapnp library
cdef array.array write_buffer = array.array('b', data)
await _VoidPromise()._init(
cpdef _connect(self, host_string, bootstrap, traversal_limit_in_words, nesting_limit):
cdef schema_cpp.ReaderOptions opts = make_reader_opts(traversal_limit_in_words, nesting_limit)
cdef _InterfaceSchema schema
cdef _EventLoop loop = C_DEFAULT_EVENT_LOOP_GETTER()
cdef capnp.StringPtr temp_string = capnp.StringPtr(<char*>host_string, len(host_string))
self._task_set = new capnp.TaskSet(self._error_handler)
self._bootstrap = bootstrap
schema = bootstrap.schema
self.port_promise = _Promise()._init(
helpers.server_to_client(schema.thisptr, <PyObject *>bootstrap),
loop.provider.get(), temp_string, opts))
def __dealloc__(self):
del self.thisptr
del self._task_set
cpdef on_disconnect(self) except +reraise_kj_exception:
if self._task_set != NULL:
raise KjException("Currently, you can only call on_disconnect on a server without an internal socket")
return _VoidPromise()._init(deref(self._network.thisptr).onDisconnect())
def poll_once(self):
Poll libcapnp library one cycle.
return poll_once()
async def poll_forever(self):
"""Deprecated. Do not use.
Poll libcapnp library forever (asyncio)
raise KjException("This functionality has been removed. If you wish to wait forever, use \n" +
"'await asyncio._get_running_loop().create_future()'")
cpdef run_forever(self):
if self.port_promise is None:
raise KjException("You must pass a string as the socket parameter in __init__ to use this function")
cpdef bootstrap(self) except +reraise_kj_exception:
return _CapabilityClient()._init(helpers.bootstrapHelperServer(deref(self.thisptr)), self)
property port:
def __get__(self):
if self._port is None:
self._port = self.port_promise.wait()
return self._port
return self._port
cdef class _AsyncIoStream:
cdef Own[AsyncIoStream] thisptr
@ -2906,53 +2710,6 @@ cdef api void _asyncio_stream_close(object thisptr) except*:
if self.transport is not None and hasattr(self.transport, "close"):
cdef class _TwoWayPipe:
cdef _EventLoop _event_loop
cdef TwoWayPipe _pipe
def __init__(self):
cpdef _init(self) except +reraise_kj_exception:
self._event_loop = C_DEFAULT_EVENT_LOOP_GETTER()
# Create two way pipe using AsyncIoContext
self._pipe = self._event_loop.makeTwoWayPipe()
cdef class _FdAsyncIoStream(_AsyncIoStream):
"""Wraps a socket for usage with pycapnp.
Note that this class does not own the socket. Instead, it receives the fileno from a python object,
which will continue to own it. This object is kept alive as long as this class is alive. Ultimately,
the python object is responsible for closing."""
cdef object _socket
def __init__(self, object socket):
self._socket = socket
cdef _init(self, int fd) except +reraise_kj_exception:
self._event_loop = C_DEFAULT_EVENT_LOOP_GETTER()
self.thisptr = self._event_loop.wrapSocketFd(fd)
def __dealloc__(self):
# The AsyncIoStream must be destroyed before self._socket is removed, to ensure the socket is still
# open when the destructor is called. Therefore, we do this manually to have control over the ordering
self.thisptr = Own[AsyncIoStream]()
cdef class PromiseFulfillerPair:
cdef Own[C_PromiseFulfillerPair] thisptr
cdef public bint is_consumed
cdef public _VoidPromise promise
def __init__(self):
self.thisptr = copyPromiseFulfillerPair(newPromiseAndFulfiller())
self.is_consumed = False
self.promise = _VoidPromise()._init(moveVoidPromise(deref(self.thisptr).promise))
cpdef fulfill(self):
cdef class _Schema:
cdef _init(self, C_Schema other):
@ -27,7 +27,6 @@ Promise may be one of:
* :meth:`capnp.lib.capnp._Promise`
* :meth:`capnp.lib.capnp._RemotePromise`
* :meth:`capnp.lib.capnp._VoidPromise`
* :meth:`PromiseFulfillerPair`
.. autoclass:: capnp.lib.capnp._Promise
@ -44,11 +43,6 @@ Promise may be one of:
.. autoclass:: PromiseFulfillerPair
Add table
Reference in a new issue