From 5e1eca6ff560477afb4436e8ece0646c86f147c4 Mon Sep 17 00:00:00 2001 From: Felix Stupp Date: Thu, 22 Feb 2024 00:58:48 +0100 Subject: [PATCH] implement TransportMethod ABC and internal method with simple test --- enjo_lib/__init__.py | 0 enjo_lib/comm/__init__.py | 0 enjo_lib/comm/internal.py | 56 ++++++++++++++++++++++++++++++++++++++ enjo_lib/comm/method.py | 39 ++++++++++++++++++++++++++ enjo_lib/our_types.py | 23 ++++++++++++++++ test/__init__.py | 0 test/comm/__init__.py | 0 test/comm/internal_test.py | 49 +++++++++++++++++++++++++++++++++ 8 files changed, 167 insertions(+) create mode 100644 enjo_lib/__init__.py create mode 100644 enjo_lib/comm/__init__.py create mode 100644 enjo_lib/comm/internal.py create mode 100644 enjo_lib/comm/method.py create mode 100644 enjo_lib/our_types.py create mode 100644 test/__init__.py create mode 100644 test/comm/__init__.py create mode 100644 test/comm/internal_test.py diff --git a/enjo_lib/__init__.py b/enjo_lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/enjo_lib/comm/__init__.py b/enjo_lib/comm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/enjo_lib/comm/internal.py b/enjo_lib/comm/internal.py new file mode 100644 index 0000000..c782d3e --- /dev/null +++ b/enjo_lib/comm/internal.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import asyncio +from functools import partial + +from .method import ( + MessageHandler, + TransportMethod, + TransmissionSuccess, +) +from ..our_types import JSON + + +class InternalTransport(TransportMethod): + """transport method for multiple enjo services running together in the same Python thread. + built for asyncio, not for communicating across multiple threads. + + create one for each service""" + + __recv_handler: MessageHandler | None = None + __send_handler: MessageHandler | None = None + + async def send(self, data: JSON) -> TransmissionSuccess: + if self.__send_handler is None: + return TransmissionSuccess.EXCHANGE_UNAVAILABLE + await self.__send_handler(data) + return TransmissionSuccess.SUBMITTED_SUCCESSFULLY + + def on_receive(self, handler: MessageHandler) -> None: + self.__recv_handler = handler + + async def _submit(self, data: JSON) -> None: + if self.__recv_handler is None: + return + await self.__recv_handler(data) + + def _on_send(self, handler: MessageHandler) -> None: + self.__send_handler = handler + + +class InternalExchanger: + + __transports = set[InternalTransport]() + + async def __send_handler(self, sender: InternalTransport, data: JSON) -> None: + async with asyncio.TaskGroup() as group: + for receiver in self.__transports: + if receiver is sender: + continue + group.create_task(receiver._submit(data)) + + def register_transport(self, transport: InternalTransport) -> None: + if transport in self.__transports: + return + self.__transports.add(transport) + transport._on_send(partial(self.__send_handler, transport)) diff --git a/enjo_lib/comm/method.py b/enjo_lib/comm/method.py new file mode 100644 index 0000000..6f99cee --- /dev/null +++ b/enjo_lib/comm/method.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from abc import ( + ABC, + abstractmethod, +) +from enum import ( + Enum, + auto, +) +from typing import ( + Awaitable, + Callable, + TypeAlias, +) + +from ..our_types import ( + JSON, +) + + +class TransmissionSuccess(Enum): + UNKNOWN_ERROR = auto() + EXCHANGE_UNAVAILABLE = auto() + SUBMITTED_SUCCESSFULLY = auto() + + +MessageHandler: TypeAlias = Callable[[JSON], Awaitable[None]] + + +class TransportMethod(ABC): + + @abstractmethod + async def send(self, data: JSON) -> TransmissionSuccess: ... + + @abstractmethod + def on_receive(self, handler: MessageHandler) -> None: + """sets the handler, there can only be one set""" + ... diff --git a/enjo_lib/our_types.py b/enjo_lib/our_types.py new file mode 100644 index 0000000..1c7863c --- /dev/null +++ b/enjo_lib/our_types.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from collections.abc import ( + Mapping, + Sequence, +) +from decimal import ( + Decimal, +) +from typing import ( + TypeAlias, + Union, +) + + +JSON: TypeAlias = Union[ + Mapping[str, "JSON"], + Sequence["JSON"], + str, + int, + Decimal, + bool, +] diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/comm/__init__.py b/test/comm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/comm/internal_test.py b/test/comm/internal_test.py new file mode 100644 index 0000000..1650cc5 --- /dev/null +++ b/test/comm/internal_test.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import ( + Any, +) +from unittest import ( + IsolatedAsyncioTestCase, +) +from unittest.mock import ( + MagicMock, +) + +from enjo_lib.comm.internal import ( + InternalExchanger, + InternalTransport, +) + + +# adapted from https://stackoverflow.com/a/32498408 +class AsyncCallableMock(MagicMock): + async def __call__(self, *args: Any, **kwargs: Any) -> Any: + return super().__call__(*args, **kwargs) + + +class InternalTransportTwoPeers(IsolatedAsyncioTestCase): + + async def test_simple_two_peers(self) -> None: + # TODO move much stuff to setup methods + exchange = InternalExchanger() + # setup peers + peer_alice = InternalTransport() + exchange.register_transport(peer_alice) + peer_bob = InternalTransport() + exchange.register_transport(peer_bob) + # setup mock receiving services + mock_alice = AsyncCallableMock() + peer_alice.on_receive(mock_alice) + mock_bob = AsyncCallableMock() + peer_bob.on_receive(mock_bob) + # send from Alice to Bob + await peer_alice.send("test Alice -> Bob") + mock_alice.assert_not_called() + mock_bob.assert_called_once_with("test Alice -> Bob") + mock_alice.reset_mock() + mock_bob.reset_mock() + # send from Bob to Alice + await peer_bob.send("test Bob -> Alice") + mock_alice.assert_called_once_with("test Bob -> Alice") + mock_bob.assert_not_called()