types/logger: rate limited: more hysteresis, better messages.

- Switch to our own simpler token bucket, since x/time/rate is missing
  necessary stuff (can't provide your own time func; can't check the
  current bucket contents) and it's overkill anyway.

- Add tests that actually include advancing time.

- Don't remove the rate limit on a message until there's enough room to
  print at least two more of them. When we do, we'll also print how
  many we dropped, as a contextual reminder that some were previously
  lost. (This is more like how the Linux kernel does it.)

- Reformat the [RATE LIMITED] messages to be shorter, and to not
  corrupt original message. Instead, we print the message, then print
  its format string.

- Use %q instead of \"%s\", for more accurate parsing later, if the
  format string contained quotes.

Fixes #1772

Signed-off-by: Avery Pennarun <apenwarr@tailscale.com>
apenwarr/statefix
Avery Pennarun 3 years ago
parent 20e04418ff
commit 19c3e6cc9e

@ -85,7 +85,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
golang.org/x/text/transform from golang.org/x/text/secure/bidirule+ golang.org/x/text/transform from golang.org/x/text/secure/bidirule+
golang.org/x/text/unicode/bidi from golang.org/x/net/idna+ golang.org/x/text/unicode/bidi from golang.org/x/net/idna+
golang.org/x/text/unicode/norm from golang.org/x/net/idna golang.org/x/text/unicode/norm from golang.org/x/net/idna
golang.org/x/time/rate from tailscale.com/types/logger+ golang.org/x/time/rate from tailscale.com/cmd/tailscale/cli+
bufio from compress/flate+ bufio from compress/flate+
bytes from bufio+ bytes from bufio+
compress/flate from compress/gzip+ compress/flate from compress/gzip+

@ -188,7 +188,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
golang.org/x/text/transform from golang.org/x/text/secure/bidirule+ golang.org/x/text/transform from golang.org/x/text/secure/bidirule+
golang.org/x/text/unicode/bidi from golang.org/x/net/idna+ golang.org/x/text/unicode/bidi from golang.org/x/net/idna+
golang.org/x/text/unicode/norm from golang.org/x/net/idna golang.org/x/text/unicode/norm from golang.org/x/net/idna
golang.org/x/time/rate from tailscale.com/types/logger+ golang.org/x/time/rate from inet.af/netstack/tcpip/stack+
bufio from compress/flate+ bufio from compress/flate+
bytes from bufio+ bytes from bufio+
compress/flate from compress/gzip+ compress/flate from compress/gzip+

@ -18,8 +18,6 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"golang.org/x/time/rate"
) )
// Logf is the basic Tailscale logger type: a printf-like func. // Logf is the basic Tailscale logger type: a printf-like func.
@ -57,9 +55,9 @@ func Discard(string, ...interface{}) {}
// limitData is used to keep track of each format string's associated // limitData is used to keep track of each format string's associated
// rate-limiting data. // rate-limiting data.
type limitData struct { type limitData struct {
lim *rate.Limiter // the token bucket associated with this string bucket *tokenBucket // the token bucket associated with this string
msgBlocked bool // whether a "duplicate error" message has already been logged nBlocked int // number of messages skipped
ele *list.Element // list element used to access this string in the cache ele *list.Element // list element used to access this string in the cache
} }
var disableRateLimit = os.Getenv("TS_DEBUG_LOG_RATE") == "all" var disableRateLimit = os.Getenv("TS_DEBUG_LOG_RATE") == "all"
@ -71,31 +69,34 @@ var rateFree = []string{
"magicsock: CreateEndpoint:", "magicsock: CreateEndpoint:",
} }
// RateLimitedFn returns a rate-limiting Logf wrapping the given logf. // RateLimitedFn is a wrapper for RateLimitedFnWithClock that includes the
// Messages are allowed through at a maximum of one message every f (where f is a time.Duration), in // current time automatically. This is mainly for backward compatibility.
// bursts of up to burst messages at a time. Up to maxCache strings will be held at a time.
func RateLimitedFn(logf Logf, f time.Duration, burst int, maxCache int) Logf { func RateLimitedFn(logf Logf, f time.Duration, burst int, maxCache int) Logf {
return RateLimitedFnWithClock(logf, f, burst, maxCache, time.Now)
}
// RateLimitedFnWithClock returns a rate-limiting Logf wrapping the given
// logf. Messages are allowed through at a maximum of one message every f
// (where f is a time.Duration), in bursts of up to burst messages at a
// time. Up to maxCache format strings will be tracked separately.
// timeNow is a function that returns the current time, used for calculating
// rate limits.
func RateLimitedFnWithClock(logf Logf, f time.Duration, burst int, maxCache int, timeNow func() time.Time) Logf {
if disableRateLimit { if disableRateLimit {
return logf return logf
} }
r := rate.Every(f)
var ( var (
mu sync.Mutex mu sync.Mutex
msgLim = make(map[string]*limitData) // keyed by logf format msgLim = make(map[string]*limitData) // keyed by logf format
msgCache = list.New() // a rudimentary LRU that limits the size of the map msgCache = list.New() // a rudimentary LRU that limits the size of the map
) )
type verdict int return func(format string, args ...interface{}) {
const ( // Shortcut for formats with no rate limit
allow verdict = iota
warn
block
)
judge := func(format string) verdict {
for _, sub := range rateFree { for _, sub := range rateFree {
if strings.Contains(format, sub) { if strings.Contains(format, sub) {
return allow logf(format, args...)
return
} }
} }
@ -106,8 +107,8 @@ func RateLimitedFn(logf Logf, f time.Duration, burst int, maxCache int) Logf {
msgCache.MoveToFront(rl.ele) msgCache.MoveToFront(rl.ele)
} else { } else {
rl = &limitData{ rl = &limitData{
lim: rate.NewLimiter(r, burst), bucket: newTokenBucket(f, burst, timeNow()),
ele: msgCache.PushFront(format), ele: msgCache.PushFront(format),
} }
msgLim[format] = rl msgLim[format] = rl
if msgCache.Len() > maxCache { if msgCache.Len() > maxCache {
@ -115,24 +116,39 @@ func RateLimitedFn(logf Logf, f time.Duration, burst int, maxCache int) Logf {
msgCache.Remove(msgCache.Back()) msgCache.Remove(msgCache.Back())
} }
} }
if rl.lim.Allow() {
rl.msgBlocked = false
return allow
}
if !rl.msgBlocked {
rl.msgBlocked = true
return warn
}
return block
}
return func(format string, args ...interface{}) { rl.bucket.AdvanceTo(timeNow())
switch judge(format) {
case allow: // Make sure there's enough room for at least a few
// more logs before we unblock, so we don't alternate
// between blocking and unblocking.
if rl.nBlocked > 0 && rl.bucket.remaining >= 2 {
// Only print this if we dropped more than 1
// message. Otherwise we'd *increase* the total
// number of log lines printed.
if rl.nBlocked > 1 {
logf("[RATELIMIT] format(%q) (%d dropped)",
format, rl.nBlocked-1)
}
rl.nBlocked = 0
}
if rl.nBlocked == 0 && rl.bucket.Get() {
logf(format, args...) logf(format, args...)
case warn: if rl.bucket.remaining == 0 {
// For the warning, log the specific format string // Enter "blocked" mode immediately after
logf("[RATE LIMITED] format string \"%s\" (example: \"%s\")", format, strings.TrimSpace(fmt.Sprintf(format, args...))) // reaching the burst limit. We want to
// always accompany the format() message
// with an example of the format, which is
// effectively the same as printing the
// message anyway. But this way they can
// be on two separate lines and we don't
// corrupt the original message.
logf("[RATELIMIT] format(%q)", format)
rl.nBlocked = 1
}
return
} else {
rl.nBlocked++
} }
} }
} }

@ -44,16 +44,27 @@ func TestRateLimiter(t *testing.T) {
"boring string with constant formatting (constant)", "boring string with constant formatting (constant)",
"templated format string no. 0", "templated format string no. 0",
"boring string with constant formatting (constant)", "boring string with constant formatting (constant)",
"[RATELIMIT] format(\"boring string with constant formatting %s\")",
"templated format string no. 1", "templated format string no. 1",
"[RATE LIMITED] format string \"boring string with constant formatting %s\" (example: \"boring string with constant formatting (constant)\")", "[RATELIMIT] format(\"templated format string no. %d\")",
"[RATE LIMITED] format string \"templated format string no. %d\" (example: \"templated format string no. 2\")",
"Make sure this string makes it through the rest (that are blocked) 4", "Make sure this string makes it through the rest (that are blocked) 4",
"4 shouldn't get filtered.", "4 shouldn't get filtered.",
"hello 1",
"hello 2",
"[RATELIMIT] format(\"hello %v\")",
"[RATELIMIT] format(\"hello %v\") (2 dropped)",
"hello 5",
"hello 6",
"[RATELIMIT] format(\"hello %v\")",
"hello 7",
} }
var now time.Time
nowf := func() time.Time { return now }
testsRun := 0 testsRun := 0
lgtest := logTester(want, t, &testsRun) lgtest := logTester(want, t, &testsRun)
lg := RateLimitedFn(lgtest, 1*time.Minute, 2, 50) lg := RateLimitedFnWithClock(lgtest, 1*time.Minute, 2, 50, nowf)
var prefixed Logf var prefixed Logf
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
lg("boring string with constant formatting %s", "(constant)") lg("boring string with constant formatting %s", "(constant)")
@ -64,6 +75,19 @@ func TestRateLimiter(t *testing.T) {
prefixed(" shouldn't get filtered.") prefixed(" shouldn't get filtered.")
} }
} }
lg("hello %v", 1)
lg("hello %v", 2) // printed, but rate limit starts
lg("hello %v", 3) // rate limited (not printed)
now = now.Add(1 * time.Minute)
lg("hello %v", 4) // still limited (not printed)
now = now.Add(1 * time.Minute)
lg("hello %v", 5) // restriction lifted; prints drop count + message
lg("hello %v", 6) // printed, but rate limit starts
now = now.Add(2 * time.Minute)
lg("hello %v", 7) // restriction lifted; no drop count needed
if testsRun < len(want) { if testsRun < len(want) {
t.Fatalf("Tests after %s weren't logged.", want[testsRun]) t.Fatalf("Tests after %s weren't logged.", want[testsRun])
} }

@ -0,0 +1,63 @@
// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package logger
import (
"time"
)
// tokenBucket is a simple token bucket style rate limiter.
// It's similar in function to golang.org/x/time/rate.Limiter, which we
// can't use because:
// - It doesn't give access to the number of accumulated tokens, which we
// need for implementing hysteresis;
// - It doesn't let us provide our own time function, which we need for
// implementing proper unit tests.
// rate.Limiter is also much more complex than necessary, but that wouldn't
// be enough to disqualify it on its own.
//
// Unlike rate.Limiter, this token bucket does not attempt to
// do any locking of its own. Don't try to access it re-entrantly.
// That's fine inside this types/logger package because we already have
// locking at a higher level.
type tokenBucket struct {
remaining int
max int
tick time.Duration
t time.Time
}
func newTokenBucket(tick time.Duration, max int, now time.Time) *tokenBucket {
return &tokenBucket{max, max, tick, now}
}
func (tb *tokenBucket) Get() bool {
if tb.remaining > 0 {
tb.remaining--
return true
}
return false
}
func (tb *tokenBucket) Refund(n int) {
b := tb.remaining + n
if b > tb.max {
tb.remaining = tb.max
} else {
tb.remaining = b
}
}
func (tb *tokenBucket) AdvanceTo(t time.Time) {
diff := t.Sub(tb.t)
// only use up whole ticks. The remainder will be used up
// next time.
ticks := int(diff / tb.tick)
tb.t = tb.t.Add(time.Duration(ticks) * tb.tick)
tb.Refund(ticks)
}
Loading…
Cancel
Save