types/logger: simplify mutex locking in rate-limited logger

Updates #365

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
reviewable/pr373/r1
Brad Fitzpatrick 4 years ago committed by Brad Fitzpatrick
parent 874be6566d
commit 8eda667aa1

@ -19,7 +19,7 @@ import (
// Logf is the basic Tailscale logger type: a printf-like func.
// Like log.Printf, the format need not end in a newline.
// Logf functions should be safe for concurrent use.
// Logf functions must be safe for concurrent use.
type Logf func(format string, args ...interface{})
// WithPrefix wraps f, prefixing each format with the provided prefix.
@ -57,46 +57,57 @@ type limitData struct {
ele *list.Element // list element used to access this string in the cache
}
// RateLimitedFn implements rate limiting by fstring on a given Logf.
// RateLimitedFn returns a rate-limiting Logf wrapping the given logf.
// Messages are allowed through at a maximum of f messages/second, in
// bursts of up to b messages at a time. Up to m strings will be held at a time.
func RateLimitedFn(logf Logf, f float64, b int, m int) Logf {
// bursts of up to burst messages at a time. Up to maxCache strings will be held at a time.
func RateLimitedFn(logf Logf, f float64, burst int, maxCache int) Logf {
r := rate.Limit(f)
msgLim := make(map[string]*limitData)
msgCache := list.New() // a rudimentary LRU that limits the size of the map
mu := &sync.Mutex{}
var (
mu sync.Mutex
msgLim = make(map[string]*limitData) // keyed by logf format
msgCache = list.New() // a rudimentary LRU that limits the size of the map
)
return func(format string, args ...interface{}) {
type verdict int
const (
allow verdict = iota
warn
block
)
judge := func(format string) verdict {
mu.Lock()
defer mu.Unlock()
rl, ok := msgLim[format]
if ok {
msgCache.MoveToFront(rl.ele)
if rl.lim.Allow() {
rl.msgBlocked = false
mu.Unlock()
logf(format, args...)
} else {
if !rl.msgBlocked {
rl.msgBlocked = true
mu.Unlock()
logf("Repeated messages were suppressed by rate limiting. Original message: %s",
fmt.Sprintf(format, args...))
} else {
mu.Unlock()
}
}
} else {
msgLim[format] = &limitData{rate.NewLimiter(r, b), false, msgCache.PushFront(format)}
msgLim[format].lim.Allow()
mu.Unlock()
logf(format, args...)
rl = &limitData{lim: rate.NewLimiter(r, burst), ele: msgCache.PushFront(format)}
msgLim[format] = rl
if msgCache.Len() > maxCache {
delete(msgLim, msgCache.Back().Value.(string))
msgCache.Remove(msgCache.Back())
}
}
if rl.lim.Allow() {
rl.msgBlocked = false
return allow
}
if !rl.msgBlocked {
rl.msgBlocked = true
return warn
}
return block
}
return func(format string, args ...interface{}) {
switch judge(format) {
case allow:
logf(format, args...)
case warn:
logf("Repeated messages were suppressed by rate limiting. Original message: %s",
fmt.Sprintf(format, args...))
mu.Lock()
if msgCache.Len() > m {
delete(msgLim, msgCache.Back().Value.(string))
msgCache.Remove(msgCache.Back())
}
mu.Unlock()
}
}

Loading…
Cancel
Save