prober: refactor probe state into a Probe struct.

Signed-off-by: David Anderson <danderson@tailscale.com>
pull/4251/head
David Anderson 2 years ago committed by Dave Anderson
parent 94aaec5c66
commit a09c30aac2

@ -18,9 +18,9 @@ import (
"tailscale.com/metrics" "tailscale.com/metrics"
) )
// ProbeFunc is a function that probes something and reports whether the // ProbeFunc is a function that probes something and reports whether
// probe succeeded. The provided context must be used to ensure timely // the probe succeeded. The provided context's deadline must be obeyed
// cancellation and timeout behavior. // for correct probe scheduling.
type ProbeFunc func(context.Context) error type ProbeFunc func(context.Context) error
// a Prober manages a set of probes and keeps track of their results. // a Prober manages a set of probes and keeps track of their results.
@ -56,8 +56,8 @@ type Prober struct {
// without being excessively delayed. // without being excessively delayed.
probeInterval metrics.LabelMap probeInterval metrics.LabelMap
mu sync.Mutex // protects all following fields mu sync.Mutex // protects all following fields
activeProbeCh map[string]chan struct{} probes map[string]*Probe
} }
// New returns a new Prober. // New returns a new Prober.
@ -74,7 +74,7 @@ func newForTest(now func() time.Time, newTicker func(time.Duration) ticker) *Pro
lastResult: metrics.LabelMap{Label: "probe"}, lastResult: metrics.LabelMap{Label: "probe"},
lastLatency: metrics.LabelMap{Label: "probe"}, lastLatency: metrics.LabelMap{Label: "probe"},
probeInterval: metrics.LabelMap{Label: "probe"}, probeInterval: metrics.LabelMap{Label: "probe"},
activeProbeCh: map[string]chan struct{}{}, probes: map[string]*Probe{},
} }
} }
@ -91,60 +91,99 @@ func (p *Prober) Expvar() *metrics.Set {
// Run executes fun every interval, and exports probe results under probeName. // Run executes fun every interval, and exports probe results under probeName.
// //
// fun is given a context.Context that, if obeyed, ensures that fun
// ends within interval. If fun disregards the context, it will not be
// run again until it does finish, and metrics will reflect that the
// probe function is stuck.
//
// Run returns a context.CancelFunc that stops the probe when
// invoked. Probe shutdown and removal happens-before the CancelFunc
// returns.
//
// Registering a probe under an already-registered name panics. // Registering a probe under an already-registered name panics.
func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) context.CancelFunc { func (p *Prober) Run(name string, interval time.Duration, fun ProbeFunc) *Probe {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
ticker := p.registerLocked(name, interval) if _, ok := p.probes[name]; ok {
panic(fmt.Sprintf("probe named %q already registered", name))
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go p.probeLoop(ctx, name, interval, ticker, fun) ticker := p.newTicker(interval)
probe := &Probe{
return func() { prober: p,
p.mu.Lock() ctx: ctx,
stopped := p.activeProbeCh[name] cancel: cancel,
p.mu.Unlock() stopped: make(chan struct{}),
cancel()
<-stopped name: name,
doProbe: fun,
interval: interval,
tick: ticker,
} }
p.probes[name] = probe
p.probeInterval.Get(name).Set(int64(interval.Seconds()))
go probe.loop()
return probe
}
func (p *Prober) unregister(probe *Probe) {
p.mu.Lock()
defer p.mu.Unlock()
name := probe.name
delete(p.probes, name)
p.lastStart.Delete(name)
p.lastEnd.Delete(name)
p.lastResult.Delete(name)
p.lastLatency.Delete(name)
p.probeInterval.Delete(name)
}
// Reports the number of registered probes. For tests only.
func (p *Prober) activeProbes() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.probes)
}
// Probe is a probe that healthchecks something and updates Prometheus
// metrics with the results.
type Probe struct {
prober *Prober
ctx context.Context
cancel context.CancelFunc // run to initiate shutdown
stopped chan struct{} // closed when shutdown is complete
name string
doProbe ProbeFunc
interval time.Duration
tick ticker
}
// Close shuts down the Probe and unregisters it from its Prober.
// It is safe to Run a new probe of the same name after Close returns.
func (p *Probe) Close() error {
p.cancel()
<-p.stopped
p.prober.unregister(p)
return nil
} }
// probeLoop invokes runProbe on fun every interval. The first probe // probeLoop invokes runProbe on fun every interval. The first probe
// is run after interval. // is run after interval.
func (p *Prober) probeLoop(ctx context.Context, name string, interval time.Duration, tick ticker, fun ProbeFunc) { func (p *Probe) loop() {
defer func() { defer close(p.stopped)
p.unregister(name)
tick.Stop()
}()
// Do a first probe right away, so that the prober immediately exports results for everything. // Do a first probe right away, so that the prober immediately exports results for everything.
p.runProbe(ctx, name, interval, fun) p.run()
for { for {
select { select {
case <-tick.Chan(): case <-p.tick.Chan():
p.runProbe(ctx, name, interval, fun) p.run()
case <-ctx.Done(): case <-p.ctx.Done():
return return
} }
} }
} }
// runProbe invokes fun and records the results. // run invokes fun and records the results.
// //
// fun is invoked with a timeout slightly less than interval, so that // fun is invoked with a timeout slightly less than interval, so that
// the probe either succeeds or fails before the next cycle is // the probe either succeeds or fails before the next cycle is
// scheduled to start. // scheduled to start.
func (p *Prober) runProbe(ctx context.Context, name string, interval time.Duration, fun ProbeFunc) { func (p *Probe) run() {
start := p.start(name) start := p.start()
defer func() { defer func() {
// Prevent a panic within one probe function from killing the // Prevent a panic within one probe function from killing the
// entire prober, so that a single buggy probe doesn't destroy // entire prober, so that a single buggy probe doesn't destroy
@ -152,70 +191,36 @@ func (p *Prober) runProbe(ctx context.Context, name string, interval time.Durati
// as a probe failure, so panicking probes will trigger an // as a probe failure, so panicking probes will trigger an
// alert for debugging. // alert for debugging.
if r := recover(); r != nil { if r := recover(); r != nil {
log.Printf("probe %s panicked: %v", name, r) log.Printf("probe %s panicked: %v", p.name, r)
p.end(name, start, errors.New("panic")) p.end(start, errors.New("panic"))
} }
}() }()
timeout := time.Duration(float64(interval) * 0.8) timeout := time.Duration(float64(p.interval) * 0.8)
ctx, cancel := context.WithTimeout(ctx, timeout) ctx, cancel := context.WithTimeout(p.ctx, timeout)
defer cancel() defer cancel()
err := fun(ctx) err := p.doProbe(ctx)
p.end(name, start, err) p.end(start, err)
if err != nil { if err != nil {
log.Printf("probe %s: %v", name, err) log.Printf("probe %s: %v", p.name, err)
}
}
func (p *Prober) registerLocked(name string, interval time.Duration) ticker {
if _, ok := p.activeProbeCh[name]; ok {
panic(fmt.Sprintf("probe named %q already registered", name))
} }
stoppedCh := make(chan struct{})
p.activeProbeCh[name] = stoppedCh
p.probeInterval.Get(name).Set(int64(interval.Seconds()))
// Create and return a ticker from here, while Prober is
// locked. This ensures that our fake time in tests always sees
// the new fake ticker being created before seeing that a new
// probe is registered.
return p.newTicker(interval)
}
func (p *Prober) unregister(name string) {
p.mu.Lock()
defer p.mu.Unlock()
close(p.activeProbeCh[name])
delete(p.activeProbeCh, name)
p.lastStart.Delete(name)
p.lastEnd.Delete(name)
p.lastResult.Delete(name)
p.lastLatency.Delete(name)
p.probeInterval.Delete(name)
} }
func (p *Prober) start(name string) time.Time { func (p *Probe) start() time.Time {
st := p.now() st := p.prober.now()
p.lastStart.Get(name).Set(st.Unix()) p.prober.lastStart.Get(p.name).Set(st.Unix())
return st return st
} }
func (p *Prober) end(name string, start time.Time, err error) { func (p *Probe) end(start time.Time, err error) {
end := p.now() end := p.prober.now()
p.lastEnd.Get(name).Set(end.Unix()) p.prober.lastEnd.Get(p.name).Set(end.Unix())
p.lastLatency.Get(name).Set(end.Sub(start).Milliseconds()) p.prober.lastLatency.Get(p.name).Set(end.Sub(start).Milliseconds())
v := int64(1) v := int64(1)
if err != nil { if err != nil {
v = 0 v = 0
} }
p.lastResult.Get(name).Set(v) p.prober.lastResult.Get(p.name).Set(v)
}
// Reports the number of registered probes. For tests only.
func (p *Prober) activeProbes() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.activeProbeCh)
} }
// ticker wraps a time.Ticker in a way that can be faked for tests. // ticker wraps a time.Ticker in a way that can be faked for tests.

@ -80,10 +80,10 @@ func TestProberRun(t *testing.T) {
) )
const startingProbes = 100 const startingProbes = 100
cancels := []context.CancelFunc{} var probes []*Probe
for i := 0; i < startingProbes; i++ { for i := 0; i < startingProbes; i++ {
cancels = append(cancels, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error { probes = append(probes, p.Run(fmt.Sprintf("probe%d", i), probeInterval, func(context.Context) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
cnt++ cnt++
@ -114,7 +114,7 @@ func TestProberRun(t *testing.T) {
keep := startingProbes / 2 keep := startingProbes / 2
for i := keep; i < startingProbes; i++ { for i := keep; i < startingProbes; i++ {
cancels[i]() probes[i].Close()
} }
waitActiveProbes(t, p, keep) waitActiveProbes(t, p, keep)

Loading…
Cancel
Save