diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index eb53f4f15..c02f1cc78 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -677,15 +677,18 @@ var tstunNew = tstun.New func tryEngine(logf logger.Logf, sys *tsd.System, name string) (onlyNetstack bool, err error) { conf := wgengine.Config{ - ListenPort: args.port, - NetMon: sys.NetMon.Get(), - HealthTracker: sys.HealthTracker(), - Dialer: sys.Dialer.Get(), - SetSubsystem: sys.Set, - ControlKnobs: sys.ControlKnobs(), - DriveForLocal: driveimpl.NewFileSystemForLocal(logf), + ListenPort: args.port, + NetMon: sys.NetMon.Get(), + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), + Dialer: sys.Dialer.Get(), + SetSubsystem: sys.Set, + ControlKnobs: sys.ControlKnobs(), + DriveForLocal: driveimpl.NewFileSystemForLocal(logf), } + sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry()) + onlyNetstack = name == "userspace-networking" netstackSubnetRouter := onlyNetstack // but mutated later on some platforms netns.SetEnabled(!onlyNetstack) diff --git a/go.mod b/go.mod index 8c46faa6c..07cfa465c 100644 --- a/go.mod +++ b/go.mod @@ -320,7 +320,7 @@ require ( github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polyfloyd/go-errorlint v1.4.1 // indirect - github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/client_model v0.5.0 github.com/prometheus/procfs v0.12.0 // indirect github.com/quasilyte/go-ruleguard v0.3.19 // indirect github.com/quasilyte/gogrep v0.5.0 // indirect diff --git a/health/health.go b/health/health.go index 7bb9d18e9..cfa1c792a 100644 --- a/health/health.go +++ b/health/health.go @@ -20,6 +20,7 @@ import ( "time" "tailscale.com/envknob" + "tailscale.com/metrics" "tailscale.com/tailcfg" "tailscale.com/types/opt" "tailscale.com/util/cibuild" @@ -111,6 +112,8 @@ type Tracker struct { lastLoginErr error localLogConfigErr error tlsConnectionErrors map[string]error // map[ServerName]error + + metricHealthMessage *metrics.MultiLabelMap[metricHealthMessageLabel] } // Subsystem is the name of a subsystem whose health can be monitored. @@ -317,6 +320,33 @@ func (w *Warnable) IsVisible(ws *warningState) bool { return time.Since(ws.BrokenSince) >= w.TimeToVisible } +// SetMetricsRegistry sets up the metrics for the Tracker. It takes +// a usermetric.Registry and registers the metrics there. +func (t *Tracker) SetMetricsRegistry(reg *usermetric.Registry) { + if reg == nil || t.metricHealthMessage != nil { + return + } + + t.metricHealthMessage = usermetric.NewMultiLabelMap[metricHealthMessageLabel]( + reg, + "tailscaled_health_messages", + "gauge", + "Number of health messages broken down by type.", + ) + + t.metricHealthMessage.Set(metricHealthMessageLabel{ + Type: "warning", + }, expvar.Func(func() any { + if t.nil() { + return 0 + } + t.mu.Lock() + defer t.mu.Unlock() + t.updateBuiltinWarnablesLocked() + return int64(len(t.stringsLocked())) + })) +} + // SetUnhealthy sets a warningState for the given Warnable with the provided Args, and should be // called when a Warnable becomes unhealthy, or its unhealthy status needs to be updated. // SetUnhealthy takes ownership of args. The args can be nil if no additional information is @@ -1205,18 +1235,6 @@ func (t *Tracker) ReceiveFuncStats(which ReceiveFunc) *ReceiveFuncStats { } func (t *Tracker) doOnceInit() { - metricHealthMessage.Set(metricHealthMessageLabel{ - Type: "warning", - }, expvar.Func(func() any { - if t.nil() { - return 0 - } - t.mu.Lock() - defer t.mu.Unlock() - t.updateBuiltinWarnablesLocked() - return int64(len(t.stringsLocked())) - })) - for i := range t.MagicSockReceiveFuncs { f := &t.MagicSockReceiveFuncs[i] f.name = (ReceiveFunc(i)).String() @@ -1252,9 +1270,3 @@ type metricHealthMessageLabel struct { // TODO: break down by warnable.severity as well? Type string } - -var metricHealthMessage = usermetric.NewMultiLabelMap[metricHealthMessageLabel]( - "tailscaled_health_messages", - "gauge", - "Number of health messages broken down by type.", -) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 9a57776a0..05acc7d37 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -118,9 +118,6 @@ import ( "tailscale.com/wgengine/wgcfg/nmcfg" ) -var metricAdvertisedRoutes = usermetric.NewGauge( - "tailscaled_advertised_routes", "Number of advertised network routes (e.g. by a subnet router)") - var controlDebugFlags = getControlDebugFlags() func getControlDebugFlags() []string { @@ -183,6 +180,7 @@ type LocalBackend struct { statsLogf logger.Logf // for printing peers stats on change sys *tsd.System health *health.Tracker // always non-nil + metrics metrics e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys store ipn.StateStore // non-nil; TODO(bradfitz): remove; use sys dialer *tsdial.Dialer // non-nil; TODO(bradfitz): remove; use sys @@ -376,6 +374,11 @@ func (b *LocalBackend) HealthTracker() *health.Tracker { return b.health } +// UserMetricsRegistry returns the usermetrics registry for the backend +func (b *LocalBackend) UserMetricsRegistry() *usermetric.Registry { + return b.sys.UserMetricsRegistry() +} + // NetMon returns the network monitor for the backend. func (b *LocalBackend) NetMon() *netmon.Monitor { return b.sys.NetMon.Get() @@ -385,6 +388,21 @@ type updateStatus struct { started bool } +type metrics struct { + // advertisedRoutes is a metric that counts the number of network routes that are advertised by the local node. + // This informs the user of how many routes are being advertised by the local node, excluding exit routes. + advertisedRoutes *usermetric.Gauge + + // approvedRoutes is a metric that counts the number of network routes served by the local node and approved + // by the control server. + approvedRoutes *usermetric.Gauge + + // primaryRoutes is a metric that counts the number of primary network routes served by the local node. + // A route being a primary route implies that the route is currently served by this node, and not by another + // subnet router in a high availability configuration. + primaryRoutes *usermetric.Gauge +} + // clientGen is a func that creates a control plane client. // It's the type used by LocalBackend.SetControlClientGetterForTesting. type clientGen func(controlclient.Options) (controlclient.Client, error) @@ -428,6 +446,15 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo captiveCtx, captiveCancel := context.WithCancel(ctx) captiveCancel() + m := metrics{ + advertisedRoutes: sys.UserMetricsRegistry().NewGauge( + "tailscaled_advertised_routes", "Number of advertised network routes (e.g. by a subnet router)"), + approvedRoutes: sys.UserMetricsRegistry().NewGauge( + "tailscaled_approved_routes", "Number of approved network routes (e.g. by a subnet router)"), + primaryRoutes: sys.UserMetricsRegistry().NewGauge( + "tailscaled_primary_routes", "Number of network routes for which this node is a primary router (in high availability configuration)"), + } + b := &LocalBackend{ ctx: ctx, ctxCancel: cancel, @@ -436,6 +463,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo statsLogf: logger.LogOnChange(logf, 5*time.Minute, clock.Now), sys: sys, health: sys.HealthTracker(), + metrics: m, e: e, dialer: dialer, store: store, @@ -4685,14 +4713,7 @@ func (b *LocalBackend) applyPrefsToHostinfoLocked(hi *tailcfg.Hostinfo, prefs ip hi.ShieldsUp = prefs.ShieldsUp() hi.AllowsUpdate = envknob.AllowsRemoteUpdate() || prefs.AutoUpdate().Apply.EqualBool(true) - // count routes without exit node routes - var routes int64 - for _, route := range hi.RoutableIPs { - if route.Bits() != 0 { - routes++ - } - } - metricAdvertisedRoutes.Set(float64(routes)) + b.metrics.advertisedRoutes.Set(float64(len(hi.RoutableIPs))) var sshHostKeys []string if prefs.RunSSH() && envknob.CanSSHD() { @@ -5317,6 +5338,11 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) { b.setTCPPortsInterceptedFromNetmapAndPrefsLocked(b.pm.CurrentPrefs()) if nm == nil { b.nodeByAddr = nil + + // If there is no netmap, the client is going into a "turned off" + // state so reset the metrics. + b.metrics.approvedRoutes.Set(0) + b.metrics.primaryRoutes.Set(0) return } @@ -5337,6 +5363,14 @@ func (b *LocalBackend) setNetMapLocked(nm *netmap.NetworkMap) { } if nm.SelfNode.Valid() { addNode(nm.SelfNode) + var approved float64 + for _, route := range nm.SelfNode.AllowedIPs().All() { + if !slices.Contains(nm.SelfNode.Addresses().AsSlice(), route) { + approved++ + } + } + b.metrics.approvedRoutes.Set(approved) + b.metrics.primaryRoutes.Set(float64(nm.SelfNode.PrimaryRoutes().Len())) } for _, p := range nm.Peers { addNode(p) diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index bf4a28ff9..edac1b206 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -432,7 +432,7 @@ func newTestLocalBackend(t testing.TB) *LocalBackend { sys := new(tsd.System) store := new(mem.Store) sys.Set(store) - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/ipnlocal/loglines_test.go b/ipn/ipnlocal/loglines_test.go index d05436e6d..f70987c0e 100644 --- a/ipn/ipnlocal/loglines_test.go +++ b/ipn/ipnlocal/loglines_test.go @@ -50,7 +50,7 @@ func TestLocalLogLines(t *testing.T) { sys := new(tsd.System) store := new(mem.Store) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatal(err) } diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index 8497b38e2..ff9b62769 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -35,6 +35,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/netmap" "tailscale.com/util/must" + "tailscale.com/util/usermetric" "tailscale.com/wgengine" "tailscale.com/wgengine/filter" ) @@ -643,7 +644,8 @@ func TestPeerAPIReplyToDNSQueries(t *testing.T) { h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") ht := new(health.Tracker) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) h.ps = &peerAPIServer{ b: &LocalBackend{ @@ -694,7 +696,8 @@ func TestPeerAPIPrettyReplyCNAME(t *testing.T) { h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") ht := new(health.Tracker) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -767,7 +770,8 @@ func TestPeerAPIReplyToDNSQueriesAreObserved(t *testing.T) { rc := &appctest.RouteCollector{} ht := new(health.Tracker) - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) + reg := new(usermetric.Registry) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { @@ -830,8 +834,9 @@ func TestPeerAPIReplyToDNSQueriesAreObservedWithCNAMEFlattening(t *testing.T) { h.remoteAddr = netip.MustParseAddrPort("100.150.151.152:12345") ht := new(health.Tracker) + reg := new(usermetric.Registry) rc := &appctest.RouteCollector{} - eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht) + eng, _ := wgengine.NewFakeUserspaceEngine(logger.Discard, 0, ht, reg) pm := must.Get(newProfileManager(new(mem.Store), t.Logf, ht)) var a *appc.AppConnector if shouldStore { diff --git a/ipn/ipnlocal/serve_test.go b/ipn/ipnlocal/serve_test.go index e43de1765..c72dac678 100644 --- a/ipn/ipnlocal/serve_test.go +++ b/ipn/ipnlocal/serve_test.go @@ -682,8 +682,9 @@ func newTestBackend(t *testing.T) *LocalBackend { sys := &tsd.System{} e, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ - SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + SetSubsystem: sys.Set, + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), }) if err != nil { t.Fatal(err) diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 20dde81f1..bebd0152b 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -298,7 +298,7 @@ func TestStateMachine(t *testing.T) { sys := new(tsd.System) store := new(testStateStorage) sys.Set(store) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } @@ -931,7 +931,7 @@ func TestEditPrefsHasNoKeys(t *testing.T) { logf := tstest.WhileTestRunningLogger(t) sys := new(tsd.System) sys.Set(new(mem.Store)) - e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + e, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 01dc064cf..f45eab922 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -62,7 +62,6 @@ import ( "tailscale.com/util/progresstracking" "tailscale.com/util/rands" "tailscale.com/util/testenv" - "tailscale.com/util/usermetric" "tailscale.com/version" "tailscale.com/wgengine/magicsock" ) @@ -578,7 +577,7 @@ func (h *Handler) serveUserMetrics(w http.ResponseWriter, r *http.Request) { http.Error(w, "usermetrics debug flag not enabled", http.StatusForbidden) return } - usermetric.Handler(w, r) + h.b.UserMetricsRegistry().Handler(w, r) } func (h *Handler) serveDebug(w http.ResponseWriter, r *http.Request) { diff --git a/ipn/localapi/localapi_test.go b/ipn/localapi/localapi_test.go index 5ec873b3b..fa54a1e75 100644 --- a/ipn/localapi/localapi_test.go +++ b/ipn/localapi/localapi_test.go @@ -356,7 +356,7 @@ func newTestLocalBackend(t testing.TB) *ipnlocal.LocalBackend { sys := new(tsd.System) store := new(mem.Store) sys.Set(store) - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatalf("NewFakeUserspaceEngine: %v", err) } diff --git a/metrics/multilabelmap.go b/metrics/multilabelmap.go index df2ae5073..325e9856f 100644 --- a/metrics/multilabelmap.go +++ b/metrics/multilabelmap.go @@ -97,7 +97,12 @@ type KeyValue[T comparable] struct { } func (v *MultiLabelMap[T]) String() string { - return `"MultiLabelMap"` + var sb strings.Builder + sb.WriteString("MultiLabelMap:\n") + v.Do(func(kv KeyValue[T]) { + fmt.Fprintf(&sb, "\t%v: %v\n", kv.Key, kv.Value) + }) + return sb.String() } // WritePrometheus writes v to w in Prometheus exposition format. @@ -281,3 +286,16 @@ func (v *MultiLabelMap[T]) Do(f func(KeyValue[T])) { f(KeyValue[T]{e.key, e.val}) } } + +// ResetAllForTest resets all values for metrics to zero. +// Should only be used in tests. +func (v *MultiLabelMap[T]) ResetAllForTest() { + v.Do(func(kv KeyValue[T]) { + switch v := kv.Value.(type) { + case *expvar.Int: + v.Set(0) + case *expvar.Float: + v.Set(0) + } + }) +} diff --git a/net/connstats/stats.go b/net/connstats/stats.go index dbcd946b8..9164897c9 100644 --- a/net/connstats/stats.go +++ b/net/connstats/stats.go @@ -135,19 +135,19 @@ func (s *Statistics) updateVirtual(b []byte, receive bool) { // The src is always a Tailscale IP address, representing some remote peer. // The dst is a remote IP address and port that corresponds // with some physical peer backing the Tailscale IP address. -func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, n int) { - s.updatePhysical(src, dst, n, false) +func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, n int) { + s.updatePhysical(src, dst, packets, n, false) } // UpdateRxPhysical updates the counters for a received wireguard packet. // The src is always a Tailscale IP address, representing some remote peer. // The dst is a remote IP address and port that corresponds // with some physical peer backing the Tailscale IP address. -func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, n int) { - s.updatePhysical(src, dst, n, true) +func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, n int) { + s.updatePhysical(src, dst, packets, n, true) } -func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, receive bool) { +func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, packets, n int, receive bool) { conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst} s.mu.Lock() @@ -157,10 +157,10 @@ func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, r return } if receive { - cnts.RxPackets++ + cnts.RxPackets += uint64(packets) cnts.RxBytes += uint64(n) } else { - cnts.TxPackets++ + cnts.TxPackets += uint64(packets) cnts.TxBytes += uint64(n) } s.physical[conn] = cnts diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index 514ebcaaf..17bf7749d 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -24,6 +24,7 @@ import ( "go4.org/mem" "gvisor.dev/gvisor/pkg/tcpip/stack" "tailscale.com/disco" + "tailscale.com/metrics" "tailscale.com/net/connstats" "tailscale.com/net/packet" "tailscale.com/net/packet/checksum" @@ -209,6 +210,30 @@ type Wrapper struct { stats atomic.Pointer[connstats.Statistics] captureHook syncs.AtomicValue[capture.Callback] + + metric *metricWrapper +} + +type metricWrapper struct { + inboundDroppedPacketsTotal *metrics.MultiLabelMap[dropPacketLabel] + outboundDroppedPacketsTotal *metrics.MultiLabelMap[dropPacketLabel] +} + +func registerMetrics(reg *usermetric.Registry) *metricWrapper { + return &metricWrapper{ + inboundDroppedPacketsTotal: usermetric.NewMultiLabelMap[dropPacketLabel]( + reg, + "tailscaled_inbound_dropped_packets_total", + "counter", + "Counts the number of dropped packets received by the node from other peers", + ), + outboundDroppedPacketsTotal: usermetric.NewMultiLabelMap[dropPacketLabel]( + reg, + "tailscaled_outbound_dropped_packets_total", + "counter", + "Counts the number of packets dropped while being sent to other peers", + ), + } } // tunInjectedRead is an injected packet pretending to be a tun.Read(). @@ -248,15 +273,15 @@ func (w *Wrapper) Start() { close(w.startCh) } -func WrapTAP(logf logger.Logf, tdev tun.Device) *Wrapper { - return wrap(logf, tdev, true) +func WrapTAP(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { + return wrap(logf, tdev, true, m) } -func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { - return wrap(logf, tdev, false) +func Wrap(logf logger.Logf, tdev tun.Device, m *usermetric.Registry) *Wrapper { + return wrap(logf, tdev, false, m) } -func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper { +func wrap(logf logger.Logf, tdev tun.Device, isTAP bool, m *usermetric.Registry) *Wrapper { logf = logger.WithPrefix(logf, "tstun: ") w := &Wrapper{ logf: logf, @@ -274,6 +299,7 @@ func wrap(logf logger.Logf, tdev tun.Device, isTAP bool) *Wrapper { // TODO(dmytro): (highly rate-limited) hexdumps should happen on unknown packets. filterFlags: filter.LogAccepts | filter.LogDrops, startCh: make(chan struct{}), + metric: registerMetrics(m), } w.vectorBuffer = make([][]byte, tdev.BatchSize()) @@ -872,7 +898,7 @@ func (t *Wrapper) filterPacketOutboundToWireGuard(p *packet.Parsed, pc *peerConf if filt.RunOut(p, t.filterFlags) != filter.Accept { metricPacketOutDropFilter.Add(1) - metricOutboundDroppedPacketsTotal.Add(dropPacketLabel{ + t.metric.outboundDroppedPacketsTotal.Add(dropPacketLabel{ Reason: DropReasonACL, }, 1) return filter.Drop, gro @@ -1144,7 +1170,7 @@ func (t *Wrapper) filterPacketInboundFromWireGuard(p *packet.Parsed, captHook ca if outcome != filter.Accept { metricPacketInDropFilter.Add(1) - metricInboundDroppedPacketsTotal.Add(dropPacketLabel{ + t.metric.inboundDroppedPacketsTotal.Add(dropPacketLabel{ Reason: DropReasonACL, }, 1) @@ -1225,7 +1251,7 @@ func (t *Wrapper) Write(buffs [][]byte, offset int) (int, error) { t.noteActivity() _, err := t.tdevWrite(buffs, offset) if err != nil { - metricInboundDroppedPacketsTotal.Add(dropPacketLabel{ + t.metric.inboundDroppedPacketsTotal.Add(dropPacketLabel{ Reason: DropReasonError, }, int64(len(buffs))) } @@ -1482,19 +1508,6 @@ type dropPacketLabel struct { Reason DropReason } -var ( - metricInboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel]( - "tailscaled_inbound_dropped_packets_total", - "counter", - "Counts the number of dropped packets received by the node from other peers", - ) - metricOutboundDroppedPacketsTotal = usermetric.NewMultiLabelMap[dropPacketLabel]( - "tailscaled_outbound_dropped_packets_total", - "counter", - "Counts the number of packets dropped while being sent to other peers", - ) -) - func (t *Wrapper) InstallCaptureHook(cb capture.Callback) { t.captureHook.Store(cb) } diff --git a/net/tstun/wrap_test.go b/net/tstun/wrap_test.go index f93192102..b7132612b 100644 --- a/net/tstun/wrap_test.go +++ b/net/tstun/wrap_test.go @@ -38,6 +38,7 @@ import ( "tailscale.com/types/ptr" "tailscale.com/types/views" "tailscale.com/util/must" + "tailscale.com/util/usermetric" "tailscale.com/wgengine/capture" "tailscale.com/wgengine/filter" "tailscale.com/wgengine/wgcfg" @@ -173,7 +174,8 @@ func setfilter(logf logger.Logf, tun *Wrapper) { func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper) { chtun := tuntest.NewChannelTUN() - tun := Wrap(logf, chtun.TUN()) + reg := new(usermetric.Registry) + tun := Wrap(logf, chtun.TUN(), reg) if secure { setfilter(logf, tun) } else { @@ -185,7 +187,8 @@ func newChannelTUN(logf logger.Logf, secure bool) (*tuntest.ChannelTUN, *Wrapper func newFakeTUN(logf logger.Logf, secure bool) (*fakeTUN, *Wrapper) { ftun := NewFake() - tun := Wrap(logf, ftun) + reg := new(usermetric.Registry) + tun := Wrap(logf, ftun, reg) if secure { setfilter(logf, tun) } else { @@ -315,12 +318,6 @@ func mustHexDecode(s string) []byte { } func TestFilter(t *testing.T) { - // Reset the metrics before test. These are global - // so the different tests might have affected them. - metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0) - metricInboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonError}, 0) - metricOutboundDroppedPacketsTotal.SetInt(dropPacketLabel{Reason: DropReasonACL}, 0) - chtun, tun := newChannelTUN(t.Logf, true) defer tun.Close() @@ -435,22 +432,6 @@ func TestFilter(t *testing.T) { } }) } - - inACL := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL}) - inError := metricInboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonError}) - outACL := metricOutboundDroppedPacketsTotal.Get(dropPacketLabel{Reason: DropReasonACL}) - - assertMetricPackets(t, "inACL", "3", inACL.String()) - assertMetricPackets(t, "inError", "0", inError.String()) - assertMetricPackets(t, "outACL", "1", outACL.String()) - -} - -func assertMetricPackets(t *testing.T, metricName, want, got string) { - t.Helper() - if want != got { - t.Errorf("%s got unexpected value, got %s, want %s", metricName, got, want) - } } func TestAllocs(t *testing.T) { @@ -512,6 +493,7 @@ func TestAtomic64Alignment(t *testing.T) { } func TestPeerAPIBypass(t *testing.T) { + reg := new(usermetric.Registry) wrapperWithPeerAPI := &Wrapper{ PeerAPIPort: func(ip netip.Addr) (port uint16, ok bool) { if ip == netip.MustParseAddr("100.64.1.2") { @@ -519,6 +501,7 @@ func TestPeerAPIBypass(t *testing.T) { } return }, + metric: registerMetrics(reg), } tests := []struct { @@ -534,13 +517,16 @@ func TestPeerAPIBypass(t *testing.T) { PeerAPIPort: func(netip.Addr) (port uint16, ok bool) { return 60000, true }, + metric: registerMetrics(reg), }, pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000), want: filter.Drop, }, { - name: "reject_with_filter", - w: &Wrapper{}, + name: "reject_with_filter", + w: &Wrapper{ + metric: registerMetrics(reg), + }, filter: filter.NewAllowNone(logger.Discard, new(netipx.IPSet)), pkt: tcp4syn("1.2.3.4", "100.64.1.2", 1234, 60000), want: filter.Drop, diff --git a/ssh/tailssh/tailssh_test.go b/ssh/tailssh/tailssh_test.go index bfc670814..cdeaa4a05 100644 --- a/ssh/tailssh/tailssh_test.go +++ b/ssh/tailssh/tailssh_test.go @@ -826,7 +826,7 @@ func TestSSHAuthFlow(t *testing.T) { func TestSSH(t *testing.T) { var logf logger.Logf = t.Logf sys := &tsd.System{} - eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker()) + eng, err := wgengine.NewFakeUserspaceEngine(logf, sys.Set, sys.HealthTracker(), sys.UserMetricsRegistry()) if err != nil { t.Fatal(err) } diff --git a/tsd/tsd.go b/tsd/tsd.go index 2b5e65626..cee63e844 100644 --- a/tsd/tsd.go +++ b/tsd/tsd.go @@ -32,6 +32,7 @@ import ( "tailscale.com/net/tstun" "tailscale.com/proxymap" "tailscale.com/types/netmap" + "tailscale.com/util/usermetric" "tailscale.com/wgengine" "tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/router" @@ -65,7 +66,8 @@ type System struct { controlKnobs controlknobs.Knobs proxyMap proxymap.Mapper - healthTracker health.Tracker + userMetricsRegistry usermetric.Registry + healthTracker health.Tracker } // NetstackImpl is the interface that *netstack.Impl implements. @@ -142,6 +144,11 @@ func (s *System) HealthTracker() *health.Tracker { return &s.healthTracker } +// UserMetricsRegistry returns the system usermetrics. +func (s *System) UserMetricsRegistry() *usermetric.Registry { + return &s.userMetricsRegistry +} + // SubSystem represents some subsystem of the Tailscale node daemon. // // A subsystem can be set to a value, and then later retrieved. A subsystem diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index 8be54bb73..c92d013b1 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -532,18 +532,20 @@ func (s *Server) start() (reterr error) { s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used) eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{ - ListenPort: s.Port, - NetMon: s.netMon, - Dialer: s.dialer, - SetSubsystem: sys.Set, - ControlKnobs: sys.ControlKnobs(), - HealthTracker: sys.HealthTracker(), + ListenPort: s.Port, + NetMon: s.netMon, + Dialer: s.dialer, + SetSubsystem: sys.Set, + ControlKnobs: sys.ControlKnobs(), + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), }) if err != nil { return err } closePool.add(s.dialer) sys.Set(eng) + sys.HealthTracker().SetMetricsRegistry(sys.UserMetricsRegistry()) // TODO(oxtoacart): do we need to support Taildrive on tsnet, and if so, how? ns, err := netstack.Create(tsLogf, sys.Tun.Get(), eng, sys.MagicSock.Get(), s.dialer, sys.DNSManager.Get(), sys.ProxyMapper(), nil) diff --git a/tsnet/tsnet_test.go b/tsnet/tsnet_test.go index 7f6fb00c0..ed84cc93a 100644 --- a/tsnet/tsnet_test.go +++ b/tsnet/tsnet_test.go @@ -5,6 +5,7 @@ package tsnet import ( "bufio" + "bytes" "context" "crypto/ecdsa" "crypto/elliptic" @@ -31,8 +32,10 @@ import ( "testing" "time" - "github.com/google/go-cmp/cmp" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "golang.org/x/net/proxy" + "tailscale.com/client/tailscale" "tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/health" "tailscale.com/ipn" @@ -757,6 +760,10 @@ func TestUDPConn(t *testing.T) { s1, s1ip, _ := startServer(t, ctx, controlURL, "s1") s2, s2ip, _ := startServer(t, ctx, controlURL, "s2") + assertUDPConn(t, ctx, s1, s2, s1ip, s2ip) +} + +func assertUDPConn(t *testing.T, ctx context.Context, s1, s2 *Server, s1ip, s2ip netip.Addr) { lc2, err := s2.LocalClient() if err != nil { t.Fatal(err) @@ -818,65 +825,373 @@ func TestUDPConn(t *testing.T) { } } +// sendData sends a given amount of bytes from s1 to s2. +func sendData(logf func(format string, args ...any), ctx context.Context, bytesCount int, s1, s2 *Server, s1ip, s2ip netip.Addr) error { + l := must.Get(s1.Listen("tcp", fmt.Sprintf("%s:8081", s1ip))) + defer l.Close() + + // Dial to s1 from s2 + w, err := s2.Dial(ctx, "tcp", fmt.Sprintf("%s:8081", s1ip)) + if err != nil { + return err + } + defer w.Close() + + stopReceive := make(chan struct{}) + defer close(stopReceive) + allReceived := make(chan error) + defer close(allReceived) + + go func() { + conn, err := l.Accept() + if err != nil { + allReceived <- err + return + } + conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) + + total := 0 + recvStart := time.Now() + for { + got := make([]byte, bytesCount) + n, err := conn.Read(got) + if n != bytesCount { + logf("read %d bytes, want %d", n, bytesCount) + } + + select { + case <-stopReceive: + return + default: + } + + if err != nil { + allReceived <- fmt.Errorf("failed reading packet, %s", err) + return + } + + total += n + logf("received %d/%d bytes, %.2f %%", total, bytesCount, (float64(total) / (float64(bytesCount)) * 100)) + if total == bytesCount { + break + } + } + + logf("all received, took: %s", time.Since(recvStart).String()) + allReceived <- nil + }() + + sendStart := time.Now() + w.SetWriteDeadline(time.Now().Add(30 * time.Second)) + if _, err := w.Write(bytes.Repeat([]byte("A"), bytesCount)); err != nil { + stopReceive <- struct{}{} + return err + } + + logf("all sent (%s), waiting for all packets (%d) to be received", time.Since(sendStart).String(), bytesCount) + err, _ = <-allReceived + if err != nil { + return err + } + + return nil +} + +// testWarnable is a Warnable that is used within this package for testing purposes only. +var testWarnable = health.Register(&health.Warnable{ + Code: "test-warnable-tsnet", + Title: "Test warnable", + Severity: health.SeverityLow, + Text: func(args health.Args) string { + return args[health.ArgError] + }, +}) + +func parseMetrics(m []byte) (map[string]float64, error) { + metrics := make(map[string]float64) + + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(bytes.NewReader(m)) + if err != nil { + return nil, err + } + + for _, f := range mf { + for _, ff := range f.Metric { + val := float64(0) + + switch f.GetType() { + case dto.MetricType_COUNTER: + val = ff.GetCounter().GetValue() + case dto.MetricType_GAUGE: + val = ff.GetGauge().GetValue() + } + + metrics[f.GetName()+","+promMetricLabelsStr(ff.GetLabel())] = val + } + } + + return metrics, nil +} + +func promMetricLabelsStr(labels []*dto.LabelPair) string { + var b strings.Builder + for _, l := range labels { + b.WriteString(fmt.Sprintf("%s=%s,", l.GetName(), l.GetValue())) + } + return b.String() +} + +// TestUserMetrics tests the user-facing metrics exposed by tailscaled. func TestUserMetrics(t *testing.T) { + flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/13420") tstest.ResourceCheck(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // testWarnable is a Warnable that is used within this package for testing purposes only. - var testWarnable = health.Register(&health.Warnable{ - Code: "test-warnable-tsnet", - Title: "Test warnable", - Severity: health.SeverityLow, - Text: func(args health.Args) string { - return args[health.ArgError] - }, - }) - controlURL, c := startControl(t) - s1, _, s1PubKey := startServer(t, ctx, controlURL, "s1") + s1, s1ip, s1PubKey := startServer(t, ctx, controlURL, "s1") + s2, s2ip, _ := startServer(t, ctx, controlURL, "s2") s1.lb.EditPrefs(&ipn.MaskedPrefs{ Prefs: ipn.Prefs{ AdvertiseRoutes: []netip.Prefix{ netip.MustParsePrefix("192.0.2.0/24"), netip.MustParsePrefix("192.0.3.0/24"), + netip.MustParsePrefix("192.0.5.1/32"), + netip.MustParsePrefix("0.0.0.0/0"), }, }, AdvertiseRoutesSet: true, }) - c.SetSubnetRoutes(s1PubKey, []netip.Prefix{netip.MustParsePrefix("192.0.2.0/24")}) + c.SetSubnetRoutes(s1PubKey, []netip.Prefix{ + netip.MustParsePrefix("192.0.2.0/24"), + netip.MustParsePrefix("192.0.5.1/32"), + netip.MustParsePrefix("0.0.0.0/0"), + }) lc1, err := s1.LocalClient() if err != nil { t.Fatal(err) } + lc2, err := s2.LocalClient() + if err != nil { + t.Fatal(err) + } + + // ping to make sure the connection is up. + res, err := lc2.Ping(ctx, s1ip, tailcfg.PingICMP) + if err != nil { + t.Fatalf("pinging: %s", err) + } + t.Logf("ping success: %#+v", res) + ht := s1.lb.HealthTracker() ht.SetUnhealthy(testWarnable, health.Args{"Text": "Hello world 1"}) + // Force an update to the netmap to ensure that the metrics are up-to-date. + s1.lb.DebugForceNetmapUpdate() + s2.lb.DebugForceNetmapUpdate() + + mustDirect(t, t.Logf, lc1, lc2) + + // Wait for the routes to be propagated to node 1 to ensure + // that the metrics are up-to-date. + waitForCondition(t, 30*time.Second, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + status1, err := lc1.Status(ctx) + if err != nil { + t.Logf("getting status: %s", err) + return false + } + return status1.Self.PrimaryRoutes != nil && status1.Self.PrimaryRoutes.Len() == 3 + }) + + // 50 megabytes 50 * 1024 kilobyte packets + bytesToSend := 50 * 1024 * 1024 + + // This asserts generates some traffic, it is factored out + // of TestUDPConn. + start := time.Now() + err = sendData(t.Logf, ctx, bytesToSend, s1, s2, s1ip, s2ip) + if err != nil { + t.Fatalf("Failed to send packets: %v", err) + } + t.Logf("Sent %d bytes from s1 to s2 in %s", bytesToSend, time.Since(start).String()) + + time.Sleep(5 * time.Second) + metrics1, err := lc1.UserMetrics(ctx) if err != nil { t.Fatal(err) } - // Note that this test will check for two warnings because the health - // tracker will have two warnings: one from the testWarnable, added in - // this test, and one because we are running the dev/unstable version - // of tailscale. - want := `# TYPE tailscaled_advertised_routes gauge -# HELP tailscaled_advertised_routes Number of advertised network routes (e.g. by a subnet router) -tailscaled_advertised_routes 2 -# TYPE tailscaled_health_messages gauge -# HELP tailscaled_health_messages Number of health messages broken down by type. -tailscaled_health_messages{type="warning"} 2 -# TYPE tailscaled_inbound_dropped_packets_total counter -# HELP tailscaled_inbound_dropped_packets_total Counts the number of dropped packets received by the node from other peers -# TYPE tailscaled_outbound_dropped_packets_total counter -# HELP tailscaled_outbound_dropped_packets_total Counts the number of packets dropped while being sent to other peers -` + status1, err := lc1.Status(ctx) + if err != nil { + t.Fatal(err) + } + + parsedMetrics1, err := parseMetrics(metrics1) + if err != nil { + t.Fatal(err) + } + + t.Logf("Metrics1:\n%s\n", metrics1) + + // The node is advertising 4 routes: + // - 192.0.2.0/24 + // - 192.0.3.0/24 + // - 192.0.5.1/32 + // - 0.0.0.0/0 + if got, want := parsedMetrics1["tailscaled_advertised_routes,"], 4.0; got != want { + t.Errorf("metrics1, tailscaled_advertised_routes: got %v, want %v", got, want) + } + + // The control has approved 3 routes: + // - 192.0.2.0/24 + // - 192.0.5.1/32 + // - 0.0.0.0/0 + if got, want := parsedMetrics1["tailscaled_approved_routes,"], 3.0; got != want { + t.Errorf("metrics1, tailscaled_approved_routes: got %v, want %v", got, want) + } + + // Validate the health counter metric against the status of the node + if got, want := parsedMetrics1["tailscaled_health_messages,type=warning,"], float64(len(status1.Health)); got != want { + t.Errorf("metrics1, tailscaled_health_messages: got %v, want %v", got, want) + } + + // The node is the primary subnet router for 3 routes: + // - 192.0.2.0/24 + // - 192.0.5.1/32 + // - 0.0.0.0/0 + if got, want := parsedMetrics1["tailscaled_primary_routes,"], 3.0; got != want { + t.Errorf("metrics1, tailscaled_primary_routes: got %v, want %v", got, want) + } + + // Verify that the amount of data recorded in bytes is higher than the + // 50 megabytes sent. + inboundBytes1 := parsedMetrics1["tailscaled_inbound_bytes_total,path=direct_ipv4,"] + if inboundBytes1 < float64(bytesToSend) { + t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", bytesToSend, inboundBytes1) + } + + // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room. + if inboundBytes1 > float64(bytesToSend)*1.2 { + t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(bytesToSend)*1.2, inboundBytes1) + } + + // // Verify that the packet count recorded is higher than the + // // 50 000 1KB packages we sent. + // inboundPackets1 := parsedMetrics1["tailscaled_inbound_packets_total,path=direct_ipv4,"] + // if inboundPackets1 < float64(packets) { + // t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", packets, inboundPackets1) + // } - if diff := cmp.Diff(want, string(metrics1)); diff != "" { - t.Fatalf("unexpected metrics (-want +got):\n%s", diff) + // // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room. + // if inboundPackets1 > float64(packets)*1.2 { + // t.Errorf("metrics1, tailscaled_inbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(packets)*1.1, inboundPackets1) + // } + + metrics2, err := lc2.UserMetrics(ctx) + if err != nil { + t.Fatal(err) + } + + status2, err := lc2.Status(ctx) + if err != nil { + t.Fatal(err) + } + + parsedMetrics2, err := parseMetrics(metrics2) + if err != nil { + t.Fatal(err) + } + + t.Logf("Metrics2:\n%s\n", metrics2) + + // The node is advertising 0 routes + if got, want := parsedMetrics2["tailscaled_advertised_routes,"], 0.0; got != want { + t.Errorf("metrics2, tailscaled_advertised_routes: got %v, want %v", got, want) + } + + // The control has approved 0 routes + if got, want := parsedMetrics2["tailscaled_approved_routes,"], 0.0; got != want { + t.Errorf("metrics2, tailscaled_approved_routes: got %v, want %v", got, want) + } + + // Validate the health counter metric against the status of the node + if got, want := parsedMetrics2["tailscaled_health_messages,type=warning,"], float64(len(status2.Health)); got != want { + t.Errorf("metrics2, tailscaled_health_messages: got %v, want %v", got, want) + } + + // The node is the primary subnet router for 0 routes + if got, want := parsedMetrics2["tailscaled_primary_routes,"], 0.0; got != want { + t.Errorf("metrics2, tailscaled_primary_routes: got %v, want %v", got, want) + } + + // Verify that the amount of data recorded in bytes is higher than the + // 50 megabytes sent. + outboundBytes2 := parsedMetrics2["tailscaled_outbound_bytes_total,path=direct_ipv4,"] + if outboundBytes2 < float64(bytesToSend) { + t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", bytesToSend, outboundBytes2) + } + + // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room. + if outboundBytes2 > float64(bytesToSend)*1.2 { + t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(bytesToSend)*1.2, outboundBytes2) + } + + // // Verify that the packet count recorded is higher than the + // // 50 000 1KB packages we sent. + // outboundPackets2 := parsedMetrics2["tailscaled_outbound_packets_total,path=direct_ipv4,"] + // if outboundPackets2 < float64(packets) { + // t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected higher than %d, got: %f", packets, outboundPackets2) + // } + + // // But ensure that it is not too much higher than the 50 megabytes sent, given 20% wiggle room. + // if outboundPackets2 > float64(packets)*1.2 { + // t.Errorf("metrics2, tailscaled_outbound_bytes_total,path=direct_ipv4: expected lower than %f, got: %f", float64(packets)*1.1, outboundPackets2) + // } +} + +func waitForCondition(t *testing.T, waitTime time.Duration, f func() bool) { + t.Helper() + for deadline := time.Now().Add(waitTime); time.Now().Before(deadline); time.Sleep(1 * time.Second) { + if f() { + return + } + } + t.Fatalf("waiting for condition") +} + +// mustDirect ensures there is a direct connection between LocalClient 1 and 2 +func mustDirect(t *testing.T, logf logger.Logf, lc1, lc2 *tailscale.LocalClient) { + t.Helper() + lastLog := time.Now().Add(-time.Minute) + // See https://github.com/tailscale/tailscale/issues/654 + // and https://github.com/tailscale/tailscale/issues/3247 for discussions of this deadline. + for deadline := time.Now().Add(30 * time.Second); time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + status1, err := lc1.Status(ctx) + if err != nil { + continue + } + status2, err := lc2.Status(ctx) + if err != nil { + continue + } + pst := status1.Peer[status2.Self.PublicKey] + if pst.CurAddr != "" { + logf("direct link %s->%s found with addr %s", status1.Self.HostName, status2.Self.HostName, pst.CurAddr) + return + } + if now := time.Now(); now.Sub(lastLog) > time.Second { + logf("no direct path %s->%s yet, addrs %v", status1.Self.HostName, status2.Self.HostName, pst.Addrs) + lastLog = now + } } + t.Error("magicsock did not find a direct path from lc1 to lc2") } diff --git a/tstest/integration/testcontrol/testcontrol.go b/tstest/integration/testcontrol/testcontrol.go index 44ed2da06..847f66de7 100644 --- a/tstest/integration/testcontrol/testcontrol.go +++ b/tstest/integration/testcontrol/testcontrol.go @@ -1018,6 +1018,7 @@ func (s *Server) MapResponse(req *tailcfg.MapRequest) (res *tailcfg.MapResponse, s.mu.Lock() defer s.mu.Unlock() + res.Node.PrimaryRoutes = s.nodeSubnetRoutes[nk] res.Node.AllowedIPs = append(res.Node.Addresses, s.nodeSubnetRoutes[nk]...) // Consume a PingRequest while protected by mutex if it exists diff --git a/util/usermetric/usermetric.go b/util/usermetric/usermetric.go index cb3f66ea9..7c6783f50 100644 --- a/util/usermetric/usermetric.go +++ b/util/usermetric/usermetric.go @@ -10,12 +10,16 @@ import ( "fmt" "io" "net/http" + "strings" "tailscale.com/metrics" "tailscale.com/tsweb/varz" ) -var vars expvar.Map +// Registry tracks user-facing metrics of various Tailscale subsystems. +type Registry struct { + vars expvar.Map +} // NewMultiLabelMap creates and register a new // MultiLabelMap[T] variable with the given name and returns it. @@ -24,15 +28,18 @@ var vars expvar.Map // Note that usermetric are not protected against duplicate // metrics name. It is the caller's responsibility to ensure that // the name is unique. -func NewMultiLabelMap[T comparable](name string, promType, helpText string) *metrics.MultiLabelMap[T] { - m := &metrics.MultiLabelMap[T]{ +func NewMultiLabelMap[T comparable](m *Registry, name string, promType, helpText string) *metrics.MultiLabelMap[T] { + if m == nil { + return nil + } + ml := &metrics.MultiLabelMap[T]{ Type: promType, Help: helpText, } var zero T _ = metrics.LabelString(zero) // panic early if T is invalid - vars.Set(name, m) - return m + m.vars.Set(name, ml) + return ml } // Gauge is a gauge metric with no labels. @@ -42,20 +49,29 @@ type Gauge struct { } // NewGauge creates and register a new gauge metric with the given name and help text. -func NewGauge(name, help string) *Gauge { +func (m *Registry) NewGauge(name, help string) *Gauge { + if m == nil { + return nil + } g := &Gauge{&expvar.Float{}, help} - vars.Set(name, g) + m.vars.Set(name, g) return g } // Set sets the gauge to the given value. func (g *Gauge) Set(v float64) { + if g == nil { + return + } g.m.Set(v) } // String returns the string of the underlying expvar.Float. // This satisfies the expvar.Var interface. func (g *Gauge) String() string { + if g == nil { + return "" + } return g.m.String() } @@ -79,6 +95,15 @@ func (g *Gauge) WritePrometheus(w io.Writer, name string) { // Handler returns a varz.Handler that serves the userfacing expvar contained // in this package. -func Handler(w http.ResponseWriter, r *http.Request) { - varz.ExpvarDoHandler(vars.Do)(w, r) +func (m *Registry) Handler(w http.ResponseWriter, r *http.Request) { + varz.ExpvarDoHandler(m.vars.Do)(w, r) +} + +func (m *Registry) String() string { + var sb strings.Builder + m.vars.Do(func(kv expvar.KeyValue) { + fmt.Fprintf(&sb, "%s: %v\n", kv.Key, kv.Value) + }) + + return sb.String() } diff --git a/util/usermetric/usermetric_test.go b/util/usermetric/usermetric_test.go index aa0e82ea6..e92db5bfc 100644 --- a/util/usermetric/usermetric_test.go +++ b/util/usermetric/usermetric_test.go @@ -9,7 +9,8 @@ import ( ) func TestGauge(t *testing.T) { - g := NewGauge("test_gauge", "This is a test gauge") + var reg Registry + g := reg.NewGauge("test_gauge", "This is a test gauge") g.Set(15) var buf bytes.Buffer diff --git a/wgengine/magicsock/derp.go b/wgengine/magicsock/derp.go index 69c5cbc90..04050b108 100644 --- a/wgengine/magicsock/derp.go +++ b/wgengine/magicsock/derp.go @@ -690,7 +690,9 @@ func (c *connBind) receiveDERP(buffs [][]byte, sizes []int, eps []conn.Endpoint) // No data read occurred. Wait for another packet. continue } - metricRecvDataDERP.Add(1) + metricRecvDataPacketsDERP.Add(1) + c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1) + c.metricInboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(n)) sizes[0] = n eps[0] = ep return 1, nil @@ -728,7 +730,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en ep.noteRecvActivity(ipp, mono.Now()) if stats := c.stats.Load(); stats != nil { - stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) + stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, dm.n) } return n, ep } diff --git a/wgengine/magicsock/endpoint.go b/wgengine/magicsock/endpoint.go index 53ecb84de..ae7221f20 100644 --- a/wgengine/magicsock/endpoint.go +++ b/wgengine/magicsock/endpoint.go @@ -950,6 +950,8 @@ func (de *endpoint) send(buffs [][]byte) error { return errNoUDPOrDERP } var err error + // TODO(kradalby): for paring, why is this not an if-else? Do we send to + // both DERP and UDP at the same time if we have both? if udpAddr.IsValid() { _, err = de.c.sendUDPBatch(udpAddr, buffs) @@ -960,13 +962,23 @@ func (de *endpoint) send(buffs [][]byte) error { de.noteBadEndpoint(udpAddr) } + var txBytes int + for _, b := range buffs { + txBytes += len(b) + } + + switch { + case udpAddr.Addr().Is4(): + de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(buffs))) + de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(txBytes)) + case udpAddr.Addr().Is6(): + de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(buffs))) + de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(txBytes)) + } + // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. if stats := de.c.stats.Load(); err == nil && stats != nil { - var txBytes int - for _, b := range buffs { - txBytes += len(b) - } - stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes) + stats.UpdateTxPhysical(de.nodeAddr, udpAddr, len(buffs), txBytes) } } if derpAddr.IsValid() { @@ -974,8 +986,11 @@ func (de *endpoint) send(buffs [][]byte) error { for _, buff := range buffs { ok, _ := de.c.sendAddr(derpAddr, de.publicKey, buff) if stats := de.c.stats.Load(); stats != nil { - stats.UpdateTxPhysical(de.nodeAddr, derpAddr, len(buff)) + stats.UpdateTxPhysical(de.nodeAddr, derpAddr, 1, len(buff)) } + // TODO(kradalby): Is this the correct place for this? Do we need an Error version? + de.c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDERP}, 1) + de.c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDERP}, int64(len(buff))) if !ok { allOk = false } diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index de6b13fc1..8e6857320 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -33,6 +33,7 @@ import ( "tailscale.com/health" "tailscale.com/hostinfo" "tailscale.com/ipn/ipnstate" + "tailscale.com/metrics" "tailscale.com/net/connstats" "tailscale.com/net/netcheck" "tailscale.com/net/neterror" @@ -60,6 +61,7 @@ import ( "tailscale.com/util/set" "tailscale.com/util/testenv" "tailscale.com/util/uniq" + "tailscale.com/util/usermetric" "tailscale.com/wgengine/capture" "tailscale.com/wgengine/wgint" ) @@ -320,6 +322,11 @@ type Conn struct { // responsibility to ensure that traffic from these endpoints is routed // to the node. staticEndpoints views.Slice[netip.AddrPort] + + metricInboundPacketsTotal *metrics.MultiLabelMap[pathLabel] + metricOutboundPacketsTotal *metrics.MultiLabelMap[pathLabel] + metricInboundBytesTotal *metrics.MultiLabelMap[pathLabel] + metricOutboundBytesTotal *metrics.MultiLabelMap[pathLabel] } // SetDebugLoggingEnabled controls whether spammy debug logging is enabled. @@ -386,6 +393,9 @@ type Options struct { // report errors and warnings to. HealthTracker *health.Tracker + // UserMetricsRegistry specifies the metrics registry to record metrics to. + UserMetricsRegistry *usermetric.Registry + // ControlKnobs are the set of control knobs to use. // If nil, they're ignored and not updated. ControlKnobs *controlknobs.Knobs @@ -466,6 +476,10 @@ func NewConn(opts Options) (*Conn, error) { return nil, errors.New("magicsock.Options.NetMon must be non-nil") } + if opts.UserMetricsRegistry == nil { + return nil, errors.New("magicsock.Options.UserMetrics must be non-nil") + } + c := newConn(opts.logf()) c.port.Store(uint32(opts.Port)) c.controlKnobs = opts.ControlKnobs @@ -505,6 +519,32 @@ func NewConn(opts Options) (*Conn, error) { UseDNSCache: true, } + // TODO(kradalby): factor out to a func + c.metricInboundBytesTotal = usermetric.NewMultiLabelMap[pathLabel]( + opts.UserMetricsRegistry, + "tailscaled_inbound_bytes_total", + "counter", + "Counts the number of bytes received from other peers", + ) + c.metricInboundPacketsTotal = usermetric.NewMultiLabelMap[pathLabel]( + opts.UserMetricsRegistry, + "tailscaled_inbound_packets_total", + "counter", + "Counts the number of packets received from other peers", + ) + c.metricOutboundBytesTotal = usermetric.NewMultiLabelMap[pathLabel]( + opts.UserMetricsRegistry, + "tailscaled_outbound_bytes_total", + "counter", + "Counts the number of bytes sent to other peers", + ) + c.metricOutboundPacketsTotal = usermetric.NewMultiLabelMap[pathLabel]( + opts.UserMetricsRegistry, + "tailscaled_outbound_packets_total", + "counter", + "Counts the number of packets sent to other peers", + ) + if d4, err := c.listenRawDisco("ip4"); err == nil { c.logf("[v1] using BPF disco receiver for IPv4") c.closeDisco4 = d4 @@ -1145,6 +1185,25 @@ func (c *Conn) sendUDP(ipp netip.AddrPort, b []byte) (sent bool, err error) { } else { if sent { metricSendUDP.Add(1) + + // TODO(kradalby): Do we need error variants of these? + switch { + case ipp.Addr().Is4(): + c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, 1) + c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, int64(len(b))) + case ipp.Addr().Is6(): + c.metricOutboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, 1) + c.metricOutboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, int64(len(b))) + } + + if stats := c.stats.Load(); stats != nil { + c.mu.Lock() + ep, ok := c.peerMap.endpointForIPPort(ipp) + c.mu.Unlock() + if ok { + stats.UpdateTxPhysical(ep.nodeAddr, ipp, 1, len(b)) + } + } } } return @@ -1266,17 +1325,29 @@ func (c *Conn) putReceiveBatch(batch *receiveBatch) { // receiveIPv4 creates an IPv4 ReceiveFunc reading from c.pconn4. func (c *Conn) receiveIPv4() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), metricRecvDataIPv4) + return c.mkReceiveFunc(&c.pconn4, c.health.ReceiveFuncStats(health.ReceiveIPv4), + func(i int64) { + metricRecvDataPacketsIPv4.Add(i) + c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv4}, i) + }, func(i int64) { + c.metricInboundBytesTotal.Add(pathLabel{Path: PathDirectIPv4}, i) + }) } // receiveIPv6 creates an IPv6 ReceiveFunc reading from c.pconn6. func (c *Conn) receiveIPv6() conn.ReceiveFunc { - return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), metricRecvDataIPv6) + return c.mkReceiveFunc(&c.pconn6, c.health.ReceiveFuncStats(health.ReceiveIPv6), + func(i int64) { + metricRecvDataPacketsIPv6.Add(i) + c.metricInboundPacketsTotal.Add(pathLabel{Path: PathDirectIPv6}, i) + }, func(i int64) { + c.metricInboundBytesTotal.Add(pathLabel{Path: PathDirectIPv6}, i) + }) } // mkReceiveFunc creates a ReceiveFunc reading from ruc. -// The provided healthItem and metric are updated if non-nil. -func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, metric *clientmetric.Metric) conn.ReceiveFunc { +// The provided healthItem and metrics are updated if non-nil. +func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFuncStats, packetMetricFunc, bytesMetricFunc func(int64)) conn.ReceiveFunc { // epCache caches an IPPort->endpoint for hot flows. var epCache ippEndpointCache @@ -1313,8 +1384,11 @@ func (c *Conn) mkReceiveFunc(ruc *RebindingUDPConn, healthItem *health.ReceiveFu } ipp := msg.Addr.(*net.UDPAddr).AddrPort() if ep, ok := c.receiveIP(msg.Buffers[0][:msg.N], ipp, &epCache); ok { - if metric != nil { - metric.Add(1) + if packetMetricFunc != nil { + packetMetricFunc(1) + } + if bytesMetricFunc != nil { + bytesMetricFunc(int64(msg.N)) } eps[i] = ep sizes[i] = msg.N @@ -1370,7 +1444,7 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache) ep.lastRecvUDPAny.StoreAtomic(now) ep.noteRecvActivity(ipp, now) if stats := c.stats.Load(); stats != nil { - stats.UpdateRxPhysical(ep.nodeAddr, ipp, len(b)) + stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, len(b)) } return ep, true } @@ -2924,9 +2998,9 @@ var ( // Data packets (non-disco) metricSendData = clientmetric.NewCounter("magicsock_send_data") metricSendDataNetworkDown = clientmetric.NewCounter("magicsock_send_data_network_down") - metricRecvDataDERP = clientmetric.NewCounter("magicsock_recv_data_derp") - metricRecvDataIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") - metricRecvDataIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") + metricRecvDataPacketsDERP = clientmetric.NewCounter("magicsock_recv_data_derp") + metricRecvDataPacketsIPv4 = clientmetric.NewCounter("magicsock_recv_data_ipv4") + metricRecvDataPacketsIPv6 = clientmetric.NewCounter("magicsock_recv_data_ipv6") // Disco packets metricSendDiscoUDP = clientmetric.NewCounter("magicsock_disco_send_udp") @@ -3064,3 +3138,19 @@ func (le *lazyEndpoint) GetPeerEndpoint(peerPublicKey [32]byte) conn.Endpoint { le.c.logf("magicsock: lazyEndpoint.GetPeerEndpoint(%v) found: %v", pubKey.ShortString(), ep.nodeAddr) return ep } + +type Path string + +const ( + PathDirectIPv4 Path = "direct_ipv4" + PathDirectIPv6 Path = "direct_ipv6" + PathDERP Path = "derp" +) + +type pathLabel struct { + // Path indicates the path that the packet took: + // - direct_ipv4 + // - direct_ipv6 + // - derp + Path Path +} diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index be1b43f56..091241f3a 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -10,6 +10,7 @@ import ( "crypto/tls" "encoding/binary" "errors" + "expvar" "fmt" "io" "math/rand" @@ -28,6 +29,7 @@ import ( "time" "unsafe" + "github.com/google/go-cmp/cmp" wgconn "github.com/tailscale/wireguard-go/conn" "github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/tun/tuntest" @@ -64,6 +66,7 @@ import ( "tailscale.com/util/cibuild" "tailscale.com/util/racebuild" "tailscale.com/util/set" + "tailscale.com/util/usermetric" "tailscale.com/wgengine/filter" "tailscale.com/wgengine/wgcfg" "tailscale.com/wgengine/wgcfg/nmcfg" @@ -156,6 +159,7 @@ type magicStack struct { dev *device.Device // the wireguard-go Device that connects the previous things wgLogger *wglog.Logger // wireguard-go log wrapper netMon *netmon.Monitor // always non-nil + metrics *usermetric.Registry } // newMagicStack builds and initializes an idle magicsock and @@ -174,6 +178,8 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen t.Fatalf("netmon.New: %v", err) } + var reg usermetric.Registry + epCh := make(chan []tailcfg.Endpoint, 100) // arbitrary conn, err := NewConn(Options{ NetMon: netMon, @@ -183,6 +189,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen EndpointsFunc: func(eps []tailcfg.Endpoint) { epCh <- eps }, + UserMetricsRegistry: ®, }) if err != nil { t.Fatalf("constructing magicsock: %v", err) @@ -193,7 +200,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen } tun := tuntest.NewChannelTUN() - tsTun := tstun.Wrap(logf, tun.TUN()) + tsTun := tstun.Wrap(logf, tun.TUN(), nil) tsTun.SetFilter(filter.NewAllowAllForTest(logf)) tsTun.Start() @@ -219,6 +226,7 @@ func newMagicStackWithKey(t testing.TB, logf logger.Logf, l nettype.PacketListen dev: dev, wgLogger: wgLogger, netMon: netMon, + metrics: ®, } } @@ -392,11 +400,12 @@ func TestNewConn(t *testing.T) { port := pickPort(t) conn, err := NewConn(Options{ - Port: port, - DisablePortMapper: true, - EndpointsFunc: epFunc, - Logf: t.Logf, - NetMon: netMon, + Port: port, + DisablePortMapper: true, + EndpointsFunc: epFunc, + Logf: t.Logf, + NetMon: netMon, + UserMetricsRegistry: new(usermetric.Registry), }) if err != nil { t.Fatal(err) @@ -519,10 +528,12 @@ func TestDeviceStartStop(t *testing.T) { } defer netMon.Close() + reg := new(usermetric.Registry) conn, err := NewConn(Options{ - EndpointsFunc: func(eps []tailcfg.Endpoint) {}, - Logf: t.Logf, - NetMon: netMon, + EndpointsFunc: func(eps []tailcfg.Endpoint) {}, + Logf: t.Logf, + NetMon: netMon, + UserMetricsRegistry: reg, }) if err != nil { t.Fatal(err) @@ -1181,6 +1192,100 @@ func testTwoDevicePing(t *testing.T, d *devices) { checkStats(t, m1, m1Conns) checkStats(t, m2, m2Conns) }) + + t.Run("compare-metrics-stats", func(t *testing.T) { + setT(t) + defer setT(outerT) + m1.conn.resetMetricsForTest() + m1.stats.TestExtract() + m2.conn.resetMetricsForTest() + m2.stats.TestExtract() + t.Logf("Metrics before: %s\n", m1.metrics.String()) + ping1(t) + ping2(t) + assertConnStatsAndUserMetricsEqual(t, m1) + assertConnStatsAndUserMetricsEqual(t, m2) + t.Logf("Metrics after: %s\n", m1.metrics.String()) + }) +} + +func (c *Conn) resetMetricsForTest() { + c.metricInboundBytesTotal.ResetAllForTest() + c.metricInboundPacketsTotal.ResetAllForTest() + c.metricOutboundBytesTotal.ResetAllForTest() + c.metricOutboundPacketsTotal.ResetAllForTest() +} + +func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { + _, phys := ms.stats.TestExtract() + + physIPv4RxBytes := int64(0) + physIPv4TxBytes := int64(0) + physDERPRxBytes := int64(0) + physDERPTxBytes := int64(0) + physIPv4RxPackets := int64(0) + physIPv4TxPackets := int64(0) + physDERPRxPackets := int64(0) + physDERPTxPackets := int64(0) + for conn, count := range phys { + t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String()) + if conn.Dst.String() == "127.3.3.40:1" { + physDERPRxBytes += int64(count.RxBytes) + physDERPTxBytes += int64(count.TxBytes) + physDERPRxPackets += int64(count.RxPackets) + physDERPTxPackets += int64(count.TxPackets) + } else { + physIPv4RxBytes += int64(count.RxBytes) + physIPv4TxBytes += int64(count.TxBytes) + physIPv4RxPackets += int64(count.RxPackets) + physIPv4TxPackets += int64(count.TxPackets) + } + } + + var metricIPv4RxBytes, metricIPv4TxBytes, metricDERPRxBytes, metricDERPTxBytes int64 + var metricIPv4RxPackets, metricIPv4TxPackets, metricDERPRxPackets, metricDERPTxPackets int64 + + if m, ok := ms.conn.metricInboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4RxBytes = m.Value() + } + if m, ok := ms.conn.metricOutboundBytesTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4TxBytes = m.Value() + } + if m, ok := ms.conn.metricInboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPRxBytes = m.Value() + } + if m, ok := ms.conn.metricOutboundBytesTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPTxBytes = m.Value() + } + if m, ok := ms.conn.metricInboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4RxPackets = m.Value() + } + if m, ok := ms.conn.metricOutboundPacketsTotal.Get(pathLabel{Path: PathDirectIPv4}).(*expvar.Int); ok { + metricIPv4TxPackets = m.Value() + } + if m, ok := ms.conn.metricInboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPRxPackets = m.Value() + } + if m, ok := ms.conn.metricOutboundPacketsTotal.Get(pathLabel{Path: PathDERP}).(*expvar.Int); ok { + metricDERPTxPackets = m.Value() + } + + assertEqual(t, "derp bytes inbound", physDERPRxBytes, metricDERPRxBytes) + assertEqual(t, "derp bytes outbound", physDERPTxBytes, metricDERPTxBytes) + assertEqual(t, "ipv4 bytes inbound", physIPv4RxBytes, metricIPv4RxBytes) + assertEqual(t, "ipv4 bytes outbound", physIPv4TxBytes, metricIPv4TxBytes) + assertEqual(t, "derp packets inbound", physDERPRxPackets, metricDERPRxPackets) + assertEqual(t, "derp packets outbound", physDERPTxPackets, metricDERPTxPackets) + assertEqual(t, "ipv4 packets inbound", physIPv4RxPackets, metricIPv4RxPackets) + assertEqual(t, "ipv4 packets outbound", physIPv4TxPackets, metricIPv4TxPackets) +} + +func assertEqual(t *testing.T, name string, a, b any) { + t.Helper() + t.Logf("assertEqual %s: %v == %v", name, a, b) + if diff := cmp.Diff(a, b); diff != "" { + t.Errorf("%s mismatch (-want +got):\n%s", name, diff) + } } func TestDiscoMessage(t *testing.T) { @@ -1275,6 +1380,7 @@ func newTestConn(t testing.TB) *Conn { conn, err := NewConn(Options{ NetMon: netMon, HealthTracker: new(health.Tracker), + UserMetricsRegistry: new(usermetric.Registry), DisablePortMapper: true, Logf: t.Logf, Port: port, diff --git a/wgengine/netstack/netstack_test.go b/wgengine/netstack/netstack_test.go index 6be61cd58..701e70e9b 100644 --- a/wgengine/netstack/netstack_test.go +++ b/wgengine/netstack/netstack_test.go @@ -46,10 +46,11 @@ func TestInjectInboundLeak(t *testing.T) { } sys := new(tsd.System) eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ - Tun: tunDev, - Dialer: dialer, - SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + Tun: tunDev, + Dialer: dialer, + SetSubsystem: sys.Set, + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), }) if err != nil { t.Fatal(err) @@ -103,10 +104,11 @@ func makeNetstack(tb testing.TB, config func(*Impl)) *Impl { dialer := new(tsdial.Dialer) logf := tstest.WhileTestRunningLogger(tb) eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ - Tun: tunDev, - Dialer: dialer, - SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + Tun: tunDev, + Dialer: dialer, + SetSubsystem: sys.Set, + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), }) if err != nil { tb.Fatal(err) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index f6b4586cb..b5f4fc902 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -49,6 +49,7 @@ import ( "tailscale.com/util/mak" "tailscale.com/util/set" "tailscale.com/util/testenv" + "tailscale.com/util/usermetric" "tailscale.com/version" "tailscale.com/wgengine/capture" "tailscale.com/wgengine/filter" @@ -195,6 +196,9 @@ type Config struct { // HealthTracker, if non-nil, is the health tracker to use. HealthTracker *health.Tracker + // UserMetricsRegistry, if non-nil, is the usermetrics registry to use. + UserMetricsRegistry *usermetric.Registry + // Dialer is the dialer to use for outbound connections. // If nil, a new Dialer is created. Dialer *tsdial.Dialer @@ -249,6 +253,8 @@ func NewFakeUserspaceEngine(logf logger.Logf, opts ...any) (Engine, error) { conf.ControlKnobs = v case *health.Tracker: conf.HealthTracker = v + case *usermetric.Registry: + conf.UserMetricsRegistry = v default: return nil, fmt.Errorf("unknown option type %T", v) } @@ -289,9 +295,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) var tsTUNDev *tstun.Wrapper if conf.IsTAP { - tsTUNDev = tstun.WrapTAP(logf, conf.Tun) + tsTUNDev = tstun.WrapTAP(logf, conf.Tun, conf.UserMetricsRegistry) } else { - tsTUNDev = tstun.Wrap(logf, conf.Tun) + tsTUNDev = tstun.Wrap(logf, conf.Tun, conf.UserMetricsRegistry) } closePool.add(tsTUNDev) @@ -379,17 +385,18 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } } magicsockOpts := magicsock.Options{ - Logf: logf, - Port: conf.ListenPort, - EndpointsFunc: endpointsFn, - DERPActiveFunc: e.RequestStatus, - IdleFunc: e.tundev.IdleDuration, - NoteRecvActivity: e.noteRecvActivity, - NetMon: e.netMon, - HealthTracker: e.health, - ControlKnobs: conf.ControlKnobs, - OnPortUpdate: onPortUpdate, - PeerByKeyFunc: e.PeerByKey, + Logf: logf, + Port: conf.ListenPort, + EndpointsFunc: endpointsFn, + DERPActiveFunc: e.RequestStatus, + IdleFunc: e.tundev.IdleDuration, + NoteRecvActivity: e.noteRecvActivity, + NetMon: e.netMon, + HealthTracker: e.health, + UserMetricsRegistry: conf.UserMetricsRegistry, + ControlKnobs: conf.ControlKnobs, + OnPortUpdate: onPortUpdate, + PeerByKeyFunc: e.PeerByKey, } var err error diff --git a/wgengine/userspace_ext_test.go b/wgengine/userspace_ext_test.go index 6610f1e92..91ea6d2b3 100644 --- a/wgengine/userspace_ext_test.go +++ b/wgengine/userspace_ext_test.go @@ -20,8 +20,9 @@ func TestIsNetstack(t *testing.T) { e, err := wgengine.NewUserspaceEngine( tstest.WhileTestRunningLogger(t), wgengine.Config{ - SetSubsystem: sys.Set, - HealthTracker: sys.HealthTracker(), + SetSubsystem: sys.Set, + HealthTracker: sys.HealthTracker(), + UserMetricsRegistry: sys.UserMetricsRegistry(), }, ) if err != nil { @@ -72,6 +73,7 @@ func TestIsNetstackRouter(t *testing.T) { conf := tt.conf conf.SetSubsystem = sys.Set conf.HealthTracker = sys.HealthTracker() + conf.UserMetricsRegistry = sys.UserMetricsRegistry() e, err := wgengine.NewUserspaceEngine(logger.Discard, conf) if err != nil { t.Fatal(err) diff --git a/wgengine/userspace_test.go b/wgengine/userspace_test.go index 8763a84a1..051421862 100644 --- a/wgengine/userspace_test.go +++ b/wgengine/userspace_test.go @@ -25,6 +25,7 @@ import ( "tailscale.com/types/key" "tailscale.com/types/netmap" "tailscale.com/types/opt" + "tailscale.com/util/usermetric" "tailscale.com/wgengine/router" "tailscale.com/wgengine/wgcfg" ) @@ -100,7 +101,8 @@ func nodeViews(v []*tailcfg.Node) []tailcfg.NodeView { func TestUserspaceEngineReconfig(t *testing.T) { ht := new(health.Tracker) - e, err := NewFakeUserspaceEngine(t.Logf, 0, ht) + reg := new(usermetric.Registry) + e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) if err != nil { t.Fatal(err) } @@ -167,9 +169,10 @@ func TestUserspaceEnginePortReconfig(t *testing.T) { // Keep making a wgengine until we find an unused port var ue *userspaceEngine ht := new(health.Tracker) + reg := new(usermetric.Registry) for i := range 100 { attempt := uint16(defaultPort + i) - e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht) + e, err := NewFakeUserspaceEngine(t.Logf, attempt, &knobs, ht, reg) if err != nil { t.Fatal(err) } @@ -249,7 +252,8 @@ func TestUserspaceEnginePeerMTUReconfig(t *testing.T) { var knobs controlknobs.Knobs ht := new(health.Tracker) - e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht) + reg := new(usermetric.Registry) + e, err := NewFakeUserspaceEngine(t.Logf, 0, &knobs, ht, reg) if err != nil { t.Fatal(err) } diff --git a/wgengine/watchdog_test.go b/wgengine/watchdog_test.go index 0d4fcd8c1..b05cd421f 100644 --- a/wgengine/watchdog_test.go +++ b/wgengine/watchdog_test.go @@ -9,6 +9,7 @@ import ( "time" "tailscale.com/health" + "tailscale.com/util/usermetric" ) func TestWatchdog(t *testing.T) { @@ -24,7 +25,8 @@ func TestWatchdog(t *testing.T) { t.Run("default watchdog does not fire", func(t *testing.T) { t.Parallel() ht := new(health.Tracker) - e, err := NewFakeUserspaceEngine(t.Logf, 0, ht) + reg := new(usermetric.Registry) + e, err := NewFakeUserspaceEngine(t.Logf, 0, ht, reg) if err != nil { t.Fatal(err) }