wgengine/magicsock: gather physical-layer statistics (#5925)

There is utility in logging traffic statistics that occurs at the physical layer.
That is, in order to send packets virtually to a particular tailscale IP address,
what physical endpoints did we need to communicate with?

This functionality logs IP addresses identical to
what had always been logged in magicsock prior to #5823,
so there is no increase in PII being logged.

ExtractStatistics returns a mapping of connections to counts.
The source is always a Tailscale IP address (without port),
while the destination is some endpoint reachable on WAN or LAN.
As a special case, traffic routed through DERP will use 127.3.3.40
as the destination address with the port being the DERP region.

This entire feature is only enabled if data-plane audit logging
is enabled on the tailnet (by default it is disabled).

Example of type of information logged:

	------------------------------------  Tx[P/s]    Tx[B/s]  Rx[P/s]   Rx[B/s]
	PhysicalTraffic:                       25.80      3.39Ki   38.80     5.57Ki
	    100.1.2.3 -> 143.11.22.33:41641    15.40      2.00Ki   23.20     3.37Ki
	    100.4.5.6 -> 192.168.0.100:41641   10.20      1.38Ki   15.60     2.20Ki
	    100.7.8.9 -> 127.3.3.40:2           0.20      6.40      0.00     0.00

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
pull/6105/head
Joe Tsai 2 years ago committed by GitHub
parent 5e9e57ecf5
commit 81fd259133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -52,6 +52,7 @@ import (
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netlogtype"
"tailscale.com/types/netmap"
"tailscale.com/types/nettype"
"tailscale.com/util/clientmetric"
@ -337,6 +338,21 @@ type Conn struct {
// port is the preferred port from opts.Port; 0 means auto.
port atomic.Uint32
// stats maintains per-connection counters.
// See SetStatisticsEnabled and ExtractStatistics for details.
stats struct {
enabled atomic.Bool
// TODO(joetsai): A per-Conn map of connections is easiest to implement.
// Since every packet occurs within the context of an endpoint,
// we could track the counts within the endpoint itself,
// and then merge the results when ExtractStatistics is called.
// That would avoid a map lookup for every packet.
mu sync.Mutex
m map[netlogtype.Connection]netlogtype.Counts
}
// ============================================================
// mu guards all following fields; see userspaceEngine lock
// ordering rules against the engine. For derphttp, mu must
@ -1750,6 +1766,9 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache,
ep = de
}
ep.noteRecvActivity()
if c.stats.enabled.Load() {
c.updateStats(ep.nodeAddr, ipp, netlogtype.Counts{RxPackets: 1, RxBytes: uint64(len(b))})
}
return ep, true
}
@ -1805,6 +1824,9 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en
}
ep.noteRecvActivity()
if c.stats.enabled.Load() {
c.updateStats(ep.nodeAddr, ipp, netlogtype.Counts{RxPackets: 1, RxBytes: uint64(dm.n)})
}
return n, ep
}
@ -2406,6 +2428,9 @@ func (c *Conn) SetNetworkMap(nm *netmap.NetworkMap) {
endpointState: map[netip.AddrPort]*endpointState{},
heartbeatDisabled: heartbeatDisabled,
}
if len(n.Addresses) > 0 {
ep.nodeAddr = n.Addresses[0].Addr()
}
if !n.DiscoKey.IsZero() {
ep.discoKey = n.DiscoKey
ep.discoShort = n.DiscoKey.ShortString()
@ -3298,6 +3323,39 @@ func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) {
})
}
// updateStats updates the statistics counters with the src, dst, and cnts.
// It is the caller's responsibility to check whether logging is enabled.
func (c *Conn) updateStats(src netip.Addr, dst netip.AddrPort, cnts netlogtype.Counts) {
conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst}
c.stats.mu.Lock()
defer c.stats.mu.Unlock()
mak.Set(&c.stats.m, conn, c.stats.m[conn].Add(cnts))
}
// SetStatisticsEnabled enables per-connection packet counters.
// Disabling statistics gathering does not reset the counters.
// ExtractStatistics must be called to reset the counters and
// be periodically called while enabled to avoid unbounded memory use.
func (c *Conn) SetStatisticsEnabled(enable bool) {
c.stats.enabled.Store(enable)
}
// ExtractStatistics extracts and resets the counters for all active connections.
// It must be called periodically otherwise the memory used is unbounded.
//
// The source is always a peer's tailscale IP address,
// while the destination is the peer's physical IP address and port.
// As a special case, packets routed through DERP use a destination address
// of 127.3.3.40 with the port being the DERP region.
// This node's tailscale IP address never appears in the returned map.
func (c *Conn) ExtractStatistics() map[netlogtype.Connection]netlogtype.Counts {
c.stats.mu.Lock()
defer c.stats.mu.Unlock()
m := c.stats.m
c.stats.m = nil
return m
}
func ippDebugString(ua netip.AddrPort) string {
if ua.Addr() == derpMagicIPAddr {
return fmt.Sprintf("derp-%d", ua.Port())
@ -3332,6 +3390,7 @@ type endpoint struct {
publicKey key.NodePublic // peer public key (for WireGuard + DERP)
publicKeyHex string // cached output of publicKey.UntypedHexString
fakeWGAddr netip.AddrPort // the UDP address we tell wireguard-go we're using
nodeAddr netip.Addr // the node's first tailscale address (only used for logging)
// mu protects all following fields.
mu sync.Mutex // Lock ordering: Conn.mu, then endpoint.mu
@ -3676,13 +3735,21 @@ func (de *endpoint) send(b []byte) error {
var err error
if udpAddr.IsValid() {
_, err = de.c.sendAddr(udpAddr, de.publicKey, b)
if err == nil && de.c.stats.enabled.Load() {
de.c.updateStats(de.nodeAddr, udpAddr, netlogtype.Counts{TxPackets: 1, TxBytes: uint64(len(b))})
}
}
if derpAddr.IsValid() {
if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok && err != nil {
if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok {
if de.c.stats.enabled.Load() {
de.c.updateStats(de.nodeAddr, derpAddr, netlogtype.Counts{TxPackets: 1, TxBytes: uint64(len(b))})
}
if err != nil {
// UDP failed but DERP worked, so good enough:
return nil
}
}
}
return err
}

@ -28,6 +28,7 @@ import (
"unsafe"
"go4.org/mem"
"golang.org/x/exp/maps"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/tuntest"
"tailscale.com/derp"
@ -42,6 +43,7 @@ import (
"tailscale.com/tstest/natlab"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netlogtype"
"tailscale.com/types/netmap"
"tailscale.com/types/nettype"
"tailscale.com/util/cibuild"
@ -1093,17 +1095,45 @@ func testTwoDevicePing(t *testing.T, d *devices) {
}
}
m1.conn.SetStatisticsEnabled(true)
m2.conn.SetStatisticsEnabled(true)
checkStats := func(t *testing.T, m *magicStack, wantConns []netlogtype.Connection) {
stats := m.conn.ExtractStatistics()
for _, conn := range wantConns {
if _, ok := stats[conn]; ok {
return
}
}
t.Helper()
t.Errorf("missing any connection to %s from %s", wantConns, maps.Keys(stats))
}
addrPort := netip.MustParseAddrPort
m1Conns := []netlogtype.Connection{
{Src: addrPort("1.0.0.2:0"), Dst: m2.conn.pconn4.LocalAddr().AddrPort()},
{Src: addrPort("1.0.0.2:0"), Dst: addrPort("127.3.3.40:1")},
}
m2Conns := []netlogtype.Connection{
{Src: addrPort("1.0.0.1:0"), Dst: m1.conn.pconn4.LocalAddr().AddrPort()},
{Src: addrPort("1.0.0.1:0"), Dst: addrPort("127.3.3.40:1")},
}
outerT := t
t.Run("ping 1.0.0.1", func(t *testing.T) {
setT(t)
defer setT(outerT)
ping1(t)
checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns)
})
t.Run("ping 1.0.0.2", func(t *testing.T) {
setT(t)
defer setT(outerT)
ping2(t)
checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns)
})
t.Run("ping 1.0.0.2 via SendPacket", func(t *testing.T) {
@ -1120,6 +1150,8 @@ func testTwoDevicePing(t *testing.T, d *devices) {
if err := sendWithTimeout(msg1to2, in, send); err != nil {
t.Error(err)
}
checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns)
})
t.Run("no-op dev1 reconfig", func(t *testing.T) {
@ -1130,6 +1162,8 @@ func testTwoDevicePing(t *testing.T, d *devices) {
}
ping1(t)
ping2(t)
checkStats(t, m1, m1Conns)
checkStats(t, m2, m2Conns)
})
}

@ -31,8 +31,7 @@ const pollPeriod = 5 * time.Second
// Device is an abstraction over a tunnel device or a magic socket.
// *tstun.Wrapper implements this interface.
//
// TODO(joetsai): Make *magicsock.Conn implement this interface.
// *magicsock.Conn implements this interface.
type Device interface {
SetStatisticsEnabled(bool)
ExtractStatistics() map[netlogtype.Connection]netlogtype.Counts

@ -953,7 +953,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config,
nid := cfg.NetworkLogging.NodeID
tid := cfg.NetworkLogging.DomainID
e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public())
if err := e.networkLogger.Startup(nid, tid, e.tundev, nil); err != nil {
if err := e.networkLogger.Startup(nid, tid, e.tundev, e.magicConn); err != nil {
e.logf("wgengine: Reconfig: error starting up network logger: %v", err)
}
}

Loading…
Cancel
Save