Pipelining almost completely wrapped

Waiting on some upstream changes in C++ libcapnp before I can finish
This commit is contained in:
Jason Paryani 2013-10-17 22:42:14 -07:00
parent 7f7b28f328
commit 96bfc495d8
8 changed files with 194 additions and 100 deletions

View file

@ -1,51 +0,0 @@
#include "kj/async.h"
#include "Python.h"
extern "C" {
PyObject * wrap_kj_exception(kj::Exception &);
// void _gevent_eventloop_prepare_to_sleep();
// void _gevent_eventloop_sleep();
// void _gevent_eventloop_wake();
}
PyObject * wrapPyFunc(PyObject * func, PyObject * arg) {
PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL);
Py_DECREF(func);
return result;
}
::kj::Promise<PyObject *> evalLater(kj::EventLoop & loop, PyObject * func) {
return loop.evalLater([func]() { return wrapPyFunc(func, NULL); } );
}
::kj::Promise<PyObject *> there(kj::EventLoop & loop, kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } );
else
return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
}
::kj::Promise<PyObject *> then(kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } );
else
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
}
// class PyEventLoop final: public ::kj::EventLoop {
// public:
// PyEventLoop() {}
// ~PyEventLoop() noexcept(false) {}
// protected:
// void prepareToSleep() noexcept override {
// _gevent_eventloop_prepare_to_sleep();
// }
// void sleep() override {
// _gevent_eventloop_sleep();
// }
// void wake() const override {
// _gevent_eventloop_wake();
// }
// };

View file

@ -1,14 +0,0 @@
# schema.capnp.cpp.pyx
# distutils: language = c++
# distutils: extra_compile_args = --std=c++11
from cpython.ref cimport PyObject
from capnp_cpp cimport PyPromise, EventLoop
cdef extern from "asyncHelper.h":
PyPromise evalLater(EventLoop &, PyObject * func)
PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func)
# cdef cppclass PyEventLoop(EventLoop):
# pass

View file

@ -15,6 +15,7 @@ cdef extern from "kj/async.h" namespace " ::kj":
T wait()
ctypedef Promise[PyObject *] PyPromise
ctypedef Promise[void] VoidPromise
cdef extern from "kj/async.h" namespace " ::kj":
cdef cppclass EventLoop:

View file

@ -4,10 +4,46 @@
#include <iostream>
extern "C" {
void wrap_remote_call(PyObject * func, capnp::Response<capnp::DynamicStruct> &);
PyObject * wrap_dynamic_struct_reader(capnp::DynamicStruct::Reader &);
void call_server_method(PyObject * py_server, char * name, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> & context);
::kj::Promise<void> * call_server_method(PyObject * py_server, char * name, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> & context);
PyObject * wrap_kj_exception(kj::Exception &);
}
PyObject * wrapPyFunc(PyObject * func, PyObject * arg) {
PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL);
Py_DECREF(func);
return result;
}
::kj::Promise<PyObject *> evalLater(kj::EventLoop & loop, PyObject * func) {
return loop.evalLater([func]() { return wrapPyFunc(func, NULL); } );
}
::kj::Promise<PyObject *> there(kj::EventLoop & loop, kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); } );
else
return loop.there(kj::mv(promise), [func](PyObject * arg) { return wrapPyFunc(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
}
::kj::Promise<PyObject *> then(kj::Promise<PyObject *> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); } );
else
return promise.then([func](PyObject * arg) { return wrapPyFunc(func, arg); }
, [error_func](kj::Exception arg) { return wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
}
::kj::Promise<void> then(::capnp::RemotePromise< ::capnp::DynamicStruct> & promise, PyObject * func, PyObject * error_func) {
if(error_func == Py_None)
return promise.then([func](capnp::Response<capnp::DynamicStruct>&& arg) { wrap_remote_call(func, arg); } );
else
return promise.then([func](capnp::Response<capnp::DynamicStruct>&& arg) { wrap_remote_call(func, arg); }
, [error_func](kj::Exception arg) { wrapPyFunc(error_func, wrap_kj_exception(arg)); } );
}
class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server {
public:
PyObject * py_server;
@ -24,8 +60,13 @@ public:
kj::Promise<void> call(capnp::InterfaceSchema::Method method,
capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context) {
auto methodName = method.getProto().getName();
call_server_method(py_server, const_cast<char *>(methodName.cStr()), context);
return kj::READY_NOW;
kj::Promise<void> * promise = call_server_method(py_server, const_cast<char *>(methodName.cStr()), context);
if(promise == nullptr)
return kj::READY_NOW;
kj::Promise<void> ret(kj::mv(*promise));
delete promise;
return ret;
}
};

View file

@ -9,7 +9,7 @@
cimport cython
cimport capnp_cpp as capnp
cimport schema_cpp
from capnp_cpp cimport Schema as C_Schema, StructSchema as C_StructSchema, InterfaceSchema as C_InterfaceSchema, DynamicStruct as C_DynamicStruct, DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList, fixMaybe, getEnumString, SchemaParser as C_SchemaParser, ParsedSchema as C_ParsedSchema, VOID, ArrayPtr, StringPtr, String, StringTree, DynamicOrphan as C_DynamicOrphan, ObjectPointer as C_DynamicObject, WordArrayPtr, DynamicCapability as C_DynamicCapability, new_client, Request, Response, RemotePromise, convert_to_pypromise, SimpleEventLoop, PyPromise, CallContext
from capnp_cpp cimport Schema as C_Schema, StructSchema as C_StructSchema, InterfaceSchema as C_InterfaceSchema, DynamicStruct as C_DynamicStruct, DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList, fixMaybe, getEnumString, SchemaParser as C_SchemaParser, ParsedSchema as C_ParsedSchema, VOID, ArrayPtr, StringPtr, String, StringTree, DynamicOrphan as C_DynamicOrphan, ObjectPointer as C_DynamicObject, WordArrayPtr, DynamicCapability as C_DynamicCapability, new_client, Request, Response, RemotePromise, convert_to_pypromise, SimpleEventLoop, PyPromise, VoidPromise, CallContext
from schema_cpp cimport Node as C_Node, EnumNode as C_EnumNode
from cython.operator cimport dereference as deref
@ -44,14 +44,28 @@ from functools import partial as _partial
cdef public object wrap_dynamic_struct_reader(C_DynamicStruct.Reader & reader):
return _DynamicStructReader()._init(reader, None)
cdef public void call_server_method(PyObject * _server, char * _method_name, CallContext & _context):
cdef public void wrap_remote_call(PyObject * func, Response & r):
response = _Response()._init_childptr(new Response(moveResponse(r)), None)
func_obj = <object>func
# TODO: decref func?
func_obj(response)
cdef public VoidPromise * call_server_method(PyObject * _server, char * _method_name, CallContext & _context):
server = <object>_server
method_name = <object>_method_name
context = _CallContext()._init(_context)
getattr(server, method_name)(context)
ret = getattr(server, method_name)(context)
# By making it public, we'll be able to call it from asyncHelper.h
if ret is not None:
if type(ret) is _VoidPromise:
return new VoidPromise(moveVoidPromise(deref((<_VoidPromise>ret).thisptr)))
else:
raise ValueError('Server function returned a value that was not a VoidPromise: ' + str(ret))
return NULL
cdef public object wrap_kj_exception(capnp.Exception & exception):
return None # TODO
@ -103,6 +117,7 @@ cdef extern from "<utility>" namespace "std":
Request moveRequest"std::move"(Request)
Response moveResponse"std::move"(Response)
PyPromise movePromise"std::move"(PyPromise)
VoidPromise moveVoidPromise"std::move"(VoidPromise)
RemotePromise moveRemotePromise"std::move"(RemotePromise)
CallContext moveCallContext"std::move"(CallContext)
@ -301,8 +316,6 @@ cdef class _DynamicListBuilder:
return self._get(index)
def __setitem__(self, index, value):
# TODO: share code with _DynamicStructBuilder.__setattr__
size = self.thisptr.size()
if index >= size:
raise IndexError('Out of bounds')
@ -387,6 +400,8 @@ cdef to_python_reader(C_DynamicValue.Reader self, object parent):
return None
elif type == capnp.TYPE_OBJECT:
return _DynamicObjectReader()._init(self.asObject(), parent)
elif type == capnp.TYPE_CAPABILITY:
return _DynamicCapabilityClient()._init(self.asCapability(), parent)
elif type == capnp.TYPE_UNKNOWN:
raise ValueError("Cannot convert type to Python. Type is unknown by capnproto library")
else:
@ -417,6 +432,8 @@ cdef to_python_builder(C_DynamicValue.Builder self, object parent):
return None
elif type == capnp.TYPE_OBJECT:
return _DynamicObjectBuilder()._init(self.asObject(), parent)
elif type == capnp.TYPE_CAPABILITY:
return _DynamicCapabilityClient()._init(self.asCapability(), parent)
elif type == capnp.TYPE_UNKNOWN:
raise ValueError("Cannot convert type to Python. Type is unknown by capnproto library")
else:
@ -428,6 +445,9 @@ cdef C_DynamicValue.Reader _extract_dynamic_struct_builder(_DynamicStructBuilder
cdef C_DynamicValue.Reader _extract_dynamic_struct_reader(_DynamicStructReader value):
return C_DynamicValue.Reader(value.thisptr)
cdef C_DynamicValue.Reader _extract_dynamic_client(_DynamicCapabilityClient value):
return C_DynamicValue.Reader(value.thisptr)
cdef _setDynamicField(_DynamicSetterClasses thisptr, field, value, parent):
cdef C_DynamicValue.Reader temp
value_type = type(value)
@ -458,6 +478,8 @@ cdef _setDynamicField(_DynamicSetterClasses thisptr, field, value, parent):
thisptr.set(field, _extract_dynamic_struct_builder(value))
elif value_type is _DynamicStructReader:
thisptr.set(field, _extract_dynamic_struct_reader(value))
elif value_type is _DynamicCapabilityClient:
thisptr.set(field, _extract_dynamic_client(value))
else:
raise ValueError("Non primitive type")
@ -491,6 +513,8 @@ cdef _setDynamicFieldPtr(_DynamicSetterClasses * thisptr, field, value, parent):
thisptr.set(field, _extract_dynamic_struct_builder(value))
elif value_type is _DynamicStructReader:
thisptr.set(field, _extract_dynamic_struct_reader(value))
elif value_type is _DynamicCapabilityClient:
thisptr.set(field, _extract_dynamic_client(value))
else:
raise ValueError("Non primitive type")
@ -898,6 +922,7 @@ cdef class _CallContext:
cdef class Promise:
cdef PyPromise * thisptr
cdef public bint is_consumed
def __init__(self):
self.is_consumed = True
@ -928,6 +953,38 @@ cdef class Promise:
return Promise()._init(capnp.then(deref(self.thisptr), <PyObject *>func, <PyObject *>error_func))
cdef class _VoidPromise:
cdef VoidPromise * thisptr
cdef public bint is_consumed
def __init__(self):
self.is_consumed = True
cdef _init(self, VoidPromise other):
self.is_consumed = False
self.thisptr = new VoidPromise(moveVoidPromise(other))
return self
def __dealloc__(self):
del self.thisptr
cpdef wait(self) except+:
if self.is_consumed:
raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
self.thisptr.wait()
self.is_consumed = True
# cpdef then(self, func, error_func=None) except+:
# if self.is_consumed:
# raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
# Py_INCREF(func)
# Py_INCREF(error_func)
# return Promise()._init(capnp.then(deref(self.thisptr), <PyObject *>func, <PyObject *>error_func))
cdef class _RemotePromise:
cdef RemotePromise * thisptr
cdef public bint is_consumed
@ -957,14 +1014,28 @@ cdef class _RemotePromise:
cpdef as_pypromise(self) except +:
Promise()._init(convert_to_pypromise(deref(self.thisptr)))
# cpdef then(self, func, error_func=None) except+:
# if self.is_consumed:
# raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
cpdef then(self, func, error_func=None) except+:
if self.is_consumed:
raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
# Py_INCREF(func)
# Py_INCREF(error_func)
Py_INCREF(func)
Py_INCREF(error_func)
# return _RemotePromise()._init(capnp.then(deref(self.thisptr), <PyObject *>func, <PyObject *>error_func))
return _VoidPromise()._init(capnp.then(deref(self.thisptr), <PyObject *>func, <PyObject *>error_func))
cpdef _get(self, field) except +ValueError:
return _DynamicCapabilityClient()._init((<C_DynamicValue.Pipeline>self.thisptr.get(field)).asCapability(), self._parent)
def __getattr__(self, field):
return self._get(field)
property schema:
"""A property that returns the _StructSchema object matching this reader"""
def __get__(self):
return _StructSchema()._init(self.thisptr.getSchema())
def __dir__(self):
return list(self.schema.fieldnames)
cdef class EventLoop:
cdef SimpleEventLoop thisptr
@ -1008,11 +1079,21 @@ cdef class _Response(_DynamicStructReader):
self._init(<C_DynamicStruct.Reader>deref(self.thisptr_child), parent)
return self
cdef _init_childptr(self, Response * other, parent):
self.thisptr_child = other
self._init(<C_DynamicStruct.Reader>deref(self.thisptr_child), parent)
return self
cdef class _DynamicCapabilityClient:
cdef C_DynamicCapability.Client thisptr
cdef public object _event_loop, _server
cdef public object _event_loop, _server, _parent
def __init__(self, schema, server, event_loop):
cdef _init(self, C_DynamicCapability.Client other, object parent):
self.thisptr = other
self._parent = parent
return self
cdef _init_vals(self, schema, server, event_loop):
cdef _InterfaceSchema s
if hasattr(schema, 'schema'):
s = schema.schema
@ -1023,6 +1104,7 @@ cdef class _DynamicCapabilityClient:
self._event_loop = event_loop
self.thisptr = new_client(s.thisptr, <PyObject *>server, loop.thisptr)
self._server = server
return self
cpdef _send_helper(self, name, firstSegmentWordSize, kwargs) except +ValueError:
cdef Request * request = new Request(self.thisptr.newRequest(name, firstSegmentWordSize))
@ -1287,7 +1369,7 @@ cdef class SchemaParser:
elif proto.isInterface:
def new_client(bound_local_module):
def helper(server, loop):
return _DynamicCapabilityClient(bound_local_module, server, loop)
return _DynamicCapabilityClient()._init_vals(bound_local_module, server, loop)
return helper
local_module.schema = schema.as_interface()
local_module.new_client = new_client(local_module)

View file

@ -2,7 +2,7 @@
# distutils: language = c++
# distutils: extra_compile_args = --std=c++11
from schema_cpp cimport Node, Data, StructNode, EnumNode
from async_cpp cimport PyPromise, Promise
from async_cpp cimport PyPromise, VoidPromise, Promise
from cpython.ref cimport PyObject
from libc.stdint cimport *
@ -112,6 +112,8 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
pass
cppclass Builder:
pass
cppclass Pipeline:
pass
enum Type:
TYPE_UNKNOWN " ::capnp::DynamicValue::UNKNOWN"
@ -147,11 +149,14 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
void adopt(char *, DynamicOrphan) except +ValueError
DynamicOrphan disown(char *)
DynamicStruct.Reader asReader()
cppclass Pipeline:
DynamicValueForward.Pipeline get(char *)
StructSchema getSchema()
cdef extern from "capnp/capability.h" namespace " ::capnp":
cdef cppclass Response" ::capnp::Response< ::capnp::DynamicStruct>"(DynamicStruct.Reader):
Response(Response)
cdef cppclass RemotePromise" ::capnp::RemotePromise< ::capnp::DynamicStruct>"(Promise[Response]):
cdef cppclass RemotePromise" ::capnp::RemotePromise< ::capnp::DynamicStruct>"(Promise[Response], DynamicStruct.Pipeline):
RemotePromise(RemotePromise)
cdef extern from "capnp/dynamic.h" namespace " ::capnp":
@ -189,6 +194,10 @@ cdef extern from "fixMaybe.h":
cdef extern from "capabilityHelper.h":
PyPromise evalLater(EventLoop &, PyObject * func)
PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func)
VoidPromise then(RemotePromise & promise, PyObject * func, PyObject * error_func)
cppclass PythonInterfaceDynamicImpl:
pass
DynamicCapability.Client new_client(InterfaceSchema&, PyObject *, EventLoop&)
@ -235,6 +244,7 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
Reader(DynamicList.Reader& value)
Reader(DynamicEnum value)
Reader(DynamicStruct.Reader& value)
Reader(DynamicCapability.Client& value)
Type getType()
int64_t asInt"as<int64_t>"()
uint64_t asUint"as<uint64_t>"()
@ -244,6 +254,7 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
DynamicList.Reader asList"as< ::capnp::DynamicList>"()
DynamicStruct.Reader asStruct"as< ::capnp::DynamicStruct>"()
ObjectPointer.Reader asObject"as< ::capnp::ObjectPointer>"()
DynamicCapability.Client asCapability"as< ::capnp::DynamicCapability>"()
DynamicEnum asEnum"as< ::capnp::DynamicEnum>"()
Data.Reader asData"as< ::capnp::Data>"()
@ -257,9 +268,14 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
DynamicList.Builder asList"as< ::capnp::DynamicList>"()
DynamicStruct.Builder asStruct"as< ::capnp::DynamicStruct>"()
ObjectPointer.Builder asObject"as< ::capnp::ObjectPointer>"()
DynamicCapability.Client asCapability"as< ::capnp::DynamicCapability>"()
DynamicEnum asEnum"as< ::capnp::DynamicEnum>"()
Data.Builder asData"as< ::capnp::Data>"()
cppclass Pipeline:
Pipeline(Pipeline)
DynamicCapability.Client asCapability"releaseAs< ::capnp::DynamicCapability>"()
cdef extern from "capnp/schema-parser.h" namespace " ::capnp":
cdef cppclass ParsedSchema(Schema):
ParsedSchema getNested(char * name) except +
@ -297,8 +313,3 @@ cdef extern from "kj/async.h" namespace " ::kj":
PyPromise there(PyPromise, PyObject * func)
cdef cppclass SimpleEventLoop(EventLoop):
pass
cdef extern from "asyncHelper.h":
PyPromise evalLater(EventLoop &, PyObject * func)
PyPromise there(EventLoop & loop, PyPromise & promise, PyObject * func, PyObject * error_func)
PyPromise then(PyPromise & promise, PyObject * func, PyObject * error_func)

View file

@ -35,11 +35,11 @@ interface TestInterface {
# grault @2 () -> TestAllTypes;
# }
# interface TestPipeline {
# getCap @0 (n: UInt32, inCap :TestInterface) -> (s: Text, outBox :Box);
# testPointers @1 (cap :TestInterface, obj :Object, list :List(TestInterface)) -> ();
interface TestPipeline {
getCap @0 (n: UInt32, inCap :TestInterface) -> (s: Text, outBox :Box);
testPointers @1 (cap :TestInterface, obj :Object, list :List(TestInterface)) -> ();
# struct Box {
# cap @0 :TestInterface;
# }
# }
struct Box {
cap @0 :TestInterface;
}
}

View file

@ -9,10 +9,21 @@ def capability():
return capnp.load(os.path.join(this_dir, 'test_capability.capnp'))
class Server:
def foo(self, context):
context.results.x = str(context.params.i * 5 + 1)
def __init__(self, val=1):
self.val = val
def test_basic_client(capability):
def foo(self, context):
context.results.x = str(context.params.i * 5 + self.val)
class PipelineServer:
def getCap(self, context):
def _then(response):
context.results.s = response.x + '_foo'
context.results.outBox.outCap = Server(100)
return context.params.inCap.foo(i=context.params.n).then(_then)
def test_client(capability):
loop = capnp.EventLoop()
client = capability.TestInterface.new_client(Server(), loop)
@ -70,3 +81,16 @@ def test_simple_client(capability):
with pytest.raises(ValueError):
remote = client.foo(baz=5)
def test_pipeline(capability):
loop = capnp.EventLoop()
client = capability.TestPipeline.new_client(PipelineServer(), loop)
foo_client = capability.TestInterface.new_client(Server(), loop)
remote = client.getCap(n=5, inCap=foo_client)
response = loop.wait_remote(remote)
assert response.s == '26_foo'