nft-update-addresses: implement general set/map implementation

deprecating specific port exposed/forwarded support
main
Felix Stupp 1 month ago
parent b62b67257f
commit 6e74da5a70
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -32,6 +32,7 @@ import os
from pathlib import Path from pathlib import Path
import re import re
import shlex import shlex
from string import Template
import subprocess import subprocess
import threading import threading
from threading import ( from threading import (
@ -445,6 +446,17 @@ class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
for wan, lan in portMap.items() for wan, lan in portMap.items()
), ),
) )
slaacs_sub = {
f"ipv6_{self.config.ifname}_{mac}": addr.ip.compressed
for mac, addr in slaacs.items()
}
for one_set in self.config.sets:
yield NftUpdate(
obj_type=one_set.set_type,
obj_name=one_set.name,
operation=op,
values=tuple(one_set.sub_elements(slaacs_sub)),
)
def gen_set_definitions(self) -> str: def gen_set_definitions(self) -> str:
output = [] output = []
@ -479,6 +491,7 @@ class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
f"inet_service : {addr_type} . inet_service", f"inet_service : {addr_type} . inet_service",
) )
) )
output.extend(s.definition for s in self.config.sets)
return "\n".join(output) return "\n".join(output)
@ -595,8 +608,7 @@ class NftUpdateHandler(UpdateStackHandler[NftUpdate]):
class SystemdHandler(UpdateHandler[object]): class SystemdHandler(UpdateHandler[object]):
def update(self, data: object) -> None: def update(self, data: object) -> None:
# TODO improve status updates # TODO improve status updates
# daemon.notify("READY=1\nSTATUS=Updated successfully.\n") daemon.notify("READY=1\nSTATUS=operating …\n")
daemon.notify("READY=1\n")
def update_stack(self, data: Sequence[object]) -> None: def update_stack(self, data: Sequence[object]) -> None:
self.update(None) self.update(None)
@ -606,39 +618,78 @@ class SystemdHandler(UpdateHandler[object]):
frozen=True, frozen=True,
kw_only=True, kw_only=True,
) )
class ProtocolConfig: class SetConfig:
protocol: NftProtocol ifname: str
exposed: Mapping[MACAddress, Sequence[Port]] set_type: str
"only when direct public IPs are available" name: str
forwarded: Mapping[MACAddress, Mapping[Port, Port]] # wan -> lan data_type: str
"i.e. DNAT" flags: str | None
elements: Sequence[Template] = field()
@elements.validator
def __elem_validate(self, attribute: str, value: Sequence[Template]) -> None:
regex = self.__supported_vars
for temp in self.elements:
for var in temp.get_identifiers():
m = regex.search(var)
if m == None:
raise ValueError(
f"set {self.name!r} for if {self.ifname!r} uses invalid template variable {var!r}"
)
@staticmethod @property
def from_json(protocol: str, obj: JsonObj) -> ProtocolConfig: def __supported_vars(self) -> re.Pattern[str]:
assert set(obj.keys()) <= set(("exposed", "forwarded")) return re.compile(rf"^ipv6_{re.escape(self.ifname)}_(?P<mac>[0-9a-f]{{12}})$")
exposed_raw = obj.get("exposed")
exposed = defaultdict[MACAddress, list[Port]](list) @property
if exposed_raw is not None: def embedded_macs(self) -> Iterable[MACAddress]:
assert isinstance(exposed_raw, Sequence) regex = self.__supported_vars
for fwd in exposed_raw: for temp in self.elements:
assert isinstance(fwd, Mapping) for var in temp.get_identifiers():
dest = to_mac(fwd["dest"]) # type: ignore[arg-type] m = regex.search(var)
port = to_port(fwd["port"]) # type: ignore[arg-type] assert m != None
exposed[dest].append(port) yield to_mac(m.group("mac")) # type: ignore[union-attr]
forwarded_raw = obj.get("forwarded")
forwarded = defaultdict[MACAddress, dict[Port, Port]](dict) @property
if forwarded_raw is not None: def definition(self) -> str:
assert isinstance(forwarded_raw, Sequence) return gen_set_def(
for smap in forwarded_raw: set_type=self.set_type,
assert isinstance(smap, Mapping) name=self.name,
dest = to_mac(smap["dest"]) # type: ignore[arg-type] data_type=self.data_type,
wanPort = to_port(smap["wanPort"]) # type: ignore[arg-type] flags=self.flags,
lanPort = to_port(smap["lanPort"]) # type: ignore[arg-type] # non matching rules at the beginning (in static part)
forwarded[dest][wanPort] = lanPort # to verify that all supplied patterns are correct
return ProtocolConfig( # undefined address should be safest to use here, because:
protocol=NftProtocol(protocol), # - as src, it is valid, but if one can spoof this one, it can spoof other addresses (and routers should have simple anti-spoof mechanisms in place)
exposed=exposed, # - as dest, it is invalid
forwarded=forwarded, # - as NAT target, it is invalid
elements=self.sub_elements(defaultdict(lambda: "::")),
)
def sub_elements(self, substitutions: Mapping[str, str]) -> Sequence[str]:
return tuple(elem.substitute(substitutions) for elem in self.elements)
@classmethod
def from_json(cls, *, ifname: str, name: str, obj: JsonObj) -> SetConfig:
assert set(obj.keys()) <= set(("set_type", "name", "type", "flags", "elements"))
set_type = obj["set_type"]
assert isinstance(set_type, str)
data_type = obj["type"]
assert isinstance(data_type, str)
flags = obj.get("flags")
assert flags == None or isinstance(flags, str)
elements = obj["elements"]
assert isinstance(elements, Sequence) and all(
isinstance(elem, str) for elem in elements
)
templates = tuple(map(lambda s: Template(cast(str, s)), elements))
return SetConfig(
set_type=set_type,
ifname=ifname,
name=name,
data_type=data_type,
flags=cast(None | str, flags),
elements=templates,
) )
@ -650,6 +701,7 @@ class InterfaceConfig:
ifname: IfName ifname: IfName
macs_direct: Sequence[MACAddress] macs_direct: Sequence[MACAddress]
protocols: Sequence[ProtocolConfig] protocols: Sequence[ProtocolConfig]
sets: Sequence[SetConfig]
@cached_property @cached_property
def macs(self) -> Sequence[MACAddress]: def macs(self) -> Sequence[MACAddress]:
@ -659,17 +711,20 @@ class InterfaceConfig:
self.macs_direct, self.macs_direct,
(mac for proto in self.protocols for mac in proto.exposed.keys()), (mac for proto in self.protocols for mac in proto.exposed.keys()),
(mac for proto in self.protocols for mac in proto.forwarded.keys()), (mac for proto in self.protocols for mac in proto.forwarded.keys()),
(mac for one_set in self.sets for mac in one_set.embedded_macs),
) )
) )
) )
@staticmethod @staticmethod
def from_json(ifname: str, obj: JsonObj) -> InterfaceConfig: def from_json(ifname: str, obj: JsonObj) -> InterfaceConfig:
assert set(obj.keys()) <= set(("macs", "ports")) assert set(obj.keys()) <= set(("macs", "ports", "sets"))
macs = obj.get("macs") macs = obj.get("macs")
assert macs == None or isinstance(macs, Sequence) assert macs == None or isinstance(macs, Sequence)
ports = obj.get("ports") ports = obj.get("ports")
assert ports == None or isinstance(ports, Mapping) assert ports == None or isinstance(ports, Mapping)
sets = obj.get("sets")
assert sets == None or isinstance(sets, Mapping)
return InterfaceConfig( return InterfaceConfig(
ifname=IfName(ifname), ifname=IfName(ifname),
macs_direct=tuple() macs_direct=tuple()
@ -681,6 +736,9 @@ class InterfaceConfig:
ProtocolConfig.from_json(proto, cast(JsonObj, proto_cfg)) ProtocolConfig.from_json(proto, cast(JsonObj, proto_cfg))
for proto, proto_cfg in ports.items() # type: ignore[union-attr] for proto, proto_cfg in ports.items() # type: ignore[union-attr]
), ),
sets=tuple()
if sets == None
else tuple(SetConfig.from_json(ifname=ifname, name=name, obj=cast(JsonObj, one_set)) for name, one_set in sets.items()), # type: ignore[union-attr]
) )

Loading…
Cancel
Save