From 0968b2d55aaefa0a8aafbb8ccab1b9697a4aa53f Mon Sep 17 00:00:00 2001 From: Dave Anderson Date: Tue, 22 Mar 2022 13:45:11 -0700 Subject: [PATCH] prober: support adding key/value labels to probes. (#4250) prober: add labels to Probe instances. This allows especially dynamically-registered probes to have a bunch more dimensions along which they can be sliced in Prometheus. Signed-off-by: David Anderson --- prober/prober.go | 196 ++++++++++++++++++++++++++++-------------- prober/prober_test.go | 168 ++++++++++++++++++++++++------------ 2 files changed, 245 insertions(+), 119 deletions(-) diff --git a/prober/prober.go b/prober/prober.go index 4640c468b..fbe302910 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -9,13 +9,16 @@ package prober import ( "context" + "encoding/json" "errors" + "expvar" "fmt" + "io" "log" + "sort" + "strings" "sync" "time" - - "tailscale.com/metrics" ) // ProbeFunc is a function that probes something and reports whether @@ -29,33 +32,6 @@ type Prober struct { now func() time.Time newTicker func(time.Duration) ticker - // lastStart is the time, in seconds since epoch, of the last time - // each probe started a probe cycle. - lastStart metrics.LabelMap - // lastEnd is the time, in seconds since epoch, of the last time - // each probe finished a probe cycle. - lastEnd metrics.LabelMap - // lastResult records whether probes succeeded. A successful probe - // is recorded as 1, a failure as 0. - lastResult metrics.LabelMap - // lastLatency records how long the last probe cycle took for each - // probe, in milliseconds. - lastLatency metrics.LabelMap - // probeInterval records the time in seconds between successive - // runs of each probe. - // - // This is to help Prometheus figure out how long a probe should - // be failing before it fires an alert for it. To avoid random - // background noise, you want it to wait for more than 1 - // datapoint, but you also can't use a fixed interval because some - // probes might run every few seconds, while e.g. TLS certificate - // expiry might only run once a day. - // - // So, for each probe, the prober tells Prometheus how often it - // runs, so that the alert can autotune itself to eliminate noise - // without being excessively delayed. - probeInterval metrics.LabelMap - mu sync.Mutex // protects all following fields probes map[string]*Probe } @@ -67,32 +43,21 @@ func New() *Prober { func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Prober { return &Prober{ - now: now, - newTicker: newTicker, - lastStart: metrics.LabelMap{Label: "probe"}, - lastEnd: metrics.LabelMap{Label: "probe"}, - lastResult: metrics.LabelMap{Label: "probe"}, - lastLatency: metrics.LabelMap{Label: "probe"}, - probeInterval: metrics.LabelMap{Label: "probe"}, - probes: map[string]*Probe{}, + now: now, + newTicker: newTicker, + probes: map[string]*Probe{}, } } // Expvar returns the metrics for running probes. -func (p *Prober) Expvar() *metrics.Set { - ret := new(metrics.Set) - ret.Set("start_secs", &p.lastStart) - ret.Set("end_secs", &p.lastEnd) - ret.Set("result", &p.lastResult) - ret.Set("latency_millis", &p.lastLatency) - ret.Set("interval_secs", &p.probeInterval) - return ret +func (p *Prober) Expvar() expvar.Var { + return varExporter{p} } // Run executes fun every interval, and exports probe results under probeName. // // Registering a probe under an already-registered name panics. -func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe { +func (p *Prober) Run(name string, interval time.Duration, labels map[string]string, fun ProbeFunc) *Probe { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.probes[name]; ok { @@ -111,9 +76,9 @@ func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe doProbe: fun, interval: interval, tick: ticker, + labels: labels, } p.probes[name] = probe - p.probeInterval.Get(name).Set(int64(interval.Seconds())) go probe.loop() return probe } @@ -123,11 +88,6 @@ func (p *Prober) unregister(probe *Probe) { defer p.mu.Unlock() name := probe.name delete(p.probes, name) - p.lastStart.Delete(name) - p.lastEnd.Delete(name) - p.lastResult.Delete(name) - p.lastLatency.Delete(name) - p.probeInterval.Delete(name) } // Reports the number of registered probes. For tests only. @@ -149,6 +109,12 @@ type Probe struct { doProbe ProbeFunc interval time.Duration tick ticker + labels map[string]string + + mu sync.Mutex + start time.Time // last time doProbe started + end time.Time // last time doProbe returned + result bool // whether the last doProbe call succeeded } // Close shuts down the Probe and unregisters it from its Prober. @@ -183,7 +149,7 @@ func (p *Probe) loop() { // the probe either succeeds or fails before the next cycle is // scheduled to start. func (p *Probe) run() { - start := p.start() + start := p.recordStart() defer func() { // Prevent a panic within one probe function from killing the // entire prober, so that a single buggy probe doesn't destroy @@ -192,7 +158,7 @@ func (p *Probe) run() { // alert for debugging. if r := recover(); r != nil { log.Printf("probe %s panicked: %v", p.name, r) - p.end(start, errors.New("panic")) + p.recordEnd(start, errors.New("panic")) } }() timeout := time.Duration(float64(p.interval) * 0.8) @@ -200,27 +166,131 @@ func (p *Probe) run() { defer cancel() err := p.doProbe(ctx) - p.end(start, err) + p.recordEnd(start, err) if err != nil { log.Printf("probe %s: %v", p.name, err) } } -func (p *Probe) start() time.Time { +func (p *Probe) recordStart() time.Time { st := p.prober.now() - p.prober.lastStart.Get(p.name).Set(st.Unix()) + p.mu.Lock() + defer p.mu.Unlock() + p.start = st return st } -func (p *Probe) end(start time.Time, err error) { +func (p *Probe) recordEnd(start time.Time, err error) { end := p.prober.now() - p.prober.lastEnd.Get(p.name).Set(end.Unix()) - p.prober.lastLatency.Get(p.name).Set(end.Sub(start).Milliseconds()) - v := int64(1) + p.mu.Lock() + defer p.mu.Unlock() + p.end = end + p.result = err == nil +} + +type varExporter struct { + p *Prober +} + +// probeInfo is the state of a Probe. Used in expvar-format debug +// data. +type probeInfo struct { + Labels map[string]string + Start time.Time + End time.Time + Latency string // as a string because time.Duration doesn't encode readably to JSON + Result bool +} + +// String implements expvar.Var, returning the prober's state as an +// encoded JSON map of probe name to its probeInfo. +func (v varExporter) String() string { + out := map[string]probeInfo{} + + v.p.mu.Lock() + probes := make([]*Probe, 0, len(v.p.probes)) + for _, probe := range v.p.probes { + probes = append(probes, probe) + } + v.p.mu.Unlock() + + for _, probe := range probes { + probe.mu.Lock() + inf := probeInfo{ + Labels: probe.labels, + Start: probe.start, + End: probe.end, + Result: probe.result, + } + if probe.end.After(probe.start) { + inf.Latency = probe.end.Sub(probe.start).String() + } + out[probe.name] = inf + probe.mu.Unlock() + } + + bs, err := json.Marshal(out) if err != nil { - v = 0 + return fmt.Sprintf(`{"error": %q}`, err) + } + return string(bs) +} + +// WritePrometheus writes the the state of all probes to w. +// +// For each probe, WritePrometheus exports 5 variables: +// - _interval_secs, how frequently the probe runs. +// - _start_secs, when the probe last started running, in seconds since epoch. +// - _end_secs, when the probe last finished running, in seconds since epoch. +// - _latency_millis, how long the last probe cycle took, in +// milliseconds. This is just (end_secs-start_secs) in an easier to +// graph form. +// - _result, 1 if the last probe succeeded, 0 if it failed. +// +// Each probe has a set of static key/value labels (defined once at +// probe creation), which are added as Prometheus metric labels to +// that probe's variables. +func (v varExporter) WritePrometheus(w io.Writer, prefix string) { + v.p.mu.Lock() + probes := make([]*Probe, 0, len(v.p.probes)) + for _, probe := range v.p.probes { + probes = append(probes, probe) + } + v.p.mu.Unlock() + + sort.Slice(probes, func(i, j int) bool { + return probes[i].name < probes[j].name + }) + for _, probe := range probes { + probe.mu.Lock() + keys := make([]string, 0, len(probe.labels)) + for k := range probe.labels { + keys = append(keys, k) + } + sort.Strings(keys) + var sb strings.Builder + fmt.Fprintf(&sb, "name=%q", probe.name) + for _, k := range keys { + fmt.Fprintf(&sb, ",%s=%q", k, probe.labels[k]) + } + labels := sb.String() + + fmt.Fprintf(w, "%s_interval_secs{%s} %f\n", prefix, labels, probe.interval.Seconds()) + if !probe.start.IsZero() { + fmt.Fprintf(w, "%s_start_secs{%s} %d\n", prefix, labels, probe.start.Unix()) + } + if !probe.end.IsZero() { + fmt.Fprintf(w, "%s_end_secs{%s} %d\n", prefix, labels, probe.end.Unix()) + // Start is always present if end is. + fmt.Fprintf(w, "%s_latency_millis{%s} %d\n", prefix, labels, probe.end.Sub(probe.start).Milliseconds()) + if probe.result { + fmt.Fprintf(w, "%s_result{%s} 1\n", prefix, labels) + } else { + fmt.Fprintf(w, "%s_result{%s} 0\n", prefix, labels) + } + } + probe.mu.Unlock() } - p.prober.lastResult.Get(p.name).Set(v) } // ticker wraps a time.Ticker in a way that can be faked for tests. diff --git a/prober/prober_test.go b/prober/prober_test.go index 6cc3ca058..930f7bef6 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -5,6 +5,7 @@ package prober import ( + "bytes" "context" "encoding/json" "errors" @@ -14,8 +15,10 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "tailscale.com/syncs" "tailscale.com/tstest" + "tailscale.com/tsweb" ) const ( @@ -24,6 +27,7 @@ const ( quarterProbeInterval = probeInterval / 4 convergenceTimeout = time.Second convergenceSleep = time.Millisecond + aFewMillis = 20 * time.Millisecond ) var epoch = time.Unix(0, 0) @@ -51,7 +55,7 @@ func TestProberTiming(t *testing.T) { } } - p.Run("test-probe", probeInterval, func(context.Context) error { + p.Run("test-probe", probeInterval, nil, func(context.Context) error { invoked <- struct{}{} return nil }) @@ -83,7 +87,7 @@ func TestProberRun(t *testing.T) { var probes []*Probe for i := 0; i < startingProbes; i++ { - probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error { + probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, nil, func(context.Context) error { mu.Lock() defer mu.Unlock() cnt++ @@ -92,6 +96,7 @@ func TestProberRun(t *testing.T) { } checkCnt := func(want int) { + t.Helper() err := tstest.WaitFor(convergenceTimeout, func() error { mu.Lock() defer mu.Unlock() @@ -126,9 +131,8 @@ func TestExpvar(t *testing.T) { clk := newFakeTime() p := newForTest(clk.Now, clk.NewTicker) - const aFewMillis = 20 * time.Millisecond var succeed syncs.AtomicBool - p.Run("probe", probeInterval, func(context.Context) error { + p.Run("probe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { clk.Advance(aFewMillis) if succeed.Get() { return nil @@ -138,20 +142,106 @@ func TestExpvar(t *testing.T) { waitActiveProbes(t, p, 1) - waitExpInt(t, p, "start_secs/probe", 0) - waitExpInt(t, p, "end_secs/probe", 0) - waitExpInt(t, p, "interval_secs/probe", int(probeInterval.Seconds())) - waitExpInt(t, p, "latency_millis/probe", int(aFewMillis.Milliseconds())) - waitExpInt(t, p, "result/probe", 0) + check := func(name string, want probeInfo) { + t.Helper() + err := tstest.WaitFor(convergenceTimeout, func() error { + vars := probeExpvar(t, p) + if got, want := len(vars), 1; got != want { + return fmt.Errorf("wrong probe count in expvar, got %d want %d", got, want) + } + for k, v := range vars { + if k != name { + return fmt.Errorf("wrong probe name in expvar, got %q want %q", k, name) + } + if diff := cmp.Diff(v, &want); diff != "" { + return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) + } + } + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + check("probe", probeInfo{ + Labels: map[string]string{"label": "value"}, + Start: epoch, + End: epoch.Add(aFewMillis), + Latency: aFewMillis.String(), + Result: false, + }) succeed.Set(true) clk.Advance(probeInterval + halfProbeInterval) - waitExpInt(t, p, "start_secs/probe", int((probeInterval + halfProbeInterval).Seconds())) - waitExpInt(t, p, "end_secs/probe", int((probeInterval + halfProbeInterval).Seconds())) - waitExpInt(t, p, "interval_secs/probe", int(probeInterval.Seconds())) - waitExpInt(t, p, "latency_millis/probe", int(aFewMillis.Milliseconds())) - waitExpInt(t, p, "result/probe", 1) + st := epoch.Add(probeInterval + halfProbeInterval + aFewMillis) + check("probe", probeInfo{ + Labels: map[string]string{"label": "value"}, + Start: st, + End: st.Add(aFewMillis), + Latency: aFewMillis.String(), + Result: true, + }) +} + +func TestPrometheus(t *testing.T) { + clk := newFakeTime() + p := newForTest(clk.Now, clk.NewTicker) + + var succeed syncs.AtomicBool + p.Run("testprobe", probeInterval, map[string]string{"label": "value"}, func(context.Context) error { + clk.Advance(aFewMillis) + if succeed.Get() { + return nil + } + return errors.New("failing, as instructed by test") + }) + + waitActiveProbes(t, p, 1) + + err := tstest.WaitFor(convergenceTimeout, func() error { + var b bytes.Buffer + p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe") + want := strings.TrimSpace(fmt.Sprintf(` +probe_interval_secs{name="testprobe",label="value"} %f +probe_start_secs{name="testprobe",label="value"} %d +probe_end_secs{name="testprobe",label="value"} %d +probe_latency_millis{name="testprobe",label="value"} %d +probe_result{name="testprobe",label="value"} 0 +`, probeInterval.Seconds(), epoch.Unix(), epoch.Add(aFewMillis).Unix(), aFewMillis.Milliseconds())) + if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { + return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + succeed.Set(true) + clk.Advance(probeInterval + halfProbeInterval) + + err = tstest.WaitFor(convergenceTimeout, func() error { + var b bytes.Buffer + p.Expvar().(tsweb.PrometheusVar).WritePrometheus(&b, "probe") + start := epoch.Add(probeInterval + halfProbeInterval) + end := start.Add(aFewMillis) + want := strings.TrimSpace(fmt.Sprintf(` +probe_interval_secs{name="testprobe",label="value"} %f +probe_start_secs{name="testprobe",label="value"} %d +probe_end_secs{name="testprobe",label="value"} %d +probe_latency_millis{name="testprobe",label="value"} %d +probe_result{name="testprobe",label="value"} 1 +`, probeInterval.Seconds(), start.Unix(), end.Unix(), aFewMillis.Milliseconds())) + if diff := cmp.Diff(strings.TrimSpace(b.String()), want); diff != "" { + return fmt.Errorf("wrong probe stats (-got+want):\n%s", diff) + } + return nil + }) + if err != nil { + t.Fatal(err) + } } type fakeTicker struct { @@ -185,7 +275,9 @@ func (t *fakeTicker) fire(now time.Time) { case t.ch <- now: default: } - t.next = now.Add(t.interval) + for now.After(t.next) { + t.next = t.next.Add(t.interval) + } } type fakeTime struct { @@ -200,7 +292,6 @@ func newFakeTime() *fakeTime { curTime: epoch, } ret.Cond = &sync.Cond{L: &ret.Mutex} - ret.Advance(time.Duration(1)) // so that Now never IsZero return ret } @@ -208,8 +299,6 @@ func (t *fakeTime) Now() time.Time { t.Lock() defer t.Unlock() ret := t.curTime - // so that time always seems to advance for the program under test - t.curTime = t.curTime.Add(time.Microsecond) return ret } @@ -237,47 +326,14 @@ func (t *fakeTime) Advance(d time.Duration) { } } -func waitExpInt(t *testing.T, p *Prober, path string, want int) { - t.Helper() - err := tstest.WaitFor(convergenceTimeout, func() error { - got, ok := getExpInt(t, p, path) - if !ok { - return fmt.Errorf("expvar %q did not get set", path) - } - if got != want { - return fmt.Errorf("expvar %q is %d, want %d", path, got, want) - } - return nil - }) - if err != nil { - t.Fatal(err) - } -} - -func getExpInt(t *testing.T, p *Prober, path string) (ret int, ok bool) { +func probeExpvar(t *testing.T, p *Prober) map[string]*probeInfo { t.Helper() s := p.Expvar().String() - dec := map[string]interface{}{} - if err := json.Unmarshal([]byte(s), &dec); err != nil { - t.Fatalf("couldn't unmarshal expvar data: %v", err) + ret := map[string]*probeInfo{} + if err := json.Unmarshal([]byte(s), &ret); err != nil { + t.Fatalf("expvar json decode failed: %v", err) } - var v interface{} = dec - for _, d := range strings.Split(path, "/") { - m, ok := v.(map[string]interface{}) - if !ok { - t.Fatalf("expvar path %q ended early with a leaf value", path) - } - child, ok := m[d] - if !ok { - return 0, false - } - v = child - } - f, ok := v.(float64) - if !ok { - return 0, false - } - return int(f), true + return ret } func waitActiveProbes(t *testing.T, p *Prober, want int) {