2019-09-27 14:40:54 -07:00
|
|
|
#!/usr/bin/env python3
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
|
|
|
import asyncio
|
|
|
|
import argparse
|
|
|
|
import time
|
|
|
|
import capnp
|
|
|
|
|
|
|
|
import thread_capnp
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args():
|
2021-10-01 11:00:22 -07:00
|
|
|
parser = argparse.ArgumentParser(
|
2023-06-08 02:29:24 +02:00
|
|
|
usage="Connects to the Example thread server at the given address and does some RPCs"
|
2021-10-01 11:00:22 -07:00
|
|
|
)
|
2019-09-26 22:18:28 -07:00
|
|
|
parser.add_argument("host", help="HOST:PORT")
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
2019-09-26 22:18:28 -07:00
|
|
|
return parser.parse_args()
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
|
|
|
|
|
|
|
class StatusSubscriber(thread_capnp.Example.StatusSubscriber.Server):
|
2021-10-01 11:00:22 -07:00
|
|
|
"""An implementation of the StatusSubscriber interface"""
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
2023-06-09 22:04:39 +02:00
|
|
|
async def status(self, value, **kwargs):
|
2021-10-01 11:00:22 -07:00
|
|
|
print("status: {}".format(time.time()))
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
|
|
|
|
|
|
|
async def main(host):
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
host, port = host.split(":")
|
|
|
|
connection = await capnp.AsyncIoStream.create_connection(host=host, port=port)
|
|
|
|
client = capnp.TwoPartyClient(connection)
|
2019-09-26 22:18:28 -07:00
|
|
|
cap = client.bootstrap().cast_as(thread_capnp.Example)
|
|
|
|
|
|
|
|
# Start background task for subscriber
|
2023-10-03 18:04:51 +02:00
|
|
|
task = asyncio.ensure_future(cap.subscribeStatus(StatusSubscriber()))
|
2019-09-26 22:18:28 -07:00
|
|
|
|
|
|
|
# Run blocking tasks
|
2021-10-01 11:00:22 -07:00
|
|
|
print("main: {}".format(time.time()))
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
await cap.longRunning()
|
2021-10-01 11:00:22 -07:00
|
|
|
print("main: {}".format(time.time()))
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
await cap.longRunning()
|
2021-10-01 11:00:22 -07:00
|
|
|
print("main: {}".format(time.time()))
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
await cap.longRunning()
|
2021-10-01 11:00:22 -07:00
|
|
|
print("main: {}".format(time.time()))
|
TwoWayPipe and basic asyncio support
Note: I've tried not to break any behaviour of the previously working APIs
Python API Changes / Additions
- capnp/lib/capnp.pyx
* class _RemotePromise
+ [Added] cpdef _wait(self)
= Exception raising code that used to be inside of wait(self)
+ [Modified] def wait(self)
= Same functionality as before
+ [Added] async def a_wait(self)
= Cannot use await as that's a reserved keyword
= Uses pollRemote and asyncio.sleep(0) to make call asynchronous
* class _TwoPartyVatNetwork
+ [Added] cdef _init_pipe(self, _TwoWayPipe pipe, Side side,
schema_cpp.ReaderOptions opts)
= Instanciates a TwoPartyVatNetwork using a TwoWayPipe (instead of
using a file handle or connection as before)
* class TwoPartyClient
+ [Modified] def __init__(self, socket=None, restorer=None,
traversal_limit_in_words=None, nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
* class TwoPartyServer
+ [Modified] def __init__(self, socket=None, restorer=None,
server_socket=None, bootstrap=None, traversal_limit_in_words=None,
nesting_limit=None)
= Changes the socket parameter to be optional
= If socket is not specified, default to using a TwoWayPipe
= Simplified code by removing an else (self._connect)
+ [Added] async def read(self, bufsize)
= awaitable function that blocks until data has been read
= bufsize defines the maximum amount of data to be read back
(e.g. 4096 bytes)
= Reads data from TwoWayPipe
+ [Added] def write(self, data)
= Write data to TwoWayPipe
= Not awaitable as the write interface of the TwoWayPipe doesn't
have poll functionality
+ [Added] async def poll_forever(self)
= asyncio equivalent of run_forever()
* class _TwoWayPipe
+ Wrapper class for TwoWayPipe
Other Additions
- capnp/helpers/asyncHelper.h
* pollWaitScope
+ Pumps the kj event handler
+ Used for the TwoWayServer
* pollRemote
+ Polls a remote promise
+ i.e. a capnp RPC call
- capnp/helpers/asyncIoHelper.h
* AsyncIoStreamReadHelper
+ I wasn't able to figure out Promise[size_t] using Cython so this was
the next best thing I could think of doing
+ Was needed to handle read polling from a read promise
= Polling is used for asyncio as kj waits need a wrapper to be
compatible
- capnp/lib/capnp.pyx
* makeTwoWayPipe
+ Wrapper for kj newTwoWayPipe function
* poll_once
+ Single pump of the kj event handler (used with pollWaitScope)
TwoWayClient Usage - TwoWayPipe
- See examples/async_client.py
TwoWayServer Usage - TwoWayPipe
- See examples/async_server.py
capnp/helpers/asyncIoHelper.h
Misc Changes
- Fixed thread_server.py and thread_client.py to use bootstrap instead
of ez_restore
- async_client.py and async_server.py examples
* Uses the same thread.capnp as thread_client.py and thread_server.py
* They are compatible, so you can mix and match client and server for
compatibility testing
* async_client.py and async_server.py require <address>:<port>
formatting (unlike autodetection from thread_client.py and
thread_server.py)
2019-09-16 21:34:29 -07:00
|
|
|
|
2023-10-03 18:04:51 +02:00
|
|
|
task.cancel()
|
|
|
|
|
2021-10-01 11:00:22 -07:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
args = parse_args()
|
2023-10-03 18:04:51 +02:00
|
|
|
asyncio.run(capnp.run(main(args.host)))
|
Integrate the KJ event loop into Python's asyncio event loop (#310)
* Integrate the KJ event loop into Python's asyncio event loop
Fix #256
This PR attempts to remove the slow and expensive polling behavior for asyncio
in favor of proper linking of the KJ event loop to the asyncio event loop.
* Don't memcopy buffer
* Improve promise cancellation and prepare for timer implementation
* Add attribution for asyncProvider.cpp
* Implement timeout
* Cleanup
* First round of simplifications
* Add more a_wait functions and a shutdown function
* Fix edge-cases with loop shutdown
* Clean up calculator examples
* Cleanup
* Cleanup
* Reformat
* Fix warnings
* Reformat again
* Compatibility with macos
* Inline the asyncio loop in some places where this is feasible
* Add todo
* Fix
* Remove synchronous wait
* Wrap fd listening callbacks in a class
* Remove poll_forever
* Remove the thread-local/thread-global optimization
This will not matter much soon anyway, and simplifies things
* Share promise code by using fused types
* Improve refcounting of python objects in promises
We replace many instances of PyObject* by Own<PyRefCounter> for more automatic
reference management.
* Code wrapPyFunc in a similar way to wrapPyFuncNoArg
* Refactor capabilityHelper, fix several memory bugs for promises and add __await__
* Improve promise ownership, reduce memory leaks
Promise wrappers now hold a Own<Promise<Own<PyRefCounter>>> object. This might
seem like excessive nesting of objects (which to some degree it is, but with
good reason):
- The outer Own is needed because Cython cannot allocate objects without a
nullary constructor on the stack (Promise doesn't have a nullary constructor).
Additionally, I believe it would be difficult or impossible to detect when a
promise is cancelled/moved if we use a bare Promise.
- Every promise returns a Owned PyRefCounter. PyRefCounter makes sure that a
reference to the returned object keeps existing until the promise is fulfilled
or cancelled. Previously, this was attempted using attach, which is redundant
and makes reasoning about PyINCREF and PyDECREF very difficult.
- Because a promise holds a Own<Promise<...>>, when we perform any kind of
action on that promise (a_wait, then, ...), we have to explicitly move() the
ownership around. This will leave the original promise with a NULL-pointer,
which we can easily detect as a cancelled promise.
Promises now only hold references to their 'parents' when strictly needed. This
should reduce memory pressure.
* Simplify and test the promise joining functionality
* Attach forgotten parent
* Catch exceptions in add_reader and friends
* Further cleanup of memory leaks
* Get rid of a_wait() in examples
* Cancel all fd read operations when the python asyncio loop is closed
* Formatting
* Remove support for capnp < 7000
* Bring asyncProvider.cpp more in line with upstream async-io-unix.c++
It was originally copied from the nodejs implementation, which in turn copied
from async-io-unix.c++. But that copy is pretty old.
* Fix a bug that caused file descriptors to never be closed
* Implement AsyncIoStream based on Python transports and protocols
* Get rid of asyncProvider
All asyncio now goes through _AsyncIoStream
* Formatting
* Add __dict__ to PyAsyncIoStreamProtocol for python 3.7
* Reintroduce strange ipv4/ipv6 selection code to make ci happy
* Extra pause_reading()
* Work around more python bugs
* Be careful to only close transport when this is still possible
* Move pause_reading() workaround
2023-06-06 20:08:15 +02:00
|
|
|
|
|
|
|
# Test that we can run multiple asyncio loops in sequence. This is particularly tricky, because
|
|
|
|
# main contains a background task that we never cancel. The entire loop gets cleaned up anyways,
|
|
|
|
# and we can start a new loop.
|
2023-10-03 18:04:51 +02:00
|
|
|
asyncio.run(capnp.run(main(args.host)))
|