all: implement pcap streaming for datapath debugging

Updates: tailscale/corp#8470

Signed-off-by: Tom DNetto <tom@tailscale.com>
pull/7187/head^2
Tom DNetto 2 years ago committed by Tom
parent 1acdcff63e
commit 99b9d7a621

@ -1028,6 +1028,27 @@ func (lc *LocalClient) DebugSetExpireIn(ctx context.Context, d time.Duration) er
return err
}
// StreamDebugCapture streams a pcap-formatted packet capture.
//
// The provided context does not determine the lifetime of the
// returned io.ReadCloser.
func (lc *LocalClient) StreamDebugCapture(ctx context.Context) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/debug-capture", nil)
if err != nil {
return nil, err
}
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
res.Body.Close()
return nil, err
}
if res.StatusCode != 200 {
res.Body.Close()
return nil, errors.New(res.Status)
}
return res.Body, nil
}
// WatchIPNBus subscribes to the IPN notification bus. It returns a watcher
// once the bus is connected successfully.
//

@ -20,6 +20,7 @@ import (
"net/netip"
"net/url"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
@ -39,6 +40,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/must"
"tailscale.com/wgengine/capture"
)
var debugCmd = &ffcli.Command{
@ -189,6 +191,16 @@ var debugCmd = &ffcli.Command{
Exec: runDebugDERP,
ShortHelp: "test a DERP configuration",
},
{
Name: "capture",
Exec: runCapture,
ShortHelp: "streams pcaps for debugging",
FlagSet: (func() *flag.FlagSet {
fs := newFlagSet("capture")
fs.StringVar(&captureArgs.outFile, "o", "", "path to stream the pcap (or - for stdout), leave empty to start wireshark")
return fs
})(),
},
},
}
@ -733,3 +745,47 @@ func runSetExpire(ctx context.Context, args []string) error {
}
return localClient.DebugSetExpireIn(ctx, setExpireArgs.in)
}
var captureArgs struct {
outFile string
}
func runCapture(ctx context.Context, args []string) error {
stream, err := localClient.StreamDebugCapture(ctx)
if err != nil {
return err
}
defer stream.Close()
switch captureArgs.outFile {
case "-":
fmt.Fprintln(os.Stderr, "Press Ctrl-C to stop the capture.")
_, err = io.Copy(os.Stdout, stream)
return err
case "":
lua, err := os.CreateTemp("", "ts-dissector")
if err != nil {
return err
}
defer os.Remove(lua.Name())
lua.Write([]byte(capture.DissectorLua))
if err := lua.Close(); err != nil {
return err
}
wireshark := exec.CommandContext(ctx, "wireshark", "-X", "lua_script:"+lua.Name(), "-k", "-i", "-")
wireshark.Stdin = stream
wireshark.Stdout = os.Stdout
wireshark.Stderr = os.Stderr
return wireshark.Run()
}
f, err := os.OpenFile(captureArgs.outFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer f.Close()
fmt.Fprintln(os.Stderr, "Press Ctrl-C to stop the capture.")
_, err = io.Copy(f, stream)
return err
}

@ -115,11 +115,12 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/util/multierr from tailscale.com/control/controlhttp+
tailscale.com/util/must from tailscale.com/cmd/tailscale/cli
tailscale.com/util/quarantine from tailscale.com/cmd/tailscale/cli
tailscale.com/util/set from tailscale.com/health
tailscale.com/util/set from tailscale.com/health+
tailscale.com/util/singleflight from tailscale.com/net/dnscache
💣 tailscale.com/util/winutil from tailscale.com/hostinfo+
tailscale.com/version from tailscale.com/cmd/tailscale/cli+
tailscale.com/version/distro from tailscale.com/cmd/tailscale/cli+
tailscale.com/wgengine/capture from tailscale.com/cmd/tailscale/cli
tailscale.com/wgengine/filter from tailscale.com/types/netmap
golang.org/x/crypto/argon2 from tailscale.com/tka
golang.org/x/crypto/blake2b from golang.org/x/crypto/nacl/box+

@ -310,6 +310,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/version/distro from tailscale.com/hostinfo+
W tailscale.com/wf from tailscale.com/cmd/tailscaled
tailscale.com/wgengine from tailscale.com/ipn/ipnlocal+
tailscale.com/wgengine/capture from tailscale.com/ipn/ipnlocal+
tailscale.com/wgengine/filter from tailscale.com/control/controlclient+
💣 tailscale.com/wgengine/magicsock from tailscale.com/ipn/ipnlocal+
tailscale.com/wgengine/monitor from tailscale.com/control/controlclient+

@ -76,6 +76,7 @@ import (
"tailscale.com/version"
"tailscale.com/version/distro"
"tailscale.com/wgengine"
"tailscale.com/wgengine/capture"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/magicsock"
"tailscale.com/wgengine/router"
@ -147,6 +148,7 @@ type LocalBackend struct {
em *expiryManager // non-nil
sshAtomicBool atomic.Bool
shutdownCalled bool // if Shutdown has been called
debugSink *capture.Sink
// lastProfileID tracks the last profile we've seen from the ProfileManager.
// It's used to detect when the user has changed their profile.
@ -516,6 +518,11 @@ func (b *LocalBackend) Shutdown() {
b.sshServer = nil
}
b.closePeerAPIListenersLocked()
if b.debugSink != nil {
b.e.InstallCaptureHook(nil)
b.debugSink.Close()
b.debugSink = nil
}
b.mu.Unlock()
b.unregisterLinkMon()
@ -4837,3 +4844,45 @@ func (b *LocalBackend) ResetAuth() error {
}
return b.resetForProfileChangeLockedOnEntry()
}
// StreamDebugCapture writes a pcap stream of packets traversing
// tailscaled to the provided response writer.
func (b *LocalBackend) StreamDebugCapture(ctx context.Context, w io.Writer) error {
var s *capture.Sink
b.mu.Lock()
if b.debugSink == nil {
s = capture.New()
b.debugSink = s
b.e.InstallCaptureHook(s.LogPacket)
} else {
s = b.debugSink
}
b.mu.Unlock()
unregister := s.RegisterOutput(w)
select {
case <-ctx.Done():
case <-s.WaitCh():
}
unregister()
// Shut down & uninstall the sink if there are no longer
// any outputs on it.
b.mu.Lock()
defer b.mu.Unlock()
select {
case <-b.ctx.Done():
return nil
default:
}
if b.debugSink != nil && b.debugSink.NumOutputs() == 0 {
s := b.debugSink
b.e.InstallCaptureHook(nil)
b.debugSink = nil
return s.Close()
}
return nil
}

@ -68,6 +68,7 @@ var handler = map[string]localAPIHandler{
"debug-derp-region": (*Handler).serveDebugDERPRegion,
"debug-packet-filter-matches": (*Handler).serveDebugPacketFilterMatches,
"debug-packet-filter-rules": (*Handler).serveDebugPacketFilterRules,
"debug-capture": (*Handler).serveDebugCapture,
"derpmap": (*Handler).serveDERPMap,
"dev-set-state-store": (*Handler).serveDevSetStateStore,
"set-push-device-token": (*Handler).serveSetPushDeviceToken,
@ -1556,6 +1557,21 @@ func defBool(a string, def bool) bool {
return v
}
func (h *Handler) serveDebugCapture(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "debug access denied", http.StatusForbidden)
return
}
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
w.WriteHeader(200)
w.(http.Flusher).Flush()
h.b.StreamDebugCapture(r.Context(), w)
}
var (
metricInvalidRequests = clientmetric.NewCounter("localapi_invalid_requests")

@ -30,6 +30,7 @@ import (
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/clientmetric"
"tailscale.com/wgengine/capture"
"tailscale.com/wgengine/filter"
)
@ -68,6 +69,12 @@ var parsedPacketPool = sync.Pool{New: func() any { return new(packet.Parsed) }}
// It must not hold onto the packet struct, as its backing storage will be reused.
type FilterFunc func(*packet.Parsed, *Wrapper) filter.Response
// CaptureFunc describes a callback to record packets when
// debugging packet-capture. Such callbacks must not take
// ownership of the provided data slice: it may only copy
// out of it within the lifetime of the function.
type CaptureFunc func(capture.Path, time.Time, []byte)
// Wrapper augments a tun.Device with packet filtering and injection.
type Wrapper struct {
logf logger.Logf
@ -173,6 +180,8 @@ type Wrapper struct {
// stats maintains per-connection counters.
stats atomic.Pointer[connstats.Statistics]
captureHook syncs.AtomicValue[CaptureFunc]
}
// tunInjectedRead is an injected packet pretending to be a tun.Read().
@ -568,6 +577,9 @@ func (t *Wrapper) Read(buffs [][]byte, sizes []int, offset int) (int, error) {
fn()
}
}
if capt := t.captureHook.Load(); capt != nil {
capt(capture.FromLocal, time.Now(), data[res.dataOffset:])
}
if !t.disableFilter {
response := t.filterOut(p)
if response != filter.Accept {
@ -631,6 +643,10 @@ func (t *Wrapper) injectedRead(res tunInjectedRead, buf []byte, offset int) (int
}
func (t *Wrapper) filterIn(p *packet.Parsed) filter.Response {
if capt := t.captureHook.Load(); capt != nil {
capt(capture.FromPeer, time.Now(), p.Buffer())
}
if p.IPProto == ipproto.TSMP {
if pingReq, ok := p.AsTSMPPing(); ok {
t.noteActivity()
@ -788,6 +804,10 @@ func (t *Wrapper) InjectInboundPacketBuffer(pkt stack.PacketBufferPtr) error {
}
pkt.DecRef()
if capt := t.captureHook.Load(); capt != nil {
capt(capture.SynthesizedToLocal, time.Now(), buf[PacketStartOffset:])
}
return t.InjectInboundDirect(buf, PacketStartOffset)
}
@ -886,6 +906,11 @@ func (t *Wrapper) InjectOutboundPacketBuffer(packet stack.PacketBufferPtr) error
packet.DecRef()
return nil
}
if capt := t.captureHook.Load(); capt != nil {
b := packet.ToBuffer()
capt(capture.SynthesizedToPeer, time.Now(), b.Flatten())
}
t.injectOutbound(tunInjectedRead{packet: packet})
return nil
}
@ -916,3 +941,7 @@ var (
metricPacketOutDropFilter = clientmetric.NewCounter("tstun_out_to_wg_drop_filter")
metricPacketOutDropSelfDisco = clientmetric.NewCounter("tstun_out_to_wg_drop_self_disco")
)
func (t *Wrapper) InstallCaptureHook(cb CaptureFunc) {
t.captureHook.Store(cb)
}

@ -0,0 +1,201 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package capture formats packet logging into a debug pcap stream.
package capture
import (
"bytes"
"context"
"encoding/binary"
"io"
"net/http"
"sync"
"time"
_ "embed"
"tailscale.com/util/set"
)
//go:embed ts-dissector.lua
var DissectorLua string
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
const flushPeriod = 100 * time.Millisecond
func writePcapHeader(w io.Writer) {
binary.Write(w, binary.LittleEndian, uint32(0xA1B2C3D4)) // pcap magic number
binary.Write(w, binary.LittleEndian, uint16(2)) // version major
binary.Write(w, binary.LittleEndian, uint16(4)) // version minor
binary.Write(w, binary.LittleEndian, uint32(0)) // this zone
binary.Write(w, binary.LittleEndian, uint32(0)) // zone significant figures
binary.Write(w, binary.LittleEndian, uint32(65535)) // max packet len
binary.Write(w, binary.LittleEndian, uint32(147)) // link-layer ID - USER0
}
func writePktHeader(w *bytes.Buffer, when time.Time, length int) {
s := when.Unix()
us := when.UnixMicro() - (s * 1000000)
binary.Write(w, binary.LittleEndian, uint32(s)) // timestamp in seconds
binary.Write(w, binary.LittleEndian, uint32(us)) // timestamp microseconds
binary.Write(w, binary.LittleEndian, uint32(length)) // length present
binary.Write(w, binary.LittleEndian, uint32(length)) // total length
}
// Path describes where in the data path the packet was captured.
type Path uint8
// Valid Path values.
const (
// FromLocal indicates the packet was logged as it traversed the FromLocal path:
// i.e.: A packet from the local system into the TUN.
FromLocal Path = 0
// FromPeer indicates the packet was logged upon reception from a remote peer.
FromPeer Path = 1
// SynthesizedToLocal indicates the packet was generated from within tailscaled,
// and is being routed to the local machine's network stack.
SynthesizedToLocal Path = 2
// SynthesizedToPeer indicates the packet was generated from within tailscaled,
// and is being routed to a remote Wireguard peer.
SynthesizedToPeer Path = 3
)
// New creates a new capture sink.
func New() *Sink {
ctx, c := context.WithCancel(context.Background())
return &Sink{
ctx: ctx,
ctxCancel: c,
}
}
// Type Sink handles callbacks with packets to be logged,
// formatting them into a pcap stream which is mirrored to
// all registered outputs.
type Sink struct {
ctx context.Context
ctxCancel context.CancelFunc
mu sync.Mutex
outputs set.HandleSet[io.Writer]
flushTimer *time.Timer // or nil if none running
}
// RegisterOutput connects an output to this sink, which
// will be written to with a pcap stream as packets are logged.
// A function is returned which unregisters the output when
// called.
//
// If w implements io.Closer, it will be closed upon error
// or when the sink is closed. If w implements http.Flusher,
// it will be flushed periodically.
func (s *Sink) RegisterOutput(w io.Writer) (unregister func()) {
select {
case <-s.ctx.Done():
return func() {}
default:
}
writePcapHeader(w)
s.mu.Lock()
hnd := s.outputs.Add(w)
s.mu.Unlock()
return func() {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.outputs, hnd)
}
}
// NumOutputs returns the number of outputs registered with the sink.
func (s *Sink) NumOutputs() int {
s.mu.Lock()
defer s.mu.Unlock()
return len(s.outputs)
}
// Close shuts down the sink. Future calls to LogPacket
// are ignored, and any registered output that implements
// io.Closer is closed.
func (s *Sink) Close() error {
s.ctxCancel()
s.mu.Lock()
defer s.mu.Unlock()
if s.flushTimer != nil {
s.flushTimer.Stop()
s.flushTimer = nil
}
for _, o := range s.outputs {
if o, ok := o.(io.Closer); ok {
o.Close()
}
}
s.outputs = nil
return nil
}
// WaitCh returns a channel which blocks untill
// the sink is closed.
func (s *Sink) WaitCh() <-chan struct{} {
return s.ctx.Done()
}
// LogPacket is called to insert a packet into the capture.
//
// This function does not take ownership of the provided data slice.
func (s *Sink) LogPacket(path Path, when time.Time, data []byte) {
select {
case <-s.ctx.Done():
return
default:
}
b := bufferPool.Get().(*bytes.Buffer)
b.Reset()
b.Grow(16 + 2 + len(data)) // 16b pcap header + 2b custom data + len
defer bufferPool.Put(b)
writePktHeader(b, when, len(data)+2)
// Custom tailscale debugging data
binary.Write(b, binary.LittleEndian, uint16(path))
b.Write(data)
s.mu.Lock()
defer s.mu.Unlock()
var hadError []set.Handle
for hnd, o := range s.outputs {
if _, err := o.Write(b.Bytes()); err != nil {
hadError = append(hadError, hnd)
continue
}
}
for _, hnd := range hadError {
if o, ok := s.outputs[hnd].(io.Closer); ok {
o.Close()
}
delete(s.outputs, hnd)
}
if s.flushTimer == nil {
s.flushTimer = time.AfterFunc(flushPeriod, func() {
s.mu.Lock()
defer s.mu.Unlock()
for _, o := range s.outputs {
if f, ok := o.(http.Flusher); ok {
f.Flush()
}
}
s.flushTimer = nil
})
}
}

@ -0,0 +1,27 @@
tsdebug_ll = Proto("tsdebug", "Tailscale debug")
PATH = ProtoField.string("tsdebug.PATH","PATH", base.ASCII)
tsdebug_ll.fields = {PATH}
function tsdebug_ll.dissector(buffer, pinfo, tree)
pinfo.cols.protocol = tsdebug_ll.name
packet_length = buffer:len()
local offset = 0
local subtree = tree:add(tsdebug_ll, buffer(), "Tailscale packet")
-- -- Get path UINT16
local path_id = buffer:range(offset, 2):le_uint()
if path_id == 0 then subtree:add(PATH, "FromLocal")
elseif path_id == 1 then subtree:add(PATH, "FromPeer")
elseif path_id == 2 then subtree:add(PATH, "Synthesized (Inbound / ToLocal)")
elseif path_id == 3 then subtree:add(PATH, "Synthesized (Outbound / ToPeer)")
end
offset = offset + 2
-- -- Handover rest of data to ip dissector
local data_buffer = buffer:range(offset, packet_length-offset):tvb()
Dissector.get("ip"):call(data_buffer, pinfo, tree)
end
-- Install the dissector on link-layer ID 147 (User-defined protocol 0)
local eth_table = DissectorTable.get("wtap_encap")
eth_table:add(wtap.USER0, tsdebug_ll)

@ -1579,3 +1579,7 @@ var (
metricNumMajorChanges = clientmetric.NewCounter("wgengine_major_changes")
metricNumMinorChanges = clientmetric.NewCounter("wgengine_minor_changes")
)
func (e *userspaceEngine) InstallCaptureHook(cb CaptureCallback) {
e.tundev.InstallCaptureHook(tstun.CaptureFunc(cb))
}

@ -200,3 +200,7 @@ func (e *watchdogEngine) PeerForIP(ip netip.Addr) (ret PeerForIP, ok bool) {
func (e *watchdogEngine) Wait() {
e.wrap.Wait()
}
func (e *watchdogEngine) InstallCaptureHook(cb CaptureCallback) {
e.wrap.InstallCaptureHook(cb)
}

@ -13,6 +13,7 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/netmap"
"tailscale.com/wgengine/capture"
"tailscale.com/wgengine/filter"
"tailscale.com/wgengine/monitor"
"tailscale.com/wgengine/router"
@ -42,6 +43,12 @@ type NetInfoCallback func(*tailcfg.NetInfo)
// into network map updates.
type NetworkMapCallback func(*netmap.NetworkMap)
// CaptureCallback is the type used to record packets when
// debugging packet-capture. This function must not take
// ownership of the provided data slice: it may only copy
// out of it within the lifetime of the function.
type CaptureCallback func(capture.Path, time.Time, []byte)
// someHandle is allocated so its pointer address acts as a unique
// map key handle. (It needs to have non-zero size for Go to guarantee
// the pointer is unique.)
@ -171,4 +178,9 @@ type Engine interface {
// WhoIsIPPort looks up an IP:port in the temporary registrations,
// and returns a matching Tailscale IP, if it exists.
WhoIsIPPort(netip.AddrPort) (netip.Addr, bool)
// InstallCaptureHook registers a function to be called to capture
// packets traversing the data path. The hook can be uninstalled by
// calling this function with a nil value.
InstallCaptureHook(CaptureCallback)
}

Loading…
Cancel
Save