From 1b4e4cc1e83562ab83f8a1941436acd9b7133925 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Wed, 12 Oct 2022 11:57:13 -0700 Subject: [PATCH] wgengine/netlog: new package for traffic flow logging (#5864) The Logger type managers a logtail.Logger for extracting statistics from a tstun.Wrapper. So long as Shutdown is called, it ensures that logtail and statistic gathering resources are properly cleared up. Signed-off-by: Joe Tsai --- net/tstun/wrap.go | 4 +- wgengine/netlog/logger.go | 250 +++++++++++++++++++++++++++++++++ wgengine/netlog/logger_test.go | 67 +++++++++ 3 files changed, 320 insertions(+), 1 deletion(-) create mode 100644 wgengine/netlog/logger.go create mode 100644 wgengine/netlog/logger_test.go diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index dc03af561..c730c8272 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -844,7 +844,9 @@ func (t *Wrapper) Unwrap() tun.Device { } // SetStatisticsEnabled enables per-connections packet counters. -// ExtractStatistics must be called periodically to avoid unbounded memory use. +// Disabling statistics gathering does not reset the counters. +// ExtractStatistics must be called to reset the counters and +// be periodically called while enabled to avoid unbounded memory use. func (t *Wrapper) SetStatisticsEnabled(enable bool) { t.stats.enabled.Store(enable) } diff --git a/wgengine/netlog/logger.go b/wgengine/netlog/logger.go new file mode 100644 index 000000000..7b1bb00d5 --- /dev/null +++ b/wgengine/netlog/logger.go @@ -0,0 +1,250 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package netlog provides a logger that monitors a TUN device and +// periodically records any traffic into a log stream. +package netlog + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/netip" + "sync" + "time" + + "golang.org/x/sync/errgroup" + "tailscale.com/logpolicy" + "tailscale.com/logtail" + "tailscale.com/net/flowtrack" + "tailscale.com/net/tsaddr" + "tailscale.com/net/tunstats" + "tailscale.com/smallzstd" + "tailscale.com/wgengine/router" +) + +// pollPeriod specifies how often to poll for network traffic. +const pollPeriod = 5 * time.Second + +// Device is an abstraction over a tunnel device. +// *tstun.Wrapper implements this interface. +type Device interface { + SetStatisticsEnabled(bool) + ExtractStatistics() map[flowtrack.Tuple]tunstats.Counts +} + +// Logger logs statistics about every connection. +// At present, it only logs connections within a tailscale network. +// Exit node traffic is not logged for privacy reasons. +// The zero value is ready for use. +type Logger struct { + mu sync.Mutex + + nodeID logtail.PrivateID + domainID logtail.PrivateID + logger *logtail.Logger + + addrs map[netip.Addr]bool + prefixes map[netip.Prefix]bool + + group errgroup.Group + cancel context.CancelFunc +} + +// Running reports whether the logger is running. +func (nl *Logger) Running() bool { + nl.mu.Lock() + defer nl.mu.Unlock() + return nl.logger != nil +} + +var testClient *http.Client + +// Startup starts an asynchronous network logger that monitors +// statistics for the provided tun device. +// The provided cfg is used to classify the types of connections. +func (nl *Logger) Startup(nodeID, domainID logtail.PrivateID, tun Device, cfg *router.Config) error { + nl.mu.Lock() + defer nl.mu.Unlock() + if nl.logger != nil { + return fmt.Errorf("network logger already running for %v", nl.nodeID.Public()) + } + + httpc := &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost)} + if testClient != nil { + httpc = testClient + } + logger := logtail.NewLogger(logtail.Config{ + Collection: "tailtraffic.log.tailscale.io", + PrivateID: nodeID, + CopyPrivateID: domainID, + Stderr: io.Discard, + // TODO(joetsai): Set Buffer? Use an in-memory buffer for now. + NewZstdEncoder: func() logtail.Encoder { + w, err := smallzstd.NewEncoder(nil) + if err != nil { + panic(err) + } + return w + }, + HTTPC: httpc, + + // Include process sequence numbers to identify missing samples. + IncludeProcID: true, + IncludeProcSequence: true, + }, log.Printf) + nl.logger = logger + + nl.addrs, nl.prefixes = makeRouteMaps(cfg) + + ctx, cancel := context.WithCancel(context.Background()) + nl.cancel = cancel + nl.group.Go(func() error { + tun.SetStatisticsEnabled(true) + defer tun.SetStatisticsEnabled(false) + tun.ExtractStatistics() // clear out any stale statistics + + start := time.Now() + ticker := time.NewTicker(pollPeriod) + for { + var end time.Time + select { + case <-ctx.Done(): + tun.SetStatisticsEnabled(false) + end = time.Now() + case end = <-ticker.C: + } + + tunStats := tun.ExtractStatistics() + if len(tunStats) > 0 { + nl.mu.Lock() + addrs := nl.addrs + prefixes := nl.prefixes + nl.mu.Unlock() + recordStatistics(logger, start, end, tunStats, addrs, prefixes) + } + + if ctx.Err() != nil { + break + } + start = end.Add(time.Nanosecond) + } + return nil + }) + return nil +} + +func recordStatistics(logger *logtail.Logger, start, end time.Time, tunStats map[flowtrack.Tuple]tunstats.Counts, addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) { + classifyAddr := func(a netip.Addr) (isTailscale, withinRoute bool) { + // NOTE: There could be mis-classifications where an address is treated + // as a Tailscale IP address because the subnet range overlaps with + // the subnet range that Tailscale IP addresses are allocated from. + withinRoute = addrs[a] + for p := range prefixes { + if p.Contains(a) && p.Bits() > 0 { + withinRoute = true + } + } + return withinRoute && tsaddr.IsTailscaleIP(a), withinRoute && !tsaddr.IsTailscaleIP(a) + } + + type tupleCounts struct { + flowtrack.Tuple + tunstats.Counts + } + + var virtualTraffic, subnetTraffic, exitTraffic []tupleCounts + for conn, cnts := range tunStats { + srcIsTailscaleIP, srcWithinSubnet := classifyAddr(conn.Src.Addr()) + dstIsTailscaleIP, dstWithinSubnet := classifyAddr(conn.Dst.Addr()) + switch { + case srcIsTailscaleIP && dstIsTailscaleIP: + virtualTraffic = append(virtualTraffic, tupleCounts{conn, cnts}) + case srcWithinSubnet || dstWithinSubnet: + subnetTraffic = append(subnetTraffic, tupleCounts{conn, cnts}) + default: + const anonymize = true + if anonymize { + if len(exitTraffic) == 0 { + exitTraffic = []tupleCounts{{}} + } + exitTraffic[0].Counts = exitTraffic[0].Counts.Add(cnts) + } else { + exitTraffic = append(exitTraffic, tupleCounts{conn, cnts}) + } + } + } + + if len(virtualTraffic)+len(subnetTraffic)+len(exitTraffic) == 0 { + return // nothing to report + } + if b, err := json.Marshal(struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + VirtualTraffic []tupleCounts `json:"virtualTraffic,omitempty"` + SubnetTraffic []tupleCounts `json:"subnetTraffic,omitempty"` + ExitTraffic []tupleCounts `json:"exitTraffic,omitempty"` + }{start.UTC(), end.UTC(), virtualTraffic, subnetTraffic, exitTraffic}); err != nil { + logger.Logf("json.Marshal error: %v", err) + } else { + logger.Logf("%s", b) + } +} + +func makeRouteMaps(cfg *router.Config) (addrs map[netip.Addr]bool, prefixes map[netip.Prefix]bool) { + addrs = make(map[netip.Addr]bool) + for _, p := range cfg.LocalAddrs { + if p.IsSingleIP() { + addrs[p.Addr()] = true + } + } + prefixes = make(map[netip.Prefix]bool) + insertPrefixes := func(rs []netip.Prefix) { + for _, p := range rs { + if p.IsSingleIP() { + addrs[p.Addr()] = true + } else { + prefixes[p] = true + } + } + } + insertPrefixes(cfg.Routes) + insertPrefixes(cfg.SubnetRoutes) + return addrs, prefixes +} + +// ReconfigRoutes configures the network logger with updated routes. +func (nl *Logger) ReconfigRoutes(cfg *router.Config) { + nl.mu.Lock() + defer nl.mu.Unlock() + // TODO(joetsai): There is a race where deleted routes are not known at + // the time of extraction. We need to keep old routes around for a bit. + nl.addrs, nl.prefixes = makeRouteMaps(cfg) +} + +// Shutdown shuts down the network logger. +// This attempts to flush out all pending log messages. +func (nl *Logger) Shutdown(ctx context.Context) error { + nl.mu.Lock() + defer nl.mu.Unlock() + if nl.logger == nil { + return nil + } + nl.cancel() + nl.mu.Unlock() + nl.group.Wait() // do not hold lock while waiting + nl.mu.Lock() + err := nl.logger.Shutdown(ctx) + + nl.nodeID = logtail.PrivateID{} + nl.domainID = logtail.PrivateID{} + nl.logger = nil + nl.addrs = nil + nl.prefixes = nil + nl.cancel = nil + return err +} diff --git a/wgengine/netlog/logger_test.go b/wgengine/netlog/logger_test.go new file mode 100644 index 000000000..ec72d11af --- /dev/null +++ b/wgengine/netlog/logger_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package netlog + +import ( + "context" + "net/http" + "testing" + + qt "github.com/frankban/quicktest" + "tailscale.com/logtail" + "tailscale.com/net/flowtrack" + "tailscale.com/net/tunstats" + "tailscale.com/tstest" + "tailscale.com/util/must" + "tailscale.com/wgengine/router" +) + +func init() { + testClient = &http.Client{Transport: &roundTripper} +} + +var roundTripper roundTripperFunc + +type roundTripperFunc struct { + F func(*http.Request) (*http.Response, error) +} + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f.F(r) +} + +type fakeDevice struct { + toggled int // even => disabled, odd => enabled +} + +func (d *fakeDevice) SetStatisticsEnabled(enable bool) { + if enabled := d.toggled%2 == 1; enabled != enable { + d.toggled++ + } + +} +func (fakeDevice) ExtractStatistics() map[flowtrack.Tuple]tunstats.Counts { + // TODO(dsnet): Add a test that verifies that statistics are correctly + // extracted from the device and uploaded. Unfortunately, + // we can't reliably run this test until we fix http://go/oss/5856. + return nil +} + +func TestResourceCheck(t *testing.T) { + roundTripper.F = func(r *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: 200}, nil + } + + c := qt.New(t) + tstest.ResourceCheck(t) + var l Logger + var d fakeDevice + for i := 0; i < 10; i++ { + must.Do(l.Startup(logtail.PrivateID{}, logtail.PrivateID{}, &d, &router.Config{})) + l.ReconfigRoutes(&router.Config{}) + must.Do(l.Shutdown(context.Background())) + c.Assert(d.toggled, qt.Equals, 2*(i+1)) + } +}