From e1bcecc393be02982a3b650d4cbd857c4df7f5ed Mon Sep 17 00:00:00 2001 From: Claire Wang Date: Fri, 21 Jul 2023 13:10:39 -0400 Subject: [PATCH] logtail: use tstime (#8607) Updates #8587 Signed-off-by: Claire Wang --- logtail/backoff/backoff.go | 12 +++++------ logtail/logtail.go | 44 +++++++++++++++++++------------------- logtail/logtail_test.go | 11 +++++----- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/logtail/backoff/backoff.go b/logtail/backoff/backoff.go index ffec64ecd..72831f592 100644 --- a/logtail/backoff/backoff.go +++ b/logtail/backoff/backoff.go @@ -9,6 +9,7 @@ import ( "math/rand" "time" + "tailscale.com/tstime" "tailscale.com/types/logger" ) @@ -23,9 +24,8 @@ type Backoff struct { // logf is the function used for log messages when backing off. logf logger.Logf - // NewTimer is the function that acts like time.NewTimer. - // It's for use in unit tests. - NewTimer func(time.Duration) *time.Timer + // tstime.Clock.NewTimer is used instead time.NewTimer. + Clock tstime.Clock // LogLongerThan sets the minimum time of a single backoff interval // before we mention it in the log. @@ -40,7 +40,7 @@ func NewBackoff(name string, logf logger.Logf, maxBackoff time.Duration) *Backof name: name, logf: logf, maxBackoff: maxBackoff, - NewTimer: time.NewTimer, + Clock: tstime.StdClock{}, } } @@ -72,10 +72,10 @@ func (b *Backoff) BackOff(ctx context.Context, err error) { if d >= b.LogLongerThan { b.logf("%s: [v1] backoff: %d msec", b.name, d.Milliseconds()) } - t := b.NewTimer(d) + t, tChannel := b.Clock.NewTimer(d) select { case <-ctx.Done(): t.Stop() - case <-t.C: + case <-tChannel: } } diff --git a/logtail/logtail.go b/logtail/logtail.go index 0ca028145..096f96422 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -49,18 +49,18 @@ type Encoder interface { } type Config struct { - Collection string // collection name, a domain name - PrivateID logid.PrivateID // private ID for the primary log stream - CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream - BaseURL string // if empty defaults to "https://log.tailscale.io" - HTTPC *http.Client // if empty defaults to http.DefaultClient - SkipClientTime bool // if true, client_time is not written to logs - LowMemory bool // if true, logtail minimizes memory use - TimeNow func() time.Time // if set, substitutes uses of time.Now - Stderr io.Writer // if set, logs are sent here instead of os.Stderr - StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only - Buffer Buffer // temp storage, if nil a MemoryBuffer - NewZstdEncoder func() Encoder // if set, used to compress logs for transmission + Collection string // collection name, a domain name + PrivateID logid.PrivateID // private ID for the primary log stream + CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream + BaseURL string // if empty defaults to "https://log.tailscale.io" + HTTPC *http.Client // if empty defaults to http.DefaultClient + SkipClientTime bool // if true, client_time is not written to logs + LowMemory bool // if true, logtail minimizes memory use + Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now + Stderr io.Writer // if set, logs are sent here instead of os.Stderr + StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only + Buffer Buffer // temp storage, if nil a MemoryBuffer + NewZstdEncoder func() Encoder // if set, used to compress logs for transmission // MetricsDelta, if non-nil, is a func that returns an encoding // delta in clientmetrics to upload alongside existing logs. @@ -94,8 +94,8 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { if cfg.HTTPC == nil { cfg.HTTPC = http.DefaultClient } - if cfg.TimeNow == nil { - cfg.TimeNow = time.Now + if cfg.Clock == nil { + cfg.Clock = tstime.StdClock{} } if cfg.Stderr == nil { cfg.Stderr = os.Stderr @@ -144,7 +144,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { drainWake: make(chan struct{}, 1), sentinel: make(chan int32, 16), flushDelayFn: cfg.FlushDelayFn, - timeNow: cfg.TimeNow, + clock: cfg.Clock, metricsDelta: cfg.MetricsDelta, procID: procID, @@ -181,7 +181,7 @@ type Logger struct { flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay flushPending atomic.Bool sentinel chan int32 - timeNow func() time.Time + clock tstime.Clock zstdEncoder Encoder uploadCancel func() explainedRaw bool @@ -195,7 +195,7 @@ type Logger struct { writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls procSequence uint64 - flushTimer *time.Timer // used when flushDelay is >0 + flushTimer tstime.TimerController // used when flushDelay is >0 shutdownStartMu sync.Mutex // guards the closing of shutdownStart shutdownStart chan struct{} // closed when shutdown begins @@ -380,7 +380,7 @@ func (l *Logger) uploading(ctx context.Context) { retryAfter, err := l.upload(ctx, body, origlen) if err != nil { numFailures++ - firstFailure = time.Now() + firstFailure = l.clock.Now() if !l.internetUp() { fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n") @@ -403,7 +403,7 @@ func (l *Logger) uploading(ctx context.Context) { } else { // Only print a success message after recovery. if numFailures > 0 { - fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, time.Since(firstFailure).Round(time.Second)) + fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second)) } break } @@ -545,7 +545,7 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) { if flushDelay > 0 { if l.flushPending.CompareAndSwap(false, true) { if l.flushTimer == nil { - l.flushTimer = time.AfterFunc(flushDelay, l.tryDrainWake) + l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake) } else { l.flushTimer.Reset(flushDelay) } @@ -559,7 +559,7 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) { // TODO: instead of allocating, this should probably just append // directly into the output log buffer. func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte { - now := l.timeNow() + now := l.clock.Now() // Factor in JSON encoding overhead to try to only do one alloc // in the make below (so appends don't resize the buffer). @@ -674,7 +674,7 @@ func (l *Logger) encodeLocked(buf []byte, level int) []byte { return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path } - now := l.timeNow() + now := l.clock.Now() obj := make(map[string]any) if err := json.Unmarshal(buf, &obj); err != nil { diff --git a/logtail/logtail_test.go b/logtail/logtail_test.go index d531eb983..f145d7a35 100644 --- a/logtail/logtail_test.go +++ b/logtail/logtail_test.go @@ -15,6 +15,7 @@ import ( "time" "tailscale.com/tstest" + "tailscale.com/tstime" ) func TestFastShutdown(t *testing.T) { @@ -212,7 +213,7 @@ func TestEncodeSpecialCases(t *testing.T) { var sink []byte func TestLoggerEncodeTextAllocs(t *testing.T) { - lg := &Logger{timeNow: time.Now} + lg := &Logger{clock: tstime.StdClock{}} inBuf := []byte("some text to encode") procID := uint32(0x24d32ee9) procSequence := uint64(0x12346) @@ -226,8 +227,8 @@ func TestLoggerEncodeTextAllocs(t *testing.T) { func TestLoggerWriteLength(t *testing.T) { lg := &Logger{ - timeNow: time.Now, - buffer: NewMemoryBuffer(1024), + clock: tstime.StdClock{}, + buffer: NewMemoryBuffer(1024), } inBuf := []byte("some text to encode") n, err := lg.Write(inBuf) @@ -309,7 +310,7 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any { } func TestEncodeTextTruncation(t *testing.T) { - lg := &Logger{timeNow: time.Now, lowMem: true} + lg := &Logger{clock: tstime.StdClock{}, lowMem: true} in := bytes.Repeat([]byte("a"), 5120) b := lg.encodeText(in, true, 0, 0, 0) got := string(b) @@ -363,7 +364,7 @@ func TestEncode(t *testing.T) { for _, tt := range tests { buf := new(simpleMemBuf) lg := &Logger{ - timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, + clock: tstest.NewClock(tstest.ClockOpts{Start: time.Unix(123, 456).UTC()}), buffer: buf, procID: 7, procSequence: 1,