diff --git a/logtail/logtail.go b/logtail/logtail.go index 57591e5e4..fbccc28ea 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -66,13 +66,12 @@ type Config struct { // that's safe to embed in a JSON string literal without further escaping. MetricsDelta func() string - // FlushDelay is how long to wait to accumulate logs before - // uploading them. + // FlushDelayFn, if non-nil is a func that returns how long to wait to + // accumulate logs before uploading them. 0 or negative means to upload + // immediately. // - // If zero, a default value is used. (currently 2 seconds) - // - // Negative means to upload immediately. - FlushDelay time.Duration + // If nil, a default value is used. (currently 2 seconds) + FlushDelayFn func() time.Duration // IncludeProcID, if true, results in an ephemeral process identifier being // included in logs. The ID is random and not guaranteed to be globally @@ -118,13 +117,13 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { } } if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" { - var err error - cfg.FlushDelay, err = time.ParseDuration(s) - if err != nil { + if delay, err := time.ParseDuration(s); err == nil { + cfg.FlushDelayFn = func() time.Duration { return delay } + } else { log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err) } - } else if cfg.FlushDelay == 0 && !envknob.Bool("IN_TS_TEST") { - cfg.FlushDelay = defaultFlushDelay + } else if cfg.FlushDelayFn == nil && envknob.Bool("IN_TS_TEST") { + cfg.FlushDelayFn = func() time.Duration { return 0 } } stdLogf := func(f string, a ...any) { @@ -145,7 +144,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { skipClientTime: cfg.SkipClientTime, drainWake: make(chan struct{}, 1), sentinel: make(chan int32, 16), - flushDelay: cfg.FlushDelay, + flushDelayFn: cfg.FlushDelayFn, timeNow: cfg.TimeNow, bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second), metricsDelta: cfg.MetricsDelta, @@ -179,8 +178,8 @@ type Logger struct { skipClientTime bool linkMonitor *monitor.Mon buffer Buffer - drainWake chan struct{} // signal to speed up drain - flushDelay time.Duration // negative or zero to upload agressively, or >0 to batch at this delay + drainWake chan struct{} // signal to speed up drain + flushDelayFn func() time.Duration // negative or zero return value to upload aggressively, or >0 to batch at this delay flushPending atomic.Bool sentinel chan int32 timeNow func() time.Time @@ -500,12 +499,16 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) { n, err := l.buffer.Write(jsonBlob) - if l.flushDelay > 0 { + flushDelay := defaultFlushDelay + if l.flushDelayFn != nil { + flushDelay = l.flushDelayFn() + } + if flushDelay > 0 { if l.flushPending.CompareAndSwap(false, true) { if l.flushTimer == nil { - l.flushTimer = time.AfterFunc(l.flushDelay, l.tryDrainWake) + l.flushTimer = time.AfterFunc(flushDelay, l.tryDrainWake) } else { - l.flushTimer.Reset(l.flushDelay) + l.flushTimer.Reset(flushDelay) } } } else {