From f54d2f3f0e32f2f727316a9f4f1f082f2a007ebd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20Lensb=C3=B8l?= Date: Fri, 3 Oct 2025 09:51:31 -0400 Subject: [PATCH] wgengine/{magicsock,userspace}: move portupdates to the eventbus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updates #15160 Signed-off-by: Claus Lensbøl --- wgengine/magicsock/magicsock.go | 24 +++++++++++++----------- wgengine/router/osrouter/router_linux.go | 16 +++++++++++----- wgengine/userspace.go | 14 ++++++-------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index e3cf249c5..bf5c4240a 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -180,6 +180,7 @@ type Conn struct { // config changes between magicsock and wireguard. syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] + portUpdatePub *eventbus.Publisher[PortUpdate] // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -394,10 +395,6 @@ type Conn struct { // wgPinger is the WireGuard only pinger used for latency measurements. wgPinger lazy.SyncValue[*ping.Pinger] - // onPortUpdate is called with the new port when magicsock rebinds to - // a new port. - onPortUpdate func(port uint16, network string) - // getPeerByKey optionally specifies a function to look up a peer's // wireguard state by its public key. If nil, it's not used. getPeerByKey func(key.NodePublic) (_ wgint.Peer, ok bool) @@ -492,10 +489,6 @@ type Options struct { // If nil, they're ignored and not updated. ControlKnobs *controlknobs.Knobs - // OnPortUpdate is called with the new port when magicsock rebinds to - // a new port. - OnPortUpdate func(port uint16, network string) - // PeerByKeyFunc optionally specifies a function to look up a peer's // WireGuard state by its public key. If nil, it's not used. // In regular use, this will be wgengine.(*userspaceEngine).PeerByKey. @@ -735,6 +728,7 @@ func NewConn(opts Options) (*Conn, error) { cli := c.eventBus.Client("magicsock.Conn") c.syncPub = eventbus.Publish[syncPoint](cli) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) + c.portUpdatePub = eventbus.Publish[PortUpdate](cli) c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) @@ -759,7 +753,6 @@ func NewConn(opts Options) (*Conn, error) { c.netMon = opts.NetMon c.health = opts.HealthTracker - c.onPortUpdate = opts.OnPortUpdate c.getPeerByKey = opts.PeerByKeyFunc if err := c.rebind(keepCurrentPort); err != nil { @@ -3471,6 +3464,12 @@ func (c *Conn) listenPacket(network string, port uint16) (nettype.PacketConn, er return nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, c.netMon)).ListenPacket(ctx, network, addr) } +// PortUpdate is an eventbus value, reporting the port and address family of a magicsock connection. +type PortUpdate struct { + UDPPort uint16 + EndpointNetwork string // either "udp4" or "udp6". +} + // bindSocket binds a UDP socket to ruc. // Network indicates the UDP socket type; it must be "udp4" or "udp6". // If ruc had an existing UDP socket bound, it closes that socket. @@ -3531,7 +3530,7 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur c.logf("magicsock: unable to bind %v port %d: %v", network, port, err) continue } - if c.onPortUpdate != nil { + if c.portUpdatePub.ShouldPublish() { _, gotPortStr, err := net.SplitHostPort(pconn.LocalAddr().String()) if err != nil { c.logf("could not parse port from %s: %w", pconn.LocalAddr().String(), err) @@ -3540,7 +3539,10 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur if err != nil { c.logf("could not parse port from %s: %w", gotPort, err) } else { - c.onPortUpdate(uint16(gotPort), network) + c.portUpdatePub.Publish(PortUpdate{ + UDPPort: uint16(gotPort), + EndpointNetwork: network, + }) } } } diff --git a/wgengine/router/osrouter/router_linux.go b/wgengine/router/osrouter/router_linux.go index 1f825b917..47a1d8246 100644 --- a/wgengine/router/osrouter/router_linux.go +++ b/wgengine/router/osrouter/router_linux.go @@ -14,6 +14,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -65,10 +66,7 @@ type linuxRouter struct { addrs map[netip.Prefix]bool routes map[netip.Prefix]bool localRoutes map[netip.Prefix]bool - snatSubnetRoutes bool statefulFiltering bool - netfilterMode preftype.NetfilterMode - netfilterKind string // ruleRestorePending is whether a timer has been started to // restore deleted ip rules. @@ -86,8 +84,12 @@ type linuxRouter struct { cmd commandRunner nfr linuxfw.NetfilterRunner - magicsockPortV4 uint16 - magicsockPortV6 uint16 + mu sync.Mutex + snatSubnetRoutes bool + netfilterMode preftype.NetfilterMode + netfilterKind string + magicsockPortV4 uint16 + magicsockPortV6 uint16 } func newUserspaceRouter(logf logger.Logf, tunDev tun.Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus) (router.Router, error) { @@ -540,6 +542,8 @@ func (r *linuxRouter) updateStatefulFilteringWithDockerWarning(cfg *router.Confi // UpdateMagicsockPort implements the Router interface. func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error { + r.mu.Lock() + defer r.mu.Unlock() if r.nfr == nil { if err := r.setupNetfilter(r.netfilterKind); err != nil { return fmt.Errorf("could not setup netfilter: %w", err) @@ -595,6 +599,8 @@ func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error { // reflect the new mode, and r.snatSubnetRoutes is updated to reflect // the current state of subnet SNATing. func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error { + r.mu.Lock() + defer r.mu.Unlock() if !platformCanNetfilter() { mode = netfilterOff } diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 735181ec7..8730abe1d 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -397,13 +397,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) e.RequestStatus() } - onPortUpdate := func(port uint16, network string) { - e.logf("onPortUpdate(port=%v, network=%s)", port, network) - - if err := e.router.UpdateMagicsockPort(port, network); err != nil { - e.logf("UpdateMagicsockPort(port=%v, network=%s) failed: %v", port, network, err) - } - } magicsockOpts := magicsock.Options{ EventBus: e.eventBus, Logf: logf, @@ -416,7 +409,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) HealthTracker: e.health, Metrics: conf.Metrics, ControlKnobs: conf.ControlKnobs, - OnPortUpdate: onPortUpdate, PeerByKeyFunc: e.PeerByKey, } @@ -557,6 +549,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) // [eventbus.Client] is closed. func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli) + portUpdateSub := eventbus.Subscribe[magicsock.PortUpdate](cli) return func(cli *eventbus.Client) { for { select { @@ -567,6 +560,11 @@ func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*even f() } e.linkChange(&changeDelta) + case pu := <-portUpdateSub.Events(): + e.logf("portUpdate(port=%v, network=%s)", pu.UDPPort, pu.EndpointNetwork) + if err := e.router.UpdateMagicsockPort(pu.UDPPort, pu.EndpointNetwork); err != nil { + e.logf("UpdateMagicsockPort(port=%v, network=%s) failed: %v", pu.UDPPort, pu.EndpointNetwork, err) + } } } }