1
0
Fork 0

Compare commits

...

8 Commits

@ -1,11 +1,7 @@
{
"python.testing.unittestArgs": [
"-v",
"-s",
"./test",
"-p",
"*_test.py"
"python.testing.pytestArgs": [
"test"
],
"python.testing.pytestEnabled": false,
"python.testing.unittestEnabled": true
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}

@ -0,0 +1,9 @@
from .interface import (
TransmissionSuccess,
TransportMethod,
)
from .internal import (
InternalExchanger,
InternalTransport,
)

@ -14,10 +14,6 @@ from typing import (
TypeAlias,
)
from ...our_types import (
JSON,
)
class TransmissionSuccess(Enum):
UNKNOWN_ERROR = auto()
@ -25,13 +21,13 @@ class TransmissionSuccess(Enum):
SUBMITTED_SUCCESSFULLY = auto()
MessageHandler: TypeAlias = Callable[[JSON], Awaitable[None]]
MessageHandler: TypeAlias = Callable[[bytes], Awaitable[None]]
class TransportMethod(ABC):
@abstractmethod
async def send(self, data: JSON) -> TransmissionSuccess: ...
async def send(self, data: bytes) -> TransmissionSuccess: ...
@abstractmethod
def on_receive(self, handler: MessageHandler) -> None:

@ -8,7 +8,6 @@ from .interface import (
TransportMethod,
TransmissionSuccess,
)
from ...our_types import JSON
class InternalTransport(TransportMethod):
@ -20,7 +19,7 @@ class InternalTransport(TransportMethod):
__recv_handler: MessageHandler | None = None
__send_handler: MessageHandler | None = None
async def send(self, data: JSON) -> TransmissionSuccess:
async def send(self, data: bytes) -> TransmissionSuccess:
if self.__send_handler is None:
return TransmissionSuccess.EXCHANGE_UNAVAILABLE
await self.__send_handler(data)
@ -29,7 +28,7 @@ class InternalTransport(TransportMethod):
def on_receive(self, handler: MessageHandler) -> None:
self.__recv_handler = handler
async def _submit(self, data: JSON) -> None:
async def _submit(self, data: bytes) -> None:
if self.__recv_handler is None:
return
await self.__recv_handler(data)
@ -42,7 +41,7 @@ class InternalExchanger:
__transports = set[InternalTransport]()
async def __send_handler(self, sender: InternalTransport, data: JSON) -> None:
async def __send_handler(self, sender: InternalTransport, data: bytes) -> None:
async with asyncio.TaskGroup() as group:
for receiver in self.__transports:
if receiver is sender:

@ -1,10 +1,16 @@
from __future__ import annotations
from collections.abc import (
Mapping,
Sequence,
)
from typing import (
Literal,
Type,
)
from attrs import (
define,
)
from ...our_types import (
@ -14,11 +20,13 @@ from ...our_types import (
JSON,
ServiceType,
ServiceClass,
serializable,
)
@serializable
@define(
frozen=True,
kw_only=True,
)
class GeneralTransportDatagram:
_version: int = 1
timestamp_ns: int
@ -27,7 +35,10 @@ class GeneralTransportDatagram:
data: JSON | GeneralTransportMetadata | None
@serializable
@define(
frozen=True,
kw_only=True,
)
class GeneralTransportMetadata:
pass
@ -35,13 +46,19 @@ class GeneralTransportMetadata:
# connect
@serializable
@define(
frozen=True,
kw_only=True,
)
class ConnectDatagram(GeneralTransportDatagram):
message_type: Literal["connect"] = "connect"
data: ConnectMetadata
@serializable
@define(
frozen=True,
kw_only=True,
)
class ConnectMetadata(GeneralTransportMetadata):
epoch: EpochId
type: ServiceType
@ -51,13 +68,19 @@ class ConnectMetadata(GeneralTransportMetadata):
# disconnect
@serializable
@define(
frozen=True,
kw_only=True,
)
class DisconnectDatagram(GeneralTransportDatagram):
message_type: Literal["disconnect"] = "disconnect"
data: DisconnectMetadata
@serializable
@define(
frozen=True,
kw_only=True,
)
class DisconnectMetadata(GeneralTransportMetadata):
epoch: EpochId
exceptional: bool
@ -67,14 +90,20 @@ class DisconnectMetadata(GeneralTransportMetadata):
# duplicate & replace sender types
@serializable
@define(
frozen=True,
kw_only=True,
)
class DuplicateSenderDatagram(GeneralTransportDatagram):
message_type: Literal["duplicate_sender"] = "duplicate_sender"
data: ConnectMetadata
@serializable
class ReplaceSenderDatagram(GeneralTransportMetadata):
@define(
frozen=True,
kw_only=True,
)
class ReplaceSenderDatagram(GeneralTransportDatagram):
message_type: Literal["replace_sender"] = "replace_sender"
data: ConnectMetadata
@ -82,13 +111,19 @@ class ReplaceSenderDatagram(GeneralTransportMetadata):
# failure
@serializable
@define(
frozen=True,
kw_only=True,
)
class FailureDatagram(GeneralTransportDatagram):
message_type: Literal["failure"] = "failure"
data: FailureMetadata
@serializable
@define(
frozen=True,
kw_only=True,
)
class FailureMetadata(GeneralTransportMetadata):
"""
Issues that a specific service has failed.
@ -105,7 +140,10 @@ class FailureMetadata(GeneralTransportMetadata):
# restart
@serializable
@define(
frozen=True,
kw_only=True,
)
class RestartDatagram(GeneralTransportDatagram):
"""issues a restart of a whole Enjo network"""
@ -116,7 +154,21 @@ class RestartDatagram(GeneralTransportDatagram):
# transmit
@serializable
@define(
frozen=True,
kw_only=True,
)
class TransmitDatagram(GeneralTransportDatagram):
message_type: Literal["transmit"] = "transmit"
data: JSON
MESSAGE_TYPE_MAP: Mapping[str, Type[GeneralTransportDatagram]] = {
"connect": ConnectDatagram,
"disconnect": DisconnectDatagram,
"duplicate_sender": DuplicateSenderDatagram,
"replace_sender": ReplaceSenderDatagram,
"failure": FailureDatagram,
"restart": RestartDatagram,
"transmit": TransmitDatagram,
}

@ -4,12 +4,6 @@ from collections.abc import (
Mapping,
Sequence,
)
from decimal import (
Decimal,
)
from functools import (
partial,
)
from typing import (
TypeAlias,
NewType,
@ -19,28 +13,17 @@ from uuid import (
UUID,
)
from attrs import (
define,
)
JSON: TypeAlias = Union[
Mapping[str, "JSON"],
Sequence["JSON"],
str,
int,
Decimal,
float,
bool,
]
serializable = partial(
define,
frozen=True,
kw_only=True,
)
ReverseDomainArg = NewType("ReverseDomainArg", str)
"""e.g. `"de.6nw.enjo.controller"`"""

@ -38,12 +38,12 @@ class InternalTransportTwoPeers(IsolatedAsyncioTestCase):
mock_bob = AsyncCallableMock()
peer_bob.on_receive(mock_bob)
# send from Alice to Bob
await peer_alice.send("test Alice -> Bob")
await peer_alice.send(b"test Alice -> Bob")
mock_alice.assert_not_called()
mock_bob.assert_called_once_with("test Alice -> Bob")
mock_bob.assert_called_once_with(b"test Alice -> Bob")
mock_alice.reset_mock()
mock_bob.reset_mock()
# send from Bob to Alice
await peer_bob.send("test Bob -> Alice")
mock_alice.assert_called_once_with("test Bob -> Alice")
await peer_bob.send(b"test Bob -> Alice")
mock_alice.assert_called_once_with(b"test Bob -> Alice")
mock_bob.assert_not_called()
Loading…
Cancel
Save