@ -7,19 +7,25 @@
package prober
import (
"container/ring"
"context"
"errors"
"fmt"
"hash/fnv"
"log"
"maps"
"math/rand"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"tailscale.com/tsweb"
)
// recentHistSize is the number of recent probe results and latencies to keep
// in memory.
const recentHistSize = 10
// ProbeClass defines a probe of a specific type: a probing function that will
// be regularly ran, and metric labels that will be added automatically to all
// probes using this class.
@ -106,6 +112,14 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob
l [ k ] = v
}
probe := newProbe ( p , name , interval , l , pc )
p . probes [ name ] = probe
go probe . loop ( )
return probe
}
// newProbe creates a new Probe with the given parameters, but does not start it.
func newProbe ( p * Prober , name string , interval time . Duration , l prometheus . Labels , pc ProbeClass ) * Probe {
ctx , cancel := context . WithCancel ( context . Background ( ) )
probe := & Probe {
prober : p ,
@ -117,6 +131,9 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob
probeClass : pc ,
interval : interval ,
initialDelay : initialDelay ( name , interval ) ,
successHist : ring . New ( recentHistSize ) ,
latencyHist : ring . New ( recentHistSize ) ,
metrics : prometheus . NewRegistry ( ) ,
metricLabels : l ,
mInterval : prometheus . NewDesc ( "interval_secs" , "Probe interval in seconds" , nil , l ) ,
@ -131,15 +148,14 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob
Name : "seconds_total" , Help : "Total amount of time spent executing the probe" , ConstLabels : l ,
} , [ ] string { "status" } ) ,
}
prometheus . WrapRegistererWithPrefix ( p . namespace + "_" , p . metrics ) . MustRegister ( probe . metrics )
if p . metrics != nil {
prometheus . WrapRegistererWithPrefix ( p . namespace + "_" , p . metrics ) . MustRegister ( probe . metrics )
}
probe . metrics . MustRegister ( probe )
p . probes [ name ] = probe
go probe . loop ( )
return probe
}
// unregister removes a probe from the prober's internal state.
func ( p * Prober ) unregister ( probe * Probe ) {
p . mu . Lock ( )
defer p . mu . Unlock ( )
@ -206,6 +222,7 @@ type Probe struct {
ctx context . Context
cancel context . CancelFunc // run to initiate shutdown
stopped chan struct { } // closed when shutdown is complete
runMu sync . Mutex // ensures only one probe runs at a time
name string
probeClass ProbeClass
@ -232,6 +249,10 @@ type Probe struct {
latency time . Duration // last successful probe latency
succeeded bool // whether the last doProbe call succeeded
lastErr error
// History of recent probe results and latencies.
successHist * ring . Ring
latencyHist * ring . Ring
}
// Close shuts down the Probe and unregisters it from its Prober.
@ -278,13 +299,17 @@ func (p *Probe) loop() {
}
}
// run invokes fun and records the results.
// run invokes the probe function and records the result. It returns the probe
// result and an error if the probe failed.
//
// fun is invoked with a timeout slightly less than interval, so that
// the probe either succeeds or fails before the next cycle is
// scheduled to start.
func ( p * Probe ) run ( ) {
start := p . recordStart ( )
// The probe function is invoked with a timeout slightly less than interval, so
// that the probe either succeeds or fails before the next cycle is scheduled to
// start.
func ( p * Probe ) run ( ) ( pi ProbeInfo , err error ) {
p . runMu . Lock ( )
defer p . runMu . Unlock ( )
p . recordStart ( )
defer func ( ) {
// Prevent a panic within one probe function from killing the
// entire prober, so that a single buggy probe doesn't destroy
@ -293,29 +318,30 @@ func (p *Probe) run() {
// alert for debugging.
if r := recover ( ) ; r != nil {
log . Printf ( "probe %s panicked: %v" , p . name , r )
p . recordEnd ( start , errors . New ( "panic" ) )
err = fmt . Errorf ( "panic: %v" , r )
p . recordEnd ( err )
}
} ( )
timeout := time . Duration ( float64 ( p . interval ) * 0.8 )
ctx , cancel := context . WithTimeout ( p . ctx , timeout )
defer cancel ( )
err : = p . probeClass . Probe ( ctx )
p . recordEnd ( start, err)
err = p . probeClass . Probe ( ctx )
p . recordEnd ( err)
if err != nil {
log . Printf ( "probe %s: %v" , p . name , err )
}
pi = p . probeInfoLocked ( )
return
}
func ( p * Probe ) recordStart ( ) time . Time {
st := p . prober . now ( )
func ( p * Probe ) recordStart ( ) {
p . mu . Lock ( )
defer p . mu . Unlock ( )
p . start = st
return st
p . start = p . prober . now ( )
p . mu . Unlock ( )
}
func ( p * Probe ) recordEnd ( start time . Time , err error ) {
func ( p * Probe ) recordEnd ( err error ) {
end := p . prober . now ( )
p . mu . Lock ( )
defer p . mu . Unlock ( )
@ -327,22 +353,55 @@ func (p *Probe) recordEnd(start time.Time, err error) {
p . latency = latency
p . mAttempts . WithLabelValues ( "ok" ) . Inc ( )
p . mSeconds . WithLabelValues ( "ok" ) . Add ( latency . Seconds ( ) )
p . latencyHist . Value = latency
p . latencyHist = p . latencyHist . Next ( )
} else {
p . latency = 0
p . mAttempts . WithLabelValues ( "fail" ) . Inc ( )
p . mSeconds . WithLabelValues ( "fail" ) . Add ( latency . Seconds ( ) )
}
p . successHist . Value = p . succeeded
p . successHist = p . successHist . Next ( )
}
// ProbeInfo is the state of a Probe.
// ProbeInfo is a snapshot of the configuration and state of a Probe.
type ProbeInfo struct {
Start time . Time
End time . Time
Latency string
Result bool
Error string
Name string
Class string
Interval time . Duration
Labels map [ string ] string
Start time . Time
End time . Time
Latency time . Duration
Result bool
Error string
RecentResults [ ] bool
RecentLatencies [ ] time . Duration
}
// RecentSuccessRatio returns the success ratio of the probe in the recent history.
func ( pb ProbeInfo ) RecentSuccessRatio ( ) float64 {
if len ( pb . RecentResults ) == 0 {
return 0
}
var sum int
for _ , r := range pb . RecentResults {
if r {
sum ++
}
}
return float64 ( sum ) / float64 ( len ( pb . RecentResults ) )
}
// RecentMedianLatency returns the median latency of the probe in the recent history.
func ( pb ProbeInfo ) RecentMedianLatency ( ) time . Duration {
if len ( pb . RecentLatencies ) == 0 {
return 0
}
return pb . RecentLatencies [ len ( pb . RecentLatencies ) / 2 ]
}
// ProbeInfo returns the state of all probes.
func ( p * Prober ) ProbeInfo ( ) map [ string ] ProbeInfo {
out := map [ string ] ProbeInfo { }
@ -352,26 +411,69 @@ func (p *Prober) ProbeInfo() map[string]ProbeInfo {
probes = append ( probes , probe )
}
p . mu . Unlock ( )
for _ , probe := range probes {
probe . mu . Lock ( )
inf := ProbeInfo {
Start : probe . start ,
End : probe . end ,
Result : probe . succeeded ,
}
if probe . lastErr != nil {
inf . Error = probe . lastErr . Error ( )
}
if probe . latency > 0 {
inf . Latency = probe . latency . String ( )
}
out [ probe . name ] = inf
out [ probe . name ] = probe . probeInfoLocked ( )
probe . mu . Unlock ( )
}
return out
}
// probeInfoLocked returns the state of the probe.
func ( probe * Probe ) probeInfoLocked ( ) ProbeInfo {
inf := ProbeInfo {
Name : probe . name ,
Class : probe . probeClass . Class ,
Interval : probe . interval ,
Labels : probe . metricLabels ,
Start : probe . start ,
End : probe . end ,
Result : probe . succeeded ,
}
if probe . lastErr != nil {
inf . Error = probe . lastErr . Error ( )
}
if probe . latency > 0 {
inf . Latency = probe . latency
}
probe . latencyHist . Do ( func ( v any ) {
if l , ok := v . ( time . Duration ) ; ok {
inf . RecentLatencies = append ( inf . RecentLatencies , l )
}
} )
probe . successHist . Do ( func ( v any ) {
if r , ok := v . ( bool ) ; ok {
inf . RecentResults = append ( inf . RecentResults , r )
}
} )
return inf
}
// RunHandler runs a probe by name and returns the result as an HTTP response.
func ( p * Prober ) RunHandler ( w http . ResponseWriter , r * http . Request ) error {
// Look up prober by name.
name := r . FormValue ( "name" )
if name == "" {
return tsweb . Error ( http . StatusBadRequest , "missing name parameter" , nil )
}
p . mu . Lock ( )
probe , ok := p . probes [ name ]
prevInfo := probe . probeInfoLocked ( )
p . mu . Unlock ( )
if ! ok {
return tsweb . Error ( http . StatusNotFound , fmt . Sprintf ( "unknown probe %q" , name ) , nil )
}
info , err := probe . run ( )
stats := fmt . Sprintf ( "Previous runs: success rate %d%%, median latency %v" ,
int ( prevInfo . RecentSuccessRatio ( ) * 100 ) , prevInfo . RecentMedianLatency ( ) )
if err != nil {
return tsweb . Error ( http . StatusFailedDependency , fmt . Sprintf ( "Probe failed: %s\n%s" , err . Error ( ) , stats ) , err )
}
w . WriteHeader ( respStatus )
w . Write ( [ ] byte ( fmt . Sprintf ( "Probe succeeded in %v\n%s" , info . Latency , stats ) ) )
return nil
}
// Describe implements prometheus.Collector.
func ( p * Probe ) Describe ( ch chan <- * prometheus . Desc ) {
ch <- p . mInterval