wgengine/{magicsock,userspace}: move portupdates to the eventbus

Updates #15160

Signed-off-by: Claus Lensbøl <claus@tailscale.com>
cmol/portupdate_eventbus_direct
Claus Lensbøl 2 months ago
parent 206d98e84b
commit f54d2f3f0e
No known key found for this signature in database
GPG Key ID: 060429CBEC62B1B4

@ -180,6 +180,7 @@ type Conn struct {
// config changes between magicsock and wireguard. // config changes between magicsock and wireguard.
syncPub *eventbus.Publisher[syncPoint] syncPub *eventbus.Publisher[syncPoint]
allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq]
portUpdatePub *eventbus.Publisher[PortUpdate]
// pconn4 and pconn6 are the underlying UDP sockets used to // pconn4 and pconn6 are the underlying UDP sockets used to
// send/receive packets for wireguard and other magicsock // send/receive packets for wireguard and other magicsock
@ -394,10 +395,6 @@ type Conn struct {
// wgPinger is the WireGuard only pinger used for latency measurements. // wgPinger is the WireGuard only pinger used for latency measurements.
wgPinger lazy.SyncValue[*ping.Pinger] wgPinger lazy.SyncValue[*ping.Pinger]
// onPortUpdate is called with the new port when magicsock rebinds to
// a new port.
onPortUpdate func(port uint16, network string)
// getPeerByKey optionally specifies a function to look up a peer's // getPeerByKey optionally specifies a function to look up a peer's
// wireguard state by its public key. If nil, it's not used. // wireguard state by its public key. If nil, it's not used.
getPeerByKey func(key.NodePublic) (_ wgint.Peer, ok bool) getPeerByKey func(key.NodePublic) (_ wgint.Peer, ok bool)
@ -492,10 +489,6 @@ type Options struct {
// If nil, they're ignored and not updated. // If nil, they're ignored and not updated.
ControlKnobs *controlknobs.Knobs ControlKnobs *controlknobs.Knobs
// OnPortUpdate is called with the new port when magicsock rebinds to
// a new port.
OnPortUpdate func(port uint16, network string)
// PeerByKeyFunc optionally specifies a function to look up a peer's // PeerByKeyFunc optionally specifies a function to look up a peer's
// WireGuard state by its public key. If nil, it's not used. // WireGuard state by its public key. If nil, it's not used.
// In regular use, this will be wgengine.(*userspaceEngine).PeerByKey. // In regular use, this will be wgengine.(*userspaceEngine).PeerByKey.
@ -735,6 +728,7 @@ func NewConn(opts Options) (*Conn, error) {
cli := c.eventBus.Client("magicsock.Conn") cli := c.eventBus.Client("magicsock.Conn")
c.syncPub = eventbus.Publish[syncPoint](cli) c.syncPub = eventbus.Publish[syncPoint](cli)
c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli)
c.portUpdatePub = eventbus.Publish[PortUpdate](cli)
c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli))
c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background())
@ -759,7 +753,6 @@ func NewConn(opts Options) (*Conn, error) {
c.netMon = opts.NetMon c.netMon = opts.NetMon
c.health = opts.HealthTracker c.health = opts.HealthTracker
c.onPortUpdate = opts.OnPortUpdate
c.getPeerByKey = opts.PeerByKeyFunc c.getPeerByKey = opts.PeerByKeyFunc
if err := c.rebind(keepCurrentPort); err != nil { if err := c.rebind(keepCurrentPort); err != nil {
@ -3471,6 +3464,12 @@ func (c *Conn) listenPacket(network string, port uint16) (nettype.PacketConn, er
return nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, c.netMon)).ListenPacket(ctx, network, addr) return nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, c.netMon)).ListenPacket(ctx, network, addr)
} }
// PortUpdate is an eventbus value, reporting the port and address family of a magicsock connection.
type PortUpdate struct {
UDPPort uint16
EndpointNetwork string // either "udp4" or "udp6".
}
// bindSocket binds a UDP socket to ruc. // bindSocket binds a UDP socket to ruc.
// Network indicates the UDP socket type; it must be "udp4" or "udp6". // Network indicates the UDP socket type; it must be "udp4" or "udp6".
// If ruc had an existing UDP socket bound, it closes that socket. // If ruc had an existing UDP socket bound, it closes that socket.
@ -3531,7 +3530,7 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur
c.logf("magicsock: unable to bind %v port %d: %v", network, port, err) c.logf("magicsock: unable to bind %v port %d: %v", network, port, err)
continue continue
} }
if c.onPortUpdate != nil { if c.portUpdatePub.ShouldPublish() {
_, gotPortStr, err := net.SplitHostPort(pconn.LocalAddr().String()) _, gotPortStr, err := net.SplitHostPort(pconn.LocalAddr().String())
if err != nil { if err != nil {
c.logf("could not parse port from %s: %w", pconn.LocalAddr().String(), err) c.logf("could not parse port from %s: %w", pconn.LocalAddr().String(), err)
@ -3540,7 +3539,10 @@ func (c *Conn) bindSocket(ruc *RebindingUDPConn, network string, curPortFate cur
if err != nil { if err != nil {
c.logf("could not parse port from %s: %w", gotPort, err) c.logf("could not parse port from %s: %w", gotPort, err)
} else { } else {
c.onPortUpdate(uint16(gotPort), network) c.portUpdatePub.Publish(PortUpdate{
UDPPort: uint16(gotPort),
EndpointNetwork: network,
})
} }
} }
} }

@ -14,6 +14,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
@ -65,10 +66,7 @@ type linuxRouter struct {
addrs map[netip.Prefix]bool addrs map[netip.Prefix]bool
routes map[netip.Prefix]bool routes map[netip.Prefix]bool
localRoutes map[netip.Prefix]bool localRoutes map[netip.Prefix]bool
snatSubnetRoutes bool
statefulFiltering bool statefulFiltering bool
netfilterMode preftype.NetfilterMode
netfilterKind string
// ruleRestorePending is whether a timer has been started to // ruleRestorePending is whether a timer has been started to
// restore deleted ip rules. // restore deleted ip rules.
@ -86,6 +84,10 @@ type linuxRouter struct {
cmd commandRunner cmd commandRunner
nfr linuxfw.NetfilterRunner nfr linuxfw.NetfilterRunner
mu sync.Mutex
snatSubnetRoutes bool
netfilterMode preftype.NetfilterMode
netfilterKind string
magicsockPortV4 uint16 magicsockPortV4 uint16
magicsockPortV6 uint16 magicsockPortV6 uint16
} }
@ -540,6 +542,8 @@ func (r *linuxRouter) updateStatefulFilteringWithDockerWarning(cfg *router.Confi
// UpdateMagicsockPort implements the Router interface. // UpdateMagicsockPort implements the Router interface.
func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error { func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error {
r.mu.Lock()
defer r.mu.Unlock()
if r.nfr == nil { if r.nfr == nil {
if err := r.setupNetfilter(r.netfilterKind); err != nil { if err := r.setupNetfilter(r.netfilterKind); err != nil {
return fmt.Errorf("could not setup netfilter: %w", err) return fmt.Errorf("could not setup netfilter: %w", err)
@ -595,6 +599,8 @@ func (r *linuxRouter) UpdateMagicsockPort(port uint16, network string) error {
// reflect the new mode, and r.snatSubnetRoutes is updated to reflect // reflect the new mode, and r.snatSubnetRoutes is updated to reflect
// the current state of subnet SNATing. // the current state of subnet SNATing.
func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error { func (r *linuxRouter) setNetfilterMode(mode preftype.NetfilterMode) error {
r.mu.Lock()
defer r.mu.Unlock()
if !platformCanNetfilter() { if !platformCanNetfilter() {
mode = netfilterOff mode = netfilterOff
} }

@ -397,13 +397,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
e.RequestStatus() e.RequestStatus()
} }
onPortUpdate := func(port uint16, network string) {
e.logf("onPortUpdate(port=%v, network=%s)", port, network)
if err := e.router.UpdateMagicsockPort(port, network); err != nil {
e.logf("UpdateMagicsockPort(port=%v, network=%s) failed: %v", port, network, err)
}
}
magicsockOpts := magicsock.Options{ magicsockOpts := magicsock.Options{
EventBus: e.eventBus, EventBus: e.eventBus,
Logf: logf, Logf: logf,
@ -416,7 +409,6 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
HealthTracker: e.health, HealthTracker: e.health,
Metrics: conf.Metrics, Metrics: conf.Metrics,
ControlKnobs: conf.ControlKnobs, ControlKnobs: conf.ControlKnobs,
OnPortUpdate: onPortUpdate,
PeerByKeyFunc: e.PeerByKey, PeerByKeyFunc: e.PeerByKey,
} }
@ -557,6 +549,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error)
// [eventbus.Client] is closed. // [eventbus.Client] is closed.
func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) {
changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli) changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli)
portUpdateSub := eventbus.Subscribe[magicsock.PortUpdate](cli)
return func(cli *eventbus.Client) { return func(cli *eventbus.Client) {
for { for {
select { select {
@ -567,6 +560,11 @@ func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*even
f() f()
} }
e.linkChange(&changeDelta) e.linkChange(&changeDelta)
case pu := <-portUpdateSub.Events():
e.logf("portUpdate(port=%v, network=%s)", pu.UDPPort, pu.EndpointNetwork)
if err := e.router.UpdateMagicsockPort(pu.UDPPort, pu.EndpointNetwork); err != nil {
e.logf("UpdateMagicsockPort(port=%v, network=%s) failed: %v", pu.UDPPort, pu.EndpointNetwork, err)
}
} }
} }
} }

Loading…
Cancel
Save