diff --git a/logtail/logtail.go b/logtail/logtail.go index 6cb910ada..04cc0bf38 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -13,6 +13,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "os" "strconv" @@ -21,6 +22,7 @@ import ( "sync/atomic" "time" + "tailscale.com/envknob" "tailscale.com/logtail/backoff" "tailscale.com/net/interfaces" tslogger "tailscale.com/types/logger" @@ -62,9 +64,9 @@ type Config struct { // that's safe to embed in a JSON string literal without further escaping. MetricsDelta func() string - // DrainLogs, if non-nil, disables automatic uploading of new logs, - // so that logs are only uploaded when a token is sent to DrainLogs. - DrainLogs <-chan struct{} + // FlushDelay, if non-zero, is how long to wait to accumulate logs before + // uploading them. + FlushDelay time.Duration // IncludeProcID, if true, results in an ephemeral process identifier being // included in logs. The ID is random and not guaranteed to be globally @@ -109,6 +111,13 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { procID = 7 } } + if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" { + var err error + cfg.FlushDelay, err = time.ParseDuration(s) + if err != nil { + log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err) + } + } stdLogf := func(f string, a ...any) { fmt.Fprintf(cfg.Stderr, strings.TrimSuffix(f, "\n")+"\n", a...) @@ -126,9 +135,9 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { lowMem: cfg.LowMemory, buffer: cfg.Buffer, skipClientTime: cfg.SkipClientTime, - sent: make(chan struct{}, 1), + drainWake: make(chan struct{}, 1), sentinel: make(chan int32, 16), - drainLogs: cfg.DrainLogs, + flushDelay: cfg.FlushDelay, timeNow: cfg.TimeNow, bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second), metricsDelta: cfg.MetricsDelta, @@ -162,8 +171,9 @@ type Logger struct { skipClientTime bool linkMonitor *monitor.Mon buffer Buffer - sent chan struct{} // signal to speed up drain - drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain + drainWake chan struct{} // signal to speed up drain + flushDelay time.Duration // 0 to upload agressively, or >0 to batch at this delay + flushPending atomic.Bool sentinel chan int32 timeNow func() time.Time bo *backoff.Backoff @@ -172,12 +182,14 @@ type Logger struct { explainedRaw bool metricsDelta func() string // or nil privateID PrivateID + httpDoCalls atomic.Int32 procID uint32 includeProcSequence bool - writeLock sync.Mutex // guards increments of procSequence + writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls procSequence uint64 + flushTimer *time.Timer // used when flushDelay non-zero shutdownStart chan struct{} // closed when shutdown begins shutdownDone chan struct{} // closed when shutdown complete @@ -241,24 +253,16 @@ func (l *Logger) Close() { // drainBlock is called by drainPending when there are no logs to drain. // -// In typical operation, every call to the Write method unblocks and triggers -// a buffer.TryReadline, so logs are written with very low latency. +// In typical operation, every call to the Write method unblocks and triggers a +// buffer.TryReadline, so logs are written with very low latency. // -// If the caller provides a DrainLogs channel, then unblock-drain-on-Write -// is disabled, and it is up to the caller to trigger unblock the drain. +// If the caller specified FlushInterface, drainWake is only sent to +// periodically. func (l *Logger) drainBlock() (shuttingDown bool) { - if l.drainLogs == nil { - select { - case <-l.shutdownStart: - return true - case <-l.sent: - } - } else { - select { - case <-l.shutdownStart: - return true - case <-l.drainLogs: - } + select { + case <-l.shutdownStart: + return true + case <-l.drainWake: } return false } @@ -426,6 +430,7 @@ func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded compressedNote = "compressed" } + l.httpDoCalls.Add(1) resp, err := l.httpc.Do(req) if err != nil { return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err) @@ -463,16 +468,40 @@ func Disable() { logtailDisabled.Store(true) } +var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES") + +// tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup. +// It does not block. +func (l *Logger) tryDrainWake() { + l.flushPending.Store(false) + if debugWakesAndUploads() { + // Using println instead of log.Printf here to avoid recursing back into + // ourselves. + println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load()) + } + select { + case l.drainWake <- struct{}{}: + default: + } +} + func (l *Logger) sendLocked(jsonBlob []byte) (int, error) { if logtailDisabled.Load() { return len(jsonBlob), nil } + n, err := l.buffer.Write(jsonBlob) - if l.drainLogs == nil { - select { - case l.sent <- struct{}{}: - default: + + if l.flushDelay > 0 { + if l.flushPending.CompareAndSwap(false, true) { + if l.flushTimer == nil { + l.flushTimer = time.AfterFunc(l.flushDelay, l.tryDrainWake) + } else { + l.flushTimer.Reset(l.flushDelay) + } } + } else { + l.tryDrainWake() } return n, err }