nft-update-addresses: add support for parsing lifetimes on IP updates

main
Felix Stupp 1 month ago
parent ca3093ddb1
commit cba1292bd9
Signed by: zocker
GPG Key ID: 93E1BD26F6B02FB7

@ -43,6 +43,7 @@ import traceback
from typing import ( from typing import (
Any, Any,
Iterable, Iterable,
Literal,
NewType, NewType,
NoReturn, NoReturn,
Protocol, Protocol,
@ -237,7 +238,18 @@ IP_MON_PATTERN = re.compile(
(?P<flags>(?:(\S+)\s)*) # (single spaces required for parser below to work correctly) (?P<flags>(?:(\S+)\s)*) # (single spaces required for parser below to work correctly)
(?:\S+)? # random interface name repetition on inet (?:\S+)? # random interface name repetition on inet
[\\]\s+ [\\]\s+
.* # lifetimes which are not interesting yet valid_lft\s+(
(?P<valid_lft_sec>\d+)sec
|
(?P<valid_lft_forever>forever)
)
\s+
preferred_lft\s+(
(?P<preferred_lft_sec>\d+)sec
|
(?P<preferred_lft_forever>forever)
)
\s*
$""" $"""
) )
@ -253,9 +265,11 @@ class IpAddressUpdate:
ip: IPv4Interface | IPv6Interface ip: IPv4Interface | IPv6Interface
scope: str scope: str
flags: IpFlag flags: IpFlag
valid_lft: int | Literal["forever"]
preferred_lft: int | Literal["forever"]
@staticmethod @classmethod
def parse_line(line: str) -> IpAddressUpdate: def parse_line(cls, line: str) -> IpAddressUpdate:
m = IP_MON_PATTERN.search(line) m = IP_MON_PATTERN.search(line)
if not m: if not m:
raise Exception(f"Could not parse ip monitor output: {line!r}") raise Exception(f"Could not parse ip monitor output: {line!r}")
@ -277,7 +291,22 @@ class IpAddressUpdate:
ip=ip, ip=ip,
scope=grp["scope"], scope=grp["scope"],
flags=flags, flags=flags,
valid_lft=cls.parse_lifetime(grp, "valid"),
preferred_lft=cls.parse_lifetime(grp, "preferred"),
)
@staticmethod
def parse_lifetime(
grp: Mapping[str, str | None], name: str
) -> int | Literal["forever"]:
if grp[f"{name}_lft_forever"] != None:
return "forever"
sec = grp[f"{name}_lft_sec"]
if sec == None:
raise ValueError(
"IP address update parse error: expected regex group for seconds != None (bug in code)"
) )
return int(sec) # type: ignore[arg-type]
def kickoff_ip( def kickoff_ip(

Loading…
Cancel
Save