@ -8,6 +8,7 @@ import (
"cmp"
"context"
crand "crypto/rand"
"encoding/binary"
"encoding/json"
"errors"
"expvar"
@ -17,6 +18,7 @@ import (
"net"
"net/http"
"net/netip"
"slices"
"strconv"
"strings"
"sync"
@ -53,6 +55,10 @@ type derpProber struct {
bwProbeSize int64
bwTUNIPv4Prefix * netip . Prefix // or nil to not use TUN
// Optional queuing delay probing.
qdPacketsPerSecond int // in packets per second
qdPacketTimeout time . Duration
// Optionally restrict probes to a single regionCode.
regionCode string
@ -64,6 +70,7 @@ type derpProber struct {
udpProbeFn func ( string , int ) ProbeClass
meshProbeFn func ( string , string ) ProbeClass
bwProbeFn func ( string , string , int64 ) ProbeClass
qdProbeFn func ( string , string , int , time . Duration ) ProbeClass
sync . Mutex
lastDERPMap * tailcfg . DERPMap
@ -93,6 +100,16 @@ func WithBandwidthProbing(interval time.Duration, size int64, tunAddress string)
}
}
// WithQueuingDelayProbing enables/disables queuing delay probing. qdSendRate
// is the number of packets sent per second. qdTimeout is the amount of time
// after which a sent packet is considered to have timed out.
func WithQueuingDelayProbing ( qdPacketsPerSecond int , qdPacketTimeout time . Duration ) DERPOpt {
return func ( d * derpProber ) {
d . qdPacketsPerSecond = qdPacketsPerSecond
d . qdPacketTimeout = qdPacketTimeout
}
}
// WithMeshProbing enables mesh probing. When enabled, a small message will be
// transferred through each DERP server and each pair of DERP servers.
func WithMeshProbing ( interval time . Duration ) DERPOpt {
@ -147,6 +164,7 @@ func DERP(p *Prober, derpMapURL string, opts ...DERPOpt) (*derpProber, error) {
d . udpProbeFn = d . ProbeUDP
d . meshProbeFn = d . probeMesh
d . bwProbeFn = d . probeBandwidth
d . qdProbeFn = d . probeQueuingDelay
return d , nil
}
@ -213,7 +231,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error {
}
}
if d . bwInterval > 0 && d . bwProbeSize > 0 {
if d . bwInterval != 0 && d . bwProbeSize > 0 {
n := fmt . Sprintf ( "derp/%s/%s/%s/bw" , region . RegionCode , server . Name , to . Name )
wantProbes [ n ] = true
if d . probes [ n ] == nil {
@ -225,6 +243,15 @@ func (d *derpProber) probeMapFn(ctx context.Context) error {
d . probes [ n ] = d . p . Run ( n , d . bwInterval , labels , d . bwProbeFn ( server . Name , to . Name , d . bwProbeSize ) )
}
}
if d . qdPacketsPerSecond > 0 {
n := fmt . Sprintf ( "derp/%s/%s/%s/qd" , region . RegionCode , server . Name , to . Name )
wantProbes [ n ] = true
if d . probes [ n ] == nil {
log . Printf ( "adding DERP queuing delay probe for %s->%s (%s)" , server . Name , to . Name , region . RegionName )
d . probes [ n ] = d . p . Run ( n , - 10 * time . Second , labels , d . qdProbeFn ( server . Name , to . Name , d . qdPacketsPerSecond , d . qdPacketTimeout ) )
}
}
}
}
}
@ -240,7 +267,7 @@ func (d *derpProber) probeMapFn(ctx context.Context) error {
return nil
}
// probeMesh retur s a probe class that sends a test packet through a pair of DERP
// probeMesh retur n s a probe class that sends a test packet through a pair of DERP
// servers (or just one server, if 'from' and 'to' are the same). 'from' and 'to'
// are expected to be names (DERPNode.Name) of two DERP servers in the same region.
func ( d * derpProber ) probeMesh ( from , to string ) ProbeClass {
@ -263,7 +290,7 @@ func (d *derpProber) probeMesh(from, to string) ProbeClass {
}
}
// probeBandwidth retur s a probe class that sends a payload of a given size
// probeBandwidth retur n s a probe class that sends a payload of a given size
// through a pair of DERP servers (or just one server, if 'from' and 'to' are
// the same). 'from' and 'to' are expected to be names (DERPNode.Name) of two
// DERP servers in the same region.
@ -295,6 +322,193 @@ func (d *derpProber) probeBandwidth(from, to string, size int64) ProbeClass {
}
}
// probeQueuingDelay returns a probe class that continuously sends packets
// through a pair of DERP servers (or just one server, if 'from' and 'to' are
// the same) at a rate of `packetsPerSecond` packets per second in order to
// measure queuing delays. Packets arriving after `packetTimeout` don't contribute
// to the queuing delay measurement and are recorded as dropped. 'from' and 'to' are
// expected to be names (DERPNode.Name) of two DERP servers in the same region,
// and may refer to the same server.
func ( d * derpProber ) probeQueuingDelay ( from , to string , packetsPerSecond int , packetTimeout time . Duration ) ProbeClass {
derpPath := "mesh"
if from == to {
derpPath = "single"
}
var packetsDropped expvar . Float
qdh := newHistogram ( [ ] float64 { .005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 } )
return ProbeClass {
Probe : func ( ctx context . Context ) error {
fromN , toN , err := d . getNodePair ( from , to )
if err != nil {
return err
}
return derpProbeQueuingDelay ( ctx , d . lastDERPMap , fromN , toN , packetsPerSecond , packetTimeout , & packetsDropped , qdh )
} ,
Class : "derp_qd" ,
Labels : Labels { "derp_path" : derpPath } ,
Metrics : func ( l prometheus . Labels ) [ ] prometheus . Metric {
qdh . mx . Lock ( )
result := [ ] prometheus . Metric {
prometheus . MustNewConstMetric ( prometheus . NewDesc ( "derp_qd_probe_dropped_packets" , "Total packets dropped" , nil , l ) , prometheus . CounterValue , float64 ( packetsDropped . Value ( ) ) ) ,
prometheus . MustNewConstHistogram ( prometheus . NewDesc ( "derp_qd_probe_delays_seconds" , "Distribution of queuing delays" , nil , l ) , qdh . count , qdh . sum , qdh . bucketedCounts ) ,
}
qdh . mx . Unlock ( )
return result
} ,
}
}
// derpProbeQueuingDelay continuously sends data between two local DERP clients
// connected to two DERP servers in order to measure queuing delays. From and to
// can be the same server.
func derpProbeQueuingDelay ( ctx context . Context , dm * tailcfg . DERPMap , from , to * tailcfg . DERPNode , packetsPerSecond int , packetTimeout time . Duration , packetsDropped * expvar . Float , qdh * histogram ) ( err error ) {
// This probe uses clients with isProber=false to avoid spamming the derper
// logs with every packet sent by the queuing delay probe.
fromc , err := newConn ( ctx , dm , from , false )
if err != nil {
return err
}
defer fromc . Close ( )
toc , err := newConn ( ctx , dm , to , false )
if err != nil {
return err
}
defer toc . Close ( )
// Wait a bit for from's node to hear about to existing on the
// other node in the region, in the case where the two nodes
// are different.
if from . Name != to . Name {
time . Sleep ( 100 * time . Millisecond ) // pretty arbitrary
}
if err := runDerpProbeQueuingDelayContinously ( ctx , from , to , fromc , toc , packetsPerSecond , packetTimeout , packetsDropped , qdh ) ; err != nil {
// Record pubkeys on failed probes to aid investigation.
return fmt . Errorf ( "%s -> %s: %w" ,
fromc . SelfPublicKey ( ) . ShortString ( ) ,
toc . SelfPublicKey ( ) . ShortString ( ) , err )
}
return nil
}
func runDerpProbeQueuingDelayContinously ( ctx context . Context , from , to * tailcfg . DERPNode , fromc , toc * derphttp . Client , packetsPerSecond int , packetTimeout time . Duration , packetsDropped * expvar . Float , qdh * histogram ) error {
// Make sure all goroutines have finished.
var wg sync . WaitGroup
defer wg . Wait ( )
// Close the clients to make sure goroutines that are reading/writing from them terminate.
defer fromc . Close ( )
defer toc . Close ( )
type txRecord struct {
at time . Time
seq uint64
}
// txRecords is sized to hold enough transmission records to keep timings
// for packets up to their timeout. As records age out of the front of this
// list, if the associated packet arrives, we won't have a txRecord for it
// and will consider it to have timed out.
txRecords := make ( [ ] txRecord , 0 , packetsPerSecond * int ( packetTimeout . Seconds ( ) ) )
var txRecordsMu sync . Mutex
// Send the packets.
sendErrC := make ( chan error , 1 )
// TODO: construct a disco CallMeMaybe in the same fashion as magicsock, e.g. magic bytes, src pub, seal payload.
// DERP server handling of disco may vary from non-disco, and we may want to measure queue delay of both.
pkt := make ( [ ] byte , 260 ) // the same size as a CallMeMaybe packet observed on a Tailscale client.
crand . Read ( pkt )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
t := time . NewTicker ( time . Second / time . Duration ( packetsPerSecond ) )
defer t . Stop ( )
seq := uint64 ( 0 )
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
txRecordsMu . Lock ( )
if len ( txRecords ) == cap ( txRecords ) {
txRecords = slices . Delete ( txRecords , 0 , 1 )
packetsDropped . Add ( 1 )
}
txRecords = append ( txRecords , txRecord { time . Now ( ) , seq } )
txRecordsMu . Unlock ( )
binary . BigEndian . PutUint64 ( pkt , seq )
seq ++
if err := fromc . Send ( toc . SelfPublicKey ( ) , pkt ) ; err != nil {
sendErrC <- fmt . Errorf ( "sending packet %w" , err )
return
}
}
}
} ( )
// Receive the packets.
recvFinishedC := make ( chan error , 1 )
wg . Add ( 1 )
go func ( ) {
defer wg . Done ( )
defer close ( recvFinishedC ) // to break out of 'select' below.
for {
m , err := toc . Recv ( )
if err != nil {
recvFinishedC <- err
return
}
switch v := m . ( type ) {
case derp . ReceivedPacket :
now := time . Now ( )
if v . Source != fromc . SelfPublicKey ( ) {
recvFinishedC <- fmt . Errorf ( "got data packet from unexpected source, %v" , v . Source )
return
}
seq := binary . BigEndian . Uint64 ( v . Data )
txRecordsMu . Lock ( )
findTxRecord :
for i , record := range txRecords {
switch {
case record . seq == seq :
rtt := now . Sub ( record . at )
qdh . add ( rtt . Seconds ( ) )
txRecords = slices . Delete ( txRecords , i , i + 1 )
break findTxRecord
case record . seq > seq :
// No sent time found, probably a late arrival already
// recorded as drop by sender when deleted.
break findTxRecord
case record . seq < seq :
continue
}
}
txRecordsMu . Unlock ( )
case derp . KeepAliveMessage :
// Silently ignore.
default :
log . Printf ( "%v: ignoring Recv frame type %T" , to . Name , v )
// Loop.
}
}
} ( )
select {
case <- ctx . Done ( ) :
return fmt . Errorf ( "timeout: %w" , ctx . Err ( ) )
case err := <- sendErrC :
return fmt . Errorf ( "error sending via %q: %w" , from . Name , err )
case err := <- recvFinishedC :
if err != nil {
return fmt . Errorf ( "error receiving from %q: %w" , to . Name , err )
}
}
return nil
}
// getNodePair returns DERPNode objects for two DERP servers based on their
// short names.
func ( d * derpProber ) getNodePair ( n1 , n2 string ) ( ret1 , ret2 * tailcfg . DERPNode , _ error ) {
@ -573,6 +787,8 @@ func runDerpProbeNodePair(ctx context.Context, from, to *tailcfg.DERPNode, fromc
recvc <- fmt . Errorf ( "got data packet %d from unexpected source, %v" , idx , v . Source )
return
}
// This assumes that the packets are received reliably and in order.
// The DERP protocol does not guarantee this, but this probe assumes it.
if got , want := v . Data , pkts [ idx ] ; ! bytes . Equal ( got , want ) {
recvc <- fmt . Errorf ( "unexpected data packet %d (out of %d)" , idx , len ( pkts ) )
return