mirror of https://github.com/tailscale/tailscale/
wgengine/netlog: embed node information in network flow logs (#17668)
This rewrites the netlog package to support embedding node information in network flow logs. Some bit of complexity comes in trying to pre-compute the expected size of the log message after JSON serialization to ensure that we can respect maximum body limits in log uploading. We also fix a bug in tstun, where we were recording the IP address after SNAT, which was resulting in non-sensible connection flows being logged. Updates tailscale/corp#33352 Signed-off-by: Joe Tsai <joetsai@digital-static.net>pull/17699/head
parent
fcb614a53e
commit
478342a642
@ -0,0 +1,236 @@
|
|||||||
|
// Copyright (c) Tailscale Inc & AUTHORS
|
||||||
|
// SPDX-License-Identifier: BSD-3-Clause
|
||||||
|
|
||||||
|
//go:build !ts_omit_netlog && !ts_omit_logtail
|
||||||
|
|
||||||
|
package netlog
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"math/rand/v2"
|
||||||
|
"net/netip"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"testing/synctest"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
jsonv2 "github.com/go-json-experiment/json"
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
|
"tailscale.com/tailcfg"
|
||||||
|
"tailscale.com/types/bools"
|
||||||
|
"tailscale.com/types/ipproto"
|
||||||
|
"tailscale.com/types/netlogtype"
|
||||||
|
"tailscale.com/types/netmap"
|
||||||
|
"tailscale.com/wgengine/router"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEmbedNodeInfo(t *testing.T) {
|
||||||
|
// Initialize the logger with a particular view of the netmap.
|
||||||
|
var logger Logger
|
||||||
|
logger.ReconfigNetworkMap(&netmap.NetworkMap{
|
||||||
|
SelfNode: (&tailcfg.Node{
|
||||||
|
StableID: "n123456CNTL",
|
||||||
|
ID: 123456,
|
||||||
|
Name: "test.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Prefix{prefix("100.1.2.3")},
|
||||||
|
Tags: []string{"tag:foo", "tag:bar"},
|
||||||
|
}).View(),
|
||||||
|
Peers: []tailcfg.NodeView{
|
||||||
|
(&tailcfg.Node{
|
||||||
|
StableID: "n123457CNTL",
|
||||||
|
ID: 123457,
|
||||||
|
Name: "peer1.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Prefix{prefix("100.1.2.4")},
|
||||||
|
Tags: []string{"tag:peer"},
|
||||||
|
}).View(),
|
||||||
|
(&tailcfg.Node{
|
||||||
|
StableID: "n123458CNTL",
|
||||||
|
ID: 123458,
|
||||||
|
Name: "peer2.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Prefix{prefix("100.1.2.5")},
|
||||||
|
User: 54321,
|
||||||
|
}).View(),
|
||||||
|
},
|
||||||
|
UserProfiles: map[tailcfg.UserID]tailcfg.UserProfileView{
|
||||||
|
54321: (&tailcfg.UserProfile{ID: 54321, LoginName: "peer@example.com"}).View(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
logger.ReconfigRoutes(&router.Config{
|
||||||
|
SubnetRoutes: []netip.Prefix{
|
||||||
|
prefix("172.16.1.1/16"),
|
||||||
|
prefix("192.168.1.1/24"),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Update the counters for a few connections.
|
||||||
|
var group sync.WaitGroup
|
||||||
|
defer group.Wait()
|
||||||
|
conns := []struct {
|
||||||
|
virt bool
|
||||||
|
proto ipproto.Proto
|
||||||
|
src, dst netip.AddrPort
|
||||||
|
txP, txB, rxP, rxB int
|
||||||
|
}{
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:80"), addrPort("100.1.2.4:1812"), 88, 278, 34, 887},
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:443"), addrPort("100.1.2.5:1742"), 96, 635, 23, 790},
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:443"), addrPort("100.1.2.6:1175"), 48, 94, 86, 618}, // unknown peer (in Tailscale IP space, but not a known peer)
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:80"), addrPort("192.168.1.241:713"), 43, 154, 66, 883},
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:80"), addrPort("192.168.2.241:713"), 43, 154, 66, 883}, // not in the subnet, must be exit traffic
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:80"), addrPort("172.16.5.18:713"), 7, 243, 40, 59},
|
||||||
|
{true, 0x6, addrPort("100.1.2.3:80"), addrPort("172.20.5.18:713"), 61, 753, 42, 492}, // not in the subnet, must be exit traffic
|
||||||
|
{true, 0x6, addrPort("192.168.1.241:713"), addrPort("100.1.2.3:80"), 43, 154, 66, 883},
|
||||||
|
{true, 0x6, addrPort("192.168.2.241:713"), addrPort("100.1.2.3:80"), 43, 154, 66, 883}, // not in the subnet, must be exit traffic
|
||||||
|
{true, 0x6, addrPort("172.16.5.18:713"), addrPort("100.1.2.3:80"), 7, 243, 40, 59},
|
||||||
|
{true, 0x6, addrPort("172.20.5.18:713"), addrPort("100.1.2.3:80"), 61, 753, 42, 492}, // not in the subnet, must be exit traffic
|
||||||
|
{true, 0x6, addrPort("14.255.192.128:39230"), addrPort("243.42.106.193:48206"), 81, 791, 79, 316}, // unknown connection
|
||||||
|
{false, 0x6, addrPort("100.1.2.4:0"), addrPort("35.92.180.165:9743"), 63, 136, 61, 409}, // physical traffic with peer1
|
||||||
|
{false, 0x6, addrPort("100.1.2.5:0"), addrPort("131.19.35.17:9743"), 88, 452, 2, 716}, // physical traffic with peer2
|
||||||
|
}
|
||||||
|
for range 10 {
|
||||||
|
for _, conn := range conns {
|
||||||
|
update := bools.IfElse(conn.virt, logger.updateVirtConn, logger.updatePhysConn)
|
||||||
|
group.Go(func() { update(conn.proto, conn.src, conn.dst, conn.txP, conn.txB, false) })
|
||||||
|
group.Go(func() { update(conn.proto, conn.src, conn.dst, conn.rxP, conn.rxB, true) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
group.Wait()
|
||||||
|
|
||||||
|
// Verify that the counters match.
|
||||||
|
got := logger.record.toMessage(false, false)
|
||||||
|
got.Start = time.Time{} // avoid flakiness
|
||||||
|
want := netlogtype.Message{
|
||||||
|
NodeID: "n123456CNTL",
|
||||||
|
SrcNode: netlogtype.Node{
|
||||||
|
NodeID: "n123456CNTL",
|
||||||
|
Name: "test.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Addr{addr("100.1.2.3")},
|
||||||
|
Tags: []string{"tag:bar", "tag:foo"},
|
||||||
|
},
|
||||||
|
DstNodes: []netlogtype.Node{{
|
||||||
|
NodeID: "n123457CNTL",
|
||||||
|
Name: "peer1.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Addr{addr("100.1.2.4")},
|
||||||
|
Tags: []string{"tag:peer"},
|
||||||
|
}, {
|
||||||
|
NodeID: "n123458CNTL",
|
||||||
|
Name: "peer2.tail123456.ts.net",
|
||||||
|
Addresses: []netip.Addr{addr("100.1.2.5")},
|
||||||
|
User: "peer@example.com",
|
||||||
|
}},
|
||||||
|
VirtualTraffic: []netlogtype.ConnectionCounts{
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:80", "100.1.2.4:1812"), Counts: counts(880, 2780, 340, 8870)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:443", "100.1.2.5:1742"), Counts: counts(960, 6350, 230, 7900)},
|
||||||
|
},
|
||||||
|
SubnetTraffic: []netlogtype.ConnectionCounts{
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:80", "172.16.5.18:713"), Counts: counts(70, 2430, 400, 590)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:80", "192.168.1.241:713"), Counts: counts(430, 1540, 660, 8830)},
|
||||||
|
{Connection: conn(0x6, "172.16.5.18:713", "100.1.2.3:80"), Counts: counts(70, 2430, 400, 590)},
|
||||||
|
{Connection: conn(0x6, "192.168.1.241:713", "100.1.2.3:80"), Counts: counts(430, 1540, 660, 8830)},
|
||||||
|
},
|
||||||
|
ExitTraffic: []netlogtype.ConnectionCounts{
|
||||||
|
{Connection: conn(0x6, "14.255.192.128:39230", "243.42.106.193:48206"), Counts: counts(810, 7910, 790, 3160)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:80", "172.20.5.18:713"), Counts: counts(610, 7530, 420, 4920)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:80", "192.168.2.241:713"), Counts: counts(430, 1540, 660, 8830)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.3:443", "100.1.2.6:1175"), Counts: counts(480, 940, 860, 6180)},
|
||||||
|
{Connection: conn(0x6, "172.20.5.18:713", "100.1.2.3:80"), Counts: counts(610, 7530, 420, 4920)},
|
||||||
|
{Connection: conn(0x6, "192.168.2.241:713", "100.1.2.3:80"), Counts: counts(430, 1540, 660, 8830)},
|
||||||
|
},
|
||||||
|
PhysicalTraffic: []netlogtype.ConnectionCounts{
|
||||||
|
{Connection: conn(0x6, "100.1.2.4:0", "35.92.180.165:9743"), Counts: counts(630, 1360, 610, 4090)},
|
||||||
|
{Connection: conn(0x6, "100.1.2.5:0", "131.19.35.17:9743"), Counts: counts(880, 4520, 20, 7160)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if d := cmp.Diff(got, want, cmpopts.EquateComparable(netip.Addr{}, netip.AddrPort{})); d != "" {
|
||||||
|
t.Errorf("Message (-got +want):\n%s", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateRace(t *testing.T) {
|
||||||
|
var logger Logger
|
||||||
|
logger.recordsChan = make(chan record, 1)
|
||||||
|
go func(recordsChan chan record) {
|
||||||
|
for range recordsChan {
|
||||||
|
}
|
||||||
|
}(logger.recordsChan)
|
||||||
|
|
||||||
|
var group sync.WaitGroup
|
||||||
|
defer group.Wait()
|
||||||
|
for i := range 1000 {
|
||||||
|
group.Go(func() {
|
||||||
|
src, dst := randAddrPort(), randAddrPort()
|
||||||
|
for j := range 1000 {
|
||||||
|
if i%2 == 0 {
|
||||||
|
logger.updateVirtConn(0x1, src, dst, rand.IntN(10), rand.IntN(1000), j%2 == 0)
|
||||||
|
} else {
|
||||||
|
logger.updatePhysConn(0x1, src, dst, rand.IntN(10), rand.IntN(1000), j%2 == 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
group.Go(func() {
|
||||||
|
for range 1000 {
|
||||||
|
logger.ReconfigNetworkMap(new(netmap.NetworkMap))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
group.Go(func() {
|
||||||
|
for range 1000 {
|
||||||
|
logger.ReconfigRoutes(new(router.Config))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
group.Wait()
|
||||||
|
logger.mu.Lock()
|
||||||
|
close(logger.recordsChan)
|
||||||
|
logger.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func randAddrPort() netip.AddrPort {
|
||||||
|
var b [4]uint8
|
||||||
|
binary.LittleEndian.PutUint32(b[:], rand.Uint32())
|
||||||
|
return netip.AddrPortFrom(netip.AddrFrom4(b), uint16(rand.Uint32()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAutoFlushMaxConns(t *testing.T) {
|
||||||
|
var logger Logger
|
||||||
|
logger.recordsChan = make(chan record, 1)
|
||||||
|
for i := 0; len(logger.recordsChan) == 0; i++ {
|
||||||
|
logger.updateVirtConn(0, netip.AddrPortFrom(netip.Addr{}, uint16(i)), netip.AddrPort{}, 1, 1, false)
|
||||||
|
}
|
||||||
|
b, _ := jsonv2.Marshal(logger.recordsChan)
|
||||||
|
if len(b) > maxLogSize {
|
||||||
|
t.Errorf("len(Message) = %v, want <= %d", len(b), maxLogSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAutoFlushTimeout(t *testing.T) {
|
||||||
|
var logger Logger
|
||||||
|
logger.recordsChan = make(chan record, 1)
|
||||||
|
synctest.Test(t, func(t *testing.T) {
|
||||||
|
logger.updateVirtConn(0, netip.AddrPort{}, netip.AddrPort{}, 1, 1, false)
|
||||||
|
time.Sleep(pollPeriod)
|
||||||
|
})
|
||||||
|
rec := <-logger.recordsChan
|
||||||
|
if d := rec.end.Sub(rec.start); d != pollPeriod {
|
||||||
|
t.Errorf("window = %v, want %v", d, pollPeriod)
|
||||||
|
}
|
||||||
|
if len(rec.virtConns) != 1 {
|
||||||
|
t.Errorf("len(virtConns) = %d, want 1", len(rec.virtConns))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkUpdateSameConn(b *testing.B) {
|
||||||
|
var logger Logger
|
||||||
|
b.ReportAllocs()
|
||||||
|
for range b.N {
|
||||||
|
logger.updateVirtConn(0, netip.AddrPort{}, netip.AddrPort{}, 1, 1, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkUpdateNewConns(b *testing.B) {
|
||||||
|
var logger Logger
|
||||||
|
b.ReportAllocs()
|
||||||
|
for i := range b.N {
|
||||||
|
logger.updateVirtConn(0, netip.AddrPortFrom(netip.Addr{}, uint16(i)), netip.AddrPort{}, 1, 1, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,222 +0,0 @@
|
|||||||
// Copyright (c) Tailscale Inc & AUTHORS
|
|
||||||
// SPDX-License-Identifier: BSD-3-Clause
|
|
||||||
|
|
||||||
//go:build !ts_omit_netlog && !ts_omit_logtail
|
|
||||||
|
|
||||||
package netlog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"net/netip"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
"tailscale.com/net/packet"
|
|
||||||
"tailscale.com/net/tsaddr"
|
|
||||||
"tailscale.com/types/ipproto"
|
|
||||||
"tailscale.com/types/netlogtype"
|
|
||||||
)
|
|
||||||
|
|
||||||
// statistics maintains counters for every connection.
|
|
||||||
// All methods are safe for concurrent use.
|
|
||||||
// The zero value is ready for use.
|
|
||||||
type statistics struct {
|
|
||||||
maxConns int // immutable once set
|
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
connCnts
|
|
||||||
|
|
||||||
connCntsCh chan connCnts
|
|
||||||
shutdownCtx context.Context
|
|
||||||
shutdown context.CancelFunc
|
|
||||||
group errgroup.Group
|
|
||||||
}
|
|
||||||
|
|
||||||
type connCnts struct {
|
|
||||||
start time.Time
|
|
||||||
end time.Time
|
|
||||||
virtual map[netlogtype.Connection]netlogtype.Counts
|
|
||||||
physical map[netlogtype.Connection]netlogtype.Counts
|
|
||||||
}
|
|
||||||
|
|
||||||
// newStatistics creates a data structure for tracking connection statistics
|
|
||||||
// that periodically dumps the virtual and physical connection counts
|
|
||||||
// depending on whether the maxPeriod or maxConns is exceeded.
|
|
||||||
// The dump function is called from a single goroutine.
|
|
||||||
// Shutdown must be called to cleanup resources.
|
|
||||||
func newStatistics(maxPeriod time.Duration, maxConns int, dump func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts)) *statistics {
|
|
||||||
s := &statistics{maxConns: maxConns}
|
|
||||||
s.connCntsCh = make(chan connCnts, 256)
|
|
||||||
s.shutdownCtx, s.shutdown = context.WithCancel(context.Background())
|
|
||||||
s.group.Go(func() error {
|
|
||||||
// TODO(joetsai): Using a ticker is problematic on mobile platforms
|
|
||||||
// where waking up a process every maxPeriod when there is no activity
|
|
||||||
// is a drain on battery life. Switch this instead to instead use
|
|
||||||
// a time.Timer that is triggered upon network activity.
|
|
||||||
ticker := new(time.Ticker)
|
|
||||||
if maxPeriod > 0 {
|
|
||||||
ticker = time.NewTicker(maxPeriod)
|
|
||||||
defer ticker.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
var cc connCnts
|
|
||||||
select {
|
|
||||||
case cc = <-s.connCntsCh:
|
|
||||||
case <-ticker.C:
|
|
||||||
cc = s.extract()
|
|
||||||
case <-s.shutdownCtx.Done():
|
|
||||||
cc = s.extract()
|
|
||||||
}
|
|
||||||
if len(cc.virtual)+len(cc.physical) > 0 && dump != nil {
|
|
||||||
dump(cc.start, cc.end, cc.virtual, cc.physical)
|
|
||||||
}
|
|
||||||
if s.shutdownCtx.Err() != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateTxVirtual updates the counters for a transmitted IP packet
|
|
||||||
// The source and destination of the packet directly correspond with
|
|
||||||
// the source and destination in netlogtype.Connection.
|
|
||||||
func (s *statistics) UpdateTxVirtual(b []byte) {
|
|
||||||
var p packet.Parsed
|
|
||||||
p.Decode(b)
|
|
||||||
s.UpdateVirtual(p.IPProto, p.Src, p.Dst, 1, len(b), false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateRxVirtual updates the counters for a received IP packet.
|
|
||||||
// The source and destination of the packet are inverted with respect to
|
|
||||||
// the source and destination in netlogtype.Connection.
|
|
||||||
func (s *statistics) UpdateRxVirtual(b []byte) {
|
|
||||||
var p packet.Parsed
|
|
||||||
p.Decode(b)
|
|
||||||
s.UpdateVirtual(p.IPProto, p.Dst, p.Src, 1, len(b), true)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
tailscaleServiceIPv4 = tsaddr.TailscaleServiceIP()
|
|
||||||
tailscaleServiceIPv6 = tsaddr.TailscaleServiceIPv6()
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *statistics) UpdateVirtual(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) {
|
|
||||||
// Network logging is defined as traffic between two Tailscale nodes.
|
|
||||||
// Traffic with the internal Tailscale service is not with another node
|
|
||||||
// and should not be logged. It also happens to be a high volume
|
|
||||||
// amount of discrete traffic flows (e.g., DNS lookups).
|
|
||||||
switch dst.Addr() {
|
|
||||||
case tailscaleServiceIPv4, tailscaleServiceIPv6:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst}
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
cnts, found := s.virtual[conn]
|
|
||||||
if !found && !s.preInsertConn() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if receive {
|
|
||||||
cnts.RxPackets += uint64(packets)
|
|
||||||
cnts.RxBytes += uint64(bytes)
|
|
||||||
} else {
|
|
||||||
cnts.TxPackets += uint64(packets)
|
|
||||||
cnts.TxBytes += uint64(bytes)
|
|
||||||
}
|
|
||||||
s.virtual[conn] = cnts
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateTxPhysical updates the counters for zero or more transmitted wireguard packets.
|
|
||||||
// The src is always a Tailscale IP address, representing some remote peer.
|
|
||||||
// The dst is a remote IP address and port that corresponds
|
|
||||||
// with some physical peer backing the Tailscale IP address.
|
|
||||||
func (s *statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
|
|
||||||
s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateRxPhysical updates the counters for zero or more received wireguard packets.
|
|
||||||
// The src is always a Tailscale IP address, representing some remote peer.
|
|
||||||
// The dst is a remote IP address and port that corresponds
|
|
||||||
// with some physical peer backing the Tailscale IP address.
|
|
||||||
func (s *statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
|
|
||||||
s.UpdatePhysical(0, netip.AddrPortFrom(src, 0), dst, packets, bytes, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statistics) UpdatePhysical(proto ipproto.Proto, src, dst netip.AddrPort, packets, bytes int, receive bool) {
|
|
||||||
conn := netlogtype.Connection{Proto: proto, Src: src, Dst: dst}
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
cnts, found := s.physical[conn]
|
|
||||||
if !found && !s.preInsertConn() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if receive {
|
|
||||||
cnts.RxPackets += uint64(packets)
|
|
||||||
cnts.RxBytes += uint64(bytes)
|
|
||||||
} else {
|
|
||||||
cnts.TxPackets += uint64(packets)
|
|
||||||
cnts.TxBytes += uint64(bytes)
|
|
||||||
}
|
|
||||||
s.physical[conn] = cnts
|
|
||||||
}
|
|
||||||
|
|
||||||
// preInsertConn updates the maps to handle insertion of a new connection.
|
|
||||||
// It reports false if insertion is not allowed (i.e., after shutdown).
|
|
||||||
func (s *statistics) preInsertConn() bool {
|
|
||||||
// Check whether insertion of a new connection will exceed maxConns.
|
|
||||||
if len(s.virtual)+len(s.physical) == s.maxConns && s.maxConns > 0 {
|
|
||||||
// Extract the current statistics and send it to the serializer.
|
|
||||||
// Avoid blocking the network packet handling path.
|
|
||||||
select {
|
|
||||||
case s.connCntsCh <- s.extractLocked():
|
|
||||||
default:
|
|
||||||
// TODO(joetsai): Log that we are dropping an entire connCounts.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize the maps if nil.
|
|
||||||
if s.virtual == nil && s.physical == nil {
|
|
||||||
s.start = time.Now().UTC()
|
|
||||||
s.virtual = make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
s.physical = make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.shutdownCtx.Err() == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statistics) extract() connCnts {
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
return s.extractLocked()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *statistics) extractLocked() connCnts {
|
|
||||||
if len(s.virtual)+len(s.physical) == 0 {
|
|
||||||
return connCnts{}
|
|
||||||
}
|
|
||||||
s.end = time.Now().UTC()
|
|
||||||
cc := s.connCnts
|
|
||||||
s.connCnts = connCnts{}
|
|
||||||
return cc
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestExtract synchronously extracts the current network statistics map
|
|
||||||
// and resets the counters. This should only be used for testing purposes.
|
|
||||||
func (s *statistics) TestExtract() (virtual, physical map[netlogtype.Connection]netlogtype.Counts) {
|
|
||||||
cc := s.extract()
|
|
||||||
return cc.virtual, cc.physical
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shutdown performs a final flush of statistics.
|
|
||||||
// Statistics for any subsequent calls to Update will be dropped.
|
|
||||||
// It is safe to call Shutdown concurrently and repeatedly.
|
|
||||||
func (s *statistics) Shutdown(context.Context) error {
|
|
||||||
s.shutdown()
|
|
||||||
return s.group.Wait()
|
|
||||||
}
|
|
||||||
@ -1,235 +0,0 @@
|
|||||||
// Copyright (c) Tailscale Inc & AUTHORS
|
|
||||||
// SPDX-License-Identifier: BSD-3-Clause
|
|
||||||
|
|
||||||
package netlog
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"net/netip"
|
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
qt "github.com/frankban/quicktest"
|
|
||||||
"tailscale.com/cmd/testwrapper/flakytest"
|
|
||||||
"tailscale.com/types/ipproto"
|
|
||||||
"tailscale.com/types/netlogtype"
|
|
||||||
)
|
|
||||||
|
|
||||||
func testPacketV4(proto ipproto.Proto, srcAddr, dstAddr [4]byte, srcPort, dstPort, size uint16) (out []byte) {
|
|
||||||
var ipHdr [20]byte
|
|
||||||
ipHdr[0] = 4<<4 | 5
|
|
||||||
binary.BigEndian.PutUint16(ipHdr[2:], size)
|
|
||||||
ipHdr[9] = byte(proto)
|
|
||||||
*(*[4]byte)(ipHdr[12:]) = srcAddr
|
|
||||||
*(*[4]byte)(ipHdr[16:]) = dstAddr
|
|
||||||
out = append(out, ipHdr[:]...)
|
|
||||||
switch proto {
|
|
||||||
case ipproto.TCP:
|
|
||||||
var tcpHdr [20]byte
|
|
||||||
binary.BigEndian.PutUint16(tcpHdr[0:], srcPort)
|
|
||||||
binary.BigEndian.PutUint16(tcpHdr[2:], dstPort)
|
|
||||||
out = append(out, tcpHdr[:]...)
|
|
||||||
case ipproto.UDP:
|
|
||||||
var udpHdr [8]byte
|
|
||||||
binary.BigEndian.PutUint16(udpHdr[0:], srcPort)
|
|
||||||
binary.BigEndian.PutUint16(udpHdr[2:], dstPort)
|
|
||||||
out = append(out, udpHdr[:]...)
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("unknown proto: %d", proto))
|
|
||||||
}
|
|
||||||
return append(out, make([]byte, int(size)-len(out))...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestInterval ensures that we receive at least one call to `dump` using only
|
|
||||||
// maxPeriod.
|
|
||||||
func TestInterval(t *testing.T) {
|
|
||||||
c := qt.New(t)
|
|
||||||
|
|
||||||
const maxPeriod = 10 * time.Millisecond
|
|
||||||
const maxConns = 2048
|
|
||||||
|
|
||||||
gotDump := make(chan struct{}, 1)
|
|
||||||
stats := newStatistics(maxPeriod, maxConns, func(_, _ time.Time, _, _ map[netlogtype.Connection]netlogtype.Counts) {
|
|
||||||
select {
|
|
||||||
case gotDump <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer stats.Shutdown(context.Background())
|
|
||||||
|
|
||||||
srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
||||||
dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
||||||
srcPort := uint16(rand.Intn(16))
|
|
||||||
dstPort := uint16(rand.Intn(16))
|
|
||||||
size := uint16(64 + rand.Intn(1024))
|
|
||||||
p := testPacketV4(ipproto.TCP, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size)
|
|
||||||
stats.UpdateRxVirtual(p)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
|
||||||
defer cancel()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
c.Fatal("didn't receive dump within context deadline")
|
|
||||||
case <-gotDump:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConcurrent(t *testing.T) {
|
|
||||||
flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/7030")
|
|
||||||
c := qt.New(t)
|
|
||||||
|
|
||||||
const maxPeriod = 10 * time.Millisecond
|
|
||||||
const maxConns = 10
|
|
||||||
virtualAggregate := make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
stats := newStatistics(maxPeriod, maxConns, func(start, end time.Time, virtual, physical map[netlogtype.Connection]netlogtype.Counts) {
|
|
||||||
c.Assert(start.IsZero(), qt.IsFalse)
|
|
||||||
c.Assert(end.IsZero(), qt.IsFalse)
|
|
||||||
c.Assert(end.Before(start), qt.IsFalse)
|
|
||||||
c.Assert(len(virtual) > 0 && len(virtual) <= maxConns, qt.IsTrue)
|
|
||||||
c.Assert(len(physical) == 0, qt.IsTrue)
|
|
||||||
for conn, cnts := range virtual {
|
|
||||||
virtualAggregate[conn] = virtualAggregate[conn].Add(cnts)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer stats.Shutdown(context.Background())
|
|
||||||
var wants []map[netlogtype.Connection]netlogtype.Counts
|
|
||||||
gots := make([]map[netlogtype.Connection]netlogtype.Counts, runtime.NumCPU())
|
|
||||||
var group sync.WaitGroup
|
|
||||||
for i := range gots {
|
|
||||||
group.Add(1)
|
|
||||||
go func(i int) {
|
|
||||||
defer group.Done()
|
|
||||||
gots[i] = make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
rn := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
var p []byte
|
|
||||||
var t netlogtype.Connection
|
|
||||||
for j := 0; j < 1000; j++ {
|
|
||||||
delay := rn.Intn(10000)
|
|
||||||
if p == nil || rn.Intn(64) == 0 {
|
|
||||||
proto := ipproto.TCP
|
|
||||||
if rn.Intn(2) == 0 {
|
|
||||||
proto = ipproto.UDP
|
|
||||||
}
|
|
||||||
srcAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
||||||
dstAddr := netip.AddrFrom4([4]byte{192, 168, 0, byte(rand.Intn(16))})
|
|
||||||
srcPort := uint16(rand.Intn(16))
|
|
||||||
dstPort := uint16(rand.Intn(16))
|
|
||||||
size := uint16(64 + rand.Intn(1024))
|
|
||||||
p = testPacketV4(proto, srcAddr.As4(), dstAddr.As4(), srcPort, dstPort, size)
|
|
||||||
t = netlogtype.Connection{Proto: proto, Src: netip.AddrPortFrom(srcAddr, srcPort), Dst: netip.AddrPortFrom(dstAddr, dstPort)}
|
|
||||||
}
|
|
||||||
t2 := t
|
|
||||||
receive := rn.Intn(2) == 0
|
|
||||||
if receive {
|
|
||||||
t2.Src, t2.Dst = t2.Dst, t2.Src
|
|
||||||
}
|
|
||||||
|
|
||||||
cnts := gots[i][t2]
|
|
||||||
if receive {
|
|
||||||
stats.UpdateRxVirtual(p)
|
|
||||||
cnts.RxPackets++
|
|
||||||
cnts.RxBytes += uint64(len(p))
|
|
||||||
} else {
|
|
||||||
cnts.TxPackets++
|
|
||||||
cnts.TxBytes += uint64(len(p))
|
|
||||||
stats.UpdateTxVirtual(p)
|
|
||||||
}
|
|
||||||
gots[i][t2] = cnts
|
|
||||||
time.Sleep(time.Duration(rn.Intn(1 + delay)))
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
group.Wait()
|
|
||||||
c.Assert(stats.Shutdown(context.Background()), qt.IsNil)
|
|
||||||
wants = append(wants, virtualAggregate)
|
|
||||||
|
|
||||||
got := make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
want := make(map[netlogtype.Connection]netlogtype.Counts)
|
|
||||||
mergeMaps(got, gots...)
|
|
||||||
mergeMaps(want, wants...)
|
|
||||||
c.Assert(got, qt.DeepEquals, want)
|
|
||||||
}
|
|
||||||
|
|
||||||
func mergeMaps(dst map[netlogtype.Connection]netlogtype.Counts, srcs ...map[netlogtype.Connection]netlogtype.Counts) {
|
|
||||||
for _, src := range srcs {
|
|
||||||
for conn, cnts := range src {
|
|
||||||
dst[conn] = dst[conn].Add(cnts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Benchmark(b *testing.B) {
|
|
||||||
// TODO: Test IPv6 packets?
|
|
||||||
b.Run("SingleRoutine/SameConn", func(b *testing.B) {
|
|
||||||
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
for range b.N {
|
|
||||||
s := newStatistics(0, 0, nil)
|
|
||||||
for j := 0; j < 1e3; j++ {
|
|
||||||
s.UpdateTxVirtual(p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
b.Run("SingleRoutine/UniqueConns", func(b *testing.B) {
|
|
||||||
p := testPacketV4(ipproto.UDP, [4]byte{}, [4]byte{}, 0, 0, 789)
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
for range b.N {
|
|
||||||
s := newStatistics(0, 0, nil)
|
|
||||||
for j := 0; j < 1e3; j++ {
|
|
||||||
binary.BigEndian.PutUint32(p[20:], uint32(j)) // unique port combination
|
|
||||||
s.UpdateTxVirtual(p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
b.Run("MultiRoutine/SameConn", func(b *testing.B) {
|
|
||||||
p := testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 123, 456, 789)
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
for range b.N {
|
|
||||||
s := newStatistics(0, 0, nil)
|
|
||||||
var group sync.WaitGroup
|
|
||||||
for j := 0; j < runtime.NumCPU(); j++ {
|
|
||||||
group.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer group.Done()
|
|
||||||
for k := 0; k < 1e3; k++ {
|
|
||||||
s.UpdateTxVirtual(p)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
group.Wait()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
b.Run("MultiRoutine/UniqueConns", func(b *testing.B) {
|
|
||||||
ps := make([][]byte, runtime.NumCPU())
|
|
||||||
for i := range ps {
|
|
||||||
ps[i] = testPacketV4(ipproto.UDP, [4]byte{192, 168, 0, 1}, [4]byte{192, 168, 0, 2}, 0, 0, 789)
|
|
||||||
}
|
|
||||||
b.ResetTimer()
|
|
||||||
b.ReportAllocs()
|
|
||||||
for range b.N {
|
|
||||||
s := newStatistics(0, 0, nil)
|
|
||||||
var group sync.WaitGroup
|
|
||||||
for j := 0; j < runtime.NumCPU(); j++ {
|
|
||||||
group.Add(1)
|
|
||||||
go func(j int) {
|
|
||||||
defer group.Done()
|
|
||||||
p := ps[j]
|
|
||||||
j *= 1e3
|
|
||||||
for k := 0; k < 1e3; k++ {
|
|
||||||
binary.BigEndian.PutUint32(p[20:], uint32(j+k)) // unique port combination
|
|
||||||
s.UpdateTxVirtual(p)
|
|
||||||
}
|
|
||||||
}(j)
|
|
||||||
}
|
|
||||||
group.Wait()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue