diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index fb00fb2b4..fe7e0ad07 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -49,7 +49,6 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/nettype" "tailscale.com/types/opt" - "tailscale.com/types/structs" "tailscale.com/types/wgkey" "tailscale.com/version" ) @@ -147,21 +146,21 @@ type Conn struct { // TODO(danderson): now that we have global rate-limiting, is this still useful? sendLogLimit *rate.Limiter - // bufferedIPv4From and bufferedIPv4Packet are owned by - // ReceiveIPv4, and used when both a DERP and IPv4 packet arrive - // at the same time. It stores the IPv4 packet for use in the next call. - bufferedIPv4From netaddr.IPPort // if non-zero, then bufferedIPv4Packet is valid - bufferedIPv4Packet []byte // the received packet (reused, owned by ReceiveIPv4) - // stunReceiveFunc holds the current STUN packet processing func. // Its Loaded value is always non-nil. stunReceiveFunc atomic.Value // of func(p []byte, fromAddr *net.UDPAddr) - // udpRecvCh and derpRecvCh are used by ReceiveIPv4 to multiplex - // reads from DERP and the pconn4. - udpRecvCh chan udpReadResult + // derpRecvCh is used by ReceiveIPv4 to read DERP messages. derpRecvCh chan derpReadResult + // derpRecvCountAtomic is atomically incremented by runDerpReader whenever + // a DERP message arrives. It's incremented before runDerpReader is interrupted. + derpRecvCountAtomic int64 + // derpRecvCountLast is used by ReceiveIPv4 to compare against + // its last read value of derpRecvCountAtomic to determine + // whether a DERP channel read should be done. + derpRecvCountLast int64 // owned by ReceiveIPv4 + // ============================================================ mu sync.Mutex // guards all following fields; see userspaceEngine lock ordering rules muCond *sync.Cond @@ -421,7 +420,6 @@ func newConn() *Conn { addrsByUDP: make(map[netaddr.IPPort]*addrSet), addrsByKey: make(map[key.Public]*addrSet), derpRecvCh: make(chan derpReadResult), - udpRecvCh: make(chan udpReadResult), derpStarted: make(chan struct{}), peerLastDerp: make(map[key.Public]int), endpointOfDisco: make(map[tailcfg.DiscoKey]*discoEndpoint), @@ -1379,7 +1377,16 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr netaddr.IPPort, d // Ignore. // TODO: handle endpoint notification messages. continue + } + // Before we wake up ReceiveIPv4 with SetReadDeadline, + // note that a DERP packet has arrived. ReceiveIPv4 + // will read this field to note that its UDP read + // error is due to us. + atomic.AddInt64(&c.derpRecvCountAtomic, 1) + // Cancel the pconn read goroutine. + c.pconn4.SetReadDeadline(aLongTimeAgo) + select { case <-ctx.Done(): return @@ -1445,54 +1452,16 @@ func (c *Conn) findEndpoint(ipp netaddr.IPPort, addr *net.UDPAddr, packet []byte } } + if addr == nil { + addr = ipp.UDPAddr() + } return c.findLegacyEndpointLocked(ipp, addr, packet) } -type udpReadResult struct { - _ structs.Incomparable - n int - err error - addr *net.UDPAddr - ipp netaddr.IPPort -} - // aLongTimeAgo is a non-zero time, far in the past, used for // immediate cancellation of network operations. var aLongTimeAgo = time.Unix(233431200, 0) -// awaitUDP4 reads a single IPv4 UDP packet (or an error) and sends it -// to c.udpRecvCh, skipping over (but handling) any STUN replies. -func (c *Conn) awaitUDP4(b []byte) { - for { - n, pAddr, err := c.pconn4.ReadFrom(b) - if err != nil { - select { - case c.udpRecvCh <- udpReadResult{err: err}: - case <-c.donec: - } - return - } - addr := pAddr.(*net.UDPAddr) - ipp, ok := netaddr.FromStdAddr(addr.IP, addr.Port, addr.Zone) - if !ok { - continue - } - if stun.Is(b[:n]) { - c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b[:n], ipp) - continue - } - if c.handleDiscoMessage(b[:n], ipp) { - continue - } - - select { - case c.udpRecvCh <- udpReadResult{n: n, addr: addr, ipp: ipp}: - case <-c.donec: - } - return - } -} - // noteRecvActivityFromEndpoint calls the c.noteRecvActivity hook if // e is a discovery-capable peer and this is the first receive activity // it's got in awhile (in last 10 seconds). @@ -1505,128 +1474,88 @@ func (c *Conn) noteRecvActivityFromEndpoint(e conn.Endpoint) { } } -func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) { - var addr *net.UDPAddr -Top: - // First, process any buffered packet from earlier. - if from := c.bufferedIPv4From; !from.IsZero() { - c.bufferedIPv4From = netaddr.IPPort{} - addr = from.UDPAddr() - ep := c.findEndpoint(from, addr, c.bufferedIPv4Packet) - if ep == nil { - goto Top - } - c.noteRecvActivityFromEndpoint(ep) - return copy(b, c.bufferedIPv4Packet), ep, nil +func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, error) { + if c.pconn6 == nil { + return 0, nil, syscall.EAFNOSUPPORT } - - go c.awaitUDP4(b) - - // Once the above goroutine has started, it owns b until it writes - // to udpRecvCh. The code below must not access b until it's - // completed a successful receive on udpRecvCh. - - var ipp netaddr.IPPort - - select { - case dm := <-c.derpRecvCh: - // Cancel the pconn read goroutine - c.pconn4.SetReadDeadline(aLongTimeAgo) - // Wait for the UDP-reading goroutine to be done, since it's currently - // the owner of the b []byte buffer: - select { - case um := <-c.udpRecvCh: - if um.err != nil { - // The normal case. The SetReadDeadline interrupted - // the read and we get an error which we now ignore. - } else { - // The pconn.ReadFrom succeeded and was about to send, - // but DERP sent first. So now we have both ready. - // Save the UDP packet away for use by the next - // ReceiveIPv4 call. - c.bufferedIPv4From = um.ipp - c.bufferedIPv4Packet = append(c.bufferedIPv4Packet[:0], b[:um.n]...) - } - c.pconn4.SetReadDeadline(time.Time{}) - case <-c.donec: - return 0, nil, errors.New("Conn closed") - } - var regionID int - n, regionID = dm.n, dm.regionID - ncopy := dm.copyBuf(b) - if ncopy != n { - err = fmt.Errorf("received DERP packet of length %d that's too big for WireGuard ReceiveIPv4 buf size %d", n, ncopy) - c.logf("magicsock: %v", err) + for { + n, pAddr, err := c.pconn6.ReadFrom(b) + if err != nil { return 0, nil, err } - - ipp = netaddr.IPPort{IP: derpMagicIPAddr, Port: uint16(regionID)} - if c.handleDiscoMessage(b[:n], ipp) { - goto Top + if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok { + return n, ep, nil } + } +} - var ( - didNoteRecvActivity bool - discoEp *discoEndpoint - asEp *addrSet - ) - c.mu.Lock() - if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok { - discoEp = c.endpointOfDisco[dk] - // If we know about the node (it's in discoOfNode) but don't know about the - // endpoint, that's because it's an idle peer that doesn't yet exist in the - // wireguard config. So run the receive hook, if defined, which should - // create the wireguard peer. - if discoEp == nil && c.noteRecvActivity != nil { - didNoteRecvActivity = true - c.mu.Unlock() // release lock before calling noteRecvActivity - c.noteRecvActivity(dk) // (calls back into CreateEndpoint) - // Now require the lock. No invariants need to be rechecked; just - // 1-2 map lookups follow that are harmless if, say, the peer has - // been deleted during this time. In that case we'll treate it as a - // legacy pre-disco UDP receive and hand it to wireguard which'll - // likely just drop it. - c.mu.Lock() - - discoEp = c.endpointOfDisco[dk] - c.logf("magicsock: DERP packet received from idle peer %v; created=%v", dm.src.ShortString(), ep != nil) - } - } - if !c.disableLegacy { - asEp = c.addrsByKey[dm.src] - } - c.mu.Unlock() +func (c *Conn) derpPacketArrived() bool { + rc := atomic.LoadInt64(&c.derpRecvCountAtomic) + if rc != c.derpRecvCountLast { + c.derpRecvCountLast = rc + return true + } + return false +} - if discoEp != nil { - ep = discoEp - } else if asEp != nil { - ep = asEp - } else { - key := wgkey.Key(dm.src) - c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString()) - ep = c.findEndpoint(ipp, addr, b[:n]) - if ep == nil { - goto Top +// ReceiveIPv4 is called by wireguard-go to receive an IPv4 packet. +// In Tailscale's case, that packet might also arrive via DERP. A DERP packet arrival +// aborts the pconn4 read deadline to make it fail. +func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) { + for { + n, pAddr, err := c.pconn4.ReadFrom(b) + if err != nil { + // If the pconn4 read failed, the likely reason is a DERP reader received + // a packet and interrupted us. + // It's possible for ReadFrom to return a non deadline exceeded error + // and for there to have also had a DERP packet arrive, but that's fine: + // we'll get the same error from ReadFrom later. + if c.derpPacketArrived() { + c.pconn4.SetReadDeadline(time.Time{}) // restore + n, ep, err = c.receiveIPv4DERP(b) + if err == errLoopAgain { + continue + } + return n, ep, err } - } - - if !didNoteRecvActivity { - c.noteRecvActivityFromEndpoint(ep) - } - return n, ep, nil - - case um := <-c.udpRecvCh: - if um.err != nil { return 0, nil, err } - n, addr, ipp = um.n, um.addr, um.ipp - ep = c.findEndpoint(ipp, addr, b[:n]) - if ep == nil { - goto Top + if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok { + return n, ep, nil } - c.noteRecvActivityFromEndpoint(ep) - return n, ep, nil + } +} +// receiveIP is the shared bits of ReceiveIPv4 and ReceiveIPv6. +func (c *Conn) receiveIP(b []byte, ua *net.UDPAddr) (ep conn.Endpoint, ok bool) { + ipp, ok := netaddr.FromStdAddr(ua.IP, ua.Port, ua.Zone) + if !ok { + return + } + if stun.Is(b) { + c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp) + return + } + if c.handleDiscoMessage(b, ipp) { + return + } + ep = c.findEndpoint(ipp, ua, b) + if ep == nil { + return + } + c.noteRecvActivityFromEndpoint(ep) + return ep, true +} + +var errLoopAgain = errors.New("received packet was not a wireguard-go packet or no endpoint found") + +// receiveIPv4DERP reads a packet from c.derpRecvCh into b and returns the associated endpoint. +// +// If the packet was a disco message or the peer endpoint wasn't +// found, the returned error is errLoopAgain. +func (c *Conn) receiveIPv4DERP(b []byte) (n int, ep conn.Endpoint, err error) { + var dm derpReadResult + select { case <-c.donec: // Socket has been shut down. All the producers of packets // respond to the context cancellation and go away, so we have @@ -1639,38 +1568,71 @@ Top: // Clos() on magicsock, and expects ReceiveIPv4 to unblock // with an error so it can clean up. return 0, nil, errors.New("socket closed") + case dm = <-c.derpRecvCh: + // Below. } -} -func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, error) { - if c.pconn6 == nil { - return 0, nil, syscall.EAFNOSUPPORT + var regionID int + n, regionID = dm.n, dm.regionID + ncopy := dm.copyBuf(b) + if ncopy != n { + err = fmt.Errorf("received DERP packet of length %d that's too big for WireGuard ReceiveIPv4 buf size %d", n, ncopy) + c.logf("magicsock: %v", err) + return 0, nil, err } - for { - n, pAddr, err := c.pconn6.ReadFrom(b) - if err != nil { - return 0, nil, err - } - addr := pAddr.(*net.UDPAddr) - ipp, ok := netaddr.FromStdAddr(addr.IP, addr.Port, addr.Zone) - if !ok { - continue - } - if stun.Is(b[:n]) { - c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b[:n], ipp) - continue - } - if c.handleDiscoMessage(b[:n], ipp) { - continue + + ipp := netaddr.IPPort{IP: derpMagicIPAddr, Port: uint16(regionID)} + if c.handleDiscoMessage(b[:n], ipp) { + return 0, nil, errLoopAgain + } + + var ( + didNoteRecvActivity bool + discoEp *discoEndpoint + asEp *addrSet + ) + c.mu.Lock() + if dk, ok := c.discoOfNode[tailcfg.NodeKey(dm.src)]; ok { + discoEp = c.endpointOfDisco[dk] + // If we know about the node (it's in discoOfNode) but don't know about the + // endpoint, that's because it's an idle peer that doesn't yet exist in the + // wireguard config. So run the receive hook, if defined, which should + // create the wireguard peer. + if discoEp == nil && c.noteRecvActivity != nil { + didNoteRecvActivity = true + c.mu.Unlock() // release lock before calling noteRecvActivity + c.noteRecvActivity(dk) // (calls back into CreateEndpoint) + // Now require the lock. No invariants need to be rechecked; just + // 1-2 map lookups follow that are harmless if, say, the peer has + // been deleted during this time. + c.mu.Lock() + + discoEp = c.endpointOfDisco[dk] + c.logf("magicsock: DERP packet received from idle peer %v; created=%v", dm.src.ShortString(), ep != nil) } + } + if !c.disableLegacy { + asEp = c.addrsByKey[dm.src] + } + c.mu.Unlock() - ep := c.findEndpoint(ipp, addr, b[:n]) + if discoEp != nil { + ep = discoEp + } else if asEp != nil { + ep = asEp + } else { + key := wgkey.Key(dm.src) + c.logf("magicsock: DERP packet from unknown key: %s", key.ShortString()) + ep = c.findEndpoint(ipp, nil, b[:n]) if ep == nil { - continue + return 0, nil, errLoopAgain } + } + + if !didNoteRecvActivity { c.noteRecvActivityFromEndpoint(ep) - return n, ep, nil } + return n, ep, nil } // discoLogLevel controls the verbosity of discovery log messages. diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index 73d5f1468..2a5c0d3da 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" "unsafe" @@ -1381,7 +1382,7 @@ func stringifyConfig(cfg wgcfg.Config) string { return string(j) } -func TestDiscoEndpointAlignment(t *testing.T) { +func Test32bitAlignment(t *testing.T) { var de discoEndpoint off := unsafe.Offsetof(de.lastRecvUnixAtomic) if off%8 != 0 { @@ -1393,6 +1394,8 @@ func TestDiscoEndpointAlignment(t *testing.T) { if de.isFirstRecvActivityInAwhile() { t.Error("expected false on second call") } + var c Conn + atomic.AddInt64(&c.derpRecvCountAtomic, 1) } func BenchmarkReceiveFrom(b *testing.B) {