From 84777354a0d036c65fc7419f0395c2c0fc4e24d0 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 26 Jul 2023 13:59:42 -0700 Subject: [PATCH] wgengine/magicsock: factor out more separable parts Updates #8720 Signed-off-by: David Anderson --- wgengine/magicsock/batching_conn.go | 194 ++++ wgengine/magicsock/blockforever_conn.go | 53 + wgengine/magicsock/debugknobs.go | 5 + wgengine/magicsock/debugknobs_stubs.go | 2 + wgengine/magicsock/derp.go | 927 ++++++++++++++++ wgengine/magicsock/magicsock.go | 1286 +---------------------- wgengine/magicsock/rebinding_conn.go | 168 +++ 7 files changed, 1350 insertions(+), 1285 deletions(-) create mode 100644 wgengine/magicsock/batching_conn.go create mode 100644 wgengine/magicsock/blockforever_conn.go create mode 100644 wgengine/magicsock/derp.go create mode 100644 wgengine/magicsock/rebinding_conn.go diff --git a/wgengine/magicsock/batching_conn.go b/wgengine/magicsock/batching_conn.go new file mode 100644 index 000000000..69fcf7d09 --- /dev/null +++ b/wgengine/magicsock/batching_conn.go @@ -0,0 +1,194 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package magicsock + +import ( + "errors" + "net" + "net/netip" + "sync" + "sync/atomic" + "time" + + "golang.org/x/net/ipv6" + "tailscale.com/net/neterror" + "tailscale.com/types/nettype" +) + +// xnetBatchReaderWriter defines the batching i/o methods of +// golang.org/x/net/ipv4.PacketConn (and ipv6.PacketConn). +// TODO(jwhited): This should eventually be replaced with the standard library +// implementation of https://github.com/golang/go/issues/45886 +type xnetBatchReaderWriter interface { + xnetBatchReader + xnetBatchWriter +} + +type xnetBatchReader interface { + ReadBatch([]ipv6.Message, int) (int, error) +} + +type xnetBatchWriter interface { + WriteBatch([]ipv6.Message, int) (int, error) +} + +// batchingUDPConn is a UDP socket that provides batched i/o. +type batchingUDPConn struct { + pc nettype.PacketConn + xpc xnetBatchReaderWriter + rxOffload bool // supports UDP GRO or similar + txOffload atomic.Bool // supports UDP GSO or similar + setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing + getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing + sendBatchPool sync.Pool +} + +func (c *batchingUDPConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { + if c.rxOffload { + // UDP_GRO is opt-in on Linux via setsockopt(). Once enabled you may + // receive a "monster datagram" from any read call. The ReadFrom() API + // does not support passing the GSO size and is unsafe to use in such a + // case. Other platforms may vary in behavior, but we go with the most + // conservative approach to prevent this from becoming a footgun in the + // future. + return 0, netip.AddrPort{}, errors.New("rx UDP offload is enabled on this socket, single packet reads are unavailable") + } + return c.pc.ReadFromUDPAddrPort(p) +} + +func (c *batchingUDPConn) SetDeadline(t time.Time) error { + return c.pc.SetDeadline(t) +} + +func (c *batchingUDPConn) SetReadDeadline(t time.Time) error { + return c.pc.SetReadDeadline(t) +} + +func (c *batchingUDPConn) SetWriteDeadline(t time.Time) error { + return c.pc.SetWriteDeadline(t) +} + +const ( + // This was initially established for Linux, but may split out to + // GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the + // kernel's TX path, and UDP_GRO_CNT_MAX for RX. + udpSegmentMaxDatagrams = 64 +) + +const ( + // Exceeding these values results in EMSGSIZE. + maxIPv4PayloadLen = 1<<16 - 1 - 20 - 8 + maxIPv6PayloadLen = 1<<16 - 1 - 8 +) + +// coalesceMessages iterates msgs, coalescing them where possible while +// maintaining datagram order. All msgs have their Addr field set to addr. +func (c *batchingUDPConn) coalesceMessages(addr *net.UDPAddr, buffs [][]byte, msgs []ipv6.Message) int { + var ( + base = -1 // index of msg we are currently coalescing into + gsoSize int // segmentation size of msgs[base] + dgramCnt int // number of dgrams coalesced into msgs[base] + endBatch bool // tracking flag to start a new batch on next iteration of buffs + ) + maxPayloadLen := maxIPv4PayloadLen + if addr.IP.To4() == nil { + maxPayloadLen = maxIPv6PayloadLen + } + for i, buff := range buffs { + if i > 0 { + msgLen := len(buff) + baseLenBefore := len(msgs[base].Buffers[0]) + freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore + if msgLen+baseLenBefore <= maxPayloadLen && + msgLen <= gsoSize && + msgLen <= freeBaseCap && + dgramCnt < udpSegmentMaxDatagrams && + !endBatch { + msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...) + copy(msgs[base].Buffers[0][baseLenBefore:], buff) + if i == len(buffs)-1 { + c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) + } + dgramCnt++ + if msgLen < gsoSize { + // A smaller than gsoSize packet on the tail is legal, but + // it must end the batch. + endBatch = true + } + continue + } + } + if dgramCnt > 1 { + c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) + } + // Reset prior to incrementing base since we are preparing to start a + // new potential batch. + endBatch = false + base++ + gsoSize = len(buff) + msgs[base].OOB = msgs[base].OOB[:0] + msgs[base].Buffers[0] = buff + msgs[base].Addr = addr + dgramCnt = 1 + } + return base + 1 +} + +type sendBatch struct { + msgs []ipv6.Message + ua *net.UDPAddr +} + +func (c *batchingUDPConn) getSendBatch() *sendBatch { + batch := c.sendBatchPool.Get().(*sendBatch) + return batch +} + +func (c *batchingUDPConn) putSendBatch(batch *sendBatch) { + for i := range batch.msgs { + batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB} + } + c.sendBatchPool.Put(batch) +} + +func (c *batchingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { + batch := c.getSendBatch() + defer c.putSendBatch(batch) + if addr.Addr().Is6() { + as16 := addr.Addr().As16() + copy(batch.ua.IP, as16[:]) + batch.ua.IP = batch.ua.IP[:16] + } else { + as4 := addr.Addr().As4() + copy(batch.ua.IP, as4[:]) + batch.ua.IP = batch.ua.IP[:4] + } + batch.ua.Port = int(addr.Port()) + var ( + n int + retried bool + ) +retry: + if c.txOffload.Load() { + n = c.coalesceMessages(batch.ua, buffs, batch.msgs) + } else { + for i := range buffs { + batch.msgs[i].Buffers[0] = buffs[i] + batch.msgs[i].Addr = batch.ua + batch.msgs[i].OOB = batch.msgs[i].OOB[:0] + } + n = len(buffs) + } + + err := c.writeBatch(batch.msgs[:n]) + if err != nil && c.txOffload.Load() && neterror.ShouldDisableUDPGSO(err) { + c.txOffload.Store(false) + retried = true + goto retry + } + if retried { + return neterror.ErrUDPGSODisabled{OnLaddr: c.pc.LocalAddr().String(), RetryErr: err} + } + return err +} diff --git a/wgengine/magicsock/blockforever_conn.go b/wgengine/magicsock/blockforever_conn.go new file mode 100644 index 000000000..42b94bbd4 --- /dev/null +++ b/wgengine/magicsock/blockforever_conn.go @@ -0,0 +1,53 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package magicsock + +import ( + "errors" + "net" + "net/netip" + "sync" + "time" +) + +// blockForeverConn is a net.PacketConn whose reads block until it is closed. +type blockForeverConn struct { + mu sync.Mutex + cond *sync.Cond + closed bool +} + +func (c *blockForeverConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { + c.mu.Lock() + for !c.closed { + c.cond.Wait() + } + c.mu.Unlock() + return 0, netip.AddrPort{}, net.ErrClosed +} + +func (c *blockForeverConn) WriteToUDPAddrPort(p []byte, addr netip.AddrPort) (int, error) { + // Silently drop writes. + return len(p), nil +} + +func (c *blockForeverConn) LocalAddr() net.Addr { + // Return a *net.UDPAddr because lots of code assumes that it will. + return new(net.UDPAddr) +} + +func (c *blockForeverConn) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return net.ErrClosed + } + c.closed = true + c.cond.Broadcast() + return nil +} + +func (c *blockForeverConn) SetDeadline(t time.Time) error { return errors.New("unimplemented") } +func (c *blockForeverConn) SetReadDeadline(t time.Time) error { return errors.New("unimplemented") } +func (c *blockForeverConn) SetWriteDeadline(t time.Time) error { return errors.New("unimplemented") } diff --git a/wgengine/magicsock/debugknobs.go b/wgengine/magicsock/debugknobs.go index 6f42dfb22..d23f2ccbe 100644 --- a/wgengine/magicsock/debugknobs.go +++ b/wgengine/magicsock/debugknobs.go @@ -42,6 +42,11 @@ var ( // debugSendCallMeUnknownPeer sends a CallMeMaybe to a non-existent destination every // time we send a real CallMeMaybe to test the PeerGoneNotHere logic. debugSendCallMeUnknownPeer = envknob.RegisterBool("TS_DEBUG_SEND_CALLME_UNKNOWN_PEER") + // debugBindSocket prints extra debugging about socket rebinding in magicsock. + debugBindSocket = envknob.RegisterBool("TS_DEBUG_MAGICSOCK_BIND_SOCKET") + // debugRingBufferMaxSizeBytes overrides the default size of the endpoint + // history ringbuffer. + debugRingBufferMaxSizeBytes = envknob.RegisterInt("TS_DEBUG_MAGICSOCK_RING_BUFFER_MAX_SIZE_BYTES") // Hey you! Adding a new debugknob? Make sure to stub it out in the debugknob_stubs.go // file too. ) diff --git a/wgengine/magicsock/debugknobs_stubs.go b/wgengine/magicsock/debugknobs_stubs.go index 60fa01100..ed966cf7b 100644 --- a/wgengine/magicsock/debugknobs_stubs.go +++ b/wgengine/magicsock/debugknobs_stubs.go @@ -11,6 +11,7 @@ import "tailscale.com/types/opt" // // They're inlinable and the linker can deadcode that's guarded by them to make // smaller binaries. +func debugBindSocket() bool { return false } func debugDisco() bool { return false } func debugOmitLocalAddresses() bool { return false } func logDerpVerbose() bool { return false } @@ -22,4 +23,5 @@ func debugSendCallMeUnknownPeer() bool { return false } func debugUseDERPAddr() string { return "" } func debugUseDerpRouteEnv() string { return "" } func debugUseDerpRoute() opt.Bool { return "" } +func debugRingBufferMaxSizeBytes() int { return 0 } func inTest() bool { return false } diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go new file mode 100644 index 000000000..28d19daad --- /dev/null +++ b/wgengine/magicsock/derp.go @@ -0,0 +1,927 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package magicsock + +import ( + "bufio" + "context" + "fmt" + "hash/fnv" + "math/rand" + "net" + "net/netip" + "reflect" + "runtime" + "sort" + "sync" + "time" + + "github.com/tailscale/wireguard-go/conn" + "tailscale.com/control/controlclient" + "tailscale.com/derp" + "tailscale.com/derp/derphttp" + "tailscale.com/health" + "tailscale.com/logtail/backoff" + "tailscale.com/net/dnscache" + "tailscale.com/net/tsaddr" + "tailscale.com/syncs" + "tailscale.com/tailcfg" + "tailscale.com/types/key" + "tailscale.com/types/logger" + "tailscale.com/util/mak" + "tailscale.com/util/sysresources" +) + +// useDerpRoute reports whether magicsock should enable the DERP +// return path optimization (Issue 150). +func useDerpRoute() bool { + if b, ok := debugUseDerpRoute().Get(); ok { + return b + } + ob := controlclient.DERPRouteFlag() + if v, ok := ob.Get(); ok { + return v + } + return true // as of 1.21.x +} + +// derpRoute is a route entry for a public key, saying that a certain +// peer should be available at DERP node derpID, as long as the +// current connection for that derpID is dc. (but dc should not be +// used to write directly; it's owned by the read/write loops) +type derpRoute struct { + derpID int + dc *derphttp.Client // don't use directly; see comment above +} + +// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute. +func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { + c.mu.Lock() + defer c.mu.Unlock() + r2 := derpRoute{derpID, dc} + if r, ok := c.derpRoute[peer]; ok && r == r2 { + delete(c.derpRoute, peer) + } +} + +// addDerpPeerRoute adds a DERP route entry, noting that peer was seen +// on DERP node derpID, at least on the connection identified by dc. +// See issue 150 for details. +func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { + c.mu.Lock() + defer c.mu.Unlock() + mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc}) +} + +// activeDerp contains fields for an active DERP connection. +type activeDerp struct { + c *derphttp.Client + cancel context.CancelFunc + writeCh chan<- derpWriteRequest + // lastWrite is the time of the last request for its write + // channel (currently even if there was no write). + // It is always non-nil and initialized to a non-zero Time. + lastWrite *time.Time + createTime time.Time +} + +var processStartUnixNano = time.Now().UnixNano() + +// pickDERPFallback returns a non-zero but deterministic DERP node to +// connect to. This is only used if netcheck couldn't find the +// nearest one (for instance, if UDP is blocked and thus STUN latency +// checks aren't working). +// +// c.mu must NOT be held. +func (c *Conn) pickDERPFallback() int { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.wantDerpLocked() { + return 0 + } + ids := c.derpMap.RegionIDs() + if len(ids) == 0 { + // No DERP regions in non-nil map. + return 0 + } + + // TODO: figure out which DERP region most of our peers are using, + // and use that region as our fallback. + // + // If we already had selected something in the past and it has any + // peers, we want to stay on it. If there are no peers at all, + // stay on whatever DERP we previously picked. If we need to pick + // one and have no peer info, pick a region randomly. + // + // We used to do the above for legacy clients, but never updated + // it for disco. + + if c.myDerp != 0 { + return c.myDerp + } + + h := fnv.New64() + fmt.Fprintf(h, "%p/%d", c, processStartUnixNano) // arbitrary + return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))] +} + +func (c *Conn) derpRegionCodeLocked(regionID int) string { + if c.derpMap == nil { + return "" + } + if dr, ok := c.derpMap.Regions[regionID]; ok { + return dr.RegionCode + } + return "" +} + +// c.mu must NOT be held. +func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) { + c.mu.Lock() + defer c.mu.Unlock() + if !c.wantDerpLocked() { + c.myDerp = 0 + health.SetMagicSockDERPHome(0) + return false + } + if derpNum == c.myDerp { + // No change. + return true + } + if c.myDerp != 0 && derpNum != 0 { + metricDERPHomeChange.Add(1) + } + c.myDerp = derpNum + health.SetMagicSockDERPHome(derpNum) + + if c.privateKey.IsZero() { + // No private key yet, so DERP connections won't come up anyway. + // Return early rather than ultimately log a couple lines of noise. + return true + } + + // On change, notify all currently connected DERP servers and + // start connecting to our home DERP if we are not already. + dr := c.derpMap.Regions[derpNum] + if dr == nil { + c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum) + } else { + c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode) + } + for i, ad := range c.activeDerp { + go ad.c.NotePreferred(i == c.myDerp) + } + c.goDerpConnect(derpNum) + return true +} + +// startDerpHomeConnectLocked starts connecting to our DERP home, if any. +// +// c.mu must be held. +func (c *Conn) startDerpHomeConnectLocked() { + c.goDerpConnect(c.myDerp) +} + +// goDerpConnect starts a goroutine to start connecting to the given +// DERP node. +// +// c.mu may be held, but does not need to be. +func (c *Conn) goDerpConnect(node int) { + if node == 0 { + return + } + go c.derpWriteChanOfAddr(netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(node)), key.NodePublic{}) +} + +var ( + bufferedDerpWrites int + bufferedDerpWritesOnce sync.Once +) + +// bufferedDerpWritesBeforeDrop returns how many packets writes can be queued +// up the DERP client to write on the wire before we start dropping. +func bufferedDerpWritesBeforeDrop() int { + // For mobile devices, always return the previous minimum value of 32; + // we can do this outside the sync.Once to avoid that overhead. + if runtime.GOOS == "ios" || runtime.GOOS == "android" { + return 32 + } + + bufferedDerpWritesOnce.Do(func() { + // Some rough sizing: for the previous fixed value of 32, the + // total consumed memory can be: + // = numDerpRegions * messages/region * sizeof(message) + // + // For sake of this calculation, assume 100 DERP regions; at + // time of writing (2023-04-03), we have 24. + // + // A reasonable upper bound for the worst-case average size of + // a message is a *disco.CallMeMaybe message with 16 endpoints; + // since sizeof(netip.AddrPort) = 32, that's 512 bytes. Thus: + // = 100 * 32 * 512 + // = 1638400 (1.6MiB) + // + // On a reasonably-small node with 4GiB of memory that's + // connected to each region and handling a lot of load, 1.6MiB + // is about 0.04% of the total system memory. + // + // For sake of this calculation, then, let's double that memory + // usage to 0.08% and scale based on total system memory. + // + // For a 16GiB Linux box, this should buffer just over 256 + // messages. + systemMemory := sysresources.TotalMemory() + memoryUsable := float64(systemMemory) * 0.0008 + + const ( + theoreticalDERPRegions = 100 + messageMaximumSizeBytes = 512 + ) + bufferedDerpWrites = int(memoryUsable / (theoreticalDERPRegions * messageMaximumSizeBytes)) + + // Never drop below the previous minimum value. + if bufferedDerpWrites < 32 { + bufferedDerpWrites = 32 + } + }) + return bufferedDerpWrites +} + +// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that +// represent DERP servers, creating them as necessary. For real UDP +// addresses, it returns nil. +// +// If peer is non-zero, it can be used to find an active reverse +// path, without using addr. +func (c *Conn) derpWriteChanOfAddr(addr netip.AddrPort, peer key.NodePublic) chan<- derpWriteRequest { + if addr.Addr() != tailcfg.DerpMagicIPAddr { + return nil + } + regionID := int(addr.Port()) + + if c.networkDown() { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + if !c.wantDerpLocked() || c.closed { + return nil + } + if c.derpMap == nil || c.derpMap.Regions[regionID] == nil { + return nil + } + if c.privateKey.IsZero() { + c.logf("magicsock: DERP lookup of %v with no private key; ignoring", addr) + return nil + } + + // See if we have a connection open to that DERP node ID + // first. If so, might as well use it. (It's a little + // arbitrary whether we use this one vs. the reverse route + // below when we have both.) + ad, ok := c.activeDerp[regionID] + if ok { + *ad.lastWrite = time.Now() + c.setPeerLastDerpLocked(peer, regionID, regionID) + return ad.writeCh + } + + // If we don't have an open connection to the peer's home DERP + // node, see if we have an open connection to a DERP node + // where we'd heard from that peer already. For instance, + // perhaps peer's home is Frankfurt, but they dialed our home DERP + // node in SF to reach us, so we can reply to them using our + // SF connection rather than dialing Frankfurt. (Issue 150) + if !peer.IsZero() && useDerpRoute() { + if r, ok := c.derpRoute[peer]; ok { + if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc { + c.setPeerLastDerpLocked(peer, r.derpID, regionID) + *ad.lastWrite = time.Now() + return ad.writeCh + } + } + } + + why := "home-keep-alive" + if !peer.IsZero() { + why = peer.ShortString() + } + c.logf("magicsock: adding connection to derp-%v for %v", regionID, why) + + firstDerp := false + if c.activeDerp == nil { + firstDerp = true + c.activeDerp = make(map[int]activeDerp) + c.prevDerp = make(map[int]*syncs.WaitGroupChan) + } + + // Note that derphttp.NewRegionClient does not dial the server + // (it doesn't block) so it is safe to do under the c.mu lock. + dc := derphttp.NewRegionClient(c.privateKey, c.logf, c.netMon, func() *tailcfg.DERPRegion { + // Warning: it is not legal to acquire + // magicsock.Conn.mu from this callback. + // It's run from derphttp.Client.connect (via Send, etc) + // and the lock ordering rules are that magicsock.Conn.mu + // must be acquired before derphttp.Client.mu. + // See https://github.com/tailscale/tailscale/issues/3726 + if c.connCtx.Err() != nil { + // We're closing anyway; return nil to stop dialing. + return nil + } + derpMap := c.derpMapAtomic.Load() + if derpMap == nil { + return nil + } + return derpMap.Regions[regionID] + }) + + dc.SetCanAckPings(true) + dc.NotePreferred(c.myDerp == regionID) + dc.SetAddressFamilySelector(derpAddrFamSelector{c}) + dc.DNSCache = dnscache.Get() + + ctx, cancel := context.WithCancel(c.connCtx) + ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop()) + + ad.c = dc + ad.writeCh = ch + ad.cancel = cancel + ad.lastWrite = new(time.Time) + *ad.lastWrite = time.Now() + ad.createTime = time.Now() + c.activeDerp[regionID] = ad + metricNumDERPConns.Set(int64(len(c.activeDerp))) + c.logActiveDerpLocked() + c.setPeerLastDerpLocked(peer, regionID, regionID) + c.scheduleCleanStaleDerpLocked() + + // Build a startGate for the derp reader+writer + // goroutines, so they don't start running until any + // previous generation is closed. + startGate := syncs.ClosedChan() + if prev := c.prevDerp[regionID]; prev != nil { + startGate = prev.DoneChan() + } + // And register a WaitGroup(Chan) for this generation. + wg := syncs.NewWaitGroupChan() + wg.Add(2) + c.prevDerp[regionID] = wg + + if firstDerp { + startGate = c.derpStarted + go func() { + dc.Connect(ctx) + close(c.derpStarted) + c.muCond.Broadcast() + }() + } + + go c.runDerpReader(ctx, addr, dc, wg, startGate) + go c.runDerpWriter(ctx, dc, ch, wg, startGate) + go c.derpActiveFunc() + + return ad.writeCh +} + +// setPeerLastDerpLocked notes that peer is now being written to via +// the provided DERP regionID, and that the peer advertises a DERP +// home region ID of homeID. +// +// If there's any change, it logs. +// +// c.mu must be held. +func (c *Conn) setPeerLastDerpLocked(peer key.NodePublic, regionID, homeID int) { + if peer.IsZero() { + return + } + old := c.peerLastDerp[peer] + if old == regionID { + return + } + c.peerLastDerp[peer] = regionID + + var newDesc string + switch { + case regionID == homeID && regionID == c.myDerp: + newDesc = "shared home" + case regionID == homeID: + newDesc = "their home" + case regionID == c.myDerp: + newDesc = "our home" + case regionID != homeID: + newDesc = "alt" + } + if old == 0 { + c.logf("[v1] magicsock: derp route for %s set to derp-%d (%s)", peer.ShortString(), regionID, newDesc) + } else { + c.logf("[v1] magicsock: derp route for %s changed from derp-%d => derp-%d (%s)", peer.ShortString(), old, regionID, newDesc) + } +} + +// derpReadResult is the type sent by runDerpClient to ReceiveIPv4 +// when a DERP packet is available. +// +// Notably, it doesn't include the derp.ReceivedPacket because we +// don't want to give the receiver access to the aliased []byte. To +// get at the packet contents they need to call copyBuf to copy it +// out, which also releases the buffer. +type derpReadResult struct { + regionID int + n int // length of data received + src key.NodePublic + // copyBuf is called to copy the data to dst. It returns how + // much data was copied, which will be n if dst is large + // enough. copyBuf can only be called once. + // If copyBuf is nil, that's a signal from the sender to ignore + // this message. + copyBuf func(dst []byte) int +} + +// runDerpReader runs in a goroutine for the life of a DERP +// connection, handling received packets. +func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netip.AddrPort, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { + defer wg.Decr() + defer dc.Close() + + select { + case <-startGate: + case <-ctx.Done(): + return + } + + didCopy := make(chan struct{}, 1) + regionID := int(derpFakeAddr.Port()) + res := derpReadResult{regionID: regionID} + var pkt derp.ReceivedPacket + res.copyBuf = func(dst []byte) int { + n := copy(dst, pkt.Data) + didCopy <- struct{}{} + return n + } + + defer health.SetDERPRegionConnectedState(regionID, false) + defer health.SetDERPRegionHealth(regionID, "") + + // peerPresent is the set of senders we know are present on this + // connection, based on messages we've received from the server. + peerPresent := map[key.NodePublic]bool{} + bo := backoff.NewBackoff(fmt.Sprintf("derp-%d", regionID), c.logf, 5*time.Second) + var lastPacketTime time.Time + var lastPacketSrc key.NodePublic + + for { + msg, connGen, err := dc.RecvDetail() + if err != nil { + health.SetDERPRegionConnectedState(regionID, false) + // Forget that all these peers have routes. + for peer := range peerPresent { + delete(peerPresent, peer) + c.removeDerpPeerRoute(peer, regionID, dc) + } + if err == derphttp.ErrClientClosed { + return + } + if c.networkDown() { + c.logf("[v1] magicsock: derp.Recv(derp-%d): network down, closing", regionID) + return + } + select { + case <-ctx.Done(): + return + default: + } + + c.logf("magicsock: [%p] derp.Recv(derp-%d): %v", dc, regionID, err) + + // If our DERP connection broke, it might be because our network + // conditions changed. Start that check. + c.ReSTUN("derp-recv-error") + + // Back off a bit before reconnecting. + bo.BackOff(ctx, err) + select { + case <-ctx.Done(): + return + default: + } + continue + } + bo.BackOff(ctx, nil) // reset + + now := time.Now() + if lastPacketTime.IsZero() || now.Sub(lastPacketTime) > 5*time.Second { + health.NoteDERPRegionReceivedFrame(regionID) + lastPacketTime = now + } + + switch m := msg.(type) { + case derp.ServerInfoMessage: + health.SetDERPRegionConnectedState(regionID, true) + health.SetDERPRegionHealth(regionID, "") // until declared otherwise + c.logf("magicsock: derp-%d connected; connGen=%v", regionID, connGen) + continue + case derp.ReceivedPacket: + pkt = m + res.n = len(m.Data) + res.src = m.Source + if logDerpVerbose() { + c.logf("magicsock: got derp-%v packet: %q", regionID, m.Data) + } + // If this is a new sender we hadn't seen before, remember it and + // register a route for this peer. + if res.src != lastPacketSrc { // avoid map lookup w/ high throughput single peer + lastPacketSrc = res.src + if _, ok := peerPresent[res.src]; !ok { + peerPresent[res.src] = true + c.addDerpPeerRoute(res.src, regionID, dc) + } + } + case derp.PingMessage: + // Best effort reply to the ping. + pingData := [8]byte(m) + go func() { + if err := dc.SendPong(pingData); err != nil { + c.logf("magicsock: derp-%d SendPong error: %v", regionID, err) + } + }() + continue + case derp.HealthMessage: + health.SetDERPRegionHealth(regionID, m.Problem) + case derp.PeerGoneMessage: + switch m.Reason { + case derp.PeerGoneReasonDisconnected: + // Do nothing. + case derp.PeerGoneReasonNotHere: + metricRecvDiscoDERPPeerNotHere.Add(1) + c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route", + regionID, key.NodePublic(m.Peer).ShortString()) + default: + metricRecvDiscoDERPPeerGoneUnknown.Add(1) + c.logf("[unexpected] magicsock: derp-%d peer %s gone, reason %v, removing route", + regionID, key.NodePublic(m.Peer).ShortString(), m.Reason) + } + c.removeDerpPeerRoute(key.NodePublic(m.Peer), regionID, dc) + default: + // Ignore. + continue + } + + select { + case <-ctx.Done(): + return + case c.derpRecvCh <- res: + } + + select { + case <-ctx.Done(): + return + case <-didCopy: + continue + } + } +} + +type derpWriteRequest struct { + addr netip.AddrPort + pubKey key.NodePublic + b []byte // copied; ownership passed to receiver +} + +// runDerpWriter runs in a goroutine for the life of a DERP +// connection, handling received packets. +func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { + defer wg.Decr() + select { + case <-startGate: + case <-ctx.Done(): + return + } + + for { + select { + case <-ctx.Done(): + return + case wr := <-ch: + err := dc.Send(wr.pubKey, wr.b) + if err != nil { + c.logf("magicsock: derp.Send(%v): %v", wr.addr, err) + metricSendDERPError.Add(1) + } else { + metricSendDERP.Add(1) + } + } + } +} + +func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) (int, error) { + health.ReceiveDERP.Enter() + defer health.ReceiveDERP.Exit() + + for dm := range c.derpRecvCh { + if c.isClosed() { + break + } + n, ep := c.processDERPReadResult(dm, buffs[0]) + if n == 0 { + // No data read occurred. Wait for another packet. + continue + } + metricRecvDataDERP.Add(1) + sizes[0] = n + eps[0] = ep + return 1, nil + } + return 0, net.ErrClosed +} + +func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *endpoint) { + if dm.copyBuf == nil { + return 0, nil + } + var regionID int + n, regionID = dm.n, dm.regionID + ncopy := dm.copyBuf(b) + if ncopy != n { + err := fmt.Errorf("received DERP packet of length %d that's too big for WireGuard buf size %d", n, ncopy) + c.logf("magicsock: %v", err) + return 0, nil + } + + ipp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID)) + if c.handleDiscoMessage(b[:n], ipp, dm.src, discoRXPathDERP) { + return 0, nil + } + + var ok bool + c.mu.Lock() + ep, ok = c.peerMap.endpointForNodeKey(dm.src) + c.mu.Unlock() + if !ok { + // We don't know anything about this node key, nothing to + // record or process. + return 0, nil + } + + ep.noteRecvActivity() + if stats := c.stats.Load(); stats != nil { + stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) + } + return n, ep +} + +// SetDERPMap controls which (if any) DERP servers are used. +// A nil value means to disable DERP; it's disabled by default. +func (c *Conn) SetDERPMap(dm *tailcfg.DERPMap) { + c.mu.Lock() + defer c.mu.Unlock() + + var derpAddr = debugUseDERPAddr() + if derpAddr != "" { + derpPort := 443 + if debugUseDERPHTTP() { + // Match the port for -dev in derper.go + derpPort = 3340 + } + dm = &tailcfg.DERPMap{ + OmitDefaultRegions: true, + Regions: map[int]*tailcfg.DERPRegion{ + 999: { + RegionID: 999, + Nodes: []*tailcfg.DERPNode{{ + Name: "999dev", + RegionID: 999, + HostName: derpAddr, + DERPPort: derpPort, + }}, + }, + }, + } + } + + if reflect.DeepEqual(dm, c.derpMap) { + return + } + + c.derpMapAtomic.Store(dm) + old := c.derpMap + c.derpMap = dm + if dm == nil { + c.closeAllDerpLocked("derp-disabled") + return + } + + // Reconnect any DERP region that changed definitions. + if old != nil { + changes := false + for rid, oldDef := range old.Regions { + if reflect.DeepEqual(oldDef, dm.Regions[rid]) { + continue + } + changes = true + if rid == c.myDerp { + c.myDerp = 0 + } + c.closeDerpLocked(rid, "derp-region-redefined") + } + if changes { + c.logActiveDerpLocked() + } + } + + go c.ReSTUN("derp-map-update") +} +func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil } + +// c.mu must be held. +func (c *Conn) closeAllDerpLocked(why string) { + if len(c.activeDerp) == 0 { + return // without the useless log statement + } + for i := range c.activeDerp { + c.closeDerpLocked(i, why) + } + c.logActiveDerpLocked() +} + +// maybeCloseDERPsOnRebind, in response to a rebind, closes all +// DERP connections that don't have a local address in okayLocalIPs +// and pings all those that do. +func (c *Conn) maybeCloseDERPsOnRebind(okayLocalIPs []netip.Prefix) { + c.mu.Lock() + defer c.mu.Unlock() + for regionID, ad := range c.activeDerp { + la, err := ad.c.LocalAddr() + if err != nil { + c.closeOrReconnectDERPLocked(regionID, "rebind-no-localaddr") + continue + } + if !tsaddr.PrefixesContainsIP(okayLocalIPs, la.Addr()) { + c.closeOrReconnectDERPLocked(regionID, "rebind-default-route-change") + continue + } + regionID := regionID + dc := ad.c + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := dc.Ping(ctx); err != nil { + c.mu.Lock() + defer c.mu.Unlock() + c.closeOrReconnectDERPLocked(regionID, "rebind-ping-fail") + return + } + c.logf("post-rebind ping of DERP region %d okay", regionID) + }() + } + c.logActiveDerpLocked() +} + +// closeOrReconnectDERPLocked closes the DERP connection to the +// provided regionID and starts reconnecting it if it's our current +// home DERP. +// +// why is a reason for logging. +// +// c.mu must be held. +func (c *Conn) closeOrReconnectDERPLocked(regionID int, why string) { + c.closeDerpLocked(regionID, why) + if !c.privateKey.IsZero() && c.myDerp == regionID { + c.startDerpHomeConnectLocked() + } +} + +// c.mu must be held. +// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes. +func (c *Conn) closeDerpLocked(regionID int, why string) { + if ad, ok := c.activeDerp[regionID]; ok { + c.logf("magicsock: closing connection to derp-%v (%v), age %v", regionID, why, time.Since(ad.createTime).Round(time.Second)) + go ad.c.Close() + ad.cancel() + delete(c.activeDerp, regionID) + metricNumDERPConns.Set(int64(len(c.activeDerp))) + } +} + +// c.mu must be held. +func (c *Conn) logActiveDerpLocked() { + now := time.Now() + c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), logger.ArgWriter(func(buf *bufio.Writer) { + if len(c.activeDerp) == 0 { + return + } + buf.WriteString(":") + c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) { + fmt.Fprintf(buf, " derp-%d=cr%v,wr%v", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite))) + }) + })) +} + +// c.mu must be held. +func (c *Conn) foreachActiveDerpSortedLocked(fn func(regionID int, ad activeDerp)) { + if len(c.activeDerp) < 2 { + for id, ad := range c.activeDerp { + fn(id, ad) + } + return + } + ids := make([]int, 0, len(c.activeDerp)) + for id := range c.activeDerp { + ids = append(ids, id) + } + sort.Ints(ids) + for _, id := range ids { + fn(id, c.activeDerp[id]) + } +} + +func (c *Conn) cleanStaleDerp() { + c.mu.Lock() + defer c.mu.Unlock() + if c.closed { + return + } + c.derpCleanupTimerArmed = false + + tooOld := time.Now().Add(-derpInactiveCleanupTime) + dirty := false + someNonHomeOpen := false + for i, ad := range c.activeDerp { + if i == c.myDerp { + continue + } + if ad.lastWrite.Before(tooOld) { + c.closeDerpLocked(i, "idle") + dirty = true + } else { + someNonHomeOpen = true + } + } + if dirty { + c.logActiveDerpLocked() + } + if someNonHomeOpen { + c.scheduleCleanStaleDerpLocked() + } +} + +func (c *Conn) scheduleCleanStaleDerpLocked() { + if c.derpCleanupTimerArmed { + // Already going to fire soon. Let the existing one + // fire lest it get infinitely delayed by repeated + // calls to scheduleCleanStaleDerpLocked. + return + } + c.derpCleanupTimerArmed = true + if c.derpCleanupTimer != nil { + c.derpCleanupTimer.Reset(derpCleanStaleInterval) + } else { + c.derpCleanupTimer = time.AfterFunc(derpCleanStaleInterval, c.cleanStaleDerp) + } +} + +// DERPs reports the number of active DERP connections. +func (c *Conn) DERPs() int { + c.mu.Lock() + defer c.mu.Unlock() + + return len(c.activeDerp) +} + +func (c *Conn) derpRegionCodeOfIDLocked(regionID int) string { + if c.derpMap == nil { + return "" + } + if r, ok := c.derpMap.Regions[regionID]; ok { + return r.RegionCode + } + return "" +} + +// derpAddrFamSelector is the derphttp.AddressFamilySelector we pass +// to derphttp.Client.SetAddressFamilySelector. +// +// It provides the hint as to whether in an IPv4-vs-IPv6 race that +// IPv4 should be held back a bit to give IPv6 a better-than-50/50 +// chance of winning. We only return true when we believe IPv6 will +// work anyway, so we don't artificially delay the connection speed. +type derpAddrFamSelector struct{ c *Conn } + +func (s derpAddrFamSelector) PreferIPv6() bool { + if r := s.c.lastNetCheckReport.Load(); r != nil { + return r.IPv6 + } + return false +} + +const ( + // derpInactiveCleanupTime is how long a non-home DERP connection + // needs to be idle (last written to) before we close it. + derpInactiveCleanupTime = 60 * time.Second + + // derpCleanStaleInterval is how often cleanStaleDerp runs when there + // are potentially-stale DERP connections to close. + derpCleanStaleInterval = 15 * time.Second +) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 275345e11..ed0a71b0e 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -11,14 +11,11 @@ import ( crand "crypto/rand" "errors" "fmt" - "hash/fnv" "io" - "math/rand" "net" "net/netip" "reflect" "runtime" - "sort" "strconv" "strings" "sync" @@ -29,19 +26,13 @@ import ( "go4.org/mem" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" - "tailscale.com/control/controlclient" - "tailscale.com/derp" - "tailscale.com/derp/derphttp" "tailscale.com/disco" "tailscale.com/envknob" "tailscale.com/health" "tailscale.com/hostinfo" "tailscale.com/ipn/ipnstate" - "tailscale.com/logtail/backoff" "tailscale.com/net/connstats" - "tailscale.com/net/dnscache" "tailscale.com/net/interfaces" - "tailscale.com/net/netaddr" "tailscale.com/net/netcheck" "tailscale.com/net/neterror" "tailscale.com/net/netmon" @@ -51,7 +42,6 @@ import ( "tailscale.com/net/portmapper" "tailscale.com/net/sockstats" "tailscale.com/net/stun" - "tailscale.com/net/tsaddr" "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/tstime" @@ -65,7 +55,6 @@ import ( "tailscale.com/util/mak" "tailscale.com/util/ringbuffer" "tailscale.com/util/set" - "tailscale.com/util/sysresources" "tailscale.com/util/uniq" "tailscale.com/version" "tailscale.com/wgengine/capture" @@ -86,19 +75,6 @@ const ( socketBufferSize = 7 << 20 ) -// useDerpRoute reports whether magicsock should enable the DERP -// return path optimization (Issue 150). -func useDerpRoute() bool { - if b, ok := debugUseDerpRoute().Get(); ok { - return b - } - ob := controlclient.DERPRouteFlag() - if v, ok := ob.Get(); ok { - return v - } - return true // as of 1.21.x -} - // A Conn routes UDP packets and actively manages a list of its endpoints. type Conn struct { // This block mirrors the contents and field order of the Options @@ -327,46 +303,6 @@ func (c *Conn) dlogf(format string, a ...any) { } } -// derpRoute is a route entry for a public key, saying that a certain -// peer should be available at DERP node derpID, as long as the -// current connection for that derpID is dc. (but dc should not be -// used to write directly; it's owned by the read/write loops) -type derpRoute struct { - derpID int - dc *derphttp.Client // don't use directly; see comment above -} - -// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute. -func (c *Conn) removeDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { - c.mu.Lock() - defer c.mu.Unlock() - r2 := derpRoute{derpID, dc} - if r, ok := c.derpRoute[peer]; ok && r == r2 { - delete(c.derpRoute, peer) - } -} - -// addDerpPeerRoute adds a DERP route entry, noting that peer was seen -// on DERP node derpID, at least on the connection identified by dc. -// See issue 150 for details. -func (c *Conn) addDerpPeerRoute(peer key.NodePublic, derpID int, dc *derphttp.Client) { - c.mu.Lock() - defer c.mu.Unlock() - mak.Set(&c.derpRoute, peer, derpRoute{derpID, dc}) -} - -// activeDerp contains fields for an active DERP connection. -type activeDerp struct { - c *derphttp.Client - cancel context.CancelFunc - writeCh chan<- derpWriteRequest - // lastWrite is the time of the last request for its write - // channel (currently even if there was no write). - // It is always non-nil and initialized to a non-zero Time. - lastWrite *time.Time - createTime time.Time -} - // Options contains options for Listen. type Options struct { // Logf optionally provides a log function to use. @@ -720,47 +656,6 @@ func (c *Conn) updateNetInfo(ctx context.Context) (*netcheck.Report, error) { return report, nil } -var processStartUnixNano = time.Now().UnixNano() - -// pickDERPFallback returns a non-zero but deterministic DERP node to -// connect to. This is only used if netcheck couldn't find the -// nearest one (for instance, if UDP is blocked and thus STUN latency -// checks aren't working). -// -// c.mu must NOT be held. -func (c *Conn) pickDERPFallback() int { - c.mu.Lock() - defer c.mu.Unlock() - - if !c.wantDerpLocked() { - return 0 - } - ids := c.derpMap.RegionIDs() - if len(ids) == 0 { - // No DERP regions in non-nil map. - return 0 - } - - // TODO: figure out which DERP region most of our peers are using, - // and use that region as our fallback. - // - // If we already had selected something in the past and it has any - // peers, we want to stay on it. If there are no peers at all, - // stay on whatever DERP we previously picked. If we need to pick - // one and have no peer info, pick a region randomly. - // - // We used to do the above for legacy clients, but never updated - // it for disco. - - if c.myDerp != 0 { - return c.myDerp - } - - h := fnv.New64() - fmt.Fprintf(h, "%p/%d", c, processStartUnixNano) // arbitrary - return ids[rand.New(rand.NewSource(int64(h.Sum64()))).Intn(len(ids))] -} - // callNetInfoCallback calls the NetInfo callback (if previously // registered with SetNetInfoCallback) if ni has substantially changed // since the last state. @@ -884,79 +779,11 @@ func (c *Conn) GetEndpointChanges(peer *tailcfg.Node) ([]EndpointChange, error) return ep.debugUpdates.GetAll(), nil } -func (c *Conn) derpRegionCodeLocked(regionID int) string { - if c.derpMap == nil { - return "" - } - if dr, ok := c.derpMap.Regions[regionID]; ok { - return dr.RegionCode - } - return "" -} - // DiscoPublicKey returns the discovery public key. func (c *Conn) DiscoPublicKey() key.DiscoPublic { return c.discoPublic } -// c.mu must NOT be held. -func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) { - c.mu.Lock() - defer c.mu.Unlock() - if !c.wantDerpLocked() { - c.myDerp = 0 - health.SetMagicSockDERPHome(0) - return false - } - if derpNum == c.myDerp { - // No change. - return true - } - if c.myDerp != 0 && derpNum != 0 { - metricDERPHomeChange.Add(1) - } - c.myDerp = derpNum - health.SetMagicSockDERPHome(derpNum) - - if c.privateKey.IsZero() { - // No private key yet, so DERP connections won't come up anyway. - // Return early rather than ultimately log a couple lines of noise. - return true - } - - // On change, notify all currently connected DERP servers and - // start connecting to our home DERP if we are not already. - dr := c.derpMap.Regions[derpNum] - if dr == nil { - c.logf("[unexpected] magicsock: derpMap.Regions[%v] is nil", derpNum) - } else { - c.logf("magicsock: home is now derp-%v (%v)", derpNum, c.derpMap.Regions[derpNum].RegionCode) - } - for i, ad := range c.activeDerp { - go ad.c.NotePreferred(i == c.myDerp) - } - c.goDerpConnect(derpNum) - return true -} - -// startDerpHomeConnectLocked starts connecting to our DERP home, if any. -// -// c.mu must be held. -func (c *Conn) startDerpHomeConnectLocked() { - c.goDerpConnect(c.myDerp) -} - -// goDerpConnect starts a goroutine to start connecting to the given -// DERP node. -// -// c.mu may be held, but does not need to be. -func (c *Conn) goDerpConnect(node int) { - if node == 0 { - return - } - go c.derpWriteChanOfAddr(netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(node)), key.NodePublic{}) -} - // determineEndpoints returns the machine's endpoint addresses. It // does a STUN lookup (via netcheck) to determine its public address. // @@ -1260,427 +1087,6 @@ func (c *Conn) sendAddr(addr netip.AddrPort, pubKey key.NodePublic, b []byte) (s } } -var ( - bufferedDerpWrites int - bufferedDerpWritesOnce sync.Once -) - -// bufferedDerpWritesBeforeDrop returns how many packets writes can be queued -// up the DERP client to write on the wire before we start dropping. -func bufferedDerpWritesBeforeDrop() int { - // For mobile devices, always return the previous minimum value of 32; - // we can do this outside the sync.Once to avoid that overhead. - if runtime.GOOS == "ios" || runtime.GOOS == "android" { - return 32 - } - - bufferedDerpWritesOnce.Do(func() { - // Some rough sizing: for the previous fixed value of 32, the - // total consumed memory can be: - // = numDerpRegions * messages/region * sizeof(message) - // - // For sake of this calculation, assume 100 DERP regions; at - // time of writing (2023-04-03), we have 24. - // - // A reasonable upper bound for the worst-case average size of - // a message is a *disco.CallMeMaybe message with 16 endpoints; - // since sizeof(netip.AddrPort) = 32, that's 512 bytes. Thus: - // = 100 * 32 * 512 - // = 1638400 (1.6MiB) - // - // On a reasonably-small node with 4GiB of memory that's - // connected to each region and handling a lot of load, 1.6MiB - // is about 0.04% of the total system memory. - // - // For sake of this calculation, then, let's double that memory - // usage to 0.08% and scale based on total system memory. - // - // For a 16GiB Linux box, this should buffer just over 256 - // messages. - systemMemory := sysresources.TotalMemory() - memoryUsable := float64(systemMemory) * 0.0008 - - const ( - theoreticalDERPRegions = 100 - messageMaximumSizeBytes = 512 - ) - bufferedDerpWrites = int(memoryUsable / (theoreticalDERPRegions * messageMaximumSizeBytes)) - - // Never drop below the previous minimum value. - if bufferedDerpWrites < 32 { - bufferedDerpWrites = 32 - } - }) - return bufferedDerpWrites -} - -// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that -// represent DERP servers, creating them as necessary. For real UDP -// addresses, it returns nil. -// -// If peer is non-zero, it can be used to find an active reverse -// path, without using addr. -func (c *Conn) derpWriteChanOfAddr(addr netip.AddrPort, peer key.NodePublic) chan<- derpWriteRequest { - if addr.Addr() != tailcfg.DerpMagicIPAddr { - return nil - } - regionID := int(addr.Port()) - - if c.networkDown() { - return nil - } - - c.mu.Lock() - defer c.mu.Unlock() - if !c.wantDerpLocked() || c.closed { - return nil - } - if c.derpMap == nil || c.derpMap.Regions[regionID] == nil { - return nil - } - if c.privateKey.IsZero() { - c.logf("magicsock: DERP lookup of %v with no private key; ignoring", addr) - return nil - } - - // See if we have a connection open to that DERP node ID - // first. If so, might as well use it. (It's a little - // arbitrary whether we use this one vs. the reverse route - // below when we have both.) - ad, ok := c.activeDerp[regionID] - if ok { - *ad.lastWrite = time.Now() - c.setPeerLastDerpLocked(peer, regionID, regionID) - return ad.writeCh - } - - // If we don't have an open connection to the peer's home DERP - // node, see if we have an open connection to a DERP node - // where we'd heard from that peer already. For instance, - // perhaps peer's home is Frankfurt, but they dialed our home DERP - // node in SF to reach us, so we can reply to them using our - // SF connection rather than dialing Frankfurt. (Issue 150) - if !peer.IsZero() && useDerpRoute() { - if r, ok := c.derpRoute[peer]; ok { - if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc { - c.setPeerLastDerpLocked(peer, r.derpID, regionID) - *ad.lastWrite = time.Now() - return ad.writeCh - } - } - } - - why := "home-keep-alive" - if !peer.IsZero() { - why = peer.ShortString() - } - c.logf("magicsock: adding connection to derp-%v for %v", regionID, why) - - firstDerp := false - if c.activeDerp == nil { - firstDerp = true - c.activeDerp = make(map[int]activeDerp) - c.prevDerp = make(map[int]*syncs.WaitGroupChan) - } - - // Note that derphttp.NewRegionClient does not dial the server - // (it doesn't block) so it is safe to do under the c.mu lock. - dc := derphttp.NewRegionClient(c.privateKey, c.logf, c.netMon, func() *tailcfg.DERPRegion { - // Warning: it is not legal to acquire - // magicsock.Conn.mu from this callback. - // It's run from derphttp.Client.connect (via Send, etc) - // and the lock ordering rules are that magicsock.Conn.mu - // must be acquired before derphttp.Client.mu. - // See https://github.com/tailscale/tailscale/issues/3726 - if c.connCtx.Err() != nil { - // We're closing anyway; return nil to stop dialing. - return nil - } - derpMap := c.derpMapAtomic.Load() - if derpMap == nil { - return nil - } - return derpMap.Regions[regionID] - }) - - dc.SetCanAckPings(true) - dc.NotePreferred(c.myDerp == regionID) - dc.SetAddressFamilySelector(derpAddrFamSelector{c}) - dc.DNSCache = dnscache.Get() - - ctx, cancel := context.WithCancel(c.connCtx) - ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop()) - - ad.c = dc - ad.writeCh = ch - ad.cancel = cancel - ad.lastWrite = new(time.Time) - *ad.lastWrite = time.Now() - ad.createTime = time.Now() - c.activeDerp[regionID] = ad - metricNumDERPConns.Set(int64(len(c.activeDerp))) - c.logActiveDerpLocked() - c.setPeerLastDerpLocked(peer, regionID, regionID) - c.scheduleCleanStaleDerpLocked() - - // Build a startGate for the derp reader+writer - // goroutines, so they don't start running until any - // previous generation is closed. - startGate := syncs.ClosedChan() - if prev := c.prevDerp[regionID]; prev != nil { - startGate = prev.DoneChan() - } - // And register a WaitGroup(Chan) for this generation. - wg := syncs.NewWaitGroupChan() - wg.Add(2) - c.prevDerp[regionID] = wg - - if firstDerp { - startGate = c.derpStarted - go func() { - dc.Connect(ctx) - close(c.derpStarted) - c.muCond.Broadcast() - }() - } - - go c.runDerpReader(ctx, addr, dc, wg, startGate) - go c.runDerpWriter(ctx, dc, ch, wg, startGate) - go c.derpActiveFunc() - - return ad.writeCh -} - -// setPeerLastDerpLocked notes that peer is now being written to via -// the provided DERP regionID, and that the peer advertises a DERP -// home region ID of homeID. -// -// If there's any change, it logs. -// -// c.mu must be held. -func (c *Conn) setPeerLastDerpLocked(peer key.NodePublic, regionID, homeID int) { - if peer.IsZero() { - return - } - old := c.peerLastDerp[peer] - if old == regionID { - return - } - c.peerLastDerp[peer] = regionID - - var newDesc string - switch { - case regionID == homeID && regionID == c.myDerp: - newDesc = "shared home" - case regionID == homeID: - newDesc = "their home" - case regionID == c.myDerp: - newDesc = "our home" - case regionID != homeID: - newDesc = "alt" - } - if old == 0 { - c.logf("[v1] magicsock: derp route for %s set to derp-%d (%s)", peer.ShortString(), regionID, newDesc) - } else { - c.logf("[v1] magicsock: derp route for %s changed from derp-%d => derp-%d (%s)", peer.ShortString(), old, regionID, newDesc) - } -} - -// derpReadResult is the type sent by runDerpClient to ReceiveIPv4 -// when a DERP packet is available. -// -// Notably, it doesn't include the derp.ReceivedPacket because we -// don't want to give the receiver access to the aliased []byte. To -// get at the packet contents they need to call copyBuf to copy it -// out, which also releases the buffer. -type derpReadResult struct { - regionID int - n int // length of data received - src key.NodePublic - // copyBuf is called to copy the data to dst. It returns how - // much data was copied, which will be n if dst is large - // enough. copyBuf can only be called once. - // If copyBuf is nil, that's a signal from the sender to ignore - // this message. - copyBuf func(dst []byte) int -} - -// runDerpReader runs in a goroutine for the life of a DERP -// connection, handling received packets. -func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netip.AddrPort, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { - defer wg.Decr() - defer dc.Close() - - select { - case <-startGate: - case <-ctx.Done(): - return - } - - didCopy := make(chan struct{}, 1) - regionID := int(derpFakeAddr.Port()) - res := derpReadResult{regionID: regionID} - var pkt derp.ReceivedPacket - res.copyBuf = func(dst []byte) int { - n := copy(dst, pkt.Data) - didCopy <- struct{}{} - return n - } - - defer health.SetDERPRegionConnectedState(regionID, false) - defer health.SetDERPRegionHealth(regionID, "") - - // peerPresent is the set of senders we know are present on this - // connection, based on messages we've received from the server. - peerPresent := map[key.NodePublic]bool{} - bo := backoff.NewBackoff(fmt.Sprintf("derp-%d", regionID), c.logf, 5*time.Second) - var lastPacketTime time.Time - var lastPacketSrc key.NodePublic - - for { - msg, connGen, err := dc.RecvDetail() - if err != nil { - health.SetDERPRegionConnectedState(regionID, false) - // Forget that all these peers have routes. - for peer := range peerPresent { - delete(peerPresent, peer) - c.removeDerpPeerRoute(peer, regionID, dc) - } - if err == derphttp.ErrClientClosed { - return - } - if c.networkDown() { - c.logf("[v1] magicsock: derp.Recv(derp-%d): network down, closing", regionID) - return - } - select { - case <-ctx.Done(): - return - default: - } - - c.logf("magicsock: [%p] derp.Recv(derp-%d): %v", dc, regionID, err) - - // If our DERP connection broke, it might be because our network - // conditions changed. Start that check. - c.ReSTUN("derp-recv-error") - - // Back off a bit before reconnecting. - bo.BackOff(ctx, err) - select { - case <-ctx.Done(): - return - default: - } - continue - } - bo.BackOff(ctx, nil) // reset - - now := time.Now() - if lastPacketTime.IsZero() || now.Sub(lastPacketTime) > 5*time.Second { - health.NoteDERPRegionReceivedFrame(regionID) - lastPacketTime = now - } - - switch m := msg.(type) { - case derp.ServerInfoMessage: - health.SetDERPRegionConnectedState(regionID, true) - health.SetDERPRegionHealth(regionID, "") // until declared otherwise - c.logf("magicsock: derp-%d connected; connGen=%v", regionID, connGen) - continue - case derp.ReceivedPacket: - pkt = m - res.n = len(m.Data) - res.src = m.Source - if logDerpVerbose() { - c.logf("magicsock: got derp-%v packet: %q", regionID, m.Data) - } - // If this is a new sender we hadn't seen before, remember it and - // register a route for this peer. - if res.src != lastPacketSrc { // avoid map lookup w/ high throughput single peer - lastPacketSrc = res.src - if _, ok := peerPresent[res.src]; !ok { - peerPresent[res.src] = true - c.addDerpPeerRoute(res.src, regionID, dc) - } - } - case derp.PingMessage: - // Best effort reply to the ping. - pingData := [8]byte(m) - go func() { - if err := dc.SendPong(pingData); err != nil { - c.logf("magicsock: derp-%d SendPong error: %v", regionID, err) - } - }() - continue - case derp.HealthMessage: - health.SetDERPRegionHealth(regionID, m.Problem) - case derp.PeerGoneMessage: - switch m.Reason { - case derp.PeerGoneReasonDisconnected: - // Do nothing. - case derp.PeerGoneReasonNotHere: - metricRecvDiscoDERPPeerNotHere.Add(1) - c.logf("[unexpected] magicsock: derp-%d does not know about peer %s, removing route", - regionID, key.NodePublic(m.Peer).ShortString()) - default: - metricRecvDiscoDERPPeerGoneUnknown.Add(1) - c.logf("[unexpected] magicsock: derp-%d peer %s gone, reason %v, removing route", - regionID, key.NodePublic(m.Peer).ShortString(), m.Reason) - } - c.removeDerpPeerRoute(key.NodePublic(m.Peer), regionID, dc) - default: - // Ignore. - continue - } - - select { - case <-ctx.Done(): - return - case c.derpRecvCh <- res: - } - - select { - case <-ctx.Done(): - return - case <-didCopy: - continue - } - } -} - -type derpWriteRequest struct { - addr netip.AddrPort - pubKey key.NodePublic - b []byte // copied; ownership passed to receiver -} - -// runDerpWriter runs in a goroutine for the life of a DERP -// connection, handling received packets. -func (c *Conn) runDerpWriter(ctx context.Context, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { - defer wg.Decr() - select { - case <-startGate: - case <-ctx.Done(): - return - } - - for { - select { - case <-ctx.Done(): - return - case wr := <-ch: - err := dc.Send(wr.pubKey, wr.b) - if err != nil { - c.logf("magicsock: derp.Send(%v): %v", wr.addr, err) - metricSendDERPError.Add(1) - } else { - metricSendDERP.Add(1) - } - } - } -} - type receiveBatch struct { msgs []ipv6.Message } @@ -1801,62 +1207,6 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache) return ep, true } -func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) (int, error) { - health.ReceiveDERP.Enter() - defer health.ReceiveDERP.Exit() - - for dm := range c.derpRecvCh { - if c.isClosed() { - break - } - n, ep := c.processDERPReadResult(dm, buffs[0]) - if n == 0 { - // No data read occurred. Wait for another packet. - continue - } - metricRecvDataDERP.Add(1) - sizes[0] = n - eps[0] = ep - return 1, nil - } - return 0, net.ErrClosed -} - -func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *endpoint) { - if dm.copyBuf == nil { - return 0, nil - } - var regionID int - n, regionID = dm.n, dm.regionID - ncopy := dm.copyBuf(b) - if ncopy != n { - err := fmt.Errorf("received DERP packet of length %d that's too big for WireGuard buf size %d", n, ncopy) - c.logf("magicsock: %v", err) - return 0, nil - } - - ipp := netip.AddrPortFrom(tailcfg.DerpMagicIPAddr, uint16(regionID)) - if c.handleDiscoMessage(b[:n], ipp, dm.src, discoRXPathDERP) { - return 0, nil - } - - var ok bool - c.mu.Lock() - ep, ok = c.peerMap.endpointForNodeKey(dm.src) - c.mu.Unlock() - if !ok { - // We don't know anything about this node key, nothing to - // record or process. - return 0, nil - } - - ep.noteRecvActivity() - if stats := c.stats.Load(); stats != nil { - stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) - } - return n, ep -} - // discoLogLevel controls the verbosity of discovery log messages. type discoLogLevel int @@ -2395,68 +1745,6 @@ func (c *Conn) UpdatePeers(newPeers map[key.NodePublic]struct{}) { } } -// SetDERPMap controls which (if any) DERP servers are used. -// A nil value means to disable DERP; it's disabled by default. -func (c *Conn) SetDERPMap(dm *tailcfg.DERPMap) { - c.mu.Lock() - defer c.mu.Unlock() - - var derpAddr = debugUseDERPAddr() - if derpAddr != "" { - derpPort := 443 - if debugUseDERPHTTP() { - // Match the port for -dev in derper.go - derpPort = 3340 - } - dm = &tailcfg.DERPMap{ - OmitDefaultRegions: true, - Regions: map[int]*tailcfg.DERPRegion{ - 999: { - RegionID: 999, - Nodes: []*tailcfg.DERPNode{{ - Name: "999dev", - RegionID: 999, - HostName: derpAddr, - DERPPort: derpPort, - }}, - }, - }, - } - } - - if reflect.DeepEqual(dm, c.derpMap) { - return - } - - c.derpMapAtomic.Store(dm) - old := c.derpMap - c.derpMap = dm - if dm == nil { - c.closeAllDerpLocked("derp-disabled") - return - } - - // Reconnect any DERP region that changed definitions. - if old != nil { - changes := false - for rid, oldDef := range old.Regions { - if reflect.DeepEqual(oldDef, dm.Regions[rid]) { - continue - } - changes = true - if rid == c.myDerp { - c.myDerp = 0 - } - c.closeDerpLocked(rid, "derp-region-redefined") - } - if changes { - c.logActiveDerpLocked() - } - } - - go c.ReSTUN("derp-map-update") -} - func nodesEqual(x, y []*tailcfg.Node) bool { if len(x) != len(y) { return false @@ -2469,8 +1757,6 @@ func nodesEqual(x, y []*tailcfg.Node) bool { return true } -var debugRingBufferMaxSizeBytes = envknob.RegisterInt("TS_DEBUG_MAGICSOCK_RING_BUFFER_MAX_SIZE_BYTES") - // SetNetworkMap is called when the control client gets a new network // map from the control server. It must always be non-nil. // @@ -2637,92 +1923,6 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) { } } -func (c *Conn) wantDerpLocked() bool { return c.derpMap != nil } - -// c.mu must be held. -func (c *Conn) closeAllDerpLocked(why string) { - if len(c.activeDerp) == 0 { - return // without the useless log statement - } - for i := range c.activeDerp { - c.closeDerpLocked(i, why) - } - c.logActiveDerpLocked() -} - -// maybeCloseDERPsOnRebind, in response to a rebind, closes all -// DERP connections that don't have a local address in okayLocalIPs -// and pings all those that do. -func (c *Conn) maybeCloseDERPsOnRebind(okayLocalIPs []netip.Prefix) { - c.mu.Lock() - defer c.mu.Unlock() - for regionID, ad := range c.activeDerp { - la, err := ad.c.LocalAddr() - if err != nil { - c.closeOrReconnectDERPLocked(regionID, "rebind-no-localaddr") - continue - } - if !tsaddr.PrefixesContainsIP(okayLocalIPs, la.Addr()) { - c.closeOrReconnectDERPLocked(regionID, "rebind-default-route-change") - continue - } - regionID := regionID - dc := ad.c - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - if err := dc.Ping(ctx); err != nil { - c.mu.Lock() - defer c.mu.Unlock() - c.closeOrReconnectDERPLocked(regionID, "rebind-ping-fail") - return - } - c.logf("post-rebind ping of DERP region %d okay", regionID) - }() - } - c.logActiveDerpLocked() -} - -// closeOrReconnectDERPLocked closes the DERP connection to the -// provided regionID and starts reconnecting it if it's our current -// home DERP. -// -// why is a reason for logging. -// -// c.mu must be held. -func (c *Conn) closeOrReconnectDERPLocked(regionID int, why string) { - c.closeDerpLocked(regionID, why) - if !c.privateKey.IsZero() && c.myDerp == regionID { - c.startDerpHomeConnectLocked() - } -} - -// c.mu must be held. -// It is the responsibility of the caller to call logActiveDerpLocked after any set of closes. -func (c *Conn) closeDerpLocked(regionID int, why string) { - if ad, ok := c.activeDerp[regionID]; ok { - c.logf("magicsock: closing connection to derp-%v (%v), age %v", regionID, why, time.Since(ad.createTime).Round(time.Second)) - go ad.c.Close() - ad.cancel() - delete(c.activeDerp, regionID) - metricNumDERPConns.Set(int64(len(c.activeDerp))) - } -} - -// c.mu must be held. -func (c *Conn) logActiveDerpLocked() { - now := time.Now() - c.logf("magicsock: %v active derp conns%s", len(c.activeDerp), logger.ArgWriter(func(buf *bufio.Writer) { - if len(c.activeDerp) == 0 { - return - } - buf.WriteString(":") - c.foreachActiveDerpSortedLocked(func(node int, ad activeDerp) { - fmt.Fprintf(buf, " derp-%d=cr%v,wr%v", node, simpleDur(now.Sub(ad.createTime)), simpleDur(now.Sub(*ad.lastWrite))) - }) - })) -} - func (c *Conn) logEndpointChange(endpoints []tailcfg.Endpoint) { c.logf("magicsock: endpoints changed: %s", logger.ArgWriter(func(buf *bufio.Writer) { for i, ep := range endpoints { @@ -2734,77 +1934,6 @@ func (c *Conn) logEndpointChange(endpoints []tailcfg.Endpoint) { })) } -// c.mu must be held. -func (c *Conn) foreachActiveDerpSortedLocked(fn func(regionID int, ad activeDerp)) { - if len(c.activeDerp) < 2 { - for id, ad := range c.activeDerp { - fn(id, ad) - } - return - } - ids := make([]int, 0, len(c.activeDerp)) - for id := range c.activeDerp { - ids = append(ids, id) - } - sort.Ints(ids) - for _, id := range ids { - fn(id, c.activeDerp[id]) - } -} - -func (c *Conn) cleanStaleDerp() { - c.mu.Lock() - defer c.mu.Unlock() - if c.closed { - return - } - c.derpCleanupTimerArmed = false - - tooOld := time.Now().Add(-derpInactiveCleanupTime) - dirty := false - someNonHomeOpen := false - for i, ad := range c.activeDerp { - if i == c.myDerp { - continue - } - if ad.lastWrite.Before(tooOld) { - c.closeDerpLocked(i, "idle") - dirty = true - } else { - someNonHomeOpen = true - } - } - if dirty { - c.logActiveDerpLocked() - } - if someNonHomeOpen { - c.scheduleCleanStaleDerpLocked() - } -} - -func (c *Conn) scheduleCleanStaleDerpLocked() { - if c.derpCleanupTimerArmed { - // Already going to fire soon. Let the existing one - // fire lest it get infinitely delayed by repeated - // calls to scheduleCleanStaleDerpLocked. - return - } - c.derpCleanupTimerArmed = true - if c.derpCleanupTimer != nil { - c.derpCleanupTimer.Reset(derpCleanStaleInterval) - } else { - c.derpCleanupTimer = time.AfterFunc(derpCleanStaleInterval, c.cleanStaleDerp) - } -} - -// DERPs reports the number of active DERP connections. -func (c *Conn) DERPs() int { - c.mu.Lock() - defer c.mu.Unlock() - - return len(c.activeDerp) -} - // Bind returns the wireguard-go conn.Bind for c. // // See https://pkg.go.dev/golang.zx2c4.com/wireguard/conn#Bind @@ -2966,13 +2095,6 @@ func (c *Conn) goroutinesRunningLocked() bool { return false } -func maxIdleBeforeSTUNShutdown() time.Duration { - if debugReSTUNStopOnIdle() { - return 45 * time.Second - } - return sessionActiveTimeout -} - func (c *Conn) shouldDoPeriodicReSTUNLocked() bool { if c.networkDown() { return false @@ -2987,7 +2109,7 @@ func (c *Conn) shouldDoPeriodicReSTUNLocked() bool { if debugReSTUNStopOnIdle() { c.logf("magicsock: periodicReSTUN: idle for %v", idleFor.Round(time.Second)) } - if idleFor > maxIdleBeforeSTUNShutdown() { + if idleFor > sessionActiveTimeout { if c.netMap != nil && c.netMap.Debug != nil && c.netMap.Debug.ForceBackgroundSTUN { // Overridden by control. return true @@ -3053,8 +2175,6 @@ func (c *Conn) listenPacket(network string, port uint16) (nettype.PacketConn, er return nettype.MakePacketListenerWithNetIP(netns.Listener(c.logf, c.netMon)).ListenPacket(ctx, network, addr) } -var debugBindSocket = envknob.RegisterBool("TS_DEBUG_MAGICSOCK_BIND_SOCKET") - // bindSocket initializes rucPtr if necessary and binds a UDP socket to it. // Network indicates the UDP socket type; it must be "udp4" or "udp6". // If rucPtr had an existing UDP socket bound, it closes that socket. @@ -3230,183 +2350,6 @@ func (c *Conn) ParseEndpoint(nodeKeyStr string) (conn.Endpoint, error) { return ep, nil } -// xnetBatchReaderWriter defines the batching i/o methods of -// golang.org/x/net/ipv4.PacketConn (and ipv6.PacketConn). -// TODO(jwhited): This should eventually be replaced with the standard library -// implementation of https://github.com/golang/go/issues/45886 -type xnetBatchReaderWriter interface { - xnetBatchReader - xnetBatchWriter -} - -type xnetBatchReader interface { - ReadBatch([]ipv6.Message, int) (int, error) -} - -type xnetBatchWriter interface { - WriteBatch([]ipv6.Message, int) (int, error) -} - -// batchingUDPConn is a UDP socket that provides batched i/o. -type batchingUDPConn struct { - pc nettype.PacketConn - xpc xnetBatchReaderWriter - rxOffload bool // supports UDP GRO or similar - txOffload atomic.Bool // supports UDP GSO or similar - setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing - getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing - sendBatchPool sync.Pool -} - -func (c *batchingUDPConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { - if c.rxOffload { - // UDP_GRO is opt-in on Linux via setsockopt(). Once enabled you may - // receive a "monster datagram" from any read call. The ReadFrom() API - // does not support passing the GSO size and is unsafe to use in such a - // case. Other platforms may vary in behavior, but we go with the most - // conservative approach to prevent this from becoming a footgun in the - // future. - return 0, netip.AddrPort{}, errors.New("rx UDP offload is enabled on this socket, single packet reads are unavailable") - } - return c.pc.ReadFromUDPAddrPort(p) -} - -func (c *batchingUDPConn) SetDeadline(t time.Time) error { - return c.pc.SetDeadline(t) -} - -func (c *batchingUDPConn) SetReadDeadline(t time.Time) error { - return c.pc.SetReadDeadline(t) -} - -func (c *batchingUDPConn) SetWriteDeadline(t time.Time) error { - return c.pc.SetWriteDeadline(t) -} - -const ( - // This was initially established for Linux, but may split out to - // GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the - // kernel's TX path, and UDP_GRO_CNT_MAX for RX. - udpSegmentMaxDatagrams = 64 -) - -const ( - // Exceeding these values results in EMSGSIZE. - maxIPv4PayloadLen = 1<<16 - 1 - 20 - 8 - maxIPv6PayloadLen = 1<<16 - 1 - 8 -) - -// coalesceMessages iterates msgs, coalescing them where possible while -// maintaining datagram order. All msgs have their Addr field set to addr. -func (c *batchingUDPConn) coalesceMessages(addr *net.UDPAddr, buffs [][]byte, msgs []ipv6.Message) int { - var ( - base = -1 // index of msg we are currently coalescing into - gsoSize int // segmentation size of msgs[base] - dgramCnt int // number of dgrams coalesced into msgs[base] - endBatch bool // tracking flag to start a new batch on next iteration of buffs - ) - maxPayloadLen := maxIPv4PayloadLen - if addr.IP.To4() == nil { - maxPayloadLen = maxIPv6PayloadLen - } - for i, buff := range buffs { - if i > 0 { - msgLen := len(buff) - baseLenBefore := len(msgs[base].Buffers[0]) - freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore - if msgLen+baseLenBefore <= maxPayloadLen && - msgLen <= gsoSize && - msgLen <= freeBaseCap && - dgramCnt < udpSegmentMaxDatagrams && - !endBatch { - msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...) - copy(msgs[base].Buffers[0][baseLenBefore:], buff) - if i == len(buffs)-1 { - c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) - } - dgramCnt++ - if msgLen < gsoSize { - // A smaller than gsoSize packet on the tail is legal, but - // it must end the batch. - endBatch = true - } - continue - } - } - if dgramCnt > 1 { - c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize)) - } - // Reset prior to incrementing base since we are preparing to start a - // new potential batch. - endBatch = false - base++ - gsoSize = len(buff) - msgs[base].OOB = msgs[base].OOB[:0] - msgs[base].Buffers[0] = buff - msgs[base].Addr = addr - dgramCnt = 1 - } - return base + 1 -} - -type sendBatch struct { - msgs []ipv6.Message - ua *net.UDPAddr -} - -func (c *batchingUDPConn) getSendBatch() *sendBatch { - batch := c.sendBatchPool.Get().(*sendBatch) - return batch -} - -func (c *batchingUDPConn) putSendBatch(batch *sendBatch) { - for i := range batch.msgs { - batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB} - } - c.sendBatchPool.Put(batch) -} - -func (c *batchingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { - batch := c.getSendBatch() - defer c.putSendBatch(batch) - if addr.Addr().Is6() { - as16 := addr.Addr().As16() - copy(batch.ua.IP, as16[:]) - batch.ua.IP = batch.ua.IP[:16] - } else { - as4 := addr.Addr().As4() - copy(batch.ua.IP, as4[:]) - batch.ua.IP = batch.ua.IP[:4] - } - batch.ua.Port = int(addr.Port()) - var ( - n int - retried bool - ) -retry: - if c.txOffload.Load() { - n = c.coalesceMessages(batch.ua, buffs, batch.msgs) - } else { - for i := range buffs { - batch.msgs[i].Buffers[0] = buffs[i] - batch.msgs[i].Addr = batch.ua - batch.msgs[i].OOB = batch.msgs[i].OOB[:0] - } - n = len(buffs) - } - - err := c.writeBatch(batch.msgs[:n]) - if err != nil && c.txOffload.Load() && neterror.ShouldDisableUDPGSO(err) { - c.txOffload.Store(false) - retried = true - goto retry - } - if retried { - return neterror.ErrUDPGSODisabled{OnLaddr: c.pc.LocalAddr().String(), RetryErr: err} - } - return err -} - func (c *batchingUDPConn) writeBatch(msgs []ipv6.Message) error { var head int for { @@ -3554,205 +2497,12 @@ func tryUpgradeToBatchingUDPConn(pconn nettype.PacketConn, network string, batch return b } -// RebindingUDPConn is a UDP socket that can be re-bound. -// Unix has no notion of re-binding a socket, so we swap it out for a new one. -type RebindingUDPConn struct { - // pconnAtomic is a pointer to the value stored in pconn, but doesn't - // require acquiring mu. It's used for reads/writes and only upon failure - // do the reads/writes then check pconn (after acquiring mu) to see if - // there's been a rebind meanwhile. - // pconn isn't really needed, but makes some of the code simpler - // to keep it distinct. - // Neither is expected to be nil, sockets are bound on creation. - pconnAtomic atomic.Pointer[nettype.PacketConn] - - mu sync.Mutex // held while changing pconn (and pconnAtomic) - pconn nettype.PacketConn - port uint16 -} - -// setConnLocked sets the provided nettype.PacketConn. It should be called only -// after acquiring RebindingUDPConn.mu. It upgrades the provided -// nettype.PacketConn to a *batchingUDPConn when appropriate. This upgrade -// is intentionally pushed closest to where read/write ops occur in order to -// avoid disrupting surrounding code that assumes nettype.PacketConn is a -// *net.UDPConn. -func (c *RebindingUDPConn) setConnLocked(p nettype.PacketConn, network string, batchSize int) { - upc := tryUpgradeToBatchingUDPConn(p, network, batchSize) - c.pconn = upc - c.pconnAtomic.Store(&upc) - c.port = uint16(c.localAddrLocked().Port) -} - -// currentConn returns c's current pconn, acquiring c.mu in the process. -func (c *RebindingUDPConn) currentConn() nettype.PacketConn { - c.mu.Lock() - defer c.mu.Unlock() - return c.pconn -} - -func (c *RebindingUDPConn) readFromWithInitPconn(pconn nettype.PacketConn, b []byte) (int, netip.AddrPort, error) { - for { - n, addr, err := pconn.ReadFromUDPAddrPort(b) - if err != nil && pconn != c.currentConn() { - pconn = *c.pconnAtomic.Load() - continue - } - return n, addr, err - } -} - -// ReadFromUDPAddrPort reads a packet from c into b. -// It returns the number of bytes copied and the source address. -func (c *RebindingUDPConn) ReadFromUDPAddrPort(b []byte) (int, netip.AddrPort, error) { - return c.readFromWithInitPconn(*c.pconnAtomic.Load(), b) -} - -// WriteBatchTo writes buffs to addr. -func (c *RebindingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { - for { - pconn := *c.pconnAtomic.Load() - b, ok := pconn.(*batchingUDPConn) - if !ok { - for _, buf := range buffs { - _, err := c.writeToUDPAddrPortWithInitPconn(pconn, buf, addr) - if err != nil { - return err - } - } - return nil - } - err := b.WriteBatchTo(buffs, addr) - if err != nil { - if pconn != c.currentConn() { - continue - } - return err - } - return err - } -} - -// ReadBatch reads messages from c into msgs. It returns the number of messages -// the caller should evaluate for nonzero len, as a zero len message may fall -// on either side of a nonzero. -func (c *RebindingUDPConn) ReadBatch(msgs []ipv6.Message, flags int) (int, error) { - for { - pconn := *c.pconnAtomic.Load() - b, ok := pconn.(*batchingUDPConn) - if !ok { - n, ap, err := c.readFromWithInitPconn(pconn, msgs[0].Buffers[0]) - if err == nil { - msgs[0].N = n - msgs[0].Addr = net.UDPAddrFromAddrPort(netaddr.Unmap(ap)) - return 1, nil - } - return 0, err - } - n, err := b.ReadBatch(msgs, flags) - if err != nil && pconn != c.currentConn() { - continue - } - return n, err - } -} - -func (c *RebindingUDPConn) Port() uint16 { - c.mu.Lock() - defer c.mu.Unlock() - return c.port -} - -func (c *RebindingUDPConn) LocalAddr() *net.UDPAddr { - c.mu.Lock() - defer c.mu.Unlock() - return c.localAddrLocked() -} - -func (c *RebindingUDPConn) localAddrLocked() *net.UDPAddr { - return c.pconn.LocalAddr().(*net.UDPAddr) -} - -// errNilPConn is returned by RebindingUDPConn.Close when there is no current pconn. -// It is for internal use only and should not be returned to users. -var errNilPConn = errors.New("nil pconn") - -func (c *RebindingUDPConn) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - return c.closeLocked() -} - -func (c *RebindingUDPConn) closeLocked() error { - if c.pconn == nil { - return errNilPConn - } - c.port = 0 - return c.pconn.Close() -} - -func (c *RebindingUDPConn) writeToUDPAddrPortWithInitPconn(pconn nettype.PacketConn, b []byte, addr netip.AddrPort) (int, error) { - for { - n, err := pconn.WriteToUDPAddrPort(b, addr) - if err != nil && pconn != c.currentConn() { - pconn = *c.pconnAtomic.Load() - continue - } - return n, err - } -} - -func (c *RebindingUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { - return c.writeToUDPAddrPortWithInitPconn(*c.pconnAtomic.Load(), b, addr) -} - func newBlockForeverConn() *blockForeverConn { c := new(blockForeverConn) c.cond = sync.NewCond(&c.mu) return c } -// blockForeverConn is a net.PacketConn whose reads block until it is closed. -type blockForeverConn struct { - mu sync.Mutex - cond *sync.Cond - closed bool -} - -func (c *blockForeverConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) { - c.mu.Lock() - for !c.closed { - c.cond.Wait() - } - c.mu.Unlock() - return 0, netip.AddrPort{}, net.ErrClosed -} - -func (c *blockForeverConn) WriteToUDPAddrPort(p []byte, addr netip.AddrPort) (int, error) { - // Silently drop writes. - return len(p), nil -} - -func (c *blockForeverConn) LocalAddr() net.Addr { - // Return a *net.UDPAddr because lots of code assumes that it will. - return new(net.UDPAddr) -} - -func (c *blockForeverConn) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - if c.closed { - return net.ErrClosed - } - c.closed = true - c.cond.Broadcast() - return nil -} - -func (c *blockForeverConn) SetDeadline(t time.Time) error { return errors.New("unimplemented") } -func (c *blockForeverConn) SetReadDeadline(t time.Time) error { return errors.New("unimplemented") } -func (c *blockForeverConn) SetWriteDeadline(t time.Time) error { return errors.New("unimplemented") } - // simpleDur rounds d such that it stringifies to something short. func simpleDur(d time.Duration) time.Duration { if d < time.Second { @@ -3764,16 +2514,6 @@ func simpleDur(d time.Duration) time.Duration { return d.Round(time.Minute) } -func (c *Conn) derpRegionCodeOfIDLocked(regionID int) string { - if c.derpMap == nil { - return "" - } - if r, ok := c.derpMap.Regions[regionID]; ok { - return r.RegionCode - } - return "" -} - func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) { c.mu.Lock() defer c.mu.Unlock() @@ -3854,14 +2594,6 @@ const ( // try to upgrade to a better path. goodEnoughLatency = 5 * time.Millisecond - // derpInactiveCleanupTime is how long a non-home DERP connection - // needs to be idle (last written to) before we close it. - derpInactiveCleanupTime = 60 * time.Second - - // derpCleanStaleInterval is how often cleanStaleDerp runs when there - // are potentially-stale DERP connections to close. - derpCleanStaleInterval = 15 * time.Second - // endpointsFreshEnoughDuration is how long we consider a // STUN-derived endpoint valid for. UDP NAT mappings typically // expire at 30 seconds, so this is a few seconds shy of that. @@ -3955,22 +2687,6 @@ type discoInfo struct { lastPingTime time.Time } -// derpAddrFamSelector is the derphttp.AddressFamilySelector we pass -// to derphttp.Client.SetAddressFamilySelector. -// -// It provides the hint as to whether in an IPv4-vs-IPv6 race that -// IPv4 should be held back a bit to give IPv6 a better-than-50/50 -// chance of winning. We only return true when we believe IPv6 will -// work anyway, so we don't artificially delay the connection speed. -type derpAddrFamSelector struct{ c *Conn } - -func (s derpAddrFamSelector) PreferIPv6() bool { - if r := s.c.lastNetCheckReport.Load(); r != nil { - return r.IPv6 - } - return false -} - type endpointTrackerEntry struct { endpoint tailcfg.Endpoint until time.Time diff --git a/wgengine/magicsock/rebinding_conn.go b/wgengine/magicsock/rebinding_conn.go new file mode 100644 index 000000000..161f52062 --- /dev/null +++ b/wgengine/magicsock/rebinding_conn.go @@ -0,0 +1,168 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package magicsock + +import ( + "errors" + "net" + "net/netip" + "sync" + "sync/atomic" + + "golang.org/x/net/ipv6" + "tailscale.com/net/netaddr" + "tailscale.com/types/nettype" +) + +// RebindingUDPConn is a UDP socket that can be re-bound. +// Unix has no notion of re-binding a socket, so we swap it out for a new one. +type RebindingUDPConn struct { + // pconnAtomic is a pointer to the value stored in pconn, but doesn't + // require acquiring mu. It's used for reads/writes and only upon failure + // do the reads/writes then check pconn (after acquiring mu) to see if + // there's been a rebind meanwhile. + // pconn isn't really needed, but makes some of the code simpler + // to keep it distinct. + // Neither is expected to be nil, sockets are bound on creation. + pconnAtomic atomic.Pointer[nettype.PacketConn] + + mu sync.Mutex // held while changing pconn (and pconnAtomic) + pconn nettype.PacketConn + port uint16 +} + +// setConnLocked sets the provided nettype.PacketConn. It should be called only +// after acquiring RebindingUDPConn.mu. It upgrades the provided +// nettype.PacketConn to a *batchingUDPConn when appropriate. This upgrade +// is intentionally pushed closest to where read/write ops occur in order to +// avoid disrupting surrounding code that assumes nettype.PacketConn is a +// *net.UDPConn. +func (c *RebindingUDPConn) setConnLocked(p nettype.PacketConn, network string, batchSize int) { + upc := tryUpgradeToBatchingUDPConn(p, network, batchSize) + c.pconn = upc + c.pconnAtomic.Store(&upc) + c.port = uint16(c.localAddrLocked().Port) +} + +// currentConn returns c's current pconn, acquiring c.mu in the process. +func (c *RebindingUDPConn) currentConn() nettype.PacketConn { + c.mu.Lock() + defer c.mu.Unlock() + return c.pconn +} + +func (c *RebindingUDPConn) readFromWithInitPconn(pconn nettype.PacketConn, b []byte) (int, netip.AddrPort, error) { + for { + n, addr, err := pconn.ReadFromUDPAddrPort(b) + if err != nil && pconn != c.currentConn() { + pconn = *c.pconnAtomic.Load() + continue + } + return n, addr, err + } +} + +// ReadFromUDPAddrPort reads a packet from c into b. +// It returns the number of bytes copied and the source address. +func (c *RebindingUDPConn) ReadFromUDPAddrPort(b []byte) (int, netip.AddrPort, error) { + return c.readFromWithInitPconn(*c.pconnAtomic.Load(), b) +} + +// WriteBatchTo writes buffs to addr. +func (c *RebindingUDPConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort) error { + for { + pconn := *c.pconnAtomic.Load() + b, ok := pconn.(*batchingUDPConn) + if !ok { + for _, buf := range buffs { + _, err := c.writeToUDPAddrPortWithInitPconn(pconn, buf, addr) + if err != nil { + return err + } + } + return nil + } + err := b.WriteBatchTo(buffs, addr) + if err != nil { + if pconn != c.currentConn() { + continue + } + return err + } + return err + } +} + +// ReadBatch reads messages from c into msgs. It returns the number of messages +// the caller should evaluate for nonzero len, as a zero len message may fall +// on either side of a nonzero. +func (c *RebindingUDPConn) ReadBatch(msgs []ipv6.Message, flags int) (int, error) { + for { + pconn := *c.pconnAtomic.Load() + b, ok := pconn.(*batchingUDPConn) + if !ok { + n, ap, err := c.readFromWithInitPconn(pconn, msgs[0].Buffers[0]) + if err == nil { + msgs[0].N = n + msgs[0].Addr = net.UDPAddrFromAddrPort(netaddr.Unmap(ap)) + return 1, nil + } + return 0, err + } + n, err := b.ReadBatch(msgs, flags) + if err != nil && pconn != c.currentConn() { + continue + } + return n, err + } +} + +func (c *RebindingUDPConn) Port() uint16 { + c.mu.Lock() + defer c.mu.Unlock() + return c.port +} + +func (c *RebindingUDPConn) LocalAddr() *net.UDPAddr { + c.mu.Lock() + defer c.mu.Unlock() + return c.localAddrLocked() +} + +func (c *RebindingUDPConn) localAddrLocked() *net.UDPAddr { + return c.pconn.LocalAddr().(*net.UDPAddr) +} + +// errNilPConn is returned by RebindingUDPConn.Close when there is no current pconn. +// It is for internal use only and should not be returned to users. +var errNilPConn = errors.New("nil pconn") + +func (c *RebindingUDPConn) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.closeLocked() +} + +func (c *RebindingUDPConn) closeLocked() error { + if c.pconn == nil { + return errNilPConn + } + c.port = 0 + return c.pconn.Close() +} + +func (c *RebindingUDPConn) writeToUDPAddrPortWithInitPconn(pconn nettype.PacketConn, b []byte, addr netip.AddrPort) (int, error) { + for { + n, err := pconn.WriteToUDPAddrPort(b, addr) + if err != nil && pconn != c.currentConn() { + pconn = *c.pconnAtomic.Load() + continue + } + return n, err + } +} + +func (c *RebindingUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { + return c.writeToUDPAddrPortWithInitPconn(*c.pconnAtomic.Load(), b, addr) +}