From 96bfc495d88cb43b447e66780d64c3eab263e268 Mon Sep 17 00:00:00 2001 From: Jason Paryani Date: Thu, 17 Oct 2013 22:42:14 -0700 Subject: [PATCH] Pipelining almost completely wrapped Waiting on some upstream changes in C++ libcapnp before I can finish --- capnp/asyncHelper.h | 51 ----------------- capnp/asyncHelper_cpp.pxd | 14 ----- capnp/async_cpp.pxd | 1 + capnp/capabilityHelper.h | 47 +++++++++++++++- capnp/capnp.pyx | 112 ++++++++++++++++++++++++++++++++----- capnp/capnp_cpp.pxd | 25 ++++++--- test/test_capability.capnp | 14 ++--- test/test_capability.py | 30 +++++++++- 8 files changed, 194 insertions(+), 100 deletions(-) delete mode 100644 capnp/asyncHelper.h delete mode 100644 capnp/asyncHelper_cpp.pxd diff --git a/capnp/asyncHelper.h b/capnp/asyncHelper.h deleted file mode 100644 index 7dca4f3..0000000 --- a/capnp/asyncHelper.h +++ /dev/null @@ -1,51 +0,0 @@ -#include "kj/async.h" -#include "Python.h" - -extern "C" { - PyObject * wrap_kj_exception(kj::Exception &); - // void _gevent_eventloop_prepare_to_sleep(); - // void _gevent_eventloop_sleep(); - // void _gevent_eventloop_wake(); -} - -PyObject * wrapPyFunc(PyObject * func, PyObject * arg) { - PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL); - Py_DECREF(func); - return result; -} - -::kj::Promise evalLater(kj::EventLoop & loop, PyObject * func) { - return loop.evalLater([func]() { return wrapPyFunc(func, NULL); } ); -} - -::kj::Promise there(kj::EventLoop & loop, kj::Promise & promise, PyObject * func, PyObject * error_func) { - if(error_func == Py_None) - return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } ); - else - return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } - , [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } ); -} -::kj::Promise then(kj::Promise & promise, PyObject * func, PyObject * error_func) { - if(error_func == Py_None) - return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } ); - else - return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } - , [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } ); -} - -// class PyEventLoop final: public ::kj::EventLoop { -// public: -// PyEventLoop() {} -// ~PyEventLoop() noexcept(false) {} - -// protected: -// void prepareToSleep() noexcept override { -// _gevent_eventloop_prepare_to_sleep(); -// } -// void sleep() override { -// _gevent_eventloop_sleep(); -// } -// void wake() const override { -// _gevent_eventloop_wake(); -// } -// }; \ No newline at end of file diff --git a/capnp/asyncHelper_cpp.pxd b/capnp/asyncHelper_cpp.pxd deleted file mode 100644 index 6ce1199..0000000 --- a/capnp/asyncHelper_cpp.pxd +++ /dev/null @@ -1,14 +0,0 @@ -# schema.capnp.cpp.pyx -# distutils: language = c++ -# distutils: extra_compile_args = --std=c++11 - -from cpython.ref cimport PyObject -from capnp_cpp cimport PyPromise, EventLoop - -cdef extern from "asyncHelper.h": - PyPromise evalLater(EventLoop &, PyObject * func) - PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func) - PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func) - - # cdef cppclass PyEventLoop(EventLoop): - # pass \ No newline at end of file diff --git a/capnp/async_cpp.pxd b/capnp/async_cpp.pxd index 3da574b..3817010 100644 --- a/capnp/async_cpp.pxd +++ b/capnp/async_cpp.pxd @@ -15,6 +15,7 @@ cdef extern from "kj/async.h" namespace " ::kj": T wait() ctypedef Promise[PyObject *] PyPromise +ctypedef Promise[void] VoidPromise cdef extern from "kj/async.h" namespace " ::kj": cdef cppclass EventLoop: diff --git a/capnp/capabilityHelper.h b/capnp/capabilityHelper.h index 414b975..eee76de 100644 --- a/capnp/capabilityHelper.h +++ b/capnp/capabilityHelper.h @@ -4,10 +4,46 @@ #include extern "C" { + void wrap_remote_call(PyObject * func, capnp::Response &); PyObject * wrap_dynamic_struct_reader(capnp::DynamicStruct::Reader &); - void call_server_method(PyObject * py_server, char * name, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> & context); + ::kj::Promise * call_server_method(PyObject * py_server, char * name, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> & context); + PyObject * wrap_kj_exception(kj::Exception &); } +PyObject * wrapPyFunc(PyObject * func, PyObject * arg) { + PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL); + Py_DECREF(func); + return result; +} + +::kj::Promise evalLater(kj::EventLoop & loop, PyObject * func) { + return loop.evalLater([func]() { return wrapPyFunc(func, NULL); } ); +} + +::kj::Promise there(kj::EventLoop & loop, kj::Promise & promise, PyObject * func, PyObject * error_func) { + if(error_func == Py_None) + return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } ); + else + return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } + , [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } ); +} + +::kj::Promise then(kj::Promise & promise, PyObject * func, PyObject * error_func) { + if(error_func == Py_None) + return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } ); + else + return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } + , [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } ); +} + +::kj::Promise then(::capnp::RemotePromise< ::capnp::DynamicStruct> & promise, PyObject * func, PyObject * error_func) { + if(error_func == Py_None) + return promise.then([func](capnp::Response&& arg) { wrap_remote_call(func, arg); } ); + else + return promise.then([func](capnp::Response&& arg) { wrap_remote_call(func, arg); } + , [error_func](kj::Exception arg) { wrapPyFunc(error_func, wrap_kj_exception(arg)); } ); +} + class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server { public: PyObject * py_server; @@ -24,8 +60,13 @@ public: kj::Promise call(capnp::InterfaceSchema::Method method, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context) { auto methodName = method.getProto().getName(); - call_server_method(py_server, const_cast(methodName.cStr()), context); - return kj::READY_NOW; + kj::Promise * promise = call_server_method(py_server, const_cast(methodName.cStr()), context); + if(promise == nullptr) + return kj::READY_NOW; + + kj::Promise ret(kj::mv(*promise)); + delete promise; + return ret; } }; diff --git a/capnp/capnp.pyx b/capnp/capnp.pyx index aee469a..fd4b647 100644 --- a/capnp/capnp.pyx +++ b/capnp/capnp.pyx @@ -9,7 +9,7 @@ cimport cython cimport capnp_cpp as capnp cimport schema_cpp -from capnp_cpp cimport Schema as C_Schema, StructSchema as C_StructSchema, InterfaceSchema as C_InterfaceSchema, DynamicStruct as C_DynamicStruct, DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList, fixMaybe, getEnumString, SchemaParser as C_SchemaParser, ParsedSchema as C_ParsedSchema, VOID, ArrayPtr, StringPtr, String, StringTree, DynamicOrphan as C_DynamicOrphan, ObjectPointer as C_DynamicObject, WordArrayPtr, DynamicCapability as C_DynamicCapability, new_client, Request, Response, RemotePromise, convert_to_pypromise, SimpleEventLoop, PyPromise, CallContext +from capnp_cpp cimport Schema as C_Schema, StructSchema as C_StructSchema, InterfaceSchema as C_InterfaceSchema, DynamicStruct as C_DynamicStruct, DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList, fixMaybe, getEnumString, SchemaParser as C_SchemaParser, ParsedSchema as C_ParsedSchema, VOID, ArrayPtr, StringPtr, String, StringTree, DynamicOrphan as C_DynamicOrphan, ObjectPointer as C_DynamicObject, WordArrayPtr, DynamicCapability as C_DynamicCapability, new_client, Request, Response, RemotePromise, convert_to_pypromise, SimpleEventLoop, PyPromise, VoidPromise, CallContext from schema_cpp cimport Node as C_Node, EnumNode as C_EnumNode from cython.operator cimport dereference as deref @@ -44,14 +44,28 @@ from functools import partial as _partial cdef public object wrap_dynamic_struct_reader(C_DynamicStruct.Reader & reader): return _DynamicStructReader()._init(reader, None) -cdef public void call_server_method(PyObject * _server, char * _method_name, CallContext & _context): +cdef public void wrap_remote_call(PyObject * func, Response & r): + response = _Response()._init_childptr(new Response(moveResponse(r)), None) + + func_obj = func + # TODO: decref func? + func_obj(response) + +cdef public VoidPromise * call_server_method(PyObject * _server, char * _method_name, CallContext & _context): server = _server method_name = _method_name context = _CallContext()._init(_context) - getattr(server, method_name)(context) + ret = getattr(server, method_name)(context) -# By making it public, we'll be able to call it from asyncHelper.h + if ret is not None: + if type(ret) is _VoidPromise: + return new VoidPromise(moveVoidPromise(deref((<_VoidPromise>ret).thisptr))) + else: + raise ValueError('Server function returned a value that was not a VoidPromise: ' + str(ret)) + + return NULL + cdef public object wrap_kj_exception(capnp.Exception & exception): return None # TODO @@ -103,6 +117,7 @@ cdef extern from "" namespace "std": Request moveRequest"std::move"(Request) Response moveResponse"std::move"(Response) PyPromise movePromise"std::move"(PyPromise) + VoidPromise moveVoidPromise"std::move"(VoidPromise) RemotePromise moveRemotePromise"std::move"(RemotePromise) CallContext moveCallContext"std::move"(CallContext) @@ -301,8 +316,6 @@ cdef class _DynamicListBuilder: return self._get(index) def __setitem__(self, index, value): - # TODO: share code with _DynamicStructBuilder.__setattr__ - size = self.thisptr.size() if index >= size: raise IndexError('Out of bounds') @@ -387,6 +400,8 @@ cdef to_python_reader(C_DynamicValue.Reader self, object parent): return None elif type == capnp.TYPE_OBJECT: return _DynamicObjectReader()._init(self.asObject(), parent) + elif type == capnp.TYPE_CAPABILITY: + return _DynamicCapabilityClient()._init(self.asCapability(), parent) elif type == capnp.TYPE_UNKNOWN: raise ValueError("Cannot convert type to Python. Type is unknown by capnproto library") else: @@ -417,6 +432,8 @@ cdef to_python_builder(C_DynamicValue.Builder self, object parent): return None elif type == capnp.TYPE_OBJECT: return _DynamicObjectBuilder()._init(self.asObject(), parent) + elif type == capnp.TYPE_CAPABILITY: + return _DynamicCapabilityClient()._init(self.asCapability(), parent) elif type == capnp.TYPE_UNKNOWN: raise ValueError("Cannot convert type to Python. Type is unknown by capnproto library") else: @@ -428,6 +445,9 @@ cdef C_DynamicValue.Reader _extract_dynamic_struct_builder(_DynamicStructBuilder cdef C_DynamicValue.Reader _extract_dynamic_struct_reader(_DynamicStructReader value): return C_DynamicValue.Reader(value.thisptr) +cdef C_DynamicValue.Reader _extract_dynamic_client(_DynamicCapabilityClient value): + return C_DynamicValue.Reader(value.thisptr) + cdef _setDynamicField(_DynamicSetterClasses thisptr, field, value, parent): cdef C_DynamicValue.Reader temp value_type = type(value) @@ -458,6 +478,8 @@ cdef _setDynamicField(_DynamicSetterClasses thisptr, field, value, parent): thisptr.set(field, _extract_dynamic_struct_builder(value)) elif value_type is _DynamicStructReader: thisptr.set(field, _extract_dynamic_struct_reader(value)) + elif value_type is _DynamicCapabilityClient: + thisptr.set(field, _extract_dynamic_client(value)) else: raise ValueError("Non primitive type") @@ -491,6 +513,8 @@ cdef _setDynamicFieldPtr(_DynamicSetterClasses * thisptr, field, value, parent): thisptr.set(field, _extract_dynamic_struct_builder(value)) elif value_type is _DynamicStructReader: thisptr.set(field, _extract_dynamic_struct_reader(value)) + elif value_type is _DynamicCapabilityClient: + thisptr.set(field, _extract_dynamic_client(value)) else: raise ValueError("Non primitive type") @@ -898,6 +922,7 @@ cdef class _CallContext: cdef class Promise: cdef PyPromise * thisptr + cdef public bint is_consumed def __init__(self): self.is_consumed = True @@ -928,6 +953,38 @@ cdef class Promise: return Promise()._init(capnp.then(deref(self.thisptr), func, error_func)) +cdef class _VoidPromise: + cdef VoidPromise * thisptr + cdef public bint is_consumed + + def __init__(self): + self.is_consumed = True + + cdef _init(self, VoidPromise other): + self.is_consumed = False + self.thisptr = new VoidPromise(moveVoidPromise(other)) + return self + + def __dealloc__(self): + del self.thisptr + + cpdef wait(self) except+: + if self.is_consumed: + raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object') + + self.thisptr.wait() + self.is_consumed = True + + + # cpdef then(self, func, error_func=None) except+: + # if self.is_consumed: + # raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object') + + # Py_INCREF(func) + # Py_INCREF(error_func) + + # return Promise()._init(capnp.then(deref(self.thisptr), func, error_func)) + cdef class _RemotePromise: cdef RemotePromise * thisptr cdef public bint is_consumed @@ -957,14 +1014,28 @@ cdef class _RemotePromise: cpdef as_pypromise(self) except +: Promise()._init(convert_to_pypromise(deref(self.thisptr))) - # cpdef then(self, func, error_func=None) except+: - # if self.is_consumed: - # raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object') + cpdef then(self, func, error_func=None) except+: + if self.is_consumed: + raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object') - # Py_INCREF(func) - # Py_INCREF(error_func) + Py_INCREF(func) + Py_INCREF(error_func) - # return _RemotePromise()._init(capnp.then(deref(self.thisptr), func, error_func)) + return _VoidPromise()._init(capnp.then(deref(self.thisptr), func, error_func)) + + cpdef _get(self, field) except +ValueError: + return _DynamicCapabilityClient()._init((self.thisptr.get(field)).asCapability(), self._parent) + + def __getattr__(self, field): + return self._get(field) + + property schema: + """A property that returns the _StructSchema object matching this reader""" + def __get__(self): + return _StructSchema()._init(self.thisptr.getSchema()) + + def __dir__(self): + return list(self.schema.fieldnames) cdef class EventLoop: cdef SimpleEventLoop thisptr @@ -1008,11 +1079,21 @@ cdef class _Response(_DynamicStructReader): self._init(deref(self.thisptr_child), parent) return self + cdef _init_childptr(self, Response * other, parent): + self.thisptr_child = other + self._init(deref(self.thisptr_child), parent) + return self + cdef class _DynamicCapabilityClient: cdef C_DynamicCapability.Client thisptr - cdef public object _event_loop, _server + cdef public object _event_loop, _server, _parent - def __init__(self, schema, server, event_loop): + cdef _init(self, C_DynamicCapability.Client other, object parent): + self.thisptr = other + self._parent = parent + return self + + cdef _init_vals(self, schema, server, event_loop): cdef _InterfaceSchema s if hasattr(schema, 'schema'): s = schema.schema @@ -1023,6 +1104,7 @@ cdef class _DynamicCapabilityClient: self._event_loop = event_loop self.thisptr = new_client(s.thisptr, server, loop.thisptr) self._server = server + return self cpdef _send_helper(self, name, firstSegmentWordSize, kwargs) except +ValueError: cdef Request * request = new Request(self.thisptr.newRequest(name, firstSegmentWordSize)) @@ -1287,7 +1369,7 @@ cdef class SchemaParser: elif proto.isInterface: def new_client(bound_local_module): def helper(server, loop): - return _DynamicCapabilityClient(bound_local_module, server, loop) + return _DynamicCapabilityClient()._init_vals(bound_local_module, server, loop) return helper local_module.schema = schema.as_interface() local_module.new_client = new_client(local_module) diff --git a/capnp/capnp_cpp.pxd b/capnp/capnp_cpp.pxd index e24f7c9..077d2d5 100644 --- a/capnp/capnp_cpp.pxd +++ b/capnp/capnp_cpp.pxd @@ -2,7 +2,7 @@ # distutils: language = c++ # distutils: extra_compile_args = --std=c++11 from schema_cpp cimport Node, Data, StructNode, EnumNode -from async_cpp cimport PyPromise, Promise +from async_cpp cimport PyPromise, VoidPromise, Promise from cpython.ref cimport PyObject from libc.stdint cimport * @@ -112,6 +112,8 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp": pass cppclass Builder: pass + cppclass Pipeline: + pass enum Type: TYPE_UNKNOWN " ::capnp::DynamicValue::UNKNOWN" @@ -147,11 +149,14 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp": void adopt(char *, DynamicOrphan) except +ValueError DynamicOrphan disown(char *) DynamicStruct.Reader asReader() + cppclass Pipeline: + DynamicValueForward.Pipeline get(char *) + StructSchema getSchema() cdef extern from "capnp/capability.h" namespace " ::capnp": cdef cppclass Response" ::capnp::Response< ::capnp::DynamicStruct>"(DynamicStruct.Reader): Response(Response) - cdef cppclass RemotePromise" ::capnp::RemotePromise< ::capnp::DynamicStruct>"(Promise[Response]): + cdef cppclass RemotePromise" ::capnp::RemotePromise< ::capnp::DynamicStruct>"(Promise[Response], DynamicStruct.Pipeline): RemotePromise(RemotePromise) cdef extern from "capnp/dynamic.h" namespace " ::capnp": @@ -189,6 +194,10 @@ cdef extern from "fixMaybe.h": cdef extern from "capabilityHelper.h": + PyPromise evalLater(EventLoop &, PyObject * func) + PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func) + PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func) + VoidPromise then(RemotePromise & promise, PyObject * func, PyObject * error_func) cppclass PythonInterfaceDynamicImpl: pass DynamicCapability.Client new_client(InterfaceSchema&, PyObject *, EventLoop&) @@ -235,6 +244,7 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp": Reader(DynamicList.Reader& value) Reader(DynamicEnum value) Reader(DynamicStruct.Reader& value) + Reader(DynamicCapability.Client& value) Type getType() int64_t asInt"as"() uint64_t asUint"as"() @@ -244,6 +254,7 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp": DynamicList.Reader asList"as< ::capnp::DynamicList>"() DynamicStruct.Reader asStruct"as< ::capnp::DynamicStruct>"() ObjectPointer.Reader asObject"as< ::capnp::ObjectPointer>"() + DynamicCapability.Client asCapability"as< ::capnp::DynamicCapability>"() DynamicEnum asEnum"as< ::capnp::DynamicEnum>"() Data.Reader asData"as< ::capnp::Data>"() @@ -257,9 +268,14 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp": DynamicList.Builder asList"as< ::capnp::DynamicList>"() DynamicStruct.Builder asStruct"as< ::capnp::DynamicStruct>"() ObjectPointer.Builder asObject"as< ::capnp::ObjectPointer>"() + DynamicCapability.Client asCapability"as< ::capnp::DynamicCapability>"() DynamicEnum asEnum"as< ::capnp::DynamicEnum>"() Data.Builder asData"as< ::capnp::Data>"() + cppclass Pipeline: + Pipeline(Pipeline) + DynamicCapability.Client asCapability"releaseAs< ::capnp::DynamicCapability>"() + cdef extern from "capnp/schema-parser.h" namespace " ::capnp": cdef cppclass ParsedSchema(Schema): ParsedSchema getNested(char * name) except + @@ -297,8 +313,3 @@ cdef extern from "kj/async.h" namespace " ::kj": PyPromise there(PyPromise, PyObject * func) cdef cppclass SimpleEventLoop(EventLoop): pass - -cdef extern from "asyncHelper.h": - PyPromise evalLater(EventLoop &, PyObject * func) - PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func) - PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func) diff --git a/test/test_capability.capnp b/test/test_capability.capnp index 0bd862f..7d59d8f 100644 --- a/test/test_capability.capnp +++ b/test/test_capability.capnp @@ -35,11 +35,11 @@ interface TestInterface { # grault @2 () -> TestAllTypes; # } -# interface TestPipeline { -# getCap @0 (n: UInt32, inCap :TestInterface) -> (s: Text, outBox :Box); -# testPointers @1 (cap :TestInterface, obj :Object, list :List(TestInterface)) -> (); +interface TestPipeline { + getCap @0 (n: UInt32, inCap :TestInterface) -> (s: Text, outBox :Box); + testPointers @1 (cap :TestInterface, obj :Object, list :List(TestInterface)) -> (); -# struct Box { -# cap @0 :TestInterface; -# } -# } + struct Box { + cap @0 :TestInterface; + } +} diff --git a/test/test_capability.py b/test/test_capability.py index e8dddac..696b713 100644 --- a/test/test_capability.py +++ b/test/test_capability.py @@ -9,10 +9,21 @@ def capability(): return capnp.load(os.path.join(this_dir, 'test_capability.capnp')) class Server: - def foo(self, context): - context.results.x = str(context.params.i * 5 + 1) + def __init__(self, val=1): + self.val = val -def test_basic_client(capability): + def foo(self, context): + context.results.x = str(context.params.i * 5 + self.val) + +class PipelineServer: + def getCap(self, context): + def _then(response): + context.results.s = response.x + '_foo' + context.results.outBox.outCap = Server(100) + + return context.params.inCap.foo(i=context.params.n).then(_then) + +def test_client(capability): loop = capnp.EventLoop() client = capability.TestInterface.new_client(Server(), loop) @@ -70,3 +81,16 @@ def test_simple_client(capability): with pytest.raises(ValueError): remote = client.foo(baz=5) + +def test_pipeline(capability): + loop = capnp.EventLoop() + + client = capability.TestPipeline.new_client(PipelineServer(), loop) + foo_client = capability.TestInterface.new_client(Server(), loop) + + remote = client.getCap(n=5, inCap=foo_client) + response = loop.wait_remote(remote) + + assert response.s == '26_foo' + +