diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go index 65f62a304..26d7b4db4 100644 --- a/util/latencyqueue/latencyqueue.go +++ b/util/latencyqueue/latencyqueue.go @@ -11,6 +11,8 @@ import ( "sync" "sync/atomic" "time" + + "tailscale.com/util/ringbuffer" ) var ( @@ -52,7 +54,7 @@ type Queue[T any] struct { cancel context.CancelCauseFunc mu sync.Mutex - items []queueItem[T] + items *ringbuffer.RingBuffer[queueItem[T]] wakeup chan struct{} started bool @@ -93,7 +95,7 @@ func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] { q := &Queue[T]{ ctx: ctx, cancel: cancel, - items: make([]queueItem[T], 0, 128), + items: ringbuffer.New[queueItem[T]](), wakeup: make(chan struct{}, 1), maxLag: maxLag, done: make(chan struct{}), @@ -154,7 +156,7 @@ func (q *Queue[T]) Enqueue(batch []T) bool { default: } - q.items = append(q.items, item) + q.items.Push(item) q.numEnqueued.Add(uint64(len(batch))) q.mu.Unlock() @@ -181,7 +183,7 @@ func (q *Queue[T]) Barrier() <-chan struct{} { kind: kindBarrier, barrier: ch, } - q.items = append(q.items, item) + q.items.Push(item) q.mu.Unlock() q.wake() @@ -227,12 +229,7 @@ func (q *Queue[T]) run(processor func(context.Context, T)) { for { if processing == nil { q.mu.Lock() - hasItems := len(q.items) > 0 - var item queueItem[T] - if hasItems { - item = q.items[0] - q.items = q.items[1:] - } + item, hasItems := q.items.Pop() q.mu.Unlock() if !hasItems { @@ -323,10 +320,14 @@ func (q *Queue[T]) drainAndReleaseBarriers() { q.mu.Lock() defer q.mu.Unlock() - for _, item := range q.items { + for !q.items.IsEmpty() { + item, ok := q.items.Pop() + if !ok { + break + } if item.kind == kindBarrier { close(item.barrier) } } - q.items = nil + q.items.Clear() } diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go index 6a6051ee1..dcf4b38d5 100644 --- a/util/latencyqueue/latencyqueue_test.go +++ b/util/latencyqueue/latencyqueue_test.go @@ -722,3 +722,101 @@ func TestZeroMaxLag(t *testing.T) { } }) } + +// BenchmarkVariableLoad tests memory efficiency under variable load patterns. +// The ringbuffer-based implementation should efficiently handle: +// - Bursts of enqueues followed by processing +// - Growing and shrinking queue sizes +// - Memory compaction during idle periods +func BenchmarkVariableLoad(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + processed := atomic.Int64{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + // Simulate some processing time + time.Sleep(10 * time.Microsecond) + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Simulate bursty traffic - enqueue in batches + batchSize := 10 + (i % 50) // Variable batch sizes from 10-59 + batch := make([]int, batchSize) + for j := range batch { + batch[j] = i*100 + j + } + q.Enqueue(batch) + + // Occasionally wait for processing to catch up + if i%100 == 99 { + barrier := q.Barrier() + <-barrier + } + } + + // Final barrier to ensure all items are processed + barrier := q.Barrier() + <-barrier + + b.ReportMetric(float64(processed.Load()), "items") +} + +// BenchmarkSteadyState tests performance under steady-state conditions. +func BenchmarkSteadyState(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + q.Start(func(ctx context.Context, val int) { + // Fast processing + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + q.Enqueue([]int{i}) + } + + barrier := q.Barrier() + <-barrier +} + +// BenchmarkBurstThenDrain tests memory efficiency in burst-then-drain scenarios. +// This pattern exposes inefficiencies in slice-based implementations where +// the underlying array never shrinks. The ringbuffer should compact efficiently. +func BenchmarkBurstThenDrain(b *testing.B) { + q := New[int](context.Background(), 10*time.Second) + + processDelay := atomic.Bool{} + q.Start(func(ctx context.Context, val int) { + if processDelay.Load() { + time.Sleep(100 * time.Microsecond) + } + }) + defer q.Close() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Burst phase: enqueue many items with slow processing + processDelay.Store(true) + largeBatch := make([]int, 1000) + for j := range largeBatch { + largeBatch[j] = i*1000 + j + } + q.Enqueue(largeBatch) + + // Let queue fill up a bit + time.Sleep(500 * time.Microsecond) + + // Drain phase: speed up processing + processDelay.Store(false) + barrier := q.Barrier() + <-barrier + + // Allow time for compaction to potentially occur + time.Sleep(100 * time.Microsecond) + } +}