Integrate the KJ event loop into Python's asyncio event loop (#310)

* Integrate the KJ event loop into Python's asyncio event loop

Fix #256

This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.

* Don't memcopy buffer

* Improve promise cancellation and prepare for timer implementation

* Add attribution for asyncProvider.cpp

* Implement timeout

* Cleanup

* First round of simplifications

* Add more a_wait functions and a shutdown function

* Fix edge-cases with loop shutdown

* Clean up calculator examples

* Cleanup

* Cleanup

* Reformat

* Fix warnings

* Reformat again

* Compatibility with macos

* Inline the asyncio loop in some places where this is feasible

* Add todo

* Fix

* Remove synchronous wait

* Wrap fd listening callbacks in a class

* Remove poll_forever

* Remove the thread-local/thread-global optimization

This will not matter much soon anyway, and simplifies things

* Share promise code by using fused types

* Improve refcounting of python objects in promises

We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.

* Code wrapPyFunc in a similar way to wrapPyFuncNoArg

* Refactor capabilityHelper, fix several memory bugs for promises and add __await__

* Improve promise ownership, reduce memory leaks

Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
  nullary constructor on the stack (Promise doesn't have a nullary constructor).
  Additionally, I believe it would be difficult or impossible to detect when a
  promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
  reference to the returned object keeps existing until the promise is fulfilled
  or cancelled. Previously, this was attempted using attach, which is redundant
  and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
  action on that promise (a_wait, then, ...), we have to explicitly move() the
  ownership around. This will leave the original promise with a NULL-pointer,
  which we can easily detect as a cancelled promise.

Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.

* Simplify and test the promise joining functionality

* Attach forgotten parent

* Catch exceptions in add_reader and friends

* Further cleanup of memory leaks

* Get rid of a_wait() in examples

* Cancel all fd read operations when the python asyncio loop is closed

* Formatting

* Remove support for capnp < 7000

* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++

It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.

* Fix a bug that caused file descriptors to never be closed

* Implement AsyncIoStream based on Python transports and protocols

* Get rid of asyncProvider

All asyncio now goes through _AsyncIoStream

* Formatting

* Add __dict__ to  PyAsyncIoStreamProtocol for python 3.7

* Reintroduce strange ipv4/ipv6 selection code to make ci happy

* Extra pause_reading()

* Work around more python bugs

* Be careful to only close transport when this is still possible

* Move pause_reading() workaround
This commit is contained in:
Lasse Blaauwbroek 2023-06-06 20:08:15 +02:00 committed by GitHub
parent ed894304a3
commit d32854eb00
Failed to generate hash of commit
25 changed files with 1060 additions and 1263 deletions

View file

@ -53,6 +53,7 @@ from .lib.capnp import (
_write_message_to_fd,
_write_packed_message_to_fd,
_Promise as Promise,
_AsyncIoStream as AsyncIoStream,
_init_capnp_api,
)

View file

@ -1,69 +1,13 @@
#pragma once
#include "kj/async.h"
#include "Python.h"
#include "capabilityHelper.h"
class PyEventPort: public kj::EventPort {
public:
PyEventPort(PyObject * _py_event_port): py_event_port(_py_event_port) {
// We don't need to incref/decref, since this C++ class will be owned by the Python wrapper class, and we'll make sure the python class doesn't refcount to 0 elsewhere.
// Py_INCREF(py_event_port);
}
virtual bool wait() {
GILAcquire gil;
PyObject_CallMethod(py_event_port, const_cast<char *>("wait"), NULL);
return true; // TODO: get the bool result from python
}
virtual bool poll() {
GILAcquire gil;
PyObject_CallMethod(py_event_port, const_cast<char *>("poll"), NULL);
return true; // TODO: get the bool result from python
}
virtual void setRunnable(bool runnable) {
GILAcquire gil;
PyObject * arg = Py_False;
if (runnable)
arg = Py_True;
PyObject_CallMethod(py_event_port, const_cast<char *>("set_runnable"), const_cast<char *>("o"), arg);
}
private:
PyObject * py_event_port;
};
void waitNeverDone(kj::WaitScope & scope) {
GILRelease gil;
kj::NEVER_DONE.wait(scope);
}
void pollWaitScope(kj::WaitScope & scope) {
GILRelease gil;
scope.poll();
}
kj::Timer * getTimer(kj::AsyncIoContext * context) {
return &context->lowLevelProvider->getTimer();
}
void waitVoidPromise(kj::Promise<void> * promise, kj::WaitScope & scope) {
GILRelease gil;
promise->wait(scope);
}
PyObject * waitPyPromise(kj::Promise<PyObject *> * promise, kj::WaitScope & scope) {
GILRelease gil;
return promise->wait(scope);
}
capnp::Response< ::capnp::DynamicStruct> * waitRemote(capnp::RemotePromise< ::capnp::DynamicStruct> * promise, kj::WaitScope & scope) {
GILRelease gil;
capnp::Response< ::capnp::DynamicStruct> * waitRemote(kj::Own<capnp::RemotePromise<::capnp::DynamicStruct>> promise,
kj::WaitScope & scope) {
return new capnp::Response< ::capnp::DynamicStruct>(promise->wait(scope));
}
bool pollRemote(capnp::RemotePromise< ::capnp::DynamicStruct> * promise, kj::WaitScope & scope) {
GILRelease gil;
return promise->poll(scope);
}

View file

@ -1,53 +0,0 @@
#pragma once
#include "kj/async.h"
#include "kj/async-io.h"
class AsyncIoStreamReadHelper {
public:
AsyncIoStreamReadHelper(kj::AsyncIoStream * _stream, kj::WaitScope * _scope, size_t bufsize) {
io_stream = _stream;
wait_scope = _scope;
ready = false;
buffer_read_size = 0;
buffer = new unsigned char[bufsize];
promise = io_stream->read(buffer, 1, bufsize);
}
~AsyncIoStreamReadHelper() {
delete[] buffer;
}
bool poll() {
bool result = promise.poll(*wait_scope);
if (result) {
ready = true;
buffer_read_size = promise.wait(*wait_scope);
}
return result;
}
size_t read_size() {
if (!ready) {
return 0;
}
return buffer_read_size;
}
void * read_buffer() {
if (!ready) {
return 0;
}
return buffer;
}
private:
kj::AsyncIoStream * io_stream;
kj::WaitScope * wait_scope;
kj::Promise<size_t> promise = nullptr;
unsigned char *buffer;
size_t buffer_read_size;
bool ready;
};

View file

@ -1,8 +1,9 @@
#include "capnp/helpers/capabilityHelper.h"
#include "capnp/lib/capnp_api.h"
::kj::Promise<PyObject *> convert_to_pypromise(capnp::RemotePromise<capnp::DynamicStruct> & promise) {
return promise.then([](capnp::Response<capnp::DynamicStruct>&& response) { return wrap_dynamic_struct_reader(response); } );
::kj::Promise<kj::Own<PyRefCounter>> convert_to_pypromise(kj::Own<capnp::RemotePromise<capnp::DynamicStruct>> promise) {
return promise->then([](capnp::Response<capnp::DynamicStruct>&& response) {
return stealPyRef(wrap_dynamic_struct_reader(response)); } );
}
void reraise_kj_exception() {
@ -57,85 +58,73 @@ void check_py_error() {
}
}
kj::Promise<PyObject *> wrapPyFunc(PyObject * func, PyObject * arg) {
inline kj::Promise<kj::Own<PyRefCounter>> maybeUnwrapPromise(PyObject * result) {
check_py_error();
auto promise = extract_promise(result);
Py_DECREF(result);
return kj::mv(*promise);
}
kj::Promise<kj::Own<PyRefCounter>> wrapPyFunc(kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> arg) {
GILAcquire gil;
auto arg_promise = extract_promise(arg);
if(arg_promise == NULL) {
PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL);
Py_DECREF(arg);
check_py_error();
auto promise = extract_promise(result);
if(promise != NULL)
return kj::mv(*promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
auto remote_promise = extract_remote_promise(result);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return result;
}
else {
return arg_promise->then([&](PyObject * new_arg){ return wrapPyFunc(func, new_arg); });// TODO: delete arg_promise?
}
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * result = PyObject_CallFunctionObjArgs(func->obj, arg->obj, NULL);
return maybeUnwrapPromise(result);
}
kj::Promise<PyObject *> wrapPyFuncNoArg(PyObject * func) {
kj::Promise<kj::Own<PyRefCounter>> wrapPyFuncNoArg(kj::Own<PyRefCounter> func) {
GILAcquire gil;
PyObject * result = PyObject_CallFunctionObjArgs(func, NULL);
check_py_error();
auto promise = extract_promise(result);
if(promise != NULL)
return kj::mv(*promise);
auto remote_promise = extract_remote_promise(result);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return result;
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * result = PyObject_CallFunctionObjArgs(func->obj, NULL);
return maybeUnwrapPromise(result);
}
kj::Promise<PyObject *> wrapRemoteCall(PyObject * func, capnp::Response<capnp::DynamicStruct> & arg) {
kj::Promise<kj::Own<PyRefCounter>> wrapRemoteCall(kj::Own<PyRefCounter> func, capnp::Response<capnp::DynamicStruct> & arg) {
GILAcquire gil;
PyObject * ret = wrap_remote_call(func, arg);
check_py_error();
auto promise = extract_promise(ret);
if(promise != NULL)
return kj::mv(*promise);
auto remote_promise = extract_remote_promise(ret);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return ret;
// Creates an owned reference, which will be destroyed in maybeUnwrapPromise
PyObject * ret = wrap_remote_call(func->obj, arg);
return maybeUnwrapPromise(ret);
}
::kj::Promise<PyObject *> then(kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } );
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<kj::Own<PyRefCounter>>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func, kj::Own<PyRefCounter> arg) {
return wrapPyFunc(kj::mv(func), kj::mv(arg)); } ));
else
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
return promise->then
(kj::mvCapture(func, [](auto func, kj::Own<PyRefCounter> arg) {
return wrapPyFunc(kj::mv(func), kj::mv(arg)); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}
::kj::Promise<PyObject *> then(::capnp::RemotePromise< ::capnp::DynamicStruct> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func](capnp::Response<capnp::DynamicStruct>&& arg) { return wrapRemoteCall(func, arg); } );
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<::capnp::RemotePromise<::capnp::DynamicStruct>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func, capnp::Response<capnp::DynamicStruct>&& arg) {
return wrapRemoteCall(kj::mv(func), arg); } ));
else
return promise.then([func](capnp::Response<capnp::DynamicStruct>&& arg) { return wrapRemoteCall(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
return promise->then
(kj::mvCapture(func, [](auto func, capnp::Response<capnp::DynamicStruct>&& arg) {
return wrapRemoteCall(kj::mv(func), arg); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}
::kj::Promise<PyObject *> then(kj::Promise<void> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func]() { return wrapPyFuncNoArg(func); } );
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<void>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func) {
if(error_func->obj == Py_None)
return promise->then(kj::mvCapture(func, [](auto func) { return wrapPyFuncNoArg(kj::mv(func)); } ));
else
return promise.then([func]() { return wrapPyFuncNoArg(func); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
return promise->then(kj::mvCapture(func, [](auto func) { return wrapPyFuncNoArg(kj::mv(func)); }),
kj::mvCapture(error_func, [](auto error_func, kj::Exception arg) {
return wrapPyFunc(kj::mv(error_func), stealPyRef(wrap_kj_exception(arg))); } ));
}
::kj::Promise<PyObject *> then(kj::Promise<kj::Array<PyObject *> > && promise) {
return promise.then([](kj::Array<PyObject *>&& arg) { return convert_array_pyobject(arg); } );
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Promise<kj::Array<kj::Own<PyRefCounter>> > && promise) {
return promise.then([](kj::Array<kj::Own<PyRefCounter>>&& arg) {
return stealPyRef(convert_array_pyobject(arg)); } );
}
kj::Promise<void> PythonInterfaceDynamicImpl::call(capnp::InterfaceSchema::Method method,
@ -154,6 +143,66 @@ kj::Promise<void> PythonInterfaceDynamicImpl::call(capnp::InterfaceSchema::Metho
return ret;
};
class ReadPromiseAdapter {
public:
ReadPromiseAdapter(kj::PromiseFulfiller<size_t>& fulfiller, PyObject* protocol,
void* buffer, size_t minBytes, size_t maxBytes)
: protocol(protocol) {
_asyncio_stream_read_start(protocol, buffer, minBytes, maxBytes, fulfiller);
}
~ReadPromiseAdapter() {
_asyncio_stream_read_stop(protocol);
}
private:
PyObject* protocol;
};
class WritePromiseAdapter {
public:
WritePromiseAdapter(kj::PromiseFulfiller<void>& fulfiller, PyObject* protocol,
kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces)
: protocol(protocol) {
_asyncio_stream_write_start(protocol, pieces, fulfiller);
}
~WritePromiseAdapter() {
_asyncio_stream_write_stop(protocol);
}
private:
PyObject* protocol;
};
PyAsyncIoStream::~PyAsyncIoStream() {
_asyncio_stream_close(protocol->obj);
}
kj::Promise<size_t> PyAsyncIoStream::tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
return kj::newAdaptedPromise<size_t, ReadPromiseAdapter>(protocol->obj, buffer, minBytes, maxBytes);
}
kj::Promise<void> PyAsyncIoStream::write(const void* buffer, size_t size) {
KJ_UNIMPLEMENTED("No use-case AsyncIoStream::write was found yet.");
}
kj::Promise<void> PyAsyncIoStream::write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) {
return kj::newAdaptedPromise<void, WritePromiseAdapter>(protocol->obj, pieces);
}
kj::Promise<void> PyAsyncIoStream::whenWriteDisconnected() {
// TODO: Possibly connect this to protocol.connection_lost?
return kj::NEVER_DONE;
}
void PyAsyncIoStream::shutdownWrite() {
_asyncio_stream_shutdown_write(protocol->obj);
}
void init_capnp_api() {
import_capnp__lib__capnp();
}

View file

@ -1,6 +1,7 @@
#pragma once
#include "capnp/dynamic.h"
#include <kj/async-io.h>
#include <stdexcept>
#include "Python.h"
@ -26,57 +27,6 @@ public:
PyThreadState *_save; // The macros above read/write from this variable
};
::kj::Promise<PyObject *> convert_to_pypromise(capnp::RemotePromise<capnp::DynamicStruct> & promise);
inline ::kj::Promise<PyObject *> convert_to_pypromise(kj::Promise<void> & promise) {
return promise.then([]() {
GILAcquire gil;
Py_INCREF( Py_None );
return Py_None;
});
}
template<class T>
::kj::Promise<void> convert_to_voidpromise(kj::Promise<T> & promise) {
return promise.then([](T) { } );
}
void reraise_kj_exception();
void check_py_error();
kj::Promise<PyObject *> wrapPyFunc(PyObject * func, PyObject * arg);
kj::Promise<PyObject *> wrapPyFuncNoArg(PyObject * func);
kj::Promise<PyObject *> wrapRemoteCall(PyObject * func, capnp::Response<capnp::DynamicStruct> & arg);
::kj::Promise<PyObject *> then(kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func);
::kj::Promise<PyObject *> then(::capnp::RemotePromise< ::capnp::DynamicStruct> & promise, PyObject * func, PyObject * error_func);
::kj::Promise<PyObject *> then(kj::Promise<void> & promise, PyObject * func, PyObject * error_func);
::kj::Promise<PyObject *> then(kj::Promise<kj::Array<PyObject *> > && promise);
class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server {
public:
PyObject * py_server;
PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema, PyObject * _py_server)
: capnp::DynamicCapability::Server(schema), py_server(_py_server) {
GILAcquire gil;
Py_INCREF(_py_server);
}
~PythonInterfaceDynamicImpl() {
GILAcquire gil;
Py_DECREF(py_server);
}
kj::Promise<void> call(capnp::InterfaceSchema::Method method,
capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context);
};
class PyRefCounter {
public:
PyObject * obj;
@ -97,6 +47,63 @@ public:
}
};
inline kj::Own<PyRefCounter> stealPyRef(PyObject* o) {
auto ret = kj::heap<PyRefCounter>(o);
Py_DECREF(o);
return ret;
}
::kj::Promise<kj::Own<PyRefCounter>> convert_to_pypromise(kj::Own<capnp::RemotePromise<capnp::DynamicStruct>> promise);
inline ::kj::Promise<kj::Own<PyRefCounter>> convert_to_pypromise(kj::Own<kj::Promise<void>> promise) {
return promise->then([]() {
GILAcquire gil;
return kj::heap<PyRefCounter>(Py_None);
});
}
template<class T>
::kj::Promise<void> convert_to_voidpromise(kj::Own<kj::Promise<T>> promise) {
return promise->then([](T) { } );
}
void reraise_kj_exception();
void check_py_error();
inline kj::Promise<kj::Own<PyRefCounter>> wrapSizePromise(kj::Promise<size_t> promise) {
return promise.then([](size_t response) { return stealPyRef(PyLong_FromSize_t(response)); } );
}
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<kj::Own<PyRefCounter>>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func);
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<::capnp::RemotePromise< ::capnp::DynamicStruct>> promise,
kj::Own<PyRefCounter> func, kj::Own<PyRefCounter> error_func);
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Own<kj::Promise<void>> promise,
kj::Own<PyRefCounter>func, kj::Own<PyRefCounter> error_func);
::kj::Promise<kj::Own<PyRefCounter>> then(kj::Promise<kj::Array<kj::Own<PyRefCounter>> > && promise);
class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server {
public:
PyObject * py_server;
PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema, PyObject * _py_server)
: capnp::DynamicCapability::Server(schema), py_server(_py_server) {
GILAcquire gil;
Py_INCREF(_py_server);
}
~PythonInterfaceDynamicImpl() {
GILAcquire gil;
Py_DECREF(py_server);
}
kj::Promise<void> call(capnp::InterfaceSchema::Method method,
capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context);
};
inline capnp::DynamicCapability::Client new_client(capnp::InterfaceSchema & schema, PyObject * server) {
return capnp::DynamicCapability::Client(kj::heap<PythonInterfaceDynamicImpl>(schema, server));
}
@ -108,4 +115,30 @@ inline capnp::Capability::Client server_to_client(capnp::InterfaceSchema & schem
return kj::heap<PythonInterfaceDynamicImpl>(schema, server);
}
class PyAsyncIoStream: public kj::AsyncIoStream {
public:
kj::Own<PyRefCounter> protocol;
PyAsyncIoStream(kj::Own<PyRefCounter> protocol) : protocol(kj::mv(protocol)) {}
~PyAsyncIoStream();
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes);
kj::Promise<void> write(const void* buffer, size_t size);
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces);
kj::Promise<void> whenWriteDisconnected();
void shutdownWrite();
};
template <typename T>
inline void rejectDisconnected(kj::PromiseFulfiller<T>& fulfiller, kj::StringPtr message) {
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, message));
}
inline void rejectVoidDisconnected(kj::PromiseFulfiller<void>& fulfiller, kj::StringPtr message) {
fulfiller.reject(KJ_EXCEPTION(DISCONNECTED, message));
}
void init_capnp_api();

View file

@ -1,8 +1,9 @@
from capnp.includes.capnp_cpp cimport (
Maybe, ReaderOptions, DynamicStruct, Request, Response, PyPromise, VoidPromise, PyPromiseArray,
Maybe, ReaderOptions, DynamicStruct, Request, Response, Promise, PyPromise, VoidPromise, PyPromiseArray,
RemotePromise, DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue,
Capability, RpcSystem, MessageBuilder, MessageReader, TwoPartyVatNetwork, AnyPointer,
DynamicStruct_Builder, WaitScope, AsyncIoContext, StringPtr, TaskSet, Timer, AsyncIoStreamReadHelper,
DynamicStruct_Builder, WaitScope, AsyncIoContext, StringPtr, TaskSet, Timer,
LowLevelAsyncIoProvider, AsyncIoProvider, Own, PyRefCounter
)
from capnp.includes.schema_cpp cimport ByteArray
@ -20,31 +21,27 @@ cdef extern from "capnp/helpers/fixMaybe.h":
cdef extern from "capnp/helpers/capabilityHelper.h":
# PyPromise evalLater(EventLoop &, PyObject * func)
# PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(RemotePromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(VoidPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(Own[PyPromise] promise, Own[PyRefCounter] func, Own[PyRefCounter] error_func)
PyPromise then(Own[RemotePromise] promise, Own[PyRefCounter] func, Own[PyRefCounter] error_func)
PyPromise then(Own[VoidPromise] promise, Own[PyRefCounter] func, Own[PyRefCounter] error_func)
PyPromise then(PyPromiseArray & promise)
DynamicCapability.Client new_client(InterfaceSchema&, PyObject *)
DynamicValue.Reader new_server(InterfaceSchema&, PyObject *)
Capability.Client server_to_client(InterfaceSchema&, PyObject *)
PyPromise convert_to_pypromise(RemotePromise&)
PyPromise convert_to_pypromise(VoidPromise&)
VoidPromise convert_to_voidpromise(PyPromise&)
PyPromise convert_to_pypromise(Own[RemotePromise])
PyPromise convert_to_pypromise(Own[VoidPromise])
VoidPromise convert_to_voidpromise(Own[PyPromise])
PyPromise wrapSizePromise(Promise[size_t])
void init_capnp_api()
cdef extern from "capnp/helpers/rpcHelper.h":
Capability.Client bootstrapHelper(RpcSystem&)
Capability.Client bootstrapHelperServer(RpcSystem&)
PyPromise connectServer(TaskSet &, Capability.Client, AsyncIoContext *, StringPtr, ReaderOptions &)
PyPromise connectServer(TaskSet &, Capability.Client, AsyncIoProvider *, StringPtr, ReaderOptions &)
cdef extern from "capnp/helpers/serialize.h":
ByteArray messageToPackedBytes(MessageBuilder &, size_t wordCount)
cdef extern from "capnp/helpers/asyncHelper.h":
void waitNeverDone(WaitScope&)
void pollWaitScope(WaitScope&)
Response * waitRemote(RemotePromise *, WaitScope&)
bool pollRemote(RemotePromise *, WaitScope&)
PyObject * waitPyPromise(PyPromise *, WaitScope&)
void waitVoidPromise(VoidPromise *, WaitScope&)
Timer * getTimer(AsyncIoContext *) except +reraise_kj_exception
void waitNeverDone(WaitScope&) except +reraise_kj_exception nogil
Response * waitRemote(Own[RemotePromise], WaitScope&) except +reraise_kj_exception nogil

View file

@ -9,11 +9,8 @@ cdef extern from "capnp/helpers/capabilityHelper.h":
void reraise_kj_exception()
cdef cppclass PyRefCounter:
PyRefCounter(PyObject *)
PyObject * obj
cdef extern from "capnp/helpers/rpcHelper.h":
cdef cppclass ErrorHandler:
pass
cdef extern from "capnp/helpers/asyncHelper.h":
cdef cppclass PyEventPort:
PyEventPort(PyObject *)

View file

@ -52,11 +52,11 @@ void acceptLoop(kj::TaskSet & tasks, capnp::Capability::Client client, kj::Own<k
})));
}
kj::Promise<PyObject *> connectServer(kj::TaskSet & tasks, capnp::Capability::Client client, kj::AsyncIoContext * context, kj::StringPtr bindAddress, capnp::ReaderOptions & opts) {
kj::Promise<kj::Own<PyRefCounter>> connectServer(kj::TaskSet & tasks, capnp::Capability::Client client, kj::AsyncIoProvider * provider, kj::StringPtr bindAddress, capnp::ReaderOptions & opts) {
auto paf = kj::newPromiseAndFulfiller<unsigned int>();
auto portPromise = paf.promise.fork();
tasks.add(context->provider->getNetwork().parseAddress(bindAddress)
tasks.add(provider->getNetwork().parseAddress(bindAddress)
.then(kj::mvCapture(paf.fulfiller,
[&, client, opts](kj::Own<kj::PromiseFulfiller<unsigned int>>&& portFulfiller,
kj::Own<kj::NetworkAddress>&& addr) mutable {
@ -65,5 +65,6 @@ kj::Promise<PyObject *> connectServer(kj::TaskSet & tasks, capnp::Capability::Cl
acceptLoop(tasks, client, kj::mv(listener), opts);
})));
return portPromise.addBranch().then([&](unsigned int port) { return PyLong_FromUnsignedLong(port); });
return portPromise.addBranch().then([&](unsigned int port) {
return stealPyRef(PyLong_FromUnsignedLong(port)); });
}

View file

@ -5,7 +5,7 @@ cdef extern from "capnp/helpers/checkCompiler.h":
from libcpp cimport bool
from capnp.helpers.non_circular cimport (
PythonInterfaceDynamicImpl, reraise_kj_exception, PyRefCounter, PyEventPort, ErrorHandler,
PythonInterfaceDynamicImpl, reraise_kj_exception, PyRefCounter, ErrorHandler,
)
from capnp.includes.schema_cpp cimport (
Node, Data, StructNode, EnumNode, InterfaceNode, MessageBuilder, MessageReader, ReaderOptions,
@ -48,21 +48,21 @@ cdef extern from "kj/exception.h" namespace " ::kj":
cdef extern from "kj/memory.h" namespace " ::kj":
cdef cppclass Own[T] nogil:
Own()
T& operator*()
T* get()
Own[T] heap[T](...)
Own[TwoPartyVatNetwork] makeTwoPartyVatNetwork" ::kj::heap< ::capnp::TwoPartyVatNetwork>"(
AsyncIoStream& stream, Side, ReaderOptions)
Own[PromiseFulfillerPair] copyPromiseFulfillerPair" ::kj::heap< ::kj::PromiseFulfillerPair<void> >"(
PromiseFulfillerPair&)
Own[PyRefCounter] makePyRefCounter" ::kj::heap< PyRefCounter >"(PyObject *)
cdef extern from "kj/async.h" namespace " ::kj":
cdef cppclass Promise[T] nogil:
Promise()
Promise(Promise)
Promise(T)
T wait(WaitScope)
bool poll(WaitScope)
T wait(WaitScope) except +reraise_kj_exception
bool poll(WaitScope) except +reraise_kj_exception
# ForkedPromise<T> fork()
# Promise<T> exclusiveJoin(Promise<T>&& other)
# Promise[T] eagerlyEvaluate()
@ -73,7 +73,15 @@ cdef extern from "kj/async.h" namespace " ::kj":
Promise[T] attach(Own[PyRefCounter] &, Own[PyRefCounter] &, Own[PyRefCounter] &)
Promise[T] attach(Own[PyRefCounter] &, Own[PyRefCounter] &, Own[PyRefCounter] &, Own[PyRefCounter] &)
ctypedef Promise[PyObject *] PyPromise
cdef cppclass Canceler nogil:
Canceler()
Promise[T] wrap[T](Promise[T] promise)
void cancel(StringPtr cancelReason)
void cancel(Exception& exception)
void release()
bool isEmpty()
ctypedef Promise[Own[PyRefCounter]] PyPromise
ctypedef Promise[void] VoidPromise
cdef extern from "kj/string-tree.h" namespace " ::kj":
@ -82,10 +90,11 @@ cdef extern from "kj/string-tree.h" namespace " ::kj":
cdef extern from "kj/common.h" namespace " ::kj":
cdef cppclass Maybe[T] nogil:
pass
T& orDefault(T&)
cdef cppclass ArrayPtr[T] nogil:
ArrayPtr()
ArrayPtr(T *, size_t size)
T* begin()
size_t size()
T& operator[](size_t index)
@ -101,9 +110,9 @@ cdef extern from "kj/array.h" namespace " ::kj":
T& add(T&)
Array[T] finish()
ArrayBuilder[PyPromise] heapArrayBuilderPyPromise"::kj::heapArrayBuilder< ::kj::Promise<PyObject *> >"(size_t) nogil
ArrayBuilder[PyPromise] heapArrayBuilderPyPromise"::kj::heapArrayBuilder< ::kj::Promise<kj::Own<PyRefCounter>> >"(size_t) nogil
ctypedef Array[PyObject *] PyArray' ::kj::Array<PyObject *>'
ctypedef Array[Own[PyRefCounter]] PyArray' ::kj::Array<kj::Own<PyRefCounter>>'
ctypedef Promise[PyArray] PyPromiseArray
@ -117,12 +126,23 @@ cdef extern from "kj/time.h" namespace " ::kj":
Duration MINUTES
Duration HOURS
Duration DAYS
# cdef cppclass TimePoint:
# TimePoint(Duration)
cdef cppclass TimePoint:
TimePoint(Duration)
cdef cppclass MonotonicClock nogil:
MonotonicClock(MonotonicClock&)
TimePoint now()
MonotonicClock systemPreciseMonotonicClock()
cdef extern from "kj/timer.h" namespace " ::kj":
cdef cppclass Timer nogil:
# int64_t now()
# VoidPromise atTime(TimePoint time)
VoidPromise afterDelay(Duration delay)
cdef cppclass TimerImpl(Timer) nogil:
TimerImpl(TimePoint startTime)
Maybe[TimePoint] nextEvent()
Maybe[uint64_t] timeoutToNextEvent(TimePoint start, Duration unit, uint64_t max)
void advanceTo(TimePoint newTime)
cdef inline Duration Nanoseconds(int64_t nanos):
return NANOSECONDS * nanos
@ -132,7 +152,7 @@ cdef extern from "kj/async-io.h" namespace " ::kj":
Promise[size_t] read(void*, size_t, size_t)
Promise[void] write(const void*, size_t)
cdef cppclass LowLevelAsyncIoProvider nogil:
cdef cppclass LowLevelAsyncIoProvider:
# Own[AsyncInputStream] wrapInputFd(int)
# Own[AsyncOutputStream] wrapOutputFd(int)
Own[AsyncIoStream] wrapSocketFd(int)
@ -141,9 +161,6 @@ cdef extern from "kj/async-io.h" namespace " ::kj":
cdef cppclass AsyncIoProvider nogil:
TwoWayPipe newTwoWayPipe()
cdef cppclass WaitScope nogil:
pass
cdef cppclass AsyncIoContext nogil:
AsyncIoContext(AsyncIoContext&)
Own[LowLevelAsyncIoProvider] lowLevelProvider
@ -157,6 +174,7 @@ cdef extern from "kj/async-io.h" namespace " ::kj":
Own[AsyncIoStream] ends[2]
AsyncIoContext setupAsyncIo() nogil
Own[AsyncIoProvider] newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
cdef extern from "capnp/schema.capnp.h" namespace " ::capnp":
enum TypeWhich" ::capnp::schema::Type::Which":
@ -518,20 +536,31 @@ cdef extern from "capnp/capability.h" namespace " ::capnp":
void allowCancellation() except +reraise_kj_exception
cdef extern from "kj/async.h" namespace " ::kj":
cdef cppclass EventPort:
bool wait() except* with gil
bool poll() except* with gil
void setRunnable(bool runnable) except* with gil
cdef cppclass EventLoop nogil:
EventLoop()
EventLoop(PyEventPort &)
cdef cppclass PromiseFulfiller nogil:
EventLoop(EventPort &)
void run()
cdef cppclass WaitScope nogil:
WaitScope(EventLoop &)
void poll()
cdef cppclass PromiseFulfiller[T] nogil:
void fulfill(T&& value)
void reject(Exception&& exception)
cdef cppclass VoidPromiseFulfiller"::kj::PromiseFulfiller<void>" nogil:
void fulfill()
void reject(Exception&& exception)
cdef cppclass PromiseFulfillerPair" ::kj::PromiseFulfillerPair<void>" nogil:
VoidPromise promise
Own[PromiseFulfiller] fulfiller
Own[VoidPromiseFulfiller] fulfiller
PromiseFulfillerPair newPromiseAndFulfiller" ::kj::newPromiseAndFulfiller<void>"() nogil
PyPromiseArray joinPromises(Array[PyPromise]) nogil
cdef extern from "capnp/helpers/asyncIoHelper.h":
cdef cppclass AsyncIoStreamReadHelper nogil:
AsyncIoStreamReadHelper(AsyncIoStream *, WaitScope *, size_t)
bool poll()
size_t read_size()
void* read_buffer()
cdef extern from "capnp/helpers/capabilityHelper.h":
cdef cppclass PyAsyncIoStream(AsyncIoStream):
PyAsyncIoStream(PyObject* thisptr)
void rejectDisconnected[T](PromiseFulfiller[T]& fulfiller, StringPtr message)
void rejectVoidDisconnected(VoidPromiseFulfiller& fulfiller, StringPtr message)

View file

@ -12,7 +12,7 @@ from capnp.includes.capnp_cpp cimport (
CallContext, RpcSystem, makeRpcServerBootstrap, makeRpcClient, Capability as C_Capability,
TwoPartyVatNetwork as C_TwoPartyVatNetwork, Side, AsyncIoStream, Own, makeTwoPartyVatNetwork,
PromiseFulfillerPair as C_PromiseFulfillerPair, copyPromiseFulfillerPair, newPromiseAndFulfiller,
PyArray, DynamicStruct_Builder, TwoWayPipe,
PyArray, DynamicStruct_Builder, TwoWayPipe, PyRefCounter, PyAsyncIoStream
)
from capnp.includes.schema_cpp cimport Node as C_Node, EnumNode as C_EnumNode
from capnp.includes.types cimport *
@ -159,12 +159,9 @@ cdef _setDynamicFieldWithField(DynamicStruct_Builder thisptr, _StructSchemaField
cdef _setDynamicFieldStatic(DynamicStruct_Builder thisptr, field, value, parent)
cdef api object wrap_dynamic_struct_reader(Response & r) with gil
cdef api PyObject * wrap_remote_call(PyObject * func, Response & r) except * with gil
cdef api Promise[void] * call_server_method(
PyObject * _server, char * _method_name, CallContext & _context) except * with gil
cdef api convert_array_pyobject(PyArray & arr) with gil
cdef api Promise[PyObject*] * extract_promise(object obj) with gil
cdef api RemotePromise * extract_remote_promise(object obj) with gil
cdef api object wrap_kj_exception(capnp.Exception & exception) with gil
cdef api object wrap_kj_exception_for_reraise(capnp.Exception & exception) with gil
cdef api object get_exception_info(object exc_type, object exc_obj, object exc_tb) with gil

File diff suppressed because it is too large Load diff

View file

@ -2,7 +2,6 @@
import argparse
import asyncio
import socket
import capnp
import calculator_capnp
@ -24,19 +23,6 @@ class PowerFunction(calculator_capnp.Calculator.Function.Server):
return pow(params[0], params[1])
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
def parse_args():
parser = argparse.ArgumentParser(
usage="Connects to the Calculator server \
@ -48,27 +34,9 @@ at the given address and does some RPCs"
async def main(host):
host = host.split(":")
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
addr, port, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
addr, port, family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
host, port = parse_args().host.split(":")
connection = await capnp.AsyncIoStream.create_connection(host=host, port=port)
client = capnp.TwoPartyClient(connection)
# Bootstrap the Calculator interface
calculator = client.bootstrap().cast_as(calculator_capnp.Calculator)
@ -106,7 +74,7 @@ async def main(host):
# Now that we've sent all the requests, wait for the response. Until this
# point, we haven't waited at all!
response = await read_promise.a_wait()
response = await read_promise
assert response.value == 123
print("PASS")
@ -144,7 +112,7 @@ async def main(host):
eval_promise = request.send()
read_promise = eval_promise.value.read()
response = await read_promise.a_wait()
response = await read_promise
assert response.value == 101
print("PASS")
@ -208,8 +176,8 @@ async def main(host):
add_5_promise = add_5_request.send().value.read()
# Now wait for the results.
assert (await add_3_promise.a_wait()).value == 27
assert (await add_5_promise.a_wait()).value == 29
assert (await add_3_promise).value == 27
assert (await add_5_promise).value == 29
print("PASS")
@ -290,8 +258,8 @@ async def main(host):
g_eval_promise = g_eval_request.send().value.read()
# Wait for the results.
assert (await f_eval_promise.a_wait()).value == 1234
assert (await g_eval_promise.a_wait()).value == 4244
assert (await f_eval_promise).value == 1234
assert (await g_eval_promise).value == 4244
print("PASS")
@ -329,7 +297,7 @@ async def main(host):
add_params[1].literal = 5
# Send the request and wait.
response = await request.send().value.read().a_wait()
response = await request.send().value.read()
assert response.value == 512
print("PASS")

View file

@ -3,7 +3,6 @@
import argparse
import asyncio
import logging
import socket
import capnp
import calculator_capnp
@ -13,60 +12,6 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class Server:
async def myreader(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.reader.read(4096), timeout=0.1)
except asyncio.TimeoutError:
logger.debug("myreader timeout.")
continue
except Exception as err:
logger.error("Unknown myreader err: %s", err)
return False
await self.server.write(data)
logger.debug("myreader done.")
return True
async def mywriter(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.server.read(4096), timeout=0.1)
self.writer.write(data.tobytes())
except asyncio.TimeoutError:
logger.debug("mywriter timeout.")
continue
except Exception as err:
logger.error("Unknown mywriter err: %s", err)
return False
logger.debug("mywriter done.")
return True
async def myserver(self, reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
self.server = capnp.TwoPartyServer(bootstrap=CalculatorImpl())
self.reader = reader
self.writer = writer
self.retry = True
# Assemble reader and writer tasks, run in the background
coroutines = [self.myreader(), self.mywriter()]
tasks = asyncio.gather(*coroutines, return_exceptions=True)
while True:
self.server.poll_once()
# Check to see if reader has been sent an eof (disconnect)
if self.reader.at_eof():
self.retry = False
break
await asyncio.sleep(0.01)
# Make wait for reader/writer to finish (prevent possible resource leaks)
await tasks
def read_value(value):
"""Helper function to asynchronously call read() on a Calculator::Value and
return a promise for the result. (In the future, the generated code might
@ -180,6 +125,11 @@ class CalculatorImpl(calculator_capnp.Calculator.Server):
return OperatorImpl(op)
async def new_connection(stream):
server = capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl())
await server.on_disconnect()
def parse_args():
parser = argparse.ArgumentParser(
usage="""Runs the server bound to the\
@ -191,29 +141,9 @@ given address/port ADDRESS. """
return parser.parse_args()
async def new_connection(reader, writer):
server = Server()
await server.myserver(reader, writer)
async def main():
address = parse_args().address
host = address.split(":")
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
new_connection, addr, port, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
server = await asyncio.start_server(
new_connection, addr, port, family=socket.AF_INET6
)
host, port = parse_args().address.split(":")
server = await capnp.AsyncIoStream.create_server(new_connection, host, port)
async with server:
await server.serve_forever()

View file

@ -4,13 +4,9 @@ import asyncio
import argparse
import time
import capnp
import socket
import thread_capnp
capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)
def parse_args():
parser = argparse.ArgumentParser(
@ -29,61 +25,35 @@ class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server):
print("status: {}".format(time.time()))
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
async def background(cap):
subscriber = StatusSubscriber()
promise = cap.subscribeStatus(subscriber)
await promise.a_wait()
await cap.subscribeStatus(subscriber)
async def main(host):
host = host.split(":")
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
addr, port, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
addr, port, family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
host, port = host.split(":")
connection = await capnp.AsyncIoStream.create_connection(host=host, port=port)
client = capnp.TwoPartyClient(connection)
cap = client.bootstrap().cast_as(thread_capnp.Example)
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
# Start background task for subscriber
tasks = [background(cap)]
asyncio.gather(*tasks, return_exceptions=True)
asyncio.create_task(background(cap))
# Run blocking tasks
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
if __name__ == "__main__":
asyncio.run(main(parse_args().host))
args = parse_args()
asyncio.run(main(args.host))
# Test that we can run multiple asyncio loops in sequence. This is particularly tricky, because
# main contains a background task that we never cancel. The entire loop gets cleaned up anyways,
# and we can start a new loop.
asyncio.run(main(args.host))

View file

@ -4,16 +4,14 @@ import asyncio
import argparse
import os
import time
import socket
import ssl
import socket
import capnp
import thread_capnp
this_dir = os.path.dirname(os.path.abspath(__file__))
capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)
def parse_args():
@ -33,32 +31,10 @@ class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server):
print("status: {}".format(time.time()))
async def myreader(client, reader):
while True:
try:
# Must be a wait_for in order to give watch_connection a slot
# to try again
data = await asyncio.wait_for(reader.read(4096), timeout=1.0)
except asyncio.TimeoutError:
continue
client.write(data)
async def mywriter(client, writer):
while True:
try:
# Must be a wait_for in order to give watch_connection a slot
# to try again
data = await asyncio.wait_for(client.read(4096), timeout=1.0)
writer.write(data.tobytes())
except asyncio.TimeoutError:
continue
async def watch_connection(cap):
while True:
try:
await asyncio.wait_for(cap.alive().a_wait(), timeout=5)
await asyncio.wait_for(cap.alive(), timeout=5)
await asyncio.sleep(1)
except asyncio.TimeoutError:
print("Watch timeout!")
@ -68,14 +44,11 @@ async def watch_connection(cap):
async def background(cap):
subscriber = StatusSubscriber()
promise = cap.subscribeStatus(subscriber)
await promise.a_wait()
await cap.subscribeStatus(subscriber)
async def main(host):
host = host.split(":")
addr = host[0]
port = host[1]
addr, port = host.split(":")
# Setup SSL context
ctx = ssl.create_default_context(
@ -85,46 +58,33 @@ async def main(host):
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET
)
except OSError:
except Exception:
print("Try IPv6")
try:
reader, writer = await asyncio.open_connection(
addr, port, ssl=ctx, family=socket.AF_INET6
)
except OSError:
return False
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
client = capnp.TwoPartyClient(stream)
cap = client.bootstrap().cast_as(thread_capnp.Example)
# Start watcher to restart socket connection if it is lost
overalltasks = []
watcher = [watch_connection(cap)]
overalltasks.append(asyncio.gather(*watcher, return_exceptions=True))
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
overalltasks.append(asyncio.gather(*coroutines, return_exceptions=True))
# Start background task for subscriber
tasks = [background(cap)]
overalltasks.append(asyncio.gather(*tasks, return_exceptions=True))
# Start watcher to restart socket connection if it is lost and subscriber background task
background_tasks = asyncio.gather(
background(cap), watch_connection(cap), return_exceptions=True
)
# Run blocking tasks
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
for task in overalltasks:
task.cancel()
background_tasks.cancel()
return True

View file

@ -3,7 +3,6 @@
import argparse
import asyncio
import logging
import socket
import capnp
import thread_capnp
@ -25,61 +24,12 @@ class ExampleImpl(thread_capnp.Example.Server):
)
def longRunning(self, **kwargs):
return capnp.getTimer().after_delay(1 * 10**9)
return capnp.getTimer().after_delay(11 * 10**8)
class Server:
async def myreader(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.reader.read(4096), timeout=0.1)
except asyncio.TimeoutError:
logger.debug("myreader timeout.")
continue
except Exception as err:
logger.error("Unknown myreader err: %s", err)
return False
await self.server.write(data)
logger.debug("myreader done.")
return True
async def mywriter(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.server.read(4096), timeout=0.1)
self.writer.write(data.tobytes())
except asyncio.TimeoutError:
logger.debug("mywriter timeout.")
continue
except Exception as err:
logger.error("Unknown mywriter err: %s", err)
return False
logger.debug("mywriter done.")
return True
async def myserver(self, reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
self.server = capnp.TwoPartyServer(bootstrap=ExampleImpl())
self.reader = reader
self.writer = writer
self.retry = True
# Assemble reader and writer tasks, run in the background
coroutines = [self.myreader(), self.mywriter()]
tasks = asyncio.gather(*coroutines, return_exceptions=True)
while True:
self.server.poll_once()
# Check to see if reader has been sent an eof (disconnect)
if self.reader.at_eof():
self.retry = False
break
await asyncio.sleep(0.01)
# Make wait for reader/writer to finish (prevent possible resource leaks)
await tasks
async def new_connection(stream):
server = capnp.TwoPartyServer(stream, bootstrap=ExampleImpl())
await server.on_disconnect()
def parse_args():
@ -93,29 +43,9 @@ given address/port ADDRESS. """
return parser.parse_args()
async def new_connection(reader, writer):
server = Server()
await server.myserver(reader, writer)
async def main():
address = parse_args().address
host = address.split(":")
addr = host[0]
port = host[1]
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
new_connection, addr, port, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
server = await asyncio.start_server(
new_connection, addr, port, family=socket.AF_INET6
)
host, port = parse_args().address.split(":")
server = await capnp.AsyncIoStream.create_server(new_connection, host, port)
async with server:
await server.serve_forever()

View file

@ -3,8 +3,8 @@
import argparse
import asyncio
import os
import socket
import ssl
import socket
import capnp
import calculator_capnp
@ -29,19 +29,6 @@ class PowerFunction(calculator_capnp.Calculator.Function.Server):
return pow(params[0], params[1])
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
def parse_args():
parser = argparse.ArgumentParser(
usage="Connects to the Calculator server \
@ -53,9 +40,7 @@ at the given address and does some RPCs"
async def main(host):
host = host.split(":")
addr = host[0]
port = host[1]
addr, port = host.split(":")
# Setup SSL context
ctx = ssl.create_default_context(
@ -65,21 +50,16 @@ async def main(host):
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
client = capnp.TwoPartyClient(stream)
# Bootstrap the Calculator interface
calculator = client.bootstrap().cast_as(calculator_capnp.Calculator)
@ -117,7 +97,7 @@ async def main(host):
# Now that we've sent all the requests, wait for the response. Until this
# point, we haven't waited at all!
response = await read_promise.a_wait()
response = await read_promise
assert response.value == 123
print("PASS")
@ -155,7 +135,7 @@ async def main(host):
eval_promise = request.send()
read_promise = eval_promise.value.read()
response = await read_promise.a_wait()
response = await read_promise
assert response.value == 101
print("PASS")
@ -219,8 +199,8 @@ async def main(host):
add_5_promise = add_5_request.send().value.read()
# Now wait for the results.
assert (await add_3_promise.a_wait()).value == 27
assert (await add_5_promise.a_wait()).value == 29
assert (await add_3_promise).value == 27
assert (await add_5_promise).value == 29
print("PASS")
@ -301,8 +281,8 @@ async def main(host):
g_eval_promise = g_eval_request.send().value.read()
# Wait for the results.
assert (await f_eval_promise.a_wait()).value == 1234
assert (await g_eval_promise.a_wait()).value == 4244
assert (await f_eval_promise).value == 1234
assert (await g_eval_promise).value == 4244
print("PASS")
@ -340,7 +320,7 @@ async def main(host):
add_params[1].literal = 5
# Send the request and wait.
response = await request.send().value.read().a_wait()
response = await request.send().value.read()
assert response.value == 512
print("PASS")

View file

@ -4,8 +4,8 @@ import argparse
import asyncio
import logging
import os
import socket
import ssl
import socket
import capnp
import calculator_capnp
@ -17,60 +17,6 @@ logger.setLevel(logging.DEBUG)
this_dir = os.path.dirname(os.path.abspath(__file__))
class Server:
async def myreader(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.reader.read(4096), timeout=0.1)
except asyncio.TimeoutError:
logger.debug("myreader timeout.")
continue
except Exception as err:
logger.error("Unknown myreader err: %s", err)
return False
await self.server.write(data)
logger.debug("myreader done.")
return True
async def mywriter(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.server.read(4096), timeout=0.1)
self.writer.write(data.tobytes())
except asyncio.TimeoutError:
logger.debug("mywriter timeout.")
continue
except Exception as err:
logger.error("Unknown mywriter err: %s", err)
return False
logger.debug("mywriter done.")
return True
async def myserver(self, reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
self.server = capnp.TwoPartyServer(bootstrap=CalculatorImpl())
self.reader = reader
self.writer = writer
self.retry = True
# Assemble reader and writer tasks, run in the background
coroutines = [self.myreader(), self.mywriter()]
tasks = asyncio.gather(*coroutines, return_exceptions=True)
while True:
self.server.poll_once()
# Check to see if reader has been sent an eof (disconnect)
if self.reader.at_eof():
self.retry = False
break
await asyncio.sleep(0.01)
# Make wait for reader/writer to finish (prevent possible resource leaks)
await tasks
def read_value(value):
"""Helper function to asynchronously call read() on a Calculator::Value and
return a promise for the result. (In the future, the generated code might
@ -195,16 +141,13 @@ given address/port ADDRESS. """
return parser.parse_args()
async def new_connection(reader, writer):
server = Server()
await server.myserver(reader, writer)
async def new_connection(stream):
server = capnp.TwoPartyServer(stream, bootstrap=CalculatorImpl())
await server.on_disconnect()
async def main():
address = parse_args().address
host = address.split(":")
addr = host[0]
port = host[1]
host, port = parse_args().address.split(":")
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
@ -216,13 +159,13 @@ async def main():
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
new_connection, addr, port, ssl=ctx, family=socket.AF_INET
server = await capnp.AsyncIoStream.create_server(
new_connection, host, port, ssl=ctx, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
server = await asyncio.start_server(
new_connection, addr, port, ssl=ctx, family=socket.AF_INET6
server = await capnp.AsyncIoStream.create_server(
new_connection, host, port, ssl=ctx, family=socket.AF_INET6
)
async with server:

View file

@ -3,9 +3,9 @@
import argparse
import asyncio
import os
import socket
import ssl
import time
import socket
import capnp
import thread_capnp
@ -30,29 +30,13 @@ class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server):
print("status: {}".format(time.time()))
async def myreader(client, reader):
while True:
data = await reader.read(4096)
client.write(data)
async def mywriter(client, writer):
while True:
data = await client.read(4096)
writer.write(data.tobytes())
await writer.drain()
async def background(cap):
subscriber = StatusSubscriber()
promise = cap.subscribeStatus(subscriber)
await promise.a_wait()
await cap.subscribeStatus(subscriber)
async def main(host):
host = host.split(":")
addr = host[0]
port = host[1]
addr, port = host.split(":")
# Setup SSL context
ctx = ssl.create_default_context(
@ -62,34 +46,28 @@ async def main(host):
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
reader, writer = await asyncio.open_connection(
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
reader, writer = await asyncio.open_connection(
stream = await capnp.AsyncIoStream.create_connection(
addr, port, ssl=ctx, family=socket.AF_INET6
)
# Start TwoPartyClient using TwoWayPipe (takes no arguments in this mode)
client = capnp.TwoPartyClient()
client = capnp.TwoPartyClient(stream)
cap = client.bootstrap().cast_as(thread_capnp.Example)
# Assemble reader and writer tasks, run in the background
coroutines = [myreader(client, reader), mywriter(client, writer)]
asyncio.gather(*coroutines, return_exceptions=True)
# Start background task for subscriber
tasks = [background(cap)]
asyncio.gather(*tasks, return_exceptions=True)
asyncio.create_task(background(cap))
# Run blocking tasks
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))
await cap.longRunning().a_wait()
await cap.longRunning()
print("main: {}".format(time.time()))

View file

@ -4,8 +4,8 @@ import argparse
import asyncio
import logging
import os
import socket
import ssl
import socket
import capnp
import thread_capnp
@ -35,81 +35,21 @@ class ExampleImpl(thread_capnp.Example.Server):
return True
class Server:
async def myreader(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.reader.read(4096), timeout=0.1)
except asyncio.TimeoutError:
logger.debug("myreader timeout.")
continue
except Exception as err:
logger.error("Unknown myreader err: %s", err)
return False
await self.server.write(data)
logger.debug("myreader done.")
return True
async def mywriter(self):
while self.retry:
try:
# Must be a wait_for so we don't block on read()
data = await asyncio.wait_for(self.server.read(4096), timeout=0.1)
self.writer.write(data.tobytes())
except asyncio.TimeoutError:
logger.debug("mywriter timeout.")
continue
except Exception as err:
logger.error("Unknown mywriter err: %s", err)
return False
logger.debug("mywriter done.")
return True
async def myserver(self, reader, writer):
# Start TwoPartyServer using TwoWayPipe (only requires bootstrap)
self.server = capnp.TwoPartyServer(bootstrap=ExampleImpl())
self.reader = reader
self.writer = writer
self.retry = True
# Assemble reader and writer tasks, run in the background
coroutines = [self.myreader(), self.mywriter()]
tasks = asyncio.gather(*coroutines, return_exceptions=True)
while True:
self.server.poll_once()
# Check to see if reader has been sent an eof (disconnect)
if self.reader.at_eof():
self.retry = False
break
await asyncio.sleep(0.01)
# Make wait for reader/writer to finish (prevent possible resource leaks)
await tasks
async def new_connection(reader, writer):
server = Server()
await server.myserver(reader, writer)
async def new_connection(stream):
server = capnp.TwoPartyServer(stream, bootstrap=ExampleImpl())
await server.on_disconnect()
def parse_args():
parser = argparse.ArgumentParser(
usage="""Runs the server bound to the\
given address/port ADDRESS. """
usage="""Runs the server bound to the given address/port ADDRESS. """
)
parser.add_argument("address", help="ADDRESS:PORT")
return parser.parse_args()
async def main():
address = parse_args().address
host = address.split(":")
addr = host[0]
port = host[1]
host, port = parse_args().address.split(":")
# Setup SSL context
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
@ -121,21 +61,13 @@ async def main():
# Handle both IPv4 and IPv6 cases
try:
print("Try IPv4")
server = await asyncio.start_server(
new_connection,
addr,
port,
ssl=ctx,
family=socket.AF_INET,
server = await capnp.AsyncIoStream.create_server(
new_connection, host, port, ssl=ctx, family=socket.AF_INET
)
except Exception:
print("Try IPv6")
server = await asyncio.start_server(
new_connection,
addr,
port,
ssl=ctx,
family=socket.AF_INET6,
server = await capnp.AsyncIoStream.create_server(
new_connection, host, port, ssl=ctx, family=socket.AF_INET6
)
async with server:

View file

@ -7,9 +7,6 @@ import capnp
import thread_capnp
capnp.remove_event_loop()
capnp.create_event_loop(threaded=True)
def parse_args():
parser = argparse.ArgumentParser(

View file

@ -2,7 +2,6 @@
import argparse
import capnp
import time
import thread_capnp
@ -38,9 +37,7 @@ def main():
address = parse_args().address
server = capnp.TwoPartyServer(address, bootstrap=ExampleImpl())
while True:
server.poll_once()
time.sleep(0.001)
server.run_forever()
if __name__ == "__main__":

View file

@ -200,7 +200,10 @@ import Cython # noqa: E402
extensions = [
Extension(
"*",
["capnp/helpers/capabilityHelper.cpp", "capnp/lib/*.pyx"],
[
"capnp/helpers/capabilityHelper.cpp",
"capnp/lib/*.pyx",
],
extra_compile_args=extra_compile_args,
extra_link_args=extra_link_args,
language="c++",

View file

@ -284,6 +284,21 @@ def test_cancel():
with pytest.raises(Exception):
remote.wait()
req = client.foo(5)
trans = req.then(lambda x: 5)
req.cancel() # Cancel a promise that was already consumed
assert trans.wait() == 5
req = client.foo(5)
req.cancel()
with pytest.raises(Exception):
trans = req.then(lambda x: 5)
req = client.foo(5)
assert req.wait().x == "26"
with pytest.raises(Exception):
req.wait()
def test_timer():
global test_timer_var
@ -350,6 +365,33 @@ def test_then_args():
client.foo(i=5).then(lambda x, y: 1)
class PromiseJoinServer(capability.TestPipeline.Server):
def getCap(self, n, inCap, _context, **kwargs):
def _then(response):
_results = _context.results
_results.s = response.x + "_bar"
_results.outBox.cap = inCap
return (
inCap.foo(i=n)
.then(
lambda res: capnp.Promise(int(res.x))
) # Make sure that Promise is flattened
.then(
lambda x: inCap.foo(i=x + 1)
) # Make sure that RemotePromise is flattened
.then(_then)
)
def test_promise_joining():
client = capability.TestPipeline._new_client(PromiseJoinServer())
foo_client = capability.TestInterface._new_client(Server())
remote = client.getCap(n=5, inCap=foo_client)
assert remote.wait().s == "136_bar"
class ExtendsServer(Server):
def qux(self, **kwargs):
pass

View file

@ -10,49 +10,9 @@ import pytest
import capnp
from capnp.lib.capnp import KjException
import test_capability_capnp
@pytest.mark.skipif(
platform.python_implementation() == "PyPy",
reason="pycapnp's GIL handling isn't working properly at the moment for PyPy",
)
def test_making_event_loop():
"""
Event loop test
"""
capnp.remove_event_loop(True)
capnp.create_event_loop()
capnp.remove_event_loop()
capnp.create_event_loop()
@pytest.mark.skipif(
platform.python_implementation() == "PyPy",
reason="pycapnp's GIL handling isn't working properly at the moment for PyPy",
)
def test_making_threaded_event_loop():
"""
Threaded event loop test
"""
# The following raises a KjException, and if not caught causes an SIGABRT:
# kj/async.c++:973: failed: expected head == nullptr; EventLoop destroyed with events still in the queue.
# Memory leak?; head->trace() = kj::_::ForkHub<kj::_::Void>
# kj::_::AdapterPromiseNode<kj::_::Void, kj::_::PromiseAndFulfillerAdapter<void> >
# stack: ...
# python(..) malloc: *** error for object 0x...: pointer being freed was not allocated
# python(..) malloc: *** set a breakpoint in malloc_error_break to debug
# Fatal Python error: Aborted
capnp.remove_event_loop(KjException)
capnp.create_event_loop(KjException)
capnp.remove_event_loop()
capnp.create_event_loop(KjException)
class Server(test_capability_capnp.TestInterface.Server):
"""
Server
@ -76,9 +36,6 @@ def test_using_threads():
"""
Thread test
"""
capnp.remove_event_loop(True)
capnp.create_event_loop(True)
read, write = socket.socketpair()
def run_server():