diff --git a/cmd/derper/depaware.txt b/cmd/derper/depaware.txt index 6608faaf7..9c720fa60 100644 --- a/cmd/derper/depaware.txt +++ b/cmd/derper/depaware.txt @@ -2,6 +2,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa filippo.io/edwards25519 from github.com/hdevalence/ed25519consensus filippo.io/edwards25519/field from filippo.io/edwards25519 + github.com/axiomhq/hyperloglog from tailscale.com/derp/derpserver github.com/beorn7/perks/quantile from github.com/prometheus/client_golang/prometheus 💣 github.com/cespare/xxhash/v2 from github.com/prometheus/client_golang/prometheus github.com/coder/websocket from tailscale.com/cmd/derper+ @@ -9,6 +10,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa github.com/coder/websocket/internal/util from github.com/coder/websocket github.com/coder/websocket/internal/xsync from github.com/coder/websocket W 💣 github.com/dblohm7/wingoes from tailscale.com/util/winutil + github.com/dgryski/go-metro from github.com/axiomhq/hyperloglog github.com/fxamacker/cbor/v2 from tailscale.com/tka github.com/go-json-experiment/json from tailscale.com/types/opt+ github.com/go-json-experiment/json/internal from github.com/go-json-experiment/json+ diff --git a/derp/derpserver/derpserver.go b/derp/derpserver/derpserver.go index 0bbc66780..1879e0c53 100644 --- a/derp/derpserver/derpserver.go +++ b/derp/derpserver/derpserver.go @@ -36,6 +36,7 @@ import ( "sync/atomic" "time" + "github.com/axiomhq/hyperloglog" "go4.org/mem" "golang.org/x/sync/errgroup" "tailscale.com/client/local" @@ -1643,6 +1644,12 @@ type sclient struct { sawSrc map[key.NodePublic]set.Handle bw *lazyBufioWriter + // senderCardinality estimates the number of unique peers that have + // sent packets to this client. Owned by sendLoop, protected by + // senderCardinalityMu for reads from other goroutines. + senderCardinalityMu sync.Mutex + senderCardinality *hyperloglog.Sketch + // Guarded by s.mu // // peerStateChange is used by mesh peers (a set of regional @@ -1778,6 +1785,8 @@ func (c *sclient) onSendLoopDone() { func (c *sclient) sendLoop(ctx context.Context) error { defer c.onSendLoopDone() + c.senderCardinality = hyperloglog.New() + jitter := rand.N(5 * time.Second) keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(derp.KeepAlive + jitter) defer keepAliveTick.Stop() @@ -2000,6 +2009,11 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) if withKey { pktLen += key.NodePublicRawLen c.noteSendFromSrc(srcKey) + if c.senderCardinality != nil { + c.senderCardinalityMu.Lock() + c.senderCardinality.Insert(srcKey.AppendTo(nil)) + c.senderCardinalityMu.Unlock() + } } if err = derp.WriteFrameHeader(c.bw.bw(), derp.FrameRecvPacket, uint32(pktLen)); err != nil { return err @@ -2013,6 +2027,17 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error) return err } +// EstimatedUniqueSenders returns an estimate of the number of unique peers +// that have sent packets to this client. +func (c *sclient) EstimatedUniqueSenders() uint64 { + c.senderCardinalityMu.Lock() + defer c.senderCardinalityMu.Unlock() + if c.senderCardinality == nil { + return 0 + } + return c.senderCardinality.Estimate() +} + // noteSendFromSrc notes that we are about to write a packet // from src to sclient. // @@ -2295,7 +2320,8 @@ type BytesSentRecv struct { Sent uint64 Recv uint64 // Key is the public key of the client which sent/received these bytes. - Key key.NodePublic + Key key.NodePublic + UniqueSenders uint64 `json:",omitzero"` } // parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic. @@ -2349,6 +2375,11 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) { if prev.Sent < next.Sent || prev.Recv < next.Recv { if pkey, ok := s.keyOfAddr[k]; ok { next.Key = pkey + if cs, ok := s.clients[pkey]; ok { + if c := cs.activeClient.Load(); c != nil { + next.UniqueSenders = c.EstimatedUniqueSenders() + } + } if err := enc.Encode(next); err != nil { s.mu.Unlock() return diff --git a/derp/derpserver/derpserver_test.go b/derp/derpserver/derpserver_test.go index 2db5f25bc..1dd86f314 100644 --- a/derp/derpserver/derpserver_test.go +++ b/derp/derpserver/derpserver_test.go @@ -9,6 +9,7 @@ import ( "context" "crypto/x509" "encoding/asn1" + "encoding/binary" "expvar" "fmt" "log" @@ -20,6 +21,7 @@ import ( "testing" "time" + "github.com/axiomhq/hyperloglog" qt "github.com/frankban/quicktest" "go4.org/mem" "golang.org/x/time/rate" @@ -755,6 +757,35 @@ func TestParseSSOutput(t *testing.T) { } } +func TestServeDebugTrafficUniqueSenders(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + clientKey := key.NewNode().Public() + c := &sclient{ + key: clientKey, + s: s, + logf: logger.Discard, + senderCardinality: hyperloglog.New(), + } + + for i := 0; i < 5; i++ { + c.senderCardinality.Insert(key.NewNode().Public().AppendTo(nil)) + } + + s.mu.Lock() + cs := &clientSet{} + cs.activeClient.Store(c) + s.clients[clientKey] = cs + s.mu.Unlock() + + estimate := c.EstimatedUniqueSenders() + t.Logf("Estimated unique senders: %d", estimate) + if estimate < 4 || estimate > 6 { + t.Errorf("EstimatedUniqueSenders() = %d, want ~5 (4-6 range)", estimate) + } +} + func TestGetPerClientSendQueueDepth(t *testing.T) { c := qt.New(t) envKey := "TS_DEBUG_DERP_PER_CLIENT_SEND_QUEUE_DEPTH" @@ -780,3 +811,167 @@ func TestGetPerClientSendQueueDepth(t *testing.T) { }) } } + +func TestSenderCardinality(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + c := &sclient{ + key: key.NewNode().Public(), + s: s, + logf: logger.WithPrefix(t.Logf, "test client: "), + } + + if got := c.EstimatedUniqueSenders(); got != 0 { + t.Errorf("EstimatedUniqueSenders() before init = %d, want 0", got) + } + + c.senderCardinality = hyperloglog.New() + + if got := c.EstimatedUniqueSenders(); got != 0 { + t.Errorf("EstimatedUniqueSenders() with no senders = %d, want 0", got) + } + + senders := make([]key.NodePublic, 10) + for i := range senders { + senders[i] = key.NewNode().Public() + c.senderCardinality.Insert(senders[i].AppendTo(nil)) + } + + estimate := c.EstimatedUniqueSenders() + t.Logf("Estimated unique senders after 10 inserts: %d", estimate) + + if estimate < 8 || estimate > 12 { + t.Errorf("EstimatedUniqueSenders() = %d, want ~10 (8-12 range)", estimate) + } + + for i := 0; i < 5; i++ { + c.senderCardinality.Insert(senders[i].AppendTo(nil)) + } + + estimate2 := c.EstimatedUniqueSenders() + t.Logf("Estimated unique senders after duplicates: %d", estimate2) + + if estimate2 < 8 || estimate2 > 12 { + t.Errorf("EstimatedUniqueSenders() after duplicates = %d, want ~10 (8-12 range)", estimate2) + } +} + +func TestSenderCardinality100(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + c := &sclient{ + key: key.NewNode().Public(), + s: s, + logf: logger.WithPrefix(t.Logf, "test client: "), + senderCardinality: hyperloglog.New(), + } + + numSenders := 100 + for i := 0; i < numSenders; i++ { + c.senderCardinality.Insert(key.NewNode().Public().AppendTo(nil)) + } + + estimate := c.EstimatedUniqueSenders() + t.Logf("Estimated unique senders for 100 actual senders: %d", estimate) + + if estimate < 85 || estimate > 115 { + t.Errorf("EstimatedUniqueSenders() = %d, want ~100 (85-115 range)", estimate) + } +} + +func TestSenderCardinalityTracking(t *testing.T) { + s := New(key.NewNode(), t.Logf) + defer s.Close() + + c := &sclient{ + key: key.NewNode().Public(), + s: s, + logf: logger.WithPrefix(t.Logf, "test client: "), + senderCardinality: hyperloglog.New(), + } + + zeroKey := key.NodePublic{} + if zeroKey != (key.NodePublic{}) { + c.senderCardinality.Insert(zeroKey.AppendTo(nil)) + } + + if estimate := c.EstimatedUniqueSenders(); estimate != 0 { + t.Errorf("EstimatedUniqueSenders() after zero key = %d, want 0", estimate) + } + + sender1 := key.NewNode().Public() + sender2 := key.NewNode().Public() + + if sender1 != (key.NodePublic{}) { + c.senderCardinality.Insert(sender1.AppendTo(nil)) + } + if sender2 != (key.NodePublic{}) { + c.senderCardinality.Insert(sender2.AppendTo(nil)) + } + + estimate := c.EstimatedUniqueSenders() + t.Logf("Estimated unique senders after 2 senders: %d", estimate) + + if estimate < 1 || estimate > 3 { + t.Errorf("EstimatedUniqueSenders() = %d, want ~2 (1-3 range)", estimate) + } +} + +func BenchmarkHyperLogLogInsert(b *testing.B) { + hll := hyperloglog.New() + sender := key.NewNode().Public() + senderBytes := sender.AppendTo(nil) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + hll.Insert(senderBytes) + } +} + +func BenchmarkHyperLogLogInsertUnique(b *testing.B) { + hll := hyperloglog.New() + + b.ResetTimer() + + buf := make([]byte, 32) + for i := 0; i < b.N; i++ { + binary.LittleEndian.PutUint64(buf, uint64(i)) + hll.Insert(buf) + } +} + +func BenchmarkHyperLogLogEstimate(b *testing.B) { + hll := hyperloglog.New() + + for i := 0; i < 100; i++ { + hll.Insert(key.NewNode().Public().AppendTo(nil)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = hll.Estimate() + } +} + +func BenchmarkSenderCardinalityOverhead(b *testing.B) { + hll := hyperloglog.New() + sender := key.NewNode().Public() + + b.Run("WithTracking", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + if hll != nil { + hll.Insert(sender.AppendTo(nil)) + } + } + }) + + b.Run("WithoutTracking", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = sender.AppendTo(nil) + } + }) +} diff --git a/flake.nix b/flake.nix index 505061a76..855ce555b 100644 --- a/flake.nix +++ b/flake.nix @@ -151,4 +151,4 @@ }); }; } -# nix-direnv cache busting line: sha256-jJSSXMyUqcJoZuqfSlBsKDQezyqS+jDkRglMMjG1K8g= +# nix-direnv cache busting line: sha256-IkodqRYdueML7U2Hh8vRw6Et7+WII+VXuPJ3jZ2xYx8= diff --git a/go.mod b/go.mod index a49a9724f..bd6fe441d 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.58 github.com/aws/aws-sdk-go-v2/service/s3 v1.75.3 github.com/aws/aws-sdk-go-v2/service/ssm v1.44.7 + github.com/axiomhq/hyperloglog v0.0.0-20240319100328-84253e514e02 github.com/bradfitz/go-tool-cache v0.0.0-20251113223507-0124e698e0bd github.com/bramvdbogaerde/go-scp v1.4.0 github.com/cilium/ebpf v0.15.0 @@ -149,6 +150,7 @@ require ( github.com/containerd/typeurl/v2 v2.2.3 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect github.com/deckarep/golang-set/v2 v2.8.0 // indirect + github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/go.mod.sri b/go.mod.sri index 66422652e..329fe9405 100644 --- a/go.mod.sri +++ b/go.mod.sri @@ -1 +1 @@ -sha256-jJSSXMyUqcJoZuqfSlBsKDQezyqS+jDkRglMMjG1K8g= +sha256-IkodqRYdueML7U2Hh8vRw6Et7+WII+VXuPJ3jZ2xYx8= diff --git a/go.sum b/go.sum index f70fe9159..111c99ac9 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.33.13 h1:3LXNnmtH3TURctC23hnC0p/39Q5 github.com/aws/aws-sdk-go-v2/service/sts v1.33.13/go.mod h1:7Yn+p66q/jt38qMoVfNvjbm3D89mGBnkwDcijgtih8w= github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/axiomhq/hyperloglog v0.0.0-20240319100328-84253e514e02 h1:bXAPYSbdYbS5VTy92NIUbeDI1qyggi+JYh5op9IFlcQ= +github.com/axiomhq/hyperloglog v0.0.0-20240319100328-84253e514e02/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -271,6 +273,8 @@ github.com/deckarep/golang-set/v2 v2.8.0 h1:swm0rlPCmdWn9mESxKOjWk8hXSqoxOp+Zlfu github.com/deckarep/golang-set/v2 v2.8.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/denis-tingaikin/go-header v0.5.0 h1:SRdnP5ZKvcO9KKRP1KJrhFR3RrlGuD+42t4429eC9k8= github.com/denis-tingaikin/go-header v0.5.0/go.mod h1:mMenU5bWrok6Wl2UsZjy+1okegmwQ3UgWl4V1D8gjlY= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc h1:8WFBn63wegobsYAX0YjD+8suexZDga5CctH4CCTx2+8= +github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/go-smbios v0.0.0-20180907143718-390a4f403a8e h1:vUmf0yezR0y7jJ5pceLHthLaYf4bA5T14B6q39S4q2Q= github.com/digitalocean/go-smbios v0.0.0-20180907143718-390a4f403a8e/go.mod h1:YTIHhz/QFSYnu/EhlF2SpU2Uk+32abacUYA5ZPljz1A= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= diff --git a/shell.nix b/shell.nix index d412693d9..28bdbdafb 100644 --- a/shell.nix +++ b/shell.nix @@ -16,4 +16,4 @@ ) { src = ./.; }).shellNix -# nix-direnv cache busting line: sha256-jJSSXMyUqcJoZuqfSlBsKDQezyqS+jDkRglMMjG1K8g= +# nix-direnv cache busting line: sha256-IkodqRYdueML7U2Hh8vRw6Et7+WII+VXuPJ3jZ2xYx8=