From 0c69b4e00d74399ca954c8210b211b09fd5b9b2d Mon Sep 17 00:00:00 2001 From: Wendi Yu <45118370+wendi-yu@users.noreply.github.com> Date: Fri, 8 May 2020 13:21:36 -0600 Subject: [PATCH] Implement rate limiting on log messages (#356) Implement rate limiting on log messages Addresses issue #317, where logs can get spammed with the same message nonstop. Created a rate limiting closure on logging functions, which limits the number of messages being logged per second based on format string. To keep memory usage as constant as possible, the previous cache purging at periodic time intervals has been replaced by an LRU that discards the oldest string when the capacity of the cache is reached. Signed-off-by: Wendi Yu --- cmd/tailscaled/tailscaled.go | 2 ++ ipn/e2e_test.go | 11 +++---- ipn/ipnserver/server_test.go | 6 +++- types/logger/logger.go | 59 ++++++++++++++++++++++++++++++++++++ types/logger/logger_test.go | 41 +++++++++++++++++++++++++ 5 files changed, 112 insertions(+), 7 deletions(-) diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index 7cded7647..63b4d3a0d 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -21,6 +21,7 @@ import ( "tailscale.com/ipn/ipnserver" "tailscale.com/logpolicy" "tailscale.com/paths" + "tailscale.com/types/logger" "tailscale.com/wgengine" "tailscale.com/wgengine/magicsock" ) @@ -51,6 +52,7 @@ func main() { socketpath := getopt.StringLong("socket", 's', paths.DefaultTailscaledSocket(), "Path of the service unix socket") logf := wgengine.RusagePrefixLog(log.Printf) + logf = logger.RateLimitedFn(logf, 1, 1, 100) err := fixconsole.FixConsoleIfNeeded() if err != nil { diff --git a/ipn/e2e_test.go b/ipn/e2e_test.go index f1bdcf1e9..ac652b11e 100644 --- a/ipn/e2e_test.go +++ b/ipn/e2e_test.go @@ -23,6 +23,7 @@ import ( "tailscale.com/control/controlclient" "tailscale.com/tailcfg" "tailscale.com/tstest" + "tailscale.com/types/logger" "tailscale.com/wgengine" "tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/router" @@ -191,12 +192,10 @@ type testNode struct { // Create a new IPN node. func newNode(t *testing.T, prefix string, https *httptest.Server, weirdPrefs bool) testNode { t.Helper() - logfe := func(fmt string, args ...interface{}) { - t.Logf(prefix+".e: "+fmt, args...) - } - logf := func(fmt string, args ...interface{}) { - t.Logf(prefix+": "+fmt, args...) - } + + logfe := logger.WithPrefix(t.Logf, prefix+"e: ") + + logf := logger.WithPrefix(t.Logf, prefix+": ") var err error httpc := https.Client() diff --git a/ipn/ipnserver/server_test.go b/ipn/ipnserver/server_test.go index 549b617c3..582f53a53 100644 --- a/ipn/ipnserver/server_test.go +++ b/ipn/ipnserver/server_test.go @@ -13,6 +13,8 @@ import ( "strings" "testing" + "tailscale.com/types/logger" + "tailscale.com/ipn" "tailscale.com/ipn/ipnserver" "tailscale.com/safesocket" @@ -32,12 +34,14 @@ func TestRunMultipleAccepts(t *testing.T) { defer os.RemoveAll(td) socketPath := filepath.Join(td, "tailscale.sock") - logf := func(format string, args ...interface{}) { + ulogf := func(format string, args ...interface{}) { format = strings.TrimRight(format, "\n") println(fmt.Sprintf(format, args...)) t.Logf(format, args...) } + logf := logger.RateLimitedFn(ulogf, 1, 1, 100) + connect := func() { for i := 1; i <= 2; i++ { logf("connect %d ...", i) diff --git a/types/logger/logger.go b/types/logger/logger.go index 51b69fcf8..c60282ddb 100644 --- a/types/logger/logger.go +++ b/types/logger/logger.go @@ -8,12 +8,18 @@ package logger import ( + "container/list" + "fmt" "io" "log" + "sync" + + "golang.org/x/time/rate" ) // Logf is the basic Tailscale logger type: a printf-like func. // Like log.Printf, the format need not end in a newline. +// Logf functions should be safe for concurrent use. type Logf func(format string, args ...interface{}) // WithPrefix wraps f, prefixing each format with the provided prefix. @@ -42,3 +48,56 @@ func (w funcWriter) Write(p []byte) (int, error) { // Discard is a Logf that throws away the logs given to it. func Discard(string, ...interface{}) {} + +// limitData is used to keep track of each format string's associated +// rate-limiting data. +type limitData struct { + lim *rate.Limiter // the token bucket associated with this string + msgBlocked bool // whether a "duplicate error" message has already been logged + ele *list.Element // list element used to access this string in the cache +} + +// RateLimitedFn implements rate limiting by fstring on a given Logf. +// Messages are allowed through at a maximum of f messages/second, in +// bursts of up to b messages at a time. Up to m strings will be held at a time. +func RateLimitedFn(logf Logf, f float64, b int, m int) Logf { + r := rate.Limit(f) + msgLim := make(map[string]*limitData) + msgCache := list.New() // a rudimentary LRU that limits the size of the map + mu := &sync.Mutex{} + + return func(format string, args ...interface{}) { + mu.Lock() + rl, ok := msgLim[format] + if ok { + msgCache.MoveToFront(rl.ele) + if rl.lim.Allow() { + mu.Lock() + rl.msgBlocked = false + mu.Unlock() + logf(format, args...) + } else { + if !rl.msgBlocked { + rl.msgBlocked = true + mu.Unlock() + logf("Repeated messages were suppressed by rate limiting. Original message: %s", + fmt.Sprintf(format, args...)) + } else { + mu.Unlock() + } + } + } else { + msgLim[format] = &limitData{rate.NewLimiter(r, b), false, msgCache.PushFront(format)} + msgLim[format].lim.Allow() + mu.Unlock() + logf(format, args...) + } + + mu.Lock() + if msgCache.Len() > m { + delete(msgLim, msgCache.Back().Value.(string)) + msgCache.Remove(msgCache.Back()) + } + mu.Unlock() + } +} diff --git a/types/logger/logger_test.go b/types/logger/logger_test.go index b6ecee403..f1a5be2cf 100644 --- a/types/logger/logger_test.go +++ b/types/logger/logger_test.go @@ -5,6 +5,7 @@ package logger import ( + "fmt" "log" "testing" ) @@ -19,3 +20,43 @@ func TestStdLogger(t *testing.T) { lg := StdLogger(t.Logf) lg.Printf("plumbed through") } + +func TestRateLimiter(t *testing.T) { + + // Testing function. args[0] should indicate what should + logTester := func(want []string) Logf { + i := 0 + return func(format string, args ...interface{}) { + got := fmt.Sprintf(format, args...) + if i >= len(want) { + t.Fatalf("Logging continued past end of expected input: %s", got) + } + if got != want[i] { + t.Fatalf("wanted: %s \n got: %s", want[i], got) + } + i++ + } + } + + want := []string{ + "boring string with constant formatting (constant)", + "templated format string no. 0", + "Repeated messages were suppressed by rate limiting. Original message: boring string with constant formatting (constant)", + "Repeated messages were suppressed by rate limiting. Original message: templated format string no. 1", + "Make sure this string makes it through the rest (that are blocked) 4", + "4 shouldn't get filtered.", + } + + lg := RateLimitedFn(logTester(want), 1, 1, 50) + var prefixed Logf + for i := 0; i < 10; i++ { + lg("boring string with constant formatting %s", "(constant)") + lg("templated format string no. %d", i) + if i == 4 { + lg("Make sure this string makes it through the rest (that are blocked) %d", i) + prefixed = WithPrefix(lg, string('0'+i)) + prefixed(" shouldn't get filtered.") + } + } + +}