diff --git a/derp/derp_server.go b/derp/derp_server.go index 2e17cbfe5..07b4092e4 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -46,6 +46,7 @@ import ( "tailscale.com/tstime/rate" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/util/lru" "tailscale.com/util/mak" "tailscale.com/util/set" "tailscale.com/util/slicesx" @@ -154,11 +155,14 @@ type Server struct { verifyClientsURL string verifyClientsURLFailOpen bool - mu sync.Mutex - closed bool - netConns map[Conn]chan struct{} // chan is closed when conn closes - clients map[key.NodePublic]*clientSet - watchers set.Set[*sclient] // mesh peers + mu sync.Mutex + closed bool + flow map[flowKey]*flow + flows []*flow // slice of values of flow map + flowCleanIndex int + netConns map[Conn]chan struct{} // chan is closed when conn closes + clients map[key.NodePublic]*clientSet + watchers set.Set[*sclient] // mesh peers // clientsMesh tracks all clients in the cluster, both locally // and to mesh peers. If the value is nil, that means the // peer is only local (and thus in the clients Map, but not @@ -341,6 +345,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { packetsDroppedType: metrics.LabelMap{Label: "type"}, clients: map[key.NodePublic]*clientSet{}, clientsMesh: map[key.NodePublic]PacketForwarder{}, + flow: map[flowKey]*flow{}, netConns: map[Conn]chan struct{}{}, memSys0: ms.Sys, watchers: set.Set[*sclient]{}, @@ -866,9 +871,20 @@ func (s *Server) debugLogf(format string, v ...any) { } } +// onRunLoopDone is called when the run loop is done +// to clean up. +// +// It must only be called from the [slient.run] goroutine. +func (c *sclient) onRunLoopDone() { + c.flows.ForEach(func(k key.NodePublic, peer flowAndClientSet) { + peer.f.ref.Add(-1) + }) +} + // run serves the client until there's an error. // If the client hangs up or the server is closed, run returns nil, otherwise run returns an error. func (c *sclient) run(ctx context.Context) error { + defer c.onRunLoopDone() // Launch sender, but don't return from run until sender goroutine is done. var grp errgroup.Group sendCtx, cancelSender := context.WithCancel(ctx) @@ -1028,6 +1044,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { var dst *sclient s.mu.Lock() + flo := s.getMakeFlowLocked(srcKey, dstKey) if set, ok := s.clients[dstKey]; ok { dstLen = set.Len() dst = set.activeClient.Load() @@ -1050,7 +1067,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { return c.sendPkt(dst, pkt{ bs: contents, enqueuedAt: c.s.clock.Now(), - src: srcKey, + flow: flo, }) } @@ -1063,22 +1080,13 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return fmt.Errorf("client %v: recvPacket: %v", c.key, err) } - var fwd PacketForwarder - var dstLen int - var dst *sclient - - s.mu.Lock() - if set, ok := s.clients[dstKey]; ok { - dstLen = set.Len() - dst = set.activeClient.Load() - } - if dst == nil && dstLen < 1 { - fwd = s.clientsMesh[dstKey] - } - s.mu.Unlock() + flo, dst, fwd := c.lookupDest(dstKey) + flo.noteActivity() if dst == nil { if fwd != nil { + flo.pktSendRegion.Add(1) + flo.byteSendRegion.Add(1) s.packetsForwardedOut.Add(1) err := fwd.ForwardPacket(c.key, dstKey, contents) c.debugLogf("SendPacket for %s, forwarding via %s: %v", dstKey.ShortString(), fwd, err) @@ -1088,22 +1096,22 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } return nil } + flo.dropUnknownDest.Add(1) reason := dropReasonUnknownDest - if dstLen > 1 { - reason = dropReasonDupClient - } else { - c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) - } + c.requestPeerGoneWriteLimited(dstKey, contents, PeerGoneReasonNotHere) s.recordDrop(contents, c.key, dstKey, reason) c.debugLogf("SendPacket for %s, dropping with reason=%s", dstKey.ShortString(), reason) return nil } c.debugLogf("SendPacket for %s, sending directly", dstKey.ShortString()) + flo.pktSendLocal.Add(1) + flo.byteSendLocal.Add(1) + p := pkt{ bs: contents, enqueuedAt: c.s.clock.Now(), - src: c.key, + flow: flo, } return c.sendPkt(dst, p) } @@ -1151,6 +1159,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r } func (c *sclient) sendPkt(dst *sclient, p pkt) error { + // TODO(bradfitz): bump metrics on p.flow s := c.s dstKey := dst.key @@ -1511,6 +1520,7 @@ type sclient struct { br *bufio.Reader connectedAt time.Time preferred bool + flows lru.Cache[key.NodePublic, flowAndClientSet] // keyed by dest // Owned by sendLoop, not thread-safe. sawSrc map[key.NodePublic]set.Handle @@ -1563,8 +1573,12 @@ type pkt struct { // The memory is owned by pkt. bs []byte - // src is the who's the sender of the packet. - src key.NodePublic + // flow is the flow stats from the src to the dest. + flow *flow +} + +func (p pkt) src() key.NodePublic { + return p.flow.flowKey.Value().src } // peerGoneMsg is a request to write a peerGone frame to an sclient @@ -1635,14 +1649,13 @@ func (c *sclient) onSendLoopDone() { for { select { case pkt := <-c.sendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected) case pkt := <-c.discoSendQueue: - c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGoneDisconnected) + c.s.recordDrop(pkt.bs, pkt.src(), c.key, dropReasonGoneDisconnected) default: return } } - } func (c *sclient) sendLoop(ctx context.Context) error { @@ -1669,11 +1682,11 @@ func (c *sclient) sendLoop(ctx context.Context) error { werr = c.sendMeshUpdates() continue case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) continue case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) continue case msg := <-c.sendPongCh: @@ -1700,10 +1713,10 @@ func (c *sclient) sendLoop(ctx context.Context) error { werr = c.sendMeshUpdates() continue case msg := <-c.sendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) case msg := <-c.discoSendQueue: - werr = c.sendPacket(msg.src, msg.bs) + werr = c.sendPacket(msg.src(), msg.bs) c.recordQueueTime(msg.enqueuedAt) case msg := <-c.sendPongCh: werr = c.sendPong(msg) diff --git a/derp/flow.go b/derp/flow.go new file mode 100644 index 000000000..95f81cd6d --- /dev/null +++ b/derp/flow.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package derp + +import ( + "sync/atomic" + "time" + "unique" + + "tailscale.com/types/key" +) + +type flowKey struct { + src, dst key.NodePublic +} + +// flow tracks metadata about a directional flow of packets from a source +// node to a destination node. The public keys of the src is known +// by the caller. +type flow struct { + createdUnixNano int64 // int64 instead of time.Time to keep flow smaller + index int // index in Server.flows slice or -1 if not; guarded by Server.mu + flowKey unique.Handle[flowKey] // TODO: make this a unique handle of two unique handles for each NodePublic? + + roughActivityUnixTime atomic.Int64 // unix sec of recent activity, updated at most once a minute + pktSendRegion atomic.Int64 + byteSendRegion atomic.Int64 + pktSendLocal atomic.Int64 + byteSendLocal atomic.Int64 + dropUnknownDest atomic.Int64 // no local or region client for dest + dropGone atomic.Int64 + + // ref is the reference count of things (*Server, *sclient) holding on + // to this flow. As of 2024-09-18 it is currently only informational + // and not used for anything. The Server adds/removes a ref count when + // it's remove from its map and each 0, 1 or more sclients for a given + // recently active flow also add/remove a ref count. + // + // This might be used in the future as an alternate Server.flow eviction + // strategy but for now it's just a debug tool. We do want to keep flow + // stats surviving a brief client disconnections, so we do want Server + // to keep at least a momentary ref count alive. + ref atomic.Int64 +} + +// noteActivity updates f.recentActivityUnixTime if it's been +// more than a minute. +func (f *flow) noteActivity() { + now := time.Now().Unix() + if now-f.roughActivityUnixTime.Load() > 60 { + f.roughActivityUnixTime.Store(now) + } +} + +// getMakeFlow either gets or makes a new flow for the given source and +// destination nodes. +func (s *Server) getMakeFlow(src, dst key.NodePublic) *flow { + s.mu.Lock() + defer s.mu.Unlock() + return s.getMakeFlowLocked(src, dst) +} + +func (s *Server) getMakeFlowLocked(src, dst key.NodePublic) *flow { + k := flowKey{src, dst} + f, ok := s.flow[k] + if ok { + return f + } + now := time.Now() + f = &flow{ + createdUnixNano: now.UnixNano(), + index: len(s.flows), + flowKey: unique.Make(k), + } + f.roughActivityUnixTime.Store(now.Unix()) + f.ref.Add(1) // for Server's ref in the s.flows map itself + + // As penance for the one flow we're about to add to the map and slice + // above, check two old flows for removal. We roll around and around the + // flows slice, so this is a simple way to eventually check everything for + // removal before we double in size. + for range 2 { + s.maybeCleanOldFlowLocked() + } + + s.flow[k] = f + s.flows = append(s.flows, f) + + return f +} + +func (s *Server) maybeCleanOldFlowLocked() { + if len(s.flows) == 0 { + return + } + s.flowCleanIndex++ + if s.flowCleanIndex >= len(s.flows) { + s.flowCleanIndex = 0 + } + f := s.flows[s.flowCleanIndex] + + now := time.Now().Unix() + ageSec := now - f.roughActivityUnixTime.Load() + if ageSec > 3600 { + // No activity in an hour. Remove it. + delete(s.flow, f.flowKey.Value()) + holeIdx := f.index + s.flows[holeIdx] = s.flows[len(s.flows)-1] + s.flows[holeIdx].index = holeIdx + s.flows = s.flows[:len(s.flows)-1] + f.ref.Add(-1) + return + } +} + +type flowAndClientSet struct { + f *flow // always non-nil + cs *clientSet // may be nil if peer not connected/known +} + +// lookupDest returns the flow (always non-nil) and sclient and/or +// PacketForwarder (at least one of which will be nil, possibly both) for the +// given destination node. + +// It must only be called from the [sclient.run] goroutine. +func (c *sclient) lookupDest(dst key.NodePublic) (_ *flow, _ *sclient, fwd PacketForwarder) { + peer, ok := c.flows.GetOk(dst) + if ok && peer.cs != nil { + if c := peer.cs.activeClient.Load(); c != nil { + // Common case for hot flows within the same node: we know the + // clientSet and no mutex is needed. + return peer.f, c, nil + } + } + + if peer.f == nil { + peer.f = c.s.getMakeFlow(c.key, dst) + peer.f.ref.Add(1) + // At least store the flow in the map, even if we don't find the + // clientSet later. In theory we could coallesce this map write with a + // possible one later, but they should be rare and uncontended so we + // don't care as of 2024-09-18. + c.flows.Set(dst, peer) + c.maybeCleanFlows() + } + + srv := c.s + srv.mu.Lock() + set, ok := srv.clients[dst] + if ok { + if c := set.activeClient.Load(); c != nil { + srv.mu.Unlock() + peer.cs = set + c.flows.Set(dst, peer) + c.maybeCleanFlows() + return peer.f, c, nil + } + fwd = srv.clientsMesh[dst] + } + srv.mu.Unlock() + return peer.f, nil, fwd // fwd may be nil too +} + +// maybeCleanFlows cleans the oldest element from the client flows cache if +// it's too big. +// +// It must only be called from the [sclient.run] goroutine. +func (c *sclient) maybeCleanFlows() { + const maxClientFlowTrack = 100 + if c.flows.Len() <= maxClientFlowTrack { + return + } + + oldest, _ := c.flows.OldestKey() + facs, ok := c.flows.PeekOk(oldest) + if !ok { + panic("lookupDest: OldestKey lied") + } + facs.f.ref.Add(-1) + c.flows.Delete(oldest) +} diff --git a/derp/flow_test.go b/derp/flow_test.go new file mode 100644 index 000000000..b3a740bc3 --- /dev/null +++ b/derp/flow_test.go @@ -0,0 +1,52 @@ +package derp + +import ( + "testing" + "unique" + + "go4.org/mem" + "tailscale.com/types/key" +) + +func BenchmarkUnique(b *testing.B) { + var keys [100]key.NodePublic + for i := range keys { + keys[i] = key.NodePublicFromRaw32(mem.B([]byte{31: byte(i)})) + } + b.Run("raw", func(b *testing.B) { + m := map[flowKey]bool{} + for range b.N { + for _, k := range keys { + key := flowKey{k, k} + if _, ok := m[key]; !ok { + m[key] = true + } + } + } + }) + b.Run("unique-tightmake", func(b *testing.B) { + m := map[unique.Handle[flowKey]]bool{} + for range b.N { + for _, k := range keys { + key := unique.Make(flowKey{k, k}) + if _, ok := m[key]; !ok { + m[key] = true + } + } + } + }) + b.Run("unique-makeonce", func(b *testing.B) { + m := map[unique.Handle[flowKey]]bool{} + ukeys := make([]unique.Handle[flowKey], len(keys)) + for i, k := range keys { + ukeys[i] = unique.Make(flowKey{k, k}) + } + for range b.N { + for _, key := range ukeys { + if _, ok := m[key]; !ok { + m[key] = true + } + } + } + }) +} diff --git a/util/lru/lru.go b/util/lru/lru.go index 8e4dd417b..d4e836cf1 100644 --- a/util/lru/lru.go +++ b/util/lru/lru.go @@ -133,6 +133,15 @@ func (c *Cache[K, V]) DeleteOldest() { } } +// OldestKey returns the oldest key, without bumping it to the head. +// If the cache is empty, it returns ok false. +func (c *Cache[K, V]) OldestKey() (key K, ok bool) { + if c.head == nil { + return key, false + } + return c.head.prev.key, true +} + // Len returns the number of items in the cache. func (c *Cache[K, V]) Len() int { return len(c.lookup) }