Fix allowing RemotePromises to be returned directly

This commit is contained in:
Jason Paryani 2013-12-11 11:24:27 -08:00
parent 03bac73120
commit 4d8a27844f
3 changed files with 57 additions and 29 deletions

View file

@ -7,15 +7,29 @@
extern "C" {
PyObject * wrap_remote_call(PyObject * func, capnp::Response<capnp::DynamicStruct> &);
PyObject * wrap_dynamic_struct_reader(capnp::DynamicStruct::Reader &);
PyObject * wrap_dynamic_struct_reader(capnp::Response<capnp::DynamicStruct> &);
::kj::Promise<void> * call_server_method(PyObject * py_server, char * name, capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> & context);
PyObject * wrap_kj_exception(kj::Exception &);
PyObject * wrap_kj_exception_for_reraise(kj::Exception &);
PyObject * get_exception_info(PyObject *, PyObject *, PyObject *);
PyObject * convert_array_pyobject(kj::Array<PyObject *>&);
::kj::Promise<PyObject *> * extract_promise(PyObject *);
::capnp::RemotePromise< ::capnp::DynamicStruct> * extract_remote_promise(PyObject *);
}
::kj::Promise<PyObject *> convert_to_pypromise(capnp::RemotePromise<capnp::DynamicStruct> & promise) {
return promise.then([](capnp::Response<capnp::DynamicStruct>&& response) { return wrap_dynamic_struct_reader(response); } );
}
::kj::Promise<PyObject *> convert_to_pypromise(kj::Promise<void> & promise) {
return promise.then([]() { Py_RETURN_NONE;} );
}
template<class T>
::kj::Promise<void> convert_to_voidpromise(kj::Promise<T> & promise) {
return promise.then([](T) { } );
}
void reraise_kj_exception() {
try {
if (PyErr_Occurred())
@ -67,15 +81,25 @@ void check_py_error() {
// TODO: need to decref error_func as well on successful run
kj::Promise<PyObject *> wrapPyFunc(PyObject * func, PyObject * arg) {
PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL);
Py_DECREF(func);
auto arg_promise = extract_promise(arg);
check_py_error();
if(arg_promise == NULL) {
PyObject * result = PyObject_CallFunctionObjArgs(func, arg, NULL);
Py_DECREF(func);
auto promise = extract_promise(result);
if(promise != NULL)
return kj::mv(*promise);
return result;
check_py_error();
auto promise = extract_promise(result);
if(promise != NULL)
return kj::mv(*promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
auto remote_promise = extract_remote_promise(result);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return result;
}
else {
return arg_promise->then([&](PyObject * new_arg){ return wrapPyFunc(func, new_arg); });// TODO: delete arg_promise?
}
}
kj::Promise<PyObject *> wrapPyFuncNoArg(PyObject * func) {
@ -87,6 +111,9 @@ kj::Promise<PyObject *> wrapPyFuncNoArg(PyObject * func) {
auto promise = extract_promise(result);
if(promise != NULL)
return kj::mv(*promise);
auto remote_promise = extract_remote_promise(result);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return result;
}
@ -98,6 +125,9 @@ kj::Promise<PyObject *> wrapRemoteCall(PyObject * func, capnp::Response<capnp::D
auto promise = extract_promise(ret);
if(promise != NULL)
return kj::mv(*promise);
auto remote_promise = extract_remote_promise(ret);
if(remote_promise != NULL)
return convert_to_pypromise(*remote_promise); // TODO: delete promise, see incref of containing promise in capnp.pyx
return ret;
}
@ -168,17 +198,4 @@ capnp::DynamicValue::Reader new_server(capnp::InterfaceSchema & schema, PyObject
capnp::Capability::Client server_to_client(capnp::InterfaceSchema & schema, PyObject * server) {
return kj::heap<PythonInterfaceDynamicImpl>(schema, server);
}
::kj::Promise<PyObject *> convert_to_pypromise(capnp::RemotePromise<capnp::DynamicStruct> & promise) {
return promise.then([](capnp::Response<capnp::DynamicStruct>&& response) { return wrap_dynamic_struct_reader(response); } );
}
::kj::Promise<PyObject *> convert_to_pypromise(kj::Promise<void> & promise) {
return promise.then([]() { Py_RETURN_NONE;} );
}
template<class T>
::kj::Promise<void> convert_to_voidpromise(kj::Promise<T> & promise) {
return promise.then([](T) { } );
}
}

View file

@ -24,8 +24,8 @@ import inspect as _inspect
from operator import attrgetter as _attrgetter
# By making it public, we'll be able to call it from capabilityHelper.h
cdef public object wrap_dynamic_struct_reader(C_DynamicStruct.Reader & reader):
return _DynamicStructReader()._init(reader, None)
cdef public object wrap_dynamic_struct_reader(Response & r):
return _Response()._init_childptr(new Response(moveResponse(r)), None)
cdef public PyObject * wrap_remote_call(PyObject * func, Response & r) except *:
response = _Response()._init_childptr(new Response(moveResponse(r)), None)
@ -43,7 +43,7 @@ cdef public VoidPromise * call_server_method(PyObject * _server, char * _method_
server = <object>_server
method_name = <object>_method_name
context = _CallContext()._init(_context)
context = _CallContext()._init(_context) # TODO: invalidate this with promise chain
func = getattr(server, method_name+'_context', None)
if func is not None:
ret = func(context)
@ -119,6 +119,15 @@ cdef public PyPromise * extract_promise(object obj):
return NULL
cdef public RemotePromise * extract_remote_promise(object obj):
if type(obj) is _RemotePromise:
promise = <_RemotePromise>obj
promise.is_consumed = True
Py_INCREF(promise) # TODO: fix leak
return promise.thisptr
return NULL
cdef extern from "<kj/string.h>" namespace " ::kj":
String strStructReader" ::kj::str"(C_DynamicStruct.Reader)
String strStructBuilder" ::kj::str"(C_DynamicStruct.Builder)
@ -1307,7 +1316,7 @@ cdef class _VoidPromise:
cpdef as_pypromise(self) except +reraise_kj_exception:
if self.is_consumed:
raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
Promise()._init(helpers.convert_to_pypromise(deref(self.thisptr)), self)
return Promise()._init(helpers.convert_to_pypromise(deref(self.thisptr)), self)
cdef class _RemotePromise:
cdef RemotePromise * thisptr
@ -1338,7 +1347,7 @@ cdef class _RemotePromise:
cpdef as_pypromise(self) except +reraise_kj_exception:
if self.is_consumed:
raise RuntimeError('Promise was already used in a consuming operation. You can no longer use this Promise object')
Promise()._init(helpers.convert_to_pypromise(deref(self.thisptr)), self)
return Promise()._init(helpers.convert_to_pypromise(deref(self.thisptr)), self)
cpdef then(self, func, error_func=None) except +reraise_kj_exception:
if self.is_consumed:
@ -1650,7 +1659,7 @@ cdef class TwoPartyClient:
# ez-rpc from the C++ API uses SturdyRef.objectId under the hood
ref = rpc_capnp.SturdyRef.new_message()
# objectId is an AnyPointer, so we have a special method for setting it to text
ref.objectId.set_as_text('calculator')
ref.objectId.set_as_text(textId)
return self.restore(ref.objectId)

View file

@ -41,7 +41,9 @@ def evaluateImpl(expression, params=None):
joinedParams = capnp.join_promises(paramPromises)
# When the parameters are complete, call the function.
ret = joinedParams.then(lambda vals: func.call(vals).then(lambda result: result.value))
ret = (joinedParams
.then(lambda vals: func.call(vals))
.then(lambda result: result.value))
return ret
else: