prober: migrate to Prometheus metric library

This provides an example of using native Prometheus metrics with tsweb.

Prober library seems to be the only user of PrometheusVar, so I am
removing support for it in tsweb.

Updates https://github.com/tailscale/corp/issues/10205

Signed-off-by: Anton Tolchanov <anton@tailscale.com>
pull/7846/head
Anton Tolchanov 2 years ago committed by Anton Tolchanov
parent 11e6247d2a
commit c153e6ae2f

@ -5,7 +5,6 @@
package main package main
import ( import (
"expvar"
"flag" "flag"
"fmt" "fmt"
"html" "html"
@ -30,7 +29,7 @@ var (
func main() { func main() {
flag.Parse() flag.Parse()
p := prober.New().WithSpread(*spread).WithOnce(*probeOnce) p := prober.New().WithSpread(*spread).WithOnce(*probeOnce).WithMetricNamespace("derpprobe")
dp, err := prober.DERP(p, *derpMapURL, *interval, *interval, *interval) dp, err := prober.DERP(p, *derpMapURL, *interval, *interval, *interval)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
@ -53,7 +52,6 @@ func main() {
mux := http.NewServeMux() mux := http.NewServeMux()
tsweb.Debugger(mux) tsweb.Debugger(mux)
expvar.Publish("derpprobe", p.Expvar())
mux.HandleFunc("/", http.HandlerFunc(serveFunc(p))) mux.HandleFunc("/", http.HandlerFunc(serveFunc(p)))
log.Fatal(http.ListenAndServe(*listen, mux)) log.Fatal(http.ListenAndServe(*listen, mux))
} }

@ -8,18 +8,15 @@ package prober
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"expvar"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"io"
"log" "log"
"math/rand" "math/rand"
"sort"
"strings"
"sync" "sync"
"time" "time"
"github.com/prometheus/client_golang/prometheus"
) )
// ProbeFunc is a function that probes something and reports whether // ProbeFunc is a function that probes something and reports whether
@ -42,6 +39,9 @@ type Prober struct {
mu sync.Mutex // protects all following fields mu sync.Mutex // protects all following fields
probes map[string]*Probe probes map[string]*Probe
namespace string
metrics *prometheus.Registry
} }
// New returns a new Prober. // New returns a new Prober.
@ -50,21 +50,15 @@ func New() *Prober {
} }
func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober { func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober {
return &Prober{ p := &Prober{
now: now, now: now,
newTicker: newTicker, newTicker: newTicker,
probes: map[string]*Probe{}, probes: map[string]*Probe{},
metrics: prometheus.NewRegistry(),
namespace: "prober",
} }
} prometheus.DefaultRegisterer.MustRegister(p.metrics)
return p
// Expvar returns the metrics for running probes.
func (p *Prober) Expvar() expvar.Var {
return varExporter{p}
}
// ProbeInfo returns information about most recent probe runs.
func (p *Prober) ProbeInfo() map[string]ProbeInfo {
return varExporter{p}.probeInfo()
} }
// Run executes fun every interval, and exports probe results under probeName. // Run executes fun every interval, and exports probe results under probeName.
@ -77,6 +71,11 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri
panic(fmt.Sprintf("probe named %q already registered", name)) panic(fmt.Sprintf("probe named %q already registered", name))
} }
l := prometheus.Labels{"name": name}
for k, v := range labels {
l[k] = v
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
probe := &Probe{ probe := &Probe{
prober: p, prober: p,
@ -88,8 +87,17 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri
doProbe: fun, doProbe: fun,
interval: interval, interval: interval,
initialDelay: initialDelay(name, interval), initialDelay: initialDelay(name, interval),
labels: labels, metrics: prometheus.NewRegistry(),
mInterval: prometheus.NewDesc("interval_secs", "Probe interval in seconds", nil, l),
mStartTime: prometheus.NewDesc("start_secs", "Latest probe start time (seconds since epoch)", nil, l),
mEndTime: prometheus.NewDesc("end_secs", "Latest probe end time (seconds since epoch)", nil, l),
mLatency: prometheus.NewDesc("latency_millis", "Latest probe latency (ms)", nil, l),
mResult: prometheus.NewDesc("result", "Latest probe result (1 = success, 0 = failure)", nil, l),
} }
prometheus.WrapRegistererWithPrefix(p.namespace+"_", p.metrics).MustRegister(probe.metrics)
probe.metrics.MustRegister(probe)
p.probes[name] = probe p.probes[name] = probe
go probe.loop() go probe.loop()
return probe return probe
@ -98,6 +106,8 @@ func (p *Prober) Run(name string, interval time.Duration, labels map[string]stri
func (p *Prober) unregister(probe *Probe) { func (p *Prober) unregister(probe *Probe) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
probe.metrics.Unregister(probe)
p.metrics.Unregister(probe.metrics)
name := probe.name name := probe.name
delete(p.probes, name) delete(p.probes, name)
} }
@ -116,6 +126,12 @@ func (p *Prober) WithOnce(s bool) *Prober {
return p return p
} }
// WithMetricNamespace allows changing metric name prefix from the default `prober`.
func (p *Prober) WithMetricNamespace(n string) *Prober {
p.namespace = n
return p
}
// Wait blocks until all probes have finished execution. It should typically // Wait blocks until all probes have finished execution. It should typically
// be used with the `once` mode to wait for probes to finish before collecting // be used with the `once` mode to wait for probes to finish before collecting
// their results. // their results.
@ -159,7 +175,16 @@ type Probe struct {
interval time.Duration interval time.Duration
initialDelay time.Duration initialDelay time.Duration
tick ticker tick ticker
labels map[string]string
// metrics is a Prometheus metrics registry for metrics exported by this probe.
// Using a separate registry allows cleanly removing metrics exported by this
// probe when it gets unregistered.
metrics *prometheus.Registry
mInterval *prometheus.Desc
mStartTime *prometheus.Desc
mEndTime *prometheus.Desc
mLatency *prometheus.Desc
mResult *prometheus.Desc
mu sync.Mutex mu sync.Mutex
start time.Time // last time doProbe started start time.Time // last time doProbe started
@ -264,35 +289,28 @@ func (p *Probe) recordEnd(start time.Time, err error) {
} }
} }
type varExporter struct { // ProbeInfo is the state of a Probe.
p *Prober
}
// ProbeInfo is the state of a Probe. Used in expvar-format debug
// data.
type ProbeInfo struct { type ProbeInfo struct {
Labels map[string]string
Start time.Time Start time.Time
End time.Time End time.Time
Latency string // as a string because time.Duration doesn't encode readably to JSON Latency string
Result bool Result bool
Error string Error string
} }
func (v varExporter) probeInfo() map[string]ProbeInfo { func (p *Prober) ProbeInfo() map[string]ProbeInfo {
out := map[string]ProbeInfo{} out := map[string]ProbeInfo{}
v.p.mu.Lock() p.mu.Lock()
probes := make([]*Probe, 0, len(v.p.probes)) probes := make([]*Probe, 0, len(p.probes))
for _, probe := range v.p.probes { for _, probe := range p.probes {
probes = append(probes, probe) probes = append(probes, probe)
} }
v.p.mu.Unlock() p.mu.Unlock()
for _, probe := range probes { for _, probe := range probes {
probe.mu.Lock() probe.mu.Lock()
inf := ProbeInfo{ inf := ProbeInfo{
Labels: probe.labels,
Start: probe.start, Start: probe.start,
End: probe.end, End: probe.end,
Result: probe.succeeded, Result: probe.succeeded,
@ -309,71 +327,34 @@ func (v varExporter) probeInfo() map[string]ProbeInfo {
return out return out
} }
// String implements expvar.Var, returning the prober's state as an // Describe implements prometheus.Collector.
// encoded JSON map of probe name to its ProbeInfo. func (p *Probe) Describe(ch chan<- *prometheus.Desc) {
func (v varExporter) String() string { ch <- p.mInterval
bs, err := json.Marshal(v.probeInfo()) ch <- p.mStartTime
if err != nil { ch <- p.mEndTime
return fmt.Sprintf(`{"error": %q}`, err) ch <- p.mResult
} ch <- p.mLatency
return string(bs)
} }
// WritePrometheus writes the state of all probes to w. // Collect implements prometheus.Collector.
// func (p *Probe) Collect(ch chan<- prometheus.Metric) {
// For each probe, WritePrometheus exports 5 variables: p.mu.Lock()
// - <prefix>_interval_secs, how frequently the probe runs. defer p.mu.Unlock()
// - <prefix>_start_secs, when the probe last started running, in seconds since epoch. ch <- prometheus.MustNewConstMetric(p.mInterval, prometheus.GaugeValue, p.interval.Seconds())
// - <prefix>_end_secs, when the probe last finished running, in seconds since epoch. if !p.start.IsZero() {
// - <prefix>_latency_millis, how long the last probe cycle took, in ch <- prometheus.MustNewConstMetric(p.mStartTime, prometheus.GaugeValue, float64(p.start.Unix()))
// milliseconds. This is just (end_secs-start_secs) in an easier to
// graph form.
// - <prefix>_result, 1 if the last probe succeeded, 0 if it failed.
//
// Each probe has a set of static key/value labels (defined once at
// probe creation), which are added as Prometheus metric labels to
// that probe's variables.
func (v varExporter) WritePrometheus(w io.Writer, prefix string) {
v.p.mu.Lock()
probes := make([]*Probe, 0, len(v.p.probes))
for _, probe := range v.p.probes {
probes = append(probes, probe)
}
v.p.mu.Unlock()
sort.Slice(probes, func(i, j int) bool {
return probes[i].name < probes[j].name
})
for _, probe := range probes {
probe.mu.Lock()
keys := make([]string, 0, len(probe.labels))
for k := range probe.labels {
keys = append(keys, k)
}
sort.Strings(keys)
var sb strings.Builder
fmt.Fprintf(&sb, "name=%q", probe.name)
for _, k := range keys {
fmt.Fprintf(&sb, ",%s=%q", k, probe.labels[k])
}
labels := sb.String()
fmt.Fprintf(w, "%s_interval_secs{%s} %f\n", prefix, labels, probe.interval.Seconds())
if !probe.start.IsZero() {
fmt.Fprintf(w, "%s_start_secs{%s} %d\n", prefix, labels, probe.start.Unix())
} }
if !probe.end.IsZero() { if p.end.IsZero() {
fmt.Fprintf(w, "%s_end_secs{%s} %d\n", prefix, labels, probe.end.Unix()) return
if probe.latency > 0 {
fmt.Fprintf(w, "%s_latency_millis{%s} %d\n", prefix, labels, probe.latency.Milliseconds())
} }
if probe.succeeded { ch <- prometheus.MustNewConstMetric(p.mEndTime, prometheus.GaugeValue, float64(p.end.Unix()))
fmt.Fprintf(w, "%s_result{%s} 1\n", prefix, labels) if p.succeeded {
ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 1)
} else { } else {
fmt.Fprintf(w, "%s_result{%s} 0\n", prefix, labels) ch <- prometheus.MustNewConstMetric(p.mResult, prometheus.GaugeValue, 0)
} }
} if p.latency > 0 {
probe.mu.Unlock() ch <- prometheus.MustNewConstMetric(p.mLatency, prometheus.GaugeValue, float64(p.latency.Milliseconds()))
} }
} }

@ -4,9 +4,7 @@
package prober package prober
import ( import (
"bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
@ -15,9 +13,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/prometheus/client_golang/prometheus/testutil"
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/tsweb"
) )
const ( const (
@ -187,6 +184,9 @@ func TestProberRun(t *testing.T) {
checkCnt(startingProbes) checkCnt(startingProbes)
clk.Advance(probeInterval + halfProbeInterval) clk.Advance(probeInterval + halfProbeInterval)
checkCnt(startingProbes) checkCnt(startingProbes)
if c, err := testutil.GatherAndCount(p.metrics, "prober_result"); c != startingProbes || err != nil {
t.Fatalf("expected %d prober_result metrics; got %d (error %s)", startingProbes, c, err)
}
keep := startingProbes / 2 keep := startingProbes / 2
@ -197,69 +197,14 @@ func TestProberRun(t *testing.T) {
clk.Advance(probeInterval) clk.Advance(probeInterval)
checkCnt(keep) checkCnt(keep)
} if c, err := testutil.GatherAndCount(p.metrics, "prober_result"); c != keep || err != nil {
t.Fatalf("expected %d prober_result metrics; got %d (error %s)", keep, c, err)
func TestExpvar(t *testing.T) {
clk := newFakeTime()
p := newForTest(clk.Now, clk.NewTicker)
var succeed atomic.Bool
p.Run("probe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error {
clk.Advance(aFewMillis)
if succeed.Load() {
return nil
} }
return errors.New("failing, as instructed by test")
})
waitActiveProbes(t, p, clk, 1)
check := func(name string, want ProbeInfo) {
t.Helper()
err := tstest.WaitFor(convergenceTimeout, func() error {
vars := probeExpvar(t, p)
if got, want := len(vars), 1; got != want {
return fmt.Errorf("wrong probe count in expvar, got %d want %d", got, want)
}
for k, v := range vars {
if k != name {
return fmt.Errorf("wrong probe name in expvar, got %q want %q", k, name)
}
if diff := cmp.Diff(v, &want); diff != "" {
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff)
}
}
return nil
})
if err != nil {
t.Fatal(err)
}
}
check("probe", ProbeInfo{
Labels: map[string]string{"label": "value"},
Start: epoch,
End: epoch.Add(aFewMillis),
Result: false,
Error: "failing, as instructed by test",
})
succeed.Store(true)
clk.Advance(probeInterval + halfProbeInterval)
st := epoch.Add(probeInterval + halfProbeInterval + aFewMillis)
check("probe", ProbeInfo{
Labels: map[string]string{"label": "value"},
Start: st,
End: st.Add(aFewMillis),
Latency: aFewMillis.String(),
Result: true,
})
} }
func TestPrometheus(t *testing.T) { func TestPrometheus(t *testing.T) {
clk := newFakeTime() clk := newFakeTime()
p := newForTest(clk.Now, clk.NewTicker) p := newForTest(clk.Now, clk.NewTicker).WithMetricNamespace("probe")
var succeed atomic.Bool var succeed atomic.Bool
p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error {
@ -273,18 +218,22 @@ func TestPrometheus(t *testing.T) {
waitActiveProbes(t, p, clk, 1) waitActiveProbes(t, p, clk, 1)
err := tstest.WaitFor(convergenceTimeout, func() error { err := tstest.WaitFor(convergenceTimeout, func() error {
var b bytes.Buffer want := fmt.Sprintf(`
p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe") # HELP probe_interval_secs Probe interval in seconds
want := strings.TrimSpace(fmt.Sprintf(` # TYPE probe_interval_secs gauge
probe_interval_secs{name="testprobe",label="value"} %f probe_interval_secs{label="value",name="testprobe"} %f
probe_start_secs{name="testprobe",label="value"} %d # HELP probe_start_secs Latest probe start time (seconds since epoch)
probe_end_secs{name="testprobe",label="value"} %d # TYPE probe_start_secs gauge
probe_result{name="testprobe",label="value"} 0 probe_start_secs{label="value",name="testprobe"} %d
`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix())) # HELP probe_end_secs Latest probe end time (seconds since epoch)
if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { # TYPE probe_end_secs gauge
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) probe_end_secs{label="value",name="testprobe"} %d
} # HELP probe_result Latest probe result (1 = success, 0 = failure)
return nil # TYPE probe_result gauge
probe_result{label="value",name="testprobe"} 0
`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix())
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_result")
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -294,21 +243,27 @@ probe_result{name="testprobe",label="value"} 0
clk.Advance(probeInterval + halfProbeInterval) clk.Advance(probeInterval + halfProbeInterval)
err = tstest.WaitFor(convergenceTimeout, func() error { err = tstest.WaitFor(convergenceTimeout, func() error {
var b bytes.Buffer
p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe")
start := epoch.Add(probeInterval + halfProbeInterval) start := epoch.Add(probeInterval + halfProbeInterval)
end := start.Add(aFewMillis) end := start.Add(aFewMillis)
want := strings.TrimSpace(fmt.Sprintf(` want := fmt.Sprintf(`
probe_interval_secs{name="testprobe",label="value"} %f # HELP probe_interval_secs Probe interval in seconds
probe_start_secs{name="testprobe",label="value"} %d # TYPE probe_interval_secs gauge
probe_end_secs{name="testprobe",label="value"} %d probe_interval_secs{label="value",name="testprobe"} %f
probe_latency_millis{name="testprobe",label="value"} %d # HELP probe_start_secs Latest probe start time (seconds since epoch)
probe_result{name="testprobe",label="value"} 1 # TYPE probe_start_secs gauge
`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())) probe_start_secs{label="value",name="testprobe"} %d
if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { # HELP probe_end_secs Latest probe end time (seconds since epoch)
return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) # TYPE probe_end_secs gauge
} probe_end_secs{label="value",name="testprobe"} %d
return nil # HELP probe_latency_millis Latest probe latency (ms)
# TYPE probe_latency_millis gauge
probe_latency_millis{label="value",name="testprobe"} %d
# HELP probe_result Latest probe result (1 = success, 0 = failure)
# TYPE probe_result gauge
probe_result{label="value",name="testprobe"} 1
`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())
return testutil.GatherAndCompare(p.metrics, strings.NewReader(want),
"probe_interval_secs", "probe_start_secs", "probe_end_secs", "probe_latency_millis", "probe_result")
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -329,13 +284,10 @@ func TestOnceMode(t *testing.T) {
}) })
p.Wait() p.Wait()
info := p.ProbeInfo() wantCount := 4
if len(info) != 4 { for _, metric := range []string{"prober_result", "prober_end_secs"} {
t.Errorf("expected 4 probe results, got %+v", info) if c, err := testutil.GatherAndCount(p.metrics, metric); c != wantCount || err != nil {
} t.Fatalf("expected %d %s metrics; got %d (error %s)", wantCount, metric, c, err)
for _, p := range info {
if p.End.IsZero() {
t.Errorf("expected all probes to finish; got %+v", p)
} }
} }
} }
@ -433,16 +385,6 @@ func (t *fakeTime) activeTickers() (count int) {
return return
} }
func probeExpvar(t *testing.T, p *Prober) map[string]*ProbeInfo {
t.Helper()
s := p.Expvar().String()
ret := map[string]*ProbeInfo{}
if err := json.Unmarshal([]byte(s), &ret); err != nil {
t.Fatalf("expvar json decode failed: %v", err)
}
return ret
}
func waitActiveProbes(t *testing.T, p *Prober, clk *fakeTime, want int) { func waitActiveProbes(t *testing.T, p *Prober, clk *fakeTime, want int) {
t.Helper() t.Helper()
err := tstest.WaitFor(convergenceTimeout, func() error { err := tstest.WaitFor(convergenceTimeout, func() error {

@ -448,15 +448,6 @@ func Error(code int, msg string, err error) HTTPError {
return HTTPError{Code: code, Msg: msg, Err: err} return HTTPError{Code: code, Msg: msg, Err: err}
} }
// PrometheusVar is a value that knows how to format itself into
// Prometheus metric syntax.
type PrometheusVar interface {
// WritePrometheus writes the value of the var to w, in Prometheus
// metric syntax. All variables names written out must start with
// prefix (or write out a single variable named exactly prefix)
WritePrometheus(w io.Writer, prefix string)
}
// WritePrometheusExpvar writes kv to w in Prometheus metrics format. // WritePrometheusExpvar writes kv to w in Prometheus metrics format.
// //
// See VarzHandler for conventions. This is exported primarily for // See VarzHandler for conventions. This is exported primarily for
@ -510,9 +501,6 @@ func writePromExpVar(w io.Writer, prefix string, kv expvar.KeyValue) {
name, typ, label := prometheusMetric(prefix, key) name, typ, label := prometheusMetric(prefix, key)
switch v := kv.Value.(type) { switch v := kv.Value.(type) {
case PrometheusVar:
v.WritePrometheus(w, name)
return
case *expvar.Int: case *expvar.Int:
if typ == "" { if typ == "" {
typ = "counter" typ = "counter"

@ -9,7 +9,6 @@ import (
"errors" "errors"
"expvar" "expvar"
"fmt" "fmt"
"io"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -570,12 +569,6 @@ foo_totalY 4
expvar.Func(func() any { return 123 }), expvar.Func(func() any { return 123 }),
"num_goroutines 123\n", "num_goroutines 123\n",
}, },
{
"var_that_exports_itself",
"custom_var",
promWriter{},
"custom_var_value 42\n",
},
{ {
"string_version_var", "string_version_var",
"foo_version", "foo_version",
@ -694,16 +687,6 @@ func (a expvarAdapter2) PrometheusMetricsReflectRoot() any {
return a.st return a.st
} }
type promWriter struct{}
func (promWriter) WritePrometheus(w io.Writer, prefix string) {
fmt.Fprintf(w, "%s_value 42\n", prefix)
}
func (promWriter) String() string {
return ""
}
func TestAcceptsEncoding(t *testing.T) { func TestAcceptsEncoding(t *testing.T) {
tests := []struct { tests := []struct {
in, enc string in, enc string

Loading…
Cancel
Save