diff --git a/prober/prober.go b/prober/prober.go index 36afb1a39..fec571e61 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -7,19 +7,25 @@ package prober import ( + "container/ring" "context" - "errors" "fmt" "hash/fnv" "log" "maps" "math/rand" + "net/http" "sync" "time" "github.com/prometheus/client_golang/prometheus" + "tailscale.com/tsweb" ) +// recentHistSize is the number of recent probe results and latencies to keep +// in memory. +const recentHistSize = 10 + // ProbeClass defines a probe of a specific type: a probing function that will // be regularly ran, and metric labels that will be added automatically to all // probes using this class. @@ -106,6 +112,14 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob l[k] = v } + probe := newProbe(p, name, interval, l, pc) + p.probes[name] = probe + go probe.loop() + return probe +} + +// newProbe creates a new Probe with the given parameters, but does not start it. +func newProbe(p *Prober, name string, interval time.Duration, l prometheus.Labels, pc ProbeClass) *Probe { ctx, cancel := context.WithCancel(context.Background()) probe := &Probe{ prober: p, @@ -117,6 +131,9 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob probeClass: pc, interval: interval, initialDelay: initialDelay(name, interval), + successHist: ring.New(recentHistSize), + latencyHist: ring.New(recentHistSize), + metrics: prometheus.NewRegistry(), metricLabels: l, mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l), @@ -131,15 +148,14 @@ func (p *Prober) Run(name string, interval time.Duration, labels Labels, pc Prob Name: "seconds_total", Help: "Total amount of time spent executing the probe", ConstLabels: l, }, []string{"status"}), } - - prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics) + if p.metrics != nil { + prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics) + } probe.metrics.MustRegister(probe) - - p.probes[name] = probe - go probe.loop() return probe } +// unregister removes a probe from the prober's internal state. func (p *Prober) unregister(probe *Probe) { p.mu.Lock() defer p.mu.Unlock() @@ -206,6 +222,7 @@ type Probe struct { ctx context.Context cancel context.CancelFunc // run to initiate shutdown stopped chan struct{} // closed when shutdown is complete + runMu sync.Mutex // ensures only one probe runs at a time name string probeClass ProbeClass @@ -232,6 +249,10 @@ type Probe struct { latency time.Duration // last successful probe latency succeeded bool // whether the last doProbe call succeeded lastErr error + + // History of recent probe results and latencies. + successHist *ring.Ring + latencyHist *ring.Ring } // Close shuts down the Probe and unregisters it from its Prober. @@ -278,13 +299,17 @@ func (p *Probe) loop() { } } -// run invokes fun and records the results. +// run invokes the probe function and records the result. It returns the probe +// result and an error if the probe failed. // -// fun is invoked with a timeout slightly less than interval, so that -// the probe either succeeds or fails before the next cycle is -// scheduled to start. -func (p *Probe) run() { - start := p.recordStart() +// The probe function is invoked with a timeout slightly less than interval, so +// that the probe either succeeds or fails before the next cycle is scheduled to +// start. +func (p *Probe) run() (pi ProbeInfo, err error) { + p.runMu.Lock() + defer p.runMu.Unlock() + + p.recordStart() defer func() { // Prevent a panic within one probe function from killing the // entire prober, so that a single buggy probe doesn't destroy @@ -293,29 +318,30 @@ func (p *Probe) run() { // alert for debugging. if r := recover(); r != nil { log.Printf("probe %s panicked: %v", p.name, r) - p.recordEnd(start, errors.New("panic")) + err = fmt.Errorf("panic: %v", r) + p.recordEnd(err) } }() timeout := time.Duration(float64(p.interval) * 0.8) ctx, cancel := context.WithTimeout(p.ctx, timeout) defer cancel() - err := p.probeClass.Probe(ctx) - p.recordEnd(start, err) + err = p.probeClass.Probe(ctx) + p.recordEnd(err) if err != nil { log.Printf("probe %s: %v", p.name, err) } + pi = p.probeInfoLocked() + return } -func (p *Probe) recordStart() time.Time { - st := p.prober.now() +func (p *Probe) recordStart() { p.mu.Lock() - defer p.mu.Unlock() - p.start = st - return st + p.start = p.prober.now() + p.mu.Unlock() } -func (p *Probe) recordEnd(start time.Time, err error) { +func (p *Probe) recordEnd(err error) { end := p.prober.now() p.mu.Lock() defer p.mu.Unlock() @@ -327,22 +353,55 @@ func (p *Probe) recordEnd(start time.Time, err error) { p.latency = latency p.mAttempts.WithLabelValues("ok").Inc() p.mSeconds.WithLabelValues("ok").Add(latency.Seconds()) + p.latencyHist.Value = latency + p.latencyHist = p.latencyHist.Next() } else { p.latency = 0 p.mAttempts.WithLabelValues("fail").Inc() p.mSeconds.WithLabelValues("fail").Add(latency.Seconds()) } + p.successHist.Value = p.succeeded + p.successHist = p.successHist.Next() } -// ProbeInfo is the state of a Probe. +// ProbeInfo is a snapshot of the configuration and state of a Probe. type ProbeInfo struct { - Start time.Time - End time.Time - Latency string - Result bool - Error string + Name string + Class string + Interval time.Duration + Labels map[string]string + Start time.Time + End time.Time + Latency time.Duration + Result bool + Error string + RecentResults []bool + RecentLatencies []time.Duration } +// RecentSuccessRatio returns the success ratio of the probe in the recent history. +func (pb ProbeInfo) RecentSuccessRatio() float64 { + if len(pb.RecentResults) == 0 { + return 0 + } + var sum int + for _, r := range pb.RecentResults { + if r { + sum++ + } + } + return float64(sum) / float64(len(pb.RecentResults)) +} + +// RecentMedianLatency returns the median latency of the probe in the recent history. +func (pb ProbeInfo) RecentMedianLatency() time.Duration { + if len(pb.RecentLatencies) == 0 { + return 0 + } + return pb.RecentLatencies[len(pb.RecentLatencies)/2] +} + +// ProbeInfo returns the state of all probes. func (p *Prober) ProbeInfo() map[string]ProbeInfo { out := map[string]ProbeInfo{} @@ -352,26 +411,69 @@ func (p *Prober) ProbeInfo() map[string]ProbeInfo { probes = append(probes, probe) } p.mu.Unlock() - for _, probe := range probes { probe.mu.Lock() - inf := ProbeInfo{ - Start: probe.start, - End: probe.end, - Result: probe.succeeded, - } - if probe.lastErr != nil { - inf.Error = probe.lastErr.Error() - } - if probe.latency > 0 { - inf.Latency = probe.latency.String() - } - out[probe.name] = inf + out[probe.name] = probe.probeInfoLocked() probe.mu.Unlock() } return out } +// probeInfoLocked returns the state of the probe. +func (probe *Probe) probeInfoLocked() ProbeInfo { + inf := ProbeInfo{ + Name: probe.name, + Class: probe.probeClass.Class, + Interval: probe.interval, + Labels: probe.metricLabels, + Start: probe.start, + End: probe.end, + Result: probe.succeeded, + } + if probe.lastErr != nil { + inf.Error = probe.lastErr.Error() + } + if probe.latency > 0 { + inf.Latency = probe.latency + } + probe.latencyHist.Do(func(v any) { + if l, ok := v.(time.Duration); ok { + inf.RecentLatencies = append(inf.RecentLatencies, l) + } + }) + probe.successHist.Do(func(v any) { + if r, ok := v.(bool); ok { + inf.RecentResults = append(inf.RecentResults, r) + } + }) + return inf +} + +// RunHandler runs a probe by name and returns the result as an HTTP response. +func (p *Prober) RunHandler(w http.ResponseWriter, r *http.Request) error { + // Look up prober by name. + name := r.FormValue("name") + if name == "" { + return tsweb.Error(http.StatusBadRequest, "missing name parameter", nil) + } + p.mu.Lock() + probe, ok := p.probes[name] + prevInfo := probe.probeInfoLocked() + p.mu.Unlock() + if !ok { + return tsweb.Error(http.StatusNotFound, fmt.Sprintf("unknown probe %q", name), nil) + } + info, err := probe.run() + stats := fmt.Sprintf("Previous runs: success rate %d%%, median latency %v", + int(prevInfo.RecentSuccessRatio()*100), prevInfo.RecentMedianLatency()) + if err != nil { + return tsweb.Error(http.StatusFailedDependency, fmt.Sprintf("Probe failed: %s\n%s", err.Error(), stats), err) + } + w.WriteHeader(respStatus) + w.Write([]byte(fmt.Sprintf("Probe succeeded in %v\n%s", info.Latency, stats))) + return nil +} + // Describe implements prometheus.Collector. func (p *Probe) Describe(ch chan<- *prometheus.Desc) { ch <- p.mInterval diff --git a/prober/prober_test.go b/prober/prober_test.go index af645ef00..a194e6f5c 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/prometheus/client_golang/prometheus/testutil" "tailscale.com/tstest" ) @@ -292,6 +294,173 @@ func TestOnceMode(t *testing.T) { } } +func TestProberProbeInfo(t *testing.T) { + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker).WithOnce(true) + + p.Run("probe1", probeInterval, nil, FuncProbe(func(context.Context) error { + clk.Advance(500 * time.Millisecond) + return nil + })) + p.Run("probe2", probeInterval, nil, FuncProbe(func(context.Context) error { return fmt.Errorf("error2") })) + p.Wait() + + info := p.ProbeInfo() + wantInfo := map[string]ProbeInfo{ + "probe1": { + Name: "probe1", + Interval: probeInterval, + Labels: map[string]string{"class": "", "name": "probe1"}, + Latency: 500 * time.Millisecond, + Result: true, + RecentResults: []bool{true}, + RecentLatencies: []time.Duration{500 * time.Millisecond}, + }, + "probe2": { + Name: "probe2", + Interval: probeInterval, + Labels: map[string]string{"class": "", "name": "probe2"}, + Error: "error2", + RecentResults: []bool{false}, + RecentLatencies: nil, // no latency for failed probes + }, + } + + if diff := cmp.Diff(wantInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End")); diff != "" { + t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff) + } +} + +func TestProbeInfoRecent(t *testing.T) { + type probeResult struct { + latency time.Duration + err error + } + tests := []struct { + name string + results []probeResult + wantProbeInfo ProbeInfo + wantRecentSuccessRatio float64 + wantRecentMedianLatency time.Duration + }{ + { + name: "no_runs", + wantProbeInfo: ProbeInfo{}, + wantRecentSuccessRatio: 0, + wantRecentMedianLatency: 0, + }, + { + name: "single_success", + results: []probeResult{{latency: 100 * time.Millisecond, err: nil}}, + wantProbeInfo: ProbeInfo{ + Latency: 100 * time.Millisecond, + Result: true, + RecentResults: []bool{true}, + RecentLatencies: []time.Duration{100 * time.Millisecond}, + }, + wantRecentSuccessRatio: 1, + wantRecentMedianLatency: 100 * time.Millisecond, + }, + { + name: "single_failure", + results: []probeResult{{latency: 100 * time.Millisecond, err: errors.New("error123")}}, + wantProbeInfo: ProbeInfo{ + Result: false, + RecentResults: []bool{false}, + RecentLatencies: nil, + Error: "error123", + }, + wantRecentSuccessRatio: 0, + wantRecentMedianLatency: 0, + }, + { + name: "recent_mix", + results: []probeResult{ + {latency: 10 * time.Millisecond, err: errors.New("error1")}, + {latency: 20 * time.Millisecond, err: nil}, + {latency: 30 * time.Millisecond, err: nil}, + {latency: 40 * time.Millisecond, err: errors.New("error4")}, + {latency: 50 * time.Millisecond, err: nil}, + {latency: 60 * time.Millisecond, err: nil}, + {latency: 70 * time.Millisecond, err: errors.New("error7")}, + {latency: 80 * time.Millisecond, err: nil}, + }, + wantProbeInfo: ProbeInfo{ + Result: true, + Latency: 80 * time.Millisecond, + RecentResults: []bool{false, true, true, false, true, true, false, true}, + RecentLatencies: []time.Duration{ + 20 * time.Millisecond, + 30 * time.Millisecond, + 50 * time.Millisecond, + 60 * time.Millisecond, + 80 * time.Millisecond, + }, + }, + wantRecentSuccessRatio: 0.625, + wantRecentMedianLatency: 50 * time.Millisecond, + }, + { + name: "only_last_10", + results: []probeResult{ + {latency: 10 * time.Millisecond, err: errors.New("old_error")}, + {latency: 20 * time.Millisecond, err: nil}, + {latency: 30 * time.Millisecond, err: nil}, + {latency: 40 * time.Millisecond, err: nil}, + {latency: 50 * time.Millisecond, err: nil}, + {latency: 60 * time.Millisecond, err: nil}, + {latency: 70 * time.Millisecond, err: nil}, + {latency: 80 * time.Millisecond, err: nil}, + {latency: 90 * time.Millisecond, err: nil}, + {latency: 100 * time.Millisecond, err: nil}, + {latency: 110 * time.Millisecond, err: nil}, + }, + wantProbeInfo: ProbeInfo{ + Result: true, + Latency: 110 * time.Millisecond, + RecentResults: []bool{true, true, true, true, true, true, true, true, true, true}, + RecentLatencies: []time.Duration{ + 20 * time.Millisecond, + 30 * time.Millisecond, + 40 * time.Millisecond, + 50 * time.Millisecond, + 60 * time.Millisecond, + 70 * time.Millisecond, + 80 * time.Millisecond, + 90 * time.Millisecond, + 100 * time.Millisecond, + 110 * time.Millisecond, + }, + }, + wantRecentSuccessRatio: 1, + wantRecentMedianLatency: 70 * time.Millisecond, + }, + } + + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker).WithOnce(true) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + probe := newProbe(p, "", probeInterval, nil, FuncProbe(func(context.Context) error { return nil })) + for _, r := range tt.results { + probe.recordStart() + clk.Advance(r.latency) + probe.recordEnd(r.err) + } + info := probe.probeInfoLocked() + if diff := cmp.Diff(tt.wantProbeInfo, info, cmpopts.IgnoreFields(ProbeInfo{}, "Start", "End", "Interval")); diff != "" { + t.Fatalf("unexpected ProbeInfo (-want +got):\n%s", diff) + } + if got := info.RecentSuccessRatio(); got != tt.wantRecentSuccessRatio { + t.Errorf("recentSuccessRatio() = %v, want %v", got, tt.wantRecentSuccessRatio) + } + if got := info.RecentMedianLatency(); got != tt.wantRecentMedianLatency { + t.Errorf("recentMedianLatency() = %v, want %v", got, tt.wantRecentMedianLatency) + } + }) + } +} + type fakeTicker struct { ch chan time.Time interval time.Duration