logtail: change batched upload mechanism to not use CPU when idle

The mobile implementation had a 2 minute ticker going all the time
to do a channel send. Instead, schedule it as needed based on activity.

Then we can be actually idle for long periods of time.

Updates #3363

Change-Id: I0dba4150ea7b94f74382fbd10db54a82f7ef6c29
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/5935/head
Brad Fitzpatrick 2 years ago committed by Brad Fitzpatrick
parent d05dd41bc1
commit a315336287

@ -13,6 +13,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@ -21,6 +22,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"tailscale.com/envknob"
"tailscale.com/logtail/backoff" "tailscale.com/logtail/backoff"
"tailscale.com/net/interfaces" "tailscale.com/net/interfaces"
tslogger "tailscale.com/types/logger" tslogger "tailscale.com/types/logger"
@ -62,9 +64,9 @@ type Config struct {
// that's safe to embed in a JSON string literal without further escaping. // that's safe to embed in a JSON string literal without further escaping.
MetricsDelta func() string MetricsDelta func() string
// DrainLogs, if non-nil, disables automatic uploading of new logs, // FlushDelay, if non-zero, is how long to wait to accumulate logs before
// so that logs are only uploaded when a token is sent to DrainLogs. // uploading them.
DrainLogs <-chan struct{} FlushDelay time.Duration
// IncludeProcID, if true, results in an ephemeral process identifier being // IncludeProcID, if true, results in an ephemeral process identifier being
// included in logs. The ID is random and not guaranteed to be globally // included in logs. The ID is random and not guaranteed to be globally
@ -109,6 +111,13 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
procID = 7 procID = 7
} }
} }
if s := envknob.String("TS_DEBUG_LOGTAIL_FLUSHDELAY"); s != "" {
var err error
cfg.FlushDelay, err = time.ParseDuration(s)
if err != nil {
log.Fatalf("invalid TS_DEBUG_LOGTAIL_FLUSHDELAY: %v", err)
}
}
stdLogf := func(f string, a ...any) { stdLogf := func(f string, a ...any) {
fmt.Fprintf(cfg.Stderr, strings.TrimSuffix(f, "\n")+"\n", a...) fmt.Fprintf(cfg.Stderr, strings.TrimSuffix(f, "\n")+"\n", a...)
@ -126,9 +135,9 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
lowMem: cfg.LowMemory, lowMem: cfg.LowMemory,
buffer: cfg.Buffer, buffer: cfg.Buffer,
skipClientTime: cfg.SkipClientTime, skipClientTime: cfg.SkipClientTime,
sent: make(chan struct{}, 1), drainWake: make(chan struct{}, 1),
sentinel: make(chan int32, 16), sentinel: make(chan int32, 16),
drainLogs: cfg.DrainLogs, flushDelay: cfg.FlushDelay,
timeNow: cfg.TimeNow, timeNow: cfg.TimeNow,
bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second), bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second),
metricsDelta: cfg.MetricsDelta, metricsDelta: cfg.MetricsDelta,
@ -162,8 +171,9 @@ type Logger struct {
skipClientTime bool skipClientTime bool
linkMonitor *monitor.Mon linkMonitor *monitor.Mon
buffer Buffer buffer Buffer
sent chan struct{} // signal to speed up drain drainWake chan struct{} // signal to speed up drain
drainLogs <-chan struct{} // if non-nil, external signal to attempt a drain flushDelay time.Duration // 0 to upload agressively, or >0 to batch at this delay
flushPending atomic.Bool
sentinel chan int32 sentinel chan int32
timeNow func() time.Time timeNow func() time.Time
bo *backoff.Backoff bo *backoff.Backoff
@ -172,12 +182,14 @@ type Logger struct {
explainedRaw bool explainedRaw bool
metricsDelta func() string // or nil metricsDelta func() string // or nil
privateID PrivateID privateID PrivateID
httpDoCalls atomic.Int32
procID uint32 procID uint32
includeProcSequence bool includeProcSequence bool
writeLock sync.Mutex // guards increments of procSequence writeLock sync.Mutex // guards procSequence, flushTimer, buffer.Write calls
procSequence uint64 procSequence uint64
flushTimer *time.Timer // used when flushDelay non-zero
shutdownStart chan struct{} // closed when shutdown begins shutdownStart chan struct{} // closed when shutdown begins
shutdownDone chan struct{} // closed when shutdown complete shutdownDone chan struct{} // closed when shutdown complete
@ -241,24 +253,16 @@ func (l *Logger) Close() {
// drainBlock is called by drainPending when there are no logs to drain. // drainBlock is called by drainPending when there are no logs to drain.
// //
// In typical operation, every call to the Write method unblocks and triggers // In typical operation, every call to the Write method unblocks and triggers a
// a buffer.TryReadline, so logs are written with very low latency. // buffer.TryReadline, so logs are written with very low latency.
// //
// If the caller provides a DrainLogs channel, then unblock-drain-on-Write // If the caller specified FlushInterface, drainWake is only sent to
// is disabled, and it is up to the caller to trigger unblock the drain. // periodically.
func (l *Logger) drainBlock() (shuttingDown bool) { func (l *Logger) drainBlock() (shuttingDown bool) {
if l.drainLogs == nil { select {
select { case <-l.shutdownStart:
case <-l.shutdownStart: return true
return true case <-l.drainWake:
case <-l.sent:
}
} else {
select {
case <-l.shutdownStart:
return true
case <-l.drainLogs:
}
} }
return false return false
} }
@ -426,6 +430,7 @@ func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded
compressedNote = "compressed" compressedNote = "compressed"
} }
l.httpDoCalls.Add(1)
resp, err := l.httpc.Do(req) resp, err := l.httpc.Do(req)
if err != nil { if err != nil {
return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err) return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err)
@ -463,16 +468,40 @@ func Disable() {
logtailDisabled.Store(true) logtailDisabled.Store(true)
} }
var debugWakesAndUploads = envknob.RegisterBool("TS_DEBUG_LOGTAIL_WAKES")
// tryDrainWake tries to send to lg.drainWake, to cause an uploading wakeup.
// It does not block.
func (l *Logger) tryDrainWake() {
l.flushPending.Store(false)
if debugWakesAndUploads() {
// Using println instead of log.Printf here to avoid recursing back into
// ourselves.
println("logtail: try drain wake, numHTTP:", l.httpDoCalls.Load())
}
select {
case l.drainWake <- struct{}{}:
default:
}
}
func (l *Logger) sendLocked(jsonBlob []byte) (int, error) { func (l *Logger) sendLocked(jsonBlob []byte) (int, error) {
if logtailDisabled.Load() { if logtailDisabled.Load() {
return len(jsonBlob), nil return len(jsonBlob), nil
} }
n, err := l.buffer.Write(jsonBlob) n, err := l.buffer.Write(jsonBlob)
if l.drainLogs == nil {
select { if l.flushDelay > 0 {
case l.sent <- struct{}{}: if l.flushPending.CompareAndSwap(false, true) {
default: if l.flushTimer == nil {
l.flushTimer = time.AfterFunc(l.flushDelay, l.tryDrainWake)
} else {
l.flushTimer.Reset(l.flushDelay)
}
} }
} else {
l.tryDrainWake()
} }
return n, err return n, err
} }

Loading…
Cancel
Save