nft-update-addresses: select SLAAC prefix based on lifetimes

main
Felix Stupp 1 year ago
parent d3b8d4cc14
commit 2f19d11aff
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -363,6 +363,7 @@ def monitor_ip(
class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
# TODO regularly check (i.e. 1 hour) if stored lists are still correct
slaac_prefix: IPv6Interface | None
def __init__(
self,
@ -373,6 +374,7 @@ class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
self.lock = RLock()
self.config = config
self.addrs = dict[IPInterface, IpAddressUpdate]()
self.slaac_prefix = None
def _update_stack(self, data: Sequence[IpAddressUpdate]) -> None:
nft_updates = tuple(
@ -416,14 +418,15 @@ class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
if changed:
yield from self.__update_network_sets(data.ip, data.deleted)
# even if "not changed", still check SLAAC rules because of lifetimes
# ignore unique link locals for SLAAC sets
if data.ip.version != 6 or data.ip in IPv6_ULA_NET:
return
# TODO track on IPv6 which are valid & so auto select for which prefix to apply rules for
slaac_prefix = self.__select_slaac_prefix()
if self.slaac_prefix == slaac_prefix:
return # no SLAAC updates required
self.slaac_prefix = slaac_prefix
logger.info(f"{self.config.ifname}: change main SLAAC prefix to {slaac_prefix}")
yield from (
self.__empty_slaac_sets()
if not data.deleted
else self.__update_slaac_sets(data.ip)
if slaac_prefix is None
else self.__update_slaac_sets(slaac_prefix)
)
def __update_network_sets(
@ -499,6 +502,26 @@ class InterfaceUpdateHandler(UpdateStackHandler[IpAddressUpdate]):
values=tuple(),
)
def __select_slaac_prefix(self) -> IPv6Interface | None:
now = datetime.now()
valid = tuple(data for data in self.addrs.values() if data.ip.version == 6)
if len(valid) <= 0:
return None
selected = max(
valid,
key=lambda data: (
# prefer valid
1 if now < data.valid_until else 0,
# prefer global unicast addresses
1 if data.ip not in IPv6_ULA_NET else 0,
# if preferred, take longest preferred
max(now, data.preferred_until),
# otherwise longest valid
data.valid_until,
),
)
return cast(IPv6Interface, selected.ip)
def gen_set_definitions(self) -> str:
output = []
for ip_v in [4, 6]:

Loading…
Cancel
Save