From 9da4181606f7e1ba13012d24c00d408ac0326fed Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Wed, 21 Jul 2021 17:23:38 -0700 Subject: [PATCH] tstime/rate: new package This is a simplified rate limiter geared for exactly our needs: A fast, mono.Time-based rate limiter for use in tstun. It was generated by stripping down the x/time/rate rate limiter to just our needs and switching it to use mono.Time. It removes one time.Now call per packet. Signed-off-by: Josh Bleecher Snyder --- cmd/tailscale/depaware.txt | 1 + cmd/tailscaled/depaware.txt | 1 + tstime/rate/rate.go | 89 ++++++++++++ tstime/rate/rate_test.go | 246 +++++++++++++++++++++++++++++++++ wgengine/filter/filter.go | 2 +- wgengine/filter/filter_test.go | 2 +- 6 files changed, 339 insertions(+), 2 deletions(-) create mode 100644 tstime/rate/rate.go create mode 100644 tstime/rate/rate_test.go diff --git a/cmd/tailscale/depaware.txt b/cmd/tailscale/depaware.txt index c0befd81f..37989e550 100644 --- a/cmd/tailscale/depaware.txt +++ b/cmd/tailscale/depaware.txt @@ -50,6 +50,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep tailscale.com/tailcfg from tailscale.com/cmd/tailscale/cli+ W tailscale.com/tsconst from tailscale.com/net/interfaces 💣 tailscale.com/tstime/mono from tailscale.com/cmd/tailscale/cli+ + tailscale.com/tstime/rate from tailscale.com/wgengine/filter tailscale.com/types/empty from tailscale.com/ipn tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ tailscale.com/types/key from tailscale.com/derp+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 3460d2430..627eaadb4 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -131,6 +131,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de W tailscale.com/tsconst from tailscale.com/net/interfaces tailscale.com/tstime from tailscale.com/wgengine/magicsock 💣 tailscale.com/tstime/mono from tailscale.com/net/tstun+ + tailscale.com/tstime/rate from tailscale.com/wgengine/filter tailscale.com/types/empty from tailscale.com/control/controlclient+ tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled tailscale.com/types/ipproto from tailscale.com/net/flowtrack+ diff --git a/tstime/rate/rate.go b/tstime/rate/rate.go new file mode 100644 index 000000000..f06562838 --- /dev/null +++ b/tstime/rate/rate.go @@ -0,0 +1,89 @@ +// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This is a modified, simplified version of code from golang.org/x/time/rate. + +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "sync" + "time" + + "tailscale.com/tstime/mono" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit is invalid. +type Limit float64 + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + panic("invalid interval") + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// Use NewLimiter to create non-zero Limiters. +type Limiter struct { + limit Limit + burst float64 + mu sync.Mutex // protects following fields + tokens float64 // number of tokens currently in bucket + last mono.Time // the last time the limiter's tokens field was updated +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + if b < 1 { + panic("bad burst, must be at least 1") + } + return &Limiter{limit: r, burst: float64(b)} +} + +// AllowN reports whether an event may happen now. +func (lim *Limiter) Allow() bool { + return lim.allow(mono.Now()) +} + +func (lim *Limiter) allow(now mono.Time) bool { + lim.mu.Lock() + defer lim.mu.Unlock() + + // If time has moved backwards, look around awkwardly and pretend nothing happened. + if now.Before(lim.last) { + lim.last = now + } + + // Calculate the new number of tokens available due to the passage of time. + elapsed := now.Sub(lim.last) + tokens := lim.tokens + float64(lim.limit)*elapsed.Seconds() + if tokens > lim.burst { + tokens = lim.burst + } + + // Consume a token. + tokens-- + + // Update state. + ok := tokens >= 0 + if ok { + lim.last = now + lim.tokens = tokens + } + return ok +} diff --git a/tstime/rate/rate_test.go b/tstime/rate/rate_test.go new file mode 100644 index 000000000..0a8a55f50 --- /dev/null +++ b/tstime/rate/rate_test.go @@ -0,0 +1,246 @@ +// Copyright (c) 2021 Tailscale Inc & AUTHORS All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This is a modified, simplified version of code from golang.org/x/time/rate. + +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.7 +// +build go1.7 + +package rate + +import ( + "context" + "math" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + "tailscale.com/tstime/mono" +) + +func closeEnough(a, b Limit) bool { + return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9 +} + +func TestEvery(t *testing.T) { + cases := []struct { + interval time.Duration + lim Limit + }{ + {1 * time.Nanosecond, Limit(1e9)}, + {1 * time.Microsecond, Limit(1e6)}, + {1 * time.Millisecond, Limit(1e3)}, + {10 * time.Millisecond, Limit(100)}, + {100 * time.Millisecond, Limit(10)}, + {1 * time.Second, Limit(1)}, + {2 * time.Second, Limit(0.5)}, + {time.Duration(2.5 * float64(time.Second)), Limit(0.4)}, + {4 * time.Second, Limit(0.25)}, + {10 * time.Second, Limit(0.1)}, + {time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))}, + } + for _, tc := range cases { + lim := Every(tc.interval) + if !closeEnough(lim, tc.lim) { + t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim) + } + } +} + +const ( + d = 100 * time.Millisecond +) + +var ( + t0 = mono.Now() + t1 = t0.Add(time.Duration(1) * d) + t2 = t0.Add(time.Duration(2) * d) + t3 = t0.Add(time.Duration(3) * d) + t4 = t0.Add(time.Duration(4) * d) + t5 = t0.Add(time.Duration(5) * d) + t9 = t0.Add(time.Duration(9) * d) +) + +type allow struct { + t mono.Time + ok bool +} + +func run(t *testing.T, lim *Limiter, allows []allow) { + t.Helper() + for i, allow := range allows { + ok := lim.allow(allow.t) + if ok != allow.ok { + t.Errorf("step %d: lim.AllowN(%v) = %v want %v", + i, allow.t, ok, allow.ok) + } + } +} + +func TestLimiterBurst1(t *testing.T) { + run(t, NewLimiter(10, 1), []allow{ + {t0, true}, + {t0, false}, + {t0, false}, + {t1, true}, + {t1, false}, + {t1, false}, + {t2, true}, + {t2, false}, + }) +} + +func TestLimiterJumpBackwards(t *testing.T) { + run(t, NewLimiter(10, 3), []allow{ + {t1, true}, // start at t1 + {t0, true}, // jump back to t0, two tokens remain + {t0, true}, + {t0, false}, + {t0, false}, + {t1, true}, // got a token + {t1, false}, + {t1, false}, + {t2, true}, // got another token + {t2, false}, + {t2, false}, + }) +} + +// Ensure that tokensFromDuration doesn't produce +// rounding errors by truncating nanoseconds. +// See golang.org/issues/34861. +func TestLimiter_noTruncationErrors(t *testing.T) { + if !NewLimiter(0.7692307692307693, 1).Allow() { + t.Fatal("expected true") + } +} + +func TestSimultaneousRequests(t *testing.T) { + const ( + limit = 1 + burst = 5 + numRequests = 15 + ) + var ( + wg sync.WaitGroup + numOK = uint32(0) + ) + + // Very slow replenishing bucket. + lim := NewLimiter(limit, burst) + + // Tries to take a token, atomically updates the counter and decreases the wait + // group counter. + f := func() { + defer wg.Done() + if ok := lim.Allow(); ok { + atomic.AddUint32(&numOK, 1) + } + } + + wg.Add(numRequests) + for i := 0; i < numRequests; i++ { + go f() + } + wg.Wait() + if numOK != burst { + t.Errorf("numOK = %d, want %d", numOK, burst) + } +} + +func TestLongRunningQPS(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + if runtime.GOOS == "openbsd" { + t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)") + return + } + + // The test runs for a few seconds executing many requests and then checks + // that overall number of requests is reasonable. + const ( + limit = 100 + burst = 100 + ) + var numOK = int32(0) + + lim := NewLimiter(limit, burst) + + var wg sync.WaitGroup + f := func() { + if ok := lim.Allow(); ok { + atomic.AddInt32(&numOK, 1) + } + wg.Done() + } + + start := time.Now() + end := start.Add(5 * time.Second) + for time.Now().Before(end) { + wg.Add(1) + go f() + + // This will still offer ~500 requests per second, but won't consume + // outrageous amount of CPU. + time.Sleep(2 * time.Millisecond) + } + wg.Wait() + elapsed := time.Since(start) + ideal := burst + (limit * float64(elapsed) / float64(time.Second)) + + // We should never get more requests than allowed. + if want := int32(ideal + 1); numOK > want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } + // We should get very close to the number of requests allowed. + if want := int32(0.999 * ideal); numOK < want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } +} + +type request struct { + t time.Time + n int + act time.Time + ok bool +} + +// dFromDuration converts a duration to a multiple of the global constant d +func dFromDuration(dur time.Duration) int { + // Adding a millisecond to be swallowed by the integer division + // because we don't care about small inaccuracies + return int((dur + time.Millisecond) / d) +} + +// dSince returns multiples of d since t0 +func dSince(t mono.Time) int { + return dFromDuration(t.Sub(t0)) +} + +type wait struct { + name string + ctx context.Context + n int + delay int // in multiples of d + nilErr bool +} + +func BenchmarkAllowN(b *testing.B) { + lim := NewLimiter(Every(1*time.Second), 1) + now := mono.Now() + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + lim.allow(now) + } + }) +} diff --git a/wgengine/filter/filter.go b/wgengine/filter/filter.go index e502502c3..5e7ea3ed2 100644 --- a/wgengine/filter/filter.go +++ b/wgengine/filter/filter.go @@ -11,10 +11,10 @@ import ( "sync" "time" - "golang.org/x/time/rate" "inet.af/netaddr" "tailscale.com/net/flowtrack" "tailscale.com/net/packet" + "tailscale.com/tstime/rate" "tailscale.com/types/ipproto" "tailscale.com/types/logger" ) diff --git a/wgengine/filter/filter_test.go b/wgengine/filter/filter_test.go index aef95334d..e2a077d25 100644 --- a/wgengine/filter/filter_test.go +++ b/wgengine/filter/filter_test.go @@ -13,11 +13,11 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "golang.org/x/time/rate" "inet.af/netaddr" "tailscale.com/net/packet" "tailscale.com/net/tsaddr" "tailscale.com/tailcfg" + "tailscale.com/tstime/rate" "tailscale.com/types/ipproto" "tailscale.com/types/logger" )