@ -89,13 +89,19 @@ func (t timestampSource) String() string {
}
}
type result struct {
at time . Time
// resultKey contains the stable dimensions and their values for a given
// timeseries, i.e. not time and not rtt/timeout.
type resultKey struct {
meta nodeMeta
timestampSource timestampSource
connStability connStability
dstPort int
rtt * time . Duration // nil signifies failure, e.g. timeout
}
type result struct {
key resultKey
at time . Time
rtt * time . Duration // nil signifies failure, e.g. timeout
}
func measureRTT ( conn io . ReadWriteCloser , dst * net . UDPAddr ) ( rtt time . Duration , err error ) {
@ -149,6 +155,10 @@ type nodeMeta struct {
type measureFn func ( conn io . ReadWriteCloser , dst * net . UDPAddr ) ( rtt time . Duration , err error )
// probe measures STUN round trip time for the node described by meta over
// conn against dstPort. It may return a nil duration and nil error if the
// STUN request timed out. A non-nil error indicates an unrecoverable or
// non-temporary error.
func probe ( meta nodeMeta , conn io . ReadWriteCloser , fn measureFn , dstPort int ) ( * time . Duration , error ) {
ua := & net . UDPAddr {
IP : net . IP ( meta . addr . AsSlice ( ) ) ,
@ -162,10 +172,15 @@ func probe(meta nodeMeta, conn io.ReadWriteCloser, fn measureFn, dstPort int) (*
log . Printf ( "temp error measuring RTT to %s(%s): %v" , meta . hostname , ua . String ( ) , err )
return nil , nil
}
return nil , err
}
return & rtt , nil
}
// nodeMetaFromDERPMap parses the provided DERP map in order to update nodeMeta
// in the provided nodeMetaByAddr. It returns a slice of nodeMeta containing
// the nodes that are no longer seen in the DERP map, but were previously held
// in nodeMetaByAddr.
func nodeMetaFromDERPMap ( dm * tailcfg . DERPMap , nodeMetaByAddr map [ netip . Addr ] nodeMeta , ipv6 bool ) ( stale [ ] nodeMeta , err error ) {
// Parse the new derp map before making any state changes in nodeMetaByAddr.
// If parse fails we just stick with the old state.
@ -271,10 +286,12 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
doProbe := func ( conn io . ReadWriteCloser , meta nodeMeta , source timestampSource , dstPort int ) {
defer wg . Done ( )
r := result {
at : at ,
meta : meta ,
timestampSource : source ,
dstPort : dstPort ,
key : resultKey {
meta : meta ,
timestampSource : source ,
dstPort : dstPort ,
} ,
at : at ,
}
if conn == nil {
var err error
@ -293,7 +310,7 @@ func probeNodes(nodeMetaByAddr map[netip.Addr]nodeMeta, stableConns map[netip.Ad
}
defer conn . Close ( )
} else {
r . connStability = stableConn
r . key. connStability = stableConn
}
fn := measureRTT
if source == timestampSourceKernel {
@ -373,7 +390,12 @@ const (
stableConn connStability = true
)
func timeSeriesLabels ( meta nodeMeta , instance string , source timestampSource , stability connStability , dstPort int ) [ ] prompb . Label {
const (
rttMetricName = "stunstamp_derp_stun_rtt_ns"
timeoutsMetricName = "stunstamp_derp_stun_timeouts_total"
)
func timeSeriesLabels ( metricName string , meta nodeMeta , instance string , source timestampSource , stability connStability , dstPort int ) [ ] prompb . Label {
addressFamily := "ipv4"
if meta . addr . Is6 ( ) {
addressFamily = "ipv6"
@ -409,7 +431,7 @@ func timeSeriesLabels(meta nodeMeta, instance string, source timestampSource, st
} )
labels = append ( labels , prompb . Label {
Name : "__name__" ,
Value : "stunstamp_derp_stun_rtt_ns" ,
Value : metricName ,
} )
labels = append ( labels , prompb . Label {
Name : "timestamp_source" ,
@ -443,20 +465,36 @@ func staleMarkersFromNodeMeta(stale []nodeMeta, instance string, dstPorts []int)
} ,
}
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( s, instance , timestampSourceUserspace , unstableConn , dstPort ) ,
Labels : timeSeriesLabels ( rttMetricName, s, instance , timestampSourceUserspace , unstableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( s , instance , timestampSourceUserspace , stableConn , dstPort ) ,
Labels : timeSeriesLabels ( rttMetricName , s , instance , timestampSourceUserspace , stableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( timeoutsMetricName , s , instance , timestampSourceUserspace , unstableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( timeoutsMetricName , s , instance , timestampSourceUserspace , stableConn , dstPort ) ,
Samples : samples ,
} )
if supportsKernelTS ( ) {
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( s , instance , timestampSourceKernel , unstableConn , dstPort ) ,
Labels : timeSeriesLabels ( rttMetricName , s , instance , timestampSourceKernel , unstableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( rttMetricName , s , instance , timestampSourceKernel , stableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( timeoutsMetricName , s , instance , timestampSourceKernel , unstableConn , dstPort ) ,
Samples : samples ,
} )
staleMarkers = append ( staleMarkers , prompb . TimeSeries {
Labels : timeSeriesLabels ( s , instance , timestampSourceKernel , stableConn , dstPort ) ,
Labels : timeSeriesLabels ( timeoutsMetricName, s, instance , timestampSourceKernel , stableConn , dstPort ) ,
Samples : samples ,
} )
}
@ -465,25 +503,47 @@ func staleMarkersFromNodeMeta(stale []nodeMeta, instance string, dstPorts []int)
return staleMarkers
}
func resultToPromTimeSeries ( r result , instance string ) prompb . TimeSeries {
labels := timeSeriesLabels ( r . meta , instance , r . timestampSource , r . connStability , r . dstPort )
samples := make ( [ ] prompb . Sample , 1 )
samples [ 0 ] . Timestamp = r . at . UnixMilli ( )
if r . rtt != nil {
samples [ 0 ] . Value = float64 ( * r . rtt )
} else {
samples [ 0 ] . Value = math . NaN ( )
// TODO: timeout counter
}
ts := prompb . TimeSeries {
Labels : labels ,
Samples : samples ,
}
slices . SortFunc ( ts . Labels , func ( a , b prompb . Label ) int {
// prometheus remote-write spec requires lexicographically sorted label names
return cmp . Compare ( a . Name , b . Name )
} )
return ts
// resultsToPromTimeSeries returns a slice of prometheus TimeSeries for the
// provided results and instance. timeouts is updated based on results, i.e.
// all result.key's are added to timeouts if they do not exist, and removed
// from timeouts if they are not present in results.
func resultsToPromTimeSeries ( results [ ] result , instance string , timeouts map [ resultKey ] uint64 ) [ ] prompb . TimeSeries {
all := make ( [ ] prompb . TimeSeries , 0 , len ( results ) * 2 )
seenKeys := make ( map [ resultKey ] bool )
for _ , r := range results {
timeoutsCount := timeouts [ r . key ] // a non-existent key will return a zero val
seenKeys [ r . key ] = true
rttLabels := timeSeriesLabels ( rttMetricName , r . key . meta , instance , r . key . timestampSource , r . key . connStability , r . key . dstPort )
rttSamples := make ( [ ] prompb . Sample , 1 )
rttSamples [ 0 ] . Timestamp = r . at . UnixMilli ( )
if r . rtt != nil {
rttSamples [ 0 ] . Value = float64 ( * r . rtt )
} else {
rttSamples [ 0 ] . Value = math . NaN ( )
timeoutsCount ++
}
rttTS := prompb . TimeSeries {
Labels : rttLabels ,
Samples : rttSamples ,
}
all = append ( all , rttTS )
timeouts [ r . key ] = timeoutsCount
timeoutsLabels := timeSeriesLabels ( timeoutsMetricName , r . key . meta , instance , r . key . timestampSource , r . key . connStability , r . key . dstPort )
timeoutsSamples := make ( [ ] prompb . Sample , 1 )
timeoutsSamples [ 0 ] . Timestamp = r . at . UnixMilli ( )
timeoutsSamples [ 0 ] . Value = float64 ( timeoutsCount )
timeoutsTS := prompb . TimeSeries {
Labels : timeoutsLabels ,
Samples : timeoutsSamples ,
}
all = append ( all , timeoutsTS )
}
for k := range timeouts {
if ! seenKeys [ k ] {
delete ( timeouts , k )
}
}
return all
}
type remoteWriteClient struct {
@ -719,6 +779,10 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
// comes into play.
stableConns := make ( map [ netip . Addr ] map [ int ] [ 2 ] io . ReadWriteCloser )
// timeouts holds counts of timeout events. Values are persisted for the
// lifetime of the related node in the DERP map.
timeouts := make ( map [ resultKey ] uint64 )
derpMapTicker := time . NewTicker ( time . Minute * 5 )
defer derpMapTicker . Stop ( )
probeTicker := time . NewTicker ( * flagInterval )
@ -744,10 +808,7 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
shutdown ( )
return
}
ts := make ( [ ] prompb . TimeSeries , 0 , len ( results ) )
for _ , r := range results {
ts = append ( ts , resultToPromTimeSeries ( r , * flagInstance ) )
}
ts := resultsToPromTimeSeries ( results , * flagInstance , timeouts )
select {
case tsCh <- ts :
default :
@ -766,11 +827,11 @@ CREATE TABLE IF NOT EXISTS rtt(at_unix INT, region_id INT, hostname TEXT, af INT
}
for _ , result := range results {
af := 4
if result . meta. addr . Is6 ( ) {
if result . key. meta. addr . Is6 ( ) {
af = 6
}
_ , err = tx . Exec ( "INSERT INTO rtt(at_unix, region_id, hostname, af, address, timestamp_source, stable_conn, dst_port, rtt_ns) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)" ,
result . at . Unix ( ) , result . meta. regionID , result . meta. hostname , af , result . meta . addr . String ( ) , result . timestampSource, result . connStability, result . dstPort , result . rtt )
result . at . Unix ( ) , result . key. meta. regionID , result . key. meta. hostname , af , result . key . meta . addr . String ( ) , result . key. timestampSource, result . key. connStability, result . key . dstPort , result . rtt )
if err != nil {
tx . Rollback ( )
log . Printf ( "error adding result to tx: %v" , err )