From a1a43ed26605b7d1f151b642ab4830f25d2fc33b Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Thu, 13 Oct 2022 10:46:29 -0700 Subject: [PATCH] wgengine/netlog: add support for magicsock statistics (#5913) This sets up Logger to handle statistics at the magicsock layer, where we can correlate traffic between a particular tailscale IP address and any number of physical endpoints used to contact the node that hosts that tailscale address. We also export Message and TupleCounts to better document the JSON format that is being sent to the logging infrastructure. This commit does NOT yet enable the actual logging of magicsock statistics. That will be a future commit. Signed-off-by: Joe Tsai --- wgengine/netlog/logger.go | 117 ++++++++++++++++++++++++--------- wgengine/netlog/logger_test.go | 2 +- wgengine/userspace.go | 2 +- 3 files changed, 87 insertions(+), 34 deletions(-) diff --git a/wgengine/netlog/logger.go b/wgengine/netlog/logger.go index 22d1a7583..2d0284b2a 100644 --- a/wgengine/netlog/logger.go +++ b/wgengine/netlog/logger.go @@ -30,13 +30,36 @@ import ( // pollPeriod specifies how often to poll for network traffic. const pollPeriod = 5 * time.Second -// Device is an abstraction over a tunnel device. +// Device is an abstraction over a tunnel device or a magic socket. // *tstun.Wrapper implements this interface. +// +// TODO(joetsai): Make *magicsock.Conn implement this interface. type Device interface { SetStatisticsEnabled(bool) ExtractStatistics() map[flowtrack.Tuple]tunstats.Counts } +type noopDevice struct{} + +func (noopDevice) SetStatisticsEnabled(bool) {} +func (noopDevice) ExtractStatistics() map[flowtrack.Tuple]tunstats.Counts { return nil } + +// Message is the log message that captures network traffic. +type Message struct { + Start time.Time `json:"start"` // inclusive + End time.Time `json:"end"` // inclusive + VirtualTraffic []TupleCounts `json:"virtualTraffic,omitempty"` + SubnetTraffic []TupleCounts `json:"subnetTraffic,omitempty"` + ExitTraffic []TupleCounts `json:"exitTraffic,omitempty"` + PhysicalTraffic []TupleCounts `json:"physicalTraffic,omitempty"` +} + +// TupleCounts is a flattened struct of both a connection and counts. +type TupleCounts struct { + flowtrack.Tuple + tunstats.Counts +} + // Logger logs statistics about every connection. // At present, it only logs connections within a tailscale network. // Exit node traffic is not logged for privacy reasons. @@ -63,14 +86,41 @@ func (nl *Logger) Running() bool { var testClient *http.Client // Startup starts an asynchronous network logger that monitors -// statistics for the provided tun device. -// The provided cfg is used to classify the types of connections. -func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device) error { +// statistics for the provided tun and/or sock device. +// +// The tun Device captures packets within the tailscale network, +// where at least one address is a tailscale IP address. +// The source is always from the perspective of the current node. +// If one of the other endpoint is not a tailscale IP address, +// then it suggests the use of a subnet router or exit node. +// For example, when using a subnet router, the source address is +// the tailscale IP address of the current node, and +// the destination address is an IP address within the subnet range. +// In contrast, when acting as a subnet router, the source address is +// an IP address within the subnet range, and the destination is a +// tailscale IP address that initiated the subnet proxy connection. +// In this case, the node acting as a subnet router is acting on behalf +// of some remote endpoint within the subnet range. +// The tun is used to populate the VirtualTraffic, SubnetTraffic, +// and ExitTraffic fields in Message. +// +// The sock Device captures packets at the magicsock layer. +// The source is always a tailscale IP address and the destination +// is a non-tailscale IP address to contact for that particular tailscale node. +// The IP protocol and source port are always zero. +// The sock is used to populated the PhysicalTraffic field in Message. +func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun, sock Device) error { nl.mu.Lock() defer nl.mu.Unlock() if nl.logger != nil { return fmt.Errorf("network logger already running for %v", nl.logger.PrivateID().Public()) } + if tun == nil { + tun = noopDevice{} + } + if sock == nil { + sock = noopDevice{} + } httpc := &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)} if testClient != nil { @@ -104,6 +154,10 @@ func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device) error defer tun.SetStatisticsEnabled(false) tun.ExtractStatistics() // clear out any stale statistics + sock.SetStatisticsEnabled(true) + defer sock.SetStatisticsEnabled(false) + sock.ExtractStatistics() // clear out any stale statistics + start := time.Now() ticker := time.NewTicker(pollPeriod) for { @@ -115,13 +169,17 @@ func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device) error case end = <-ticker.C: } + // NOTE: tunStats and sockStats will always be slightly out-of-sync. + // It is impossible to have an atomic snapshot of statistics + // at both layers without a global mutex that spans all layers. tunStats := tun.ExtractStatistics() - if len(tunStats) > 0 { + sockStats := sock.ExtractStatistics() + if len(tunStats)+len(sockStats) > 0 { nl.mu.Lock() addrs := nl.addrs prefixes := nl.prefixes nl.mu.Unlock() - recordStatistics(logger, start, end, tunStats, addrs, prefixes) + recordStatistics(logger, start, end, tunStats, sockStats, addrs, prefixes) } if ctx.Err() != nil { @@ -134,11 +192,14 @@ func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device) error return nil } -func recordStatistics(logger *logtail.Logger, start, end time.Time, tunStats map[flowtrack.Tuple]tunstats.Counts, addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) { +func recordStatistics(logger *logtail.Logger, start, end time.Time, tunStats, sockStats map[flowtrack.Tuple]tunstats.Counts, addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) { + m := Message{Start: start.UTC(), End: end.UTC()} + classifyAddr := func(a netip.Addr) (isTailscale, withinRoute bool) { // NOTE: There could be mis-classifications where an address is treated // as a Tailscale IP address because the subnet range overlaps with // the subnet range that Tailscale IP addresses are allocated from. + // This should never happen for IPv6, but could happen for IPv4. withinRoute = addrs[a] for p := range prefixes { if p.Contains(a) && p.Bits() > 0 { @@ -148,46 +209,36 @@ func recordStatistics(logger *logtail.Logger, start, end time.Time, tunStats map return withinRoute && tsaddr.IsTailscaleIP(a), withinRoute && !tsaddr.IsTailscaleIP(a) } - type tupleCounts struct { - flowtrack.Tuple - tunstats.Counts - } - - var virtualTraffic, subnetTraffic, exitTraffic []tupleCounts for conn, cnts := range tunStats { srcIsTailscaleIP, srcWithinSubnet := classifyAddr(conn.Src.Addr()) dstIsTailscaleIP, dstWithinSubnet := classifyAddr(conn.Dst.Addr()) switch { case srcIsTailscaleIP && dstIsTailscaleIP: - virtualTraffic = append(virtualTraffic, tupleCounts{conn, cnts}) + m.VirtualTraffic = append(m.VirtualTraffic, TupleCounts{conn, cnts}) case srcWithinSubnet || dstWithinSubnet: - subnetTraffic = append(subnetTraffic, tupleCounts{conn, cnts}) + m.SubnetTraffic = append(m.SubnetTraffic, TupleCounts{conn, cnts}) default: const anonymize = true if anonymize { - if len(exitTraffic) == 0 { - exitTraffic = []tupleCounts{{}} + if len(m.ExitTraffic) == 0 { + m.ExitTraffic = []TupleCounts{{}} } - exitTraffic[0].Counts = exitTraffic[0].Counts.Add(cnts) + m.ExitTraffic[0].Counts = m.ExitTraffic[0].Counts.Add(cnts) } else { - exitTraffic = append(exitTraffic, tupleCounts{conn, cnts}) + m.ExitTraffic = append(m.ExitTraffic, TupleCounts{conn, cnts}) } } } - - if len(virtualTraffic)+len(subnetTraffic)+len(exitTraffic) == 0 { - return // nothing to report + for conn, cnts := range sockStats { + m.PhysicalTraffic = append(m.PhysicalTraffic, TupleCounts{conn, cnts}) } - if b, err := json.Marshal(struct { - Start time.Time `json:"start"` - End time.Time `json:"end"` - VirtualTraffic []tupleCounts `json:"virtualTraffic,omitempty"` - SubnetTraffic []tupleCounts `json:"subnetTraffic,omitempty"` - ExitTraffic []tupleCounts `json:"exitTraffic,omitempty"` - }{start.UTC(), end.UTC(), virtualTraffic, subnetTraffic, exitTraffic}); err != nil { - logger.Logf("json.Marshal error: %v", err) - } else { - logger.Logf("%s", b) + + if len(m.VirtualTraffic)+len(m.SubnetTraffic)+len(m.ExitTraffic)+len(m.PhysicalTraffic) > 0 { + if b, err := json.Marshal(m); err != nil { + logger.Logf("json.Marshal error: %v", err) + } else { + logger.Logf("%s", b) + } } } @@ -214,6 +265,8 @@ func makeRouteMaps(cfg *router.Config) (addrs map[netip.Addr]bool, prefixes map[ } // ReconfigRoutes configures the network logger with updated routes. +// The cfg is used to classify the types of connections captured by +// the tun Device passed to Startup. func (nl *Logger) ReconfigRoutes(cfg *router.Config) { nl.mu.Lock() defer nl.mu.Unlock() diff --git a/wgengine/netlog/logger_test.go b/wgengine/netlog/logger_test.go index 34d81c0e9..654379315 100644 --- a/wgengine/netlog/logger_test.go +++ b/wgengine/netlog/logger_test.go @@ -59,7 +59,7 @@ func TestResourceCheck(t *testing.T) { var l Logger var d fakeDevice for i := 0; i < 10; i++ { - must.Do(l.Startup(logtail.PrivateID{}, logtail.PrivateID{}, &d)) + must.Do(l.Startup(logtail.PrivateID{}, logtail.PrivateID{}, &d, nil)) l.ReconfigRoutes(&router.Config{}) must.Do(l.Shutdown(context.Background())) c.Assert(d.toggled, qt.Equals, 2*(i+1)) diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 6bf7cee7b..7cb5151ba 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -953,7 +953,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, nid := cfg.NetworkLogging.NodeID tid := cfg.NetworkLogging.DomainID e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public()) - if err := e.networkLogger.Startup(nid, tid, e.tundev); err != nil { + if err := e.networkLogger.Startup(nid, tid, e.tundev, nil); err != nil { e.logf("wgengine: Reconfig: error starting up network logger: %v", err) } }