@ -66,13 +66,12 @@ type Config struct {
// that's safe to embed in a JSON string literal without further escaping.
// that's safe to embed in a JSON string literal without further escaping.
MetricsDelta func ( ) string
MetricsDelta func ( ) string
// FlushDelay is how long to wait to accumulate logs before
// FlushDelayFn, if non-nil is a func that returns how long to wait to
// uploading them.
// accumulate logs before uploading them. 0 or negative means to upload
// immediately.
//
//
// If zero, a default value is used. (currently 2 seconds)
// If nil, a default value is used. (currently 2 seconds)
//
FlushDelayFn func ( ) time . Duration
// Negative means to upload immediately.
FlushDelay time . Duration
// IncludeProcID, if true, results in an ephemeral process identifier being
// IncludeProcID, if true, results in an ephemeral process identifier being
// included in logs. The ID is random and not guaranteed to be globally
// included in logs. The ID is random and not guaranteed to be globally
@ -118,13 +117,13 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
}
}
}
}
if s := envknob . String ( "TS_DEBUG_LOGTAIL_FLUSHDELAY" ) ; s != "" {
if s := envknob . String ( "TS_DEBUG_LOGTAIL_FLUSHDELAY" ) ; s != "" {
var err error
if delay , err := time . ParseDuration ( s ) ; err == nil {
cfg . FlushDelay , err = time . ParseDuration ( s )
cfg . FlushDelayFn = func ( ) time . Duration { return delay }
if err != nil {
} else {
log . Fatalf ( "invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v" , err )
log . Fatalf ( "invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v" , err )
}
}
} else if cfg . FlushDelay == 0 && ! envknob . Bool ( "IN_TS_TEST" ) {
} else if cfg . FlushDelay Fn == nil && envknob . Bool ( "IN_TS_TEST" ) {
cfg . FlushDelay = defaultFlushDelay
cfg . FlushDelay Fn = func ( ) time . Duration { return 0 }
}
}
stdLogf := func ( f string , a ... any ) {
stdLogf := func ( f string , a ... any ) {
@ -145,7 +144,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
skipClientTime : cfg . SkipClientTime ,
skipClientTime : cfg . SkipClientTime ,
drainWake : make ( chan struct { } , 1 ) ,
drainWake : make ( chan struct { } , 1 ) ,
sentinel : make ( chan int32 , 16 ) ,
sentinel : make ( chan int32 , 16 ) ,
flushDelay : cfg . FlushDelay ,
flushDelay Fn : cfg . FlushDelay Fn ,
timeNow : cfg . TimeNow ,
timeNow : cfg . TimeNow ,
bo : backoff . NewBackoff ( "logtail" , stdLogf , 30 * time . Second ) ,
bo : backoff . NewBackoff ( "logtail" , stdLogf , 30 * time . Second ) ,
metricsDelta : cfg . MetricsDelta ,
metricsDelta : cfg . MetricsDelta ,
@ -180,7 +179,7 @@ type Logger struct {
linkMonitor * monitor . Mon
linkMonitor * monitor . Mon
buffer Buffer
buffer Buffer
drainWake chan struct { } // signal to speed up drain
drainWake chan struct { } // signal to speed up drain
flushDelay time . Duration // negative or zero to upload agressively, or >0 to batch at this delay
flushDelay Fn func ( ) time . Duration // negative or zero return value to upload ag gressively, or >0 to batch at this delay
flushPending atomic . Bool
flushPending atomic . Bool
sentinel chan int32
sentinel chan int32
timeNow func ( ) time . Time
timeNow func ( ) time . Time
@ -500,12 +499,16 @@ func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
n , err := l . buffer . Write ( jsonBlob )
n , err := l . buffer . Write ( jsonBlob )
if l . flushDelay > 0 {
flushDelay := defaultFlushDelay
if l . flushDelayFn != nil {
flushDelay = l . flushDelayFn ( )
}
if flushDelay > 0 {
if l . flushPending . CompareAndSwap ( false , true ) {
if l . flushPending . CompareAndSwap ( false , true ) {
if l . flushTimer == nil {
if l . flushTimer == nil {
l . flushTimer = time . AfterFunc ( l . flushDelay , l . tryDrainWake )
l . flushTimer = time . AfterFunc ( flushDelay, l . tryDrainWake )
} else {
} else {
l . flushTimer . Reset ( l. flushDelay)
l . flushTimer . Reset ( flushDelay)
}
}
}
}
} else {
} else {