diff --git a/prober/prober.go b/prober/prober.go index 268a79dc0..4640c468b 100644 --- a/prober/prober.go +++ b/prober/prober.go @@ -18,9 +18,9 @@ import ( "tailscale.com/metrics" ) -// ProbeFunc is a function that probes something and reports whether the -// probe succeeded. The provided context must be used to ensure timely -// cancellation and timeout behavior. +// ProbeFunc is a function that probes something and reports whether +// the probe succeeded. The provided context's deadline must be obeyed +// for correct probe scheduling. type ProbeFunc func(context.Context) error // a Prober manages a set of probes and keeps track of their results. @@ -56,8 +56,8 @@ type Prober struct { // without being excessively delayed. probeInterval metrics.LabelMap - mu sync.Mutex // protects all following fields - activeProbeCh map[string]chan struct{} + mu sync.Mutex // protects all following fields + probes map[string]*Probe } // New returns a new Prober. @@ -74,7 +74,7 @@ func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Pro lastResult: metrics.LabelMap{Label: "probe"}, lastLatency: metrics.LabelMap{Label: "probe"}, probeInterval: metrics.LabelMap{Label: "probe"}, - activeProbeCh: map[string]chan struct{}{}, + probes: map[string]*Probe{}, } } @@ -91,60 +91,99 @@ func (p *Prober) Expvar() *metrics.Set { // Run executes fun every interval, and exports probe results under probeName. // -// fun is given a context.Context that, if obeyed, ensures that fun -// ends within interval. If fun disregards the context, it will not be -// run again until it does finish, and metrics will reflect that the -// probe function is stuck. -// -// Run returns a context.CancelFunc that stops the probe when -// invoked. Probe shutdown and removal happens-before the CancelFunc -// returns. -// // Registering a probe under an already-registered name panics. -func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) context.CancelFunc { +func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe { p.mu.Lock() defer p.mu.Unlock() - ticker := p.registerLocked(name, interval) + if _, ok := p.probes[name]; ok { + panic(fmt.Sprintf("probe named %q already registered", name)) + } ctx, cancel := context.WithCancel(context.Background()) - go p.probeLoop(ctx, name, interval, ticker, fun) - - return func() { - p.mu.Lock() - stopped := p.activeProbeCh[name] - p.mu.Unlock() - cancel() - <-stopped + ticker := p.newTicker(interval) + probe := &Probe{ + prober: p, + ctx: ctx, + cancel: cancel, + stopped: make(chan struct{}), + + name: name, + doProbe: fun, + interval: interval, + tick: ticker, } + p.probes[name] = probe + p.probeInterval.Get(name).Set(int64(interval.Seconds())) + go probe.loop() + return probe +} + +func (p *Prober) unregister(probe *Probe) { + p.mu.Lock() + defer p.mu.Unlock() + name := probe.name + delete(p.probes, name) + p.lastStart.Delete(name) + p.lastEnd.Delete(name) + p.lastResult.Delete(name) + p.lastLatency.Delete(name) + p.probeInterval.Delete(name) +} + +// Reports the number of registered probes. For tests only. +func (p *Prober) activeProbes() int { + p.mu.Lock() + defer p.mu.Unlock() + return len(p.probes) +} + +// Probe is a probe that healthchecks something and updates Prometheus +// metrics with the results. +type Probe struct { + prober *Prober + ctx context.Context + cancel context.CancelFunc // run to initiate shutdown + stopped chan struct{} // closed when shutdown is complete + + name string + doProbe ProbeFunc + interval time.Duration + tick ticker +} + +// Close shuts down the Probe and unregisters it from its Prober. +// It is safe to Run a new probe of the same name after Close returns. +func (p *Probe) Close() error { + p.cancel() + <-p.stopped + p.prober.unregister(p) + return nil } // probeLoop invokes runProbe on fun every interval. The first probe // is run after interval. -func (p *Prober) probeLoop(ctx context.Context, name string, interval time.Duration, tick ticker, fun ProbeFunc) { - defer func() { - p.unregister(name) - tick.Stop() - }() +func (p *Probe) loop() { + defer close(p.stopped) // Do a first probe right away, so that the prober immediately exports results for everything. - p.runProbe(ctx, name, interval, fun) + p.run() for { select { - case <-tick.Chan(): - p.runProbe(ctx, name, interval, fun) - case <-ctx.Done(): + case <-p.tick.Chan(): + p.run() + case <-p.ctx.Done(): return } } } -// runProbe invokes fun and records the results. +// run invokes fun and records the results. // // fun is invoked with a timeout slightly less than interval, so that // the probe either succeeds or fails before the next cycle is // scheduled to start. -func (p *Prober) runProbe(ctx context.Context, name string, interval time.Duration, fun ProbeFunc) { - start := p.start(name) +func (p *Probe) run() { + start := p.start() defer func() { // Prevent a panic within one probe function from killing the // entire prober, so that a single buggy probe doesn't destroy @@ -152,70 +191,36 @@ func (p *Prober) runProbe(ctx context.Context, name string, interval time.Durati // as a probe failure, so panicking probes will trigger an // alert for debugging. if r := recover(); r != nil { - log.Printf("probe %s panicked: %v", name, r) - p.end(name, start, errors.New("panic")) + log.Printf("probe %s panicked: %v", p.name, r) + p.end(start, errors.New("panic")) } }() - timeout := time.Duration(float64(interval) * 0.8) - ctx, cancel := context.WithTimeout(ctx, timeout) + timeout := time.Duration(float64(p.interval) * 0.8) + ctx, cancel := context.WithTimeout(p.ctx, timeout) defer cancel() - err := fun(ctx) - p.end(name, start, err) + err := p.doProbe(ctx) + p.end(start, err) if err != nil { - log.Printf("probe %s: %v", name, err) - } -} - -func (p *Prober) registerLocked(name string, interval time.Duration) ticker { - if _, ok := p.activeProbeCh[name]; ok { - panic(fmt.Sprintf("probe named %q already registered", name)) + log.Printf("probe %s: %v", p.name, err) } - - stoppedCh := make(chan struct{}) - p.activeProbeCh[name] = stoppedCh - p.probeInterval.Get(name).Set(int64(interval.Seconds())) - // Create and return a ticker from here, while Prober is - // locked. This ensures that our fake time in tests always sees - // the new fake ticker being created before seeing that a new - // probe is registered. - return p.newTicker(interval) -} - -func (p *Prober) unregister(name string) { - p.mu.Lock() - defer p.mu.Unlock() - close(p.activeProbeCh[name]) - delete(p.activeProbeCh, name) - p.lastStart.Delete(name) - p.lastEnd.Delete(name) - p.lastResult.Delete(name) - p.lastLatency.Delete(name) - p.probeInterval.Delete(name) } -func (p *Prober) start(name string) time.Time { - st := p.now() - p.lastStart.Get(name).Set(st.Unix()) +func (p *Probe) start() time.Time { + st := p.prober.now() + p.prober.lastStart.Get(p.name).Set(st.Unix()) return st } -func (p *Prober) end(name string, start time.Time, err error) { - end := p.now() - p.lastEnd.Get(name).Set(end.Unix()) - p.lastLatency.Get(name).Set(end.Sub(start).Milliseconds()) +func (p *Probe) end(start time.Time, err error) { + end := p.prober.now() + p.prober.lastEnd.Get(p.name).Set(end.Unix()) + p.prober.lastLatency.Get(p.name).Set(end.Sub(start).Milliseconds()) v := int64(1) if err != nil { v = 0 } - p.lastResult.Get(name).Set(v) -} - -// Reports the number of registered probes. For tests only. -func (p *Prober) activeProbes() int { - p.mu.Lock() - defer p.mu.Unlock() - return len(p.activeProbeCh) + p.prober.lastResult.Get(p.name).Set(v) } // ticker wraps a time.Ticker in a way that can be faked for tests. diff --git a/prober/prober_test.go b/prober/prober_test.go index 8813bc95f..6cc3ca058 100644 --- a/prober/prober_test.go +++ b/prober/prober_test.go @@ -80,10 +80,10 @@ func TestProberRun(t *testing.T) { ) const startingProbes = 100 - cancels := []context.CancelFunc{} + var probes []*Probe for i := 0; i < startingProbes; i++ { - cancels = append(cancels, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error { + probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error { mu.Lock() defer mu.Unlock() cnt++ @@ -114,7 +114,7 @@ func TestProberRun(t *testing.T) { keep := startingProbes / 2 for i := keep; i < startingProbes; i++ { - cancels[i]() + probes[i].Close() } waitActiveProbes(t, p, keep)