implement TransportMethod ABC and internal method with simple test
parent
e59f11f379
commit
5e1eca6ff5
@ -0,0 +1,56 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
from .method import (
|
||||||
|
MessageHandler,
|
||||||
|
TransportMethod,
|
||||||
|
TransmissionSuccess,
|
||||||
|
)
|
||||||
|
from ..our_types import JSON
|
||||||
|
|
||||||
|
|
||||||
|
class InternalTransport(TransportMethod):
|
||||||
|
"""transport method for multiple enjo services running together in the same Python thread.
|
||||||
|
built for asyncio, not for communicating across multiple threads.
|
||||||
|
|
||||||
|
create one for each service"""
|
||||||
|
|
||||||
|
__recv_handler: MessageHandler | None = None
|
||||||
|
__send_handler: MessageHandler | None = None
|
||||||
|
|
||||||
|
async def send(self, data: JSON) -> TransmissionSuccess:
|
||||||
|
if self.__send_handler is None:
|
||||||
|
return TransmissionSuccess.EXCHANGE_UNAVAILABLE
|
||||||
|
await self.__send_handler(data)
|
||||||
|
return TransmissionSuccess.SUBMITTED_SUCCESSFULLY
|
||||||
|
|
||||||
|
def on_receive(self, handler: MessageHandler) -> None:
|
||||||
|
self.__recv_handler = handler
|
||||||
|
|
||||||
|
async def _submit(self, data: JSON) -> None:
|
||||||
|
if self.__recv_handler is None:
|
||||||
|
return
|
||||||
|
await self.__recv_handler(data)
|
||||||
|
|
||||||
|
def _on_send(self, handler: MessageHandler) -> None:
|
||||||
|
self.__send_handler = handler
|
||||||
|
|
||||||
|
|
||||||
|
class InternalExchanger:
|
||||||
|
|
||||||
|
__transports = set[InternalTransport]()
|
||||||
|
|
||||||
|
async def __send_handler(self, sender: InternalTransport, data: JSON) -> None:
|
||||||
|
async with asyncio.TaskGroup() as group:
|
||||||
|
for receiver in self.__transports:
|
||||||
|
if receiver is sender:
|
||||||
|
continue
|
||||||
|
group.create_task(receiver._submit(data))
|
||||||
|
|
||||||
|
def register_transport(self, transport: InternalTransport) -> None:
|
||||||
|
if transport in self.__transports:
|
||||||
|
return
|
||||||
|
self.__transports.add(transport)
|
||||||
|
transport._on_send(partial(self.__send_handler, transport))
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import (
|
||||||
|
ABC,
|
||||||
|
abstractmethod,
|
||||||
|
)
|
||||||
|
from enum import (
|
||||||
|
Enum,
|
||||||
|
auto,
|
||||||
|
)
|
||||||
|
from typing import (
|
||||||
|
Awaitable,
|
||||||
|
Callable,
|
||||||
|
TypeAlias,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ..our_types import (
|
||||||
|
JSON,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TransmissionSuccess(Enum):
|
||||||
|
UNKNOWN_ERROR = auto()
|
||||||
|
EXCHANGE_UNAVAILABLE = auto()
|
||||||
|
SUBMITTED_SUCCESSFULLY = auto()
|
||||||
|
|
||||||
|
|
||||||
|
MessageHandler: TypeAlias = Callable[[JSON], Awaitable[None]]
|
||||||
|
|
||||||
|
|
||||||
|
class TransportMethod(ABC):
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def send(self, data: JSON) -> TransmissionSuccess: ...
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def on_receive(self, handler: MessageHandler) -> None:
|
||||||
|
"""sets the handler, there can only be one set"""
|
||||||
|
...
|
||||||
@ -0,0 +1,23 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from collections.abc import (
|
||||||
|
Mapping,
|
||||||
|
Sequence,
|
||||||
|
)
|
||||||
|
from decimal import (
|
||||||
|
Decimal,
|
||||||
|
)
|
||||||
|
from typing import (
|
||||||
|
TypeAlias,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
JSON: TypeAlias = Union[
|
||||||
|
Mapping[str, "JSON"],
|
||||||
|
Sequence["JSON"],
|
||||||
|
str,
|
||||||
|
int,
|
||||||
|
Decimal,
|
||||||
|
bool,
|
||||||
|
]
|
||||||
@ -0,0 +1,49 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
)
|
||||||
|
from unittest import (
|
||||||
|
IsolatedAsyncioTestCase,
|
||||||
|
)
|
||||||
|
from unittest.mock import (
|
||||||
|
MagicMock,
|
||||||
|
)
|
||||||
|
|
||||||
|
from enjo_lib.comm.internal import (
|
||||||
|
InternalExchanger,
|
||||||
|
InternalTransport,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# adapted from https://stackoverflow.com/a/32498408
|
||||||
|
class AsyncCallableMock(MagicMock):
|
||||||
|
async def __call__(self, *args: Any, **kwargs: Any) -> Any:
|
||||||
|
return super().__call__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class InternalTransportTwoPeers(IsolatedAsyncioTestCase):
|
||||||
|
|
||||||
|
async def test_simple_two_peers(self) -> None:
|
||||||
|
# TODO move much stuff to setup methods
|
||||||
|
exchange = InternalExchanger()
|
||||||
|
# setup peers
|
||||||
|
peer_alice = InternalTransport()
|
||||||
|
exchange.register_transport(peer_alice)
|
||||||
|
peer_bob = InternalTransport()
|
||||||
|
exchange.register_transport(peer_bob)
|
||||||
|
# setup mock receiving services
|
||||||
|
mock_alice = AsyncCallableMock()
|
||||||
|
peer_alice.on_receive(mock_alice)
|
||||||
|
mock_bob = AsyncCallableMock()
|
||||||
|
peer_bob.on_receive(mock_bob)
|
||||||
|
# send from Alice to Bob
|
||||||
|
await peer_alice.send("test Alice -> Bob")
|
||||||
|
mock_alice.assert_not_called()
|
||||||
|
mock_bob.assert_called_once_with("test Alice -> Bob")
|
||||||
|
mock_alice.reset_mock()
|
||||||
|
mock_bob.reset_mock()
|
||||||
|
# send from Bob to Alice
|
||||||
|
await peer_bob.send("test Bob -> Alice")
|
||||||
|
mock_alice.assert_called_once_with("test Bob -> Alice")
|
||||||
|
mock_bob.assert_not_called()
|
||||||
Loading…
Reference in New Issue