Schema loading from the wire

Cap'n Proto provides a schema loader, which can be used to dynamically
load schemas during runtime. To port this functionality to pycapnp,
a new class is provided `C_SchemaLoader`, which exposes the Cap'n
Proto C++ interface, and `SchemaLoader`, which is part of the pycapnp
library.

The specific use case for this is when a capnp message contains
a Node.Reader: The schema for a yet unseen message can be loaded
dynamically, allowing the future message to be properly processed.

If the message is a struct containing other structs, all the schemas for
every struct must be loaded to correctly parse the message. See
https://github.com/DaneSlattery/capnp_generic_poc for a
proof-of-concept.

Add docs and cleanup

Add more docs

Reduce changes

Fix flake8 formatting

Fix get datatype
This commit is contained in:
Rowan Reeve 2023-02-21 08:31:12 +02:00 committed by DaneSlattery
parent b439993b1f
commit a5c29a74d2
9 changed files with 106 additions and 4 deletions

View file

@ -0,0 +1,12 @@
#pragma once
#include "capnp/dynamic.h"
#include "capnp/schema.capnp.h"
/// @brief Convert the dynamic struct to a Node::Reader
::capnp::schema::Node::Reader toReader(capnp::DynamicStruct::Reader reader)
{
// requires an intermediate step to AnyStruct before going directly to Node::Reader,
// since there exists no direct conversion from DynamicStruct::Reader to Node::Reader.
return reader.as<capnp::AnyStruct>().as<capnp::schema::Node>();
}

View file

@ -1,7 +1,7 @@
from capnp.includes.capnp_cpp cimport (
Maybe, PyPromise, VoidPromise, RemotePromise,
DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability,
RpcSystem, MessageBuilder, Own, PyRefCounter
DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability,
RpcSystem, MessageBuilder, Own, PyRefCounter, Node, DynamicStruct
)
from capnp.includes.schema_cpp cimport ByteArray
@ -30,3 +30,7 @@ cdef extern from "capnp/helpers/rpcHelper.h":
cdef extern from "capnp/helpers/serialize.h":
ByteArray messageToPackedBytes(MessageBuilder &, size_t wordCount)
cdef extern from "capnp/helpers/deserialize.h":
Node.Reader toReader(DynamicStruct.Reader reader) except +reraise_kj_exception

View file

@ -11,4 +11,4 @@ kj::Array< ::capnp::byte> messageToPackedBytes(capnp::MessageBuilder & message,
kj::ArrayOutputStream out(result.asPtr());
capnp::writePackedMessage(out, message);
return heapArray(out.getArray()); // TODO: make this non-copying somehow
}
}

View file

@ -446,6 +446,12 @@ cdef extern from "capnp/dynamic.h" namespace " ::capnp":
DynamicStruct.Pipeline asStruct"releaseAs< ::capnp::DynamicStruct>"()
Type getType()
cdef extern from "capnp/schema-loader.h" namespace " ::capnp":
cdef cppclass SchemaLoader:
SchemaLoader()
Schema load(Node.Reader reader) except +reraise_kj_exception
Schema get(uint64_t id_) except +reraise_kj_exception
cdef extern from "capnp/schema-parser.h" namespace " ::capnp":
cdef cppclass ParsedSchema(Schema) nogil:
ParsedSchema getNested(char * name) except +reraise_kj_exception

View file

@ -5,7 +5,7 @@ from capnp.includes cimport schema_cpp
from capnp.includes.capnp_cpp cimport (
Schema as C_Schema, StructSchema as C_StructSchema, InterfaceSchema as C_InterfaceSchema,
EnumSchema as C_EnumSchema, ListSchema as C_ListSchema, DynamicStruct as C_DynamicStruct,
DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList,
DynamicValue as C_DynamicValue, Type as C_Type, DynamicList as C_DynamicList, SchemaLoader as C_SchemaLoader,
SchemaParser as C_SchemaParser, ParsedSchema as C_ParsedSchema, VOID, ArrayPtr, StringPtr,
String, StringTree, DynamicOrphan as C_DynamicOrphan, AnyPointer as C_DynamicObject,
DynamicCapability as C_DynamicCapability, Request, Response, RemotePromise, Promise,
@ -30,6 +30,9 @@ cdef class _StringArrayPtr:
cdef size_t size
cdef ArrayPtr[StringPtr] asArrayPtr(self) except +reraise_kj_exception
cdef class SchemaLoader:
cdef C_SchemaLoader * thisptr
cdef class SchemaParser:
cdef C_SchemaParser * thisptr
cdef public dict modules_by_id

View file

@ -362,6 +362,11 @@ cdef class _NodeReader:
property isEnum:
def __get__(self):
return self.thisptr.isEnum()
property node:
"""A property that returns the NodeReader as a DynamicStructReader."""
def __get__(self):
return _DynamicStructReader()._init(self.thisptr, self)
cdef class _NestedNodeReader:
@ -3231,6 +3236,34 @@ cdef class _StringArrayPtr:
return ArrayPtr[StringPtr](self.thisptr, self.size)
cdef class SchemaLoader:
""" Class which can be used to construct Schema objects from schema::Nodes as defined in
schema.capnp.
This class wraps capnproto/c++/src/capnp/schema-loader.h directly."""
def __cinit__(self):
self.thisptr = new C_SchemaLoader()
def __dealloc__(self):
del self.thisptr
def load(self, _NodeReader reader):
"""Loads the given schema node. Validates the node and throws an exception if invalid. This
makes a copy of the schema, so the object passed in can be destroyed after this returns.
"""
return _Schema()._init(self.thisptr.load(reader.thisptr))
def load_dynamic(self, _DynamicStructReader reader):
"""Loads the given schema node with self.load, but converts from a _DynamicStructReader
first."""
return _Schema()._init(self.thisptr.load(helpers.toReader(reader.thisptr)))
def get(self, id_):
"""Gets the schema for the given ID, throwing an exception if it isn't present."""
return _Schema()._init(self.thisptr.get(<uint64_t>id_))
cdef class SchemaParser:
"""A class for loading Cap'n Proto schema files.

View file

@ -88,6 +88,10 @@ Miscellaneous
:undoc-members:
:inherited-members:
.. autoclass:: SchemaLoader
:members:
:undoc-members:
:inherited-members:
Functions
---------

View file

@ -5,3 +5,16 @@ struct Foo {
name @1 :Text;
}
struct Baz{
text @0 :Text;
qux @1 :Qux;
}
struct Qux{
id @0 :UInt64;
}
interface Wrapper {
wrapped @0 (object :AnyPointer);
}

View file

@ -119,3 +119,30 @@ def test_bundled_import_hook():
# stream.capnp should be bundled, or provided by the system capnproto
capnp.add_import_hook()
import stream_capnp # noqa: F401
def test_load_capnp(foo):
# test dynamically loading
loader = capnp.SchemaLoader()
loader.load(foo.Baz.schema.get_proto())
loader.load_dynamic(foo.Qux.schema.get_proto().node)
schema = loader.get(foo.Baz.schema.get_proto().node.id).as_struct()
assert "text" in schema.fieldnames
assert "qux" in schema.fieldnames
assert schema.fields["qux"].proto.slot.type.which == "struct"
class Wrapper(foo.Wrapper.Server):
def wrapped(self, object, **kwargs):
assert isinstance(object, capnp.lib.capnp._DynamicObjectReader)
baz_ = object.as_struct(schema)
assert baz_.text == "test"
assert baz_.qux.id == 2
# test calling into the wrapper with a Baz message.
baz_ = foo.Baz.new_message()
baz_.text = "test"
baz_.qux.id = 2
wrapper = foo.Wrapper._new_client(Wrapper())
wrapper.wrapped(baz_).wait()