mirror of
https://github.com/capnproto/pycapnp.git
synced 2025-03-04 00:14:45 +01:00
32 lines
771 B
Python
32 lines
771 B
Python
from types import coroutine
|
|
import pytest
|
|
import socket
|
|
import gc
|
|
|
|
import capnp
|
|
import test_capability
|
|
import test_capability_capnp as capability
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
async def kj_loop():
|
|
async with capnp.kj_loop():
|
|
yield
|
|
|
|
|
|
@coroutine
|
|
def wrap(p):
|
|
return (yield from p)
|
|
|
|
|
|
async def test_kj_loop_await_attach():
|
|
read, write = socket.socketpair()
|
|
read = await capnp.AsyncIoStream.create_connection(sock=read)
|
|
write = await capnp.AsyncIoStream.create_connection(sock=write)
|
|
_ = capnp.TwoPartyServer(write, bootstrap=test_capability.Server())
|
|
client = capnp.TwoPartyClient(read).bootstrap().cast_as(capability.TestInterface)
|
|
t = wrap(client.foo(5, True).__await__())
|
|
del client
|
|
del read
|
|
gc.collect()
|
|
await t
|