logtail: use tstime (#8607)

Updates #8587
Signed-off-by: Claire Wang <claire@tailscale.com>
clairew/use-tstime-etc
Claire Wang 1 year ago committed by GitHub
parent bb4b35e923
commit e1bcecc393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,6 +9,7 @@ import (
"math/rand" "math/rand"
"time" "time"
"tailscale.com/tstime"
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
@ -23,9 +24,8 @@ type Backoff struct {
// logf is the function used for log messages when backing off. // logf is the function used for log messages when backing off.
logf logger.Logf logf logger.Logf
// NewTimer is the function that acts like time.NewTimer. // tstime.Clock.NewTimer is used instead time.NewTimer.
// It's for use in unit tests. Clock tstime.Clock
NewTimer func(time.Duration) *time.Timer
// LogLongerThan sets the minimum time of a single backoff interval // LogLongerThan sets the minimum time of a single backoff interval
// before we mention it in the log. // before we mention it in the log.
@ -40,7 +40,7 @@ func NewBackoff(name string, logf logger.Logf, maxBackoff time.Duration) *Backof
name: name, name: name,
logf: logf, logf: logf,
maxBackoff: maxBackoff, maxBackoff: maxBackoff,
NewTimer: time.NewTimer, Clock: tstime.StdClock{},
} }
} }
@ -72,10 +72,10 @@ func (b *Backoff) BackOff(ctx context.Context, err error) {
if d >= b.LogLongerThan { if d >= b.LogLongerThan {
b.logf("%s: [v1] backoff: %d msec", b.name, d.Milliseconds()) b.logf("%s: [v1] backoff: %d msec", b.name, d.Milliseconds())
} }
t := b.NewTimer(d) t, tChannel := b.Clock.NewTimer(d)
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Stop() t.Stop()
case <-t.C: case <-tChannel:
} }
} }

@ -49,18 +49,18 @@ type Encoder interface {
} }
type Config struct { type Config struct {
Collection string // collection name, a domain name Collection string // collection name, a domain name
PrivateID logid.PrivateID // private ID for the primary log stream PrivateID logid.PrivateID // private ID for the primary log stream
CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream CopyPrivateID logid.PrivateID // private ID for a log stream that is a superset of this log stream
BaseURL string // if empty defaults to "https://log.tailscale.io" BaseURL string // if empty defaults to "https://log.tailscale.io"
HTTPC *http.Client // if empty defaults to http.DefaultClient HTTPC *http.Client // if empty defaults to http.DefaultClient
SkipClientTime bool // if true, client_time is not written to logs SkipClientTime bool // if true, client_time is not written to logs
LowMemory bool // if true, logtail minimizes memory use LowMemory bool // if true, logtail minimizes memory use
TimeNow func() time.Time // if set, substitutes uses of time.Now Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now
Stderr io.Writer // if set, logs are sent here instead of os.Stderr Stderr io.Writer // if set, logs are sent here instead of os.Stderr
StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only
Buffer Buffer // temp storage, if nil a MemoryBuffer Buffer Buffer // temp storage, if nil a MemoryBuffer
NewZstdEncoder func() Encoder // if set, used to compress logs for transmission NewZstdEncoder func() Encoder // if set, used to compress logs for transmission
// MetricsDelta, if non-nil, is a func that returns an encoding // MetricsDelta, if non-nil, is a func that returns an encoding
// delta in clientmetrics to upload alongside existing logs. // delta in clientmetrics to upload alongside existing logs.
@ -94,8 +94,8 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
if cfg.HTTPC == nil { if cfg.HTTPC == nil {
cfg.HTTPC = http.DefaultClient cfg.HTTPC = http.DefaultClient
} }
if cfg.TimeNow == nil { if cfg.Clock == nil {
cfg.TimeNow = time.Now cfg.Clock = tstime.StdClock{}
} }
if cfg.Stderr == nil { if cfg.Stderr == nil {
cfg.Stderr = os.Stderr cfg.Stderr = os.Stderr
@ -144,7 +144,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
drainWake: make(chan struct{}, 1), drainWake: make(chan struct{}, 1),
sentinel: make(chan int32, 16), sentinel: make(chan int32, 16),
flushDelayFn: cfg.FlushDelayFn, flushDelayFn: cfg.FlushDelayFn,
timeNow: cfg.TimeNow, clock: cfg.Clock,
metricsDelta: cfg.MetricsDelta, metricsDelta: cfg.MetricsDelta,
procID: procID, procID: procID,
@ -181,7 +181,7 @@ type Logger struct {
flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay
flushPending atomic.Bool flushPending atomic.Bool
sentinel chan int32 sentinel chan int32
timeNow func() time.Time clock tstime.Clock
zstdEncoder Encoder zstdEncoder Encoder
uploadCancel func() uploadCancel func()
explainedRaw bool explainedRaw bool
@ -195,7 +195,7 @@ type Logger struct {
writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
procSequence uint64 procSequence uint64
flushTimer *time.Timer // used when flushDelay is >0 flushTimer tstime.TimerController // used when flushDelay is >0
shutdownStartMu sync.Mutex // guards the closing of shutdownStart shutdownStartMu sync.Mutex // guards the closing of shutdownStart
shutdownStart chan struct{} // closed when shutdown begins shutdownStart chan struct{} // closed when shutdown begins
@ -380,7 +380,7 @@ func (l *Logger) uploading(ctx context.Context) {
retryAfter, err := l.upload(ctx, body, origlen) retryAfter, err := l.upload(ctx, body, origlen)
if err != nil { if err != nil {
numFailures++ numFailures++
firstFailure = time.Now() firstFailure = l.clock.Now()
if !l.internetUp() { if !l.internetUp() {
fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n") fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n")
@ -403,7 +403,7 @@ func (l *Logger) uploading(ctx context.Context) {
} else { } else {
// Only print a success message after recovery. // Only print a success message after recovery.
if numFailures > 0 { if numFailures > 0 {
fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, time.Since(firstFailure).Round(time.Second)) fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, l.clock.Since(firstFailure).Round(time.Second))
} }
break break
} }
@ -545,7 +545,7 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
if flushDelay > 0 { if flushDelay > 0 {
if l.flushPending.CompareAndSwap(false, true) { if l.flushPending.CompareAndSwap(false, true) {
if l.flushTimer == nil { if l.flushTimer == nil {
l.flushTimer = time.AfterFunc(flushDelay, l.tryDrainWake) l.flushTimer = l.clock.AfterFunc(flushDelay, l.tryDrainWake)
} else { } else {
l.flushTimer.Reset(flushDelay) l.flushTimer.Reset(flushDelay)
} }
@ -559,7 +559,7 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
// TODO: instead of allocating, this should probably just append // TODO: instead of allocating, this should probably just append
// directly into the output log buffer. // directly into the output log buffer.
func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte { func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
now := l.timeNow() now := l.clock.Now()
// Factor in JSON encoding overhead to try to only do one alloc // Factor in JSON encoding overhead to try to only do one alloc
// in the make below (so appends don't resize the buffer). // in the make below (so appends don't resize the buffer).
@ -674,7 +674,7 @@ func (l *Logger) encodeLocked(buf []byte, level int) []byte {
return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
} }
now := l.timeNow() now := l.clock.Now()
obj := make(map[string]any) obj := make(map[string]any)
if err := json.Unmarshal(buf, &obj); err != nil { if err := json.Unmarshal(buf, &obj); err != nil {

@ -15,6 +15,7 @@ import (
"time" "time"
"tailscale.com/tstest" "tailscale.com/tstest"
"tailscale.com/tstime"
) )
func TestFastShutdown(t *testing.T) { func TestFastShutdown(t *testing.T) {
@ -212,7 +213,7 @@ func TestEncodeSpecialCases(t *testing.T) {
var sink []byte var sink []byte
func TestLoggerEncodeTextAllocs(t *testing.T) { func TestLoggerEncodeTextAllocs(t *testing.T) {
lg := &Logger{timeNow: time.Now} lg := &Logger{clock: tstime.StdClock{}}
inBuf := []byte("some text to encode") inBuf := []byte("some text to encode")
procID := uint32(0x24d32ee9) procID := uint32(0x24d32ee9)
procSequence := uint64(0x12346) procSequence := uint64(0x12346)
@ -226,8 +227,8 @@ func TestLoggerEncodeTextAllocs(t *testing.T) {
func TestLoggerWriteLength(t *testing.T) { func TestLoggerWriteLength(t *testing.T) {
lg := &Logger{ lg := &Logger{
timeNow: time.Now, clock: tstime.StdClock{},
buffer: NewMemoryBuffer(1024), buffer: NewMemoryBuffer(1024),
} }
inBuf := []byte("some text to encode") inBuf := []byte("some text to encode")
n, err := lg.Write(inBuf) n, err := lg.Write(inBuf)
@ -309,7 +310,7 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any {
} }
func TestEncodeTextTruncation(t *testing.T) { func TestEncodeTextTruncation(t *testing.T) {
lg := &Logger{timeNow: time.Now, lowMem: true} lg := &Logger{clock: tstime.StdClock{}, lowMem: true}
in := bytes.Repeat([]byte("a"), 5120) in := bytes.Repeat([]byte("a"), 5120)
b := lg.encodeText(in, true, 0, 0, 0) b := lg.encodeText(in, true, 0, 0, 0)
got := string(b) got := string(b)
@ -363,7 +364,7 @@ func TestEncode(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
buf := new(simpleMemBuf) buf := new(simpleMemBuf)
lg := &Logger{ lg := &Logger{
timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, clock: tstest.NewClock(tstest.ClockOpts{Start: time.Unix(123, 456).UTC()}),
buffer: buf, buffer: buf,
procID: 7, procID: 7,
procSequence: 1, procSequence: 1,

Loading…
Cancel
Save