From 55b4993256c647e7ed8b4845849d187bb8f49a62 Mon Sep 17 00:00:00 2001 From: James Tucker Date: Fri, 7 Nov 2025 17:43:39 -0800 Subject: [PATCH] util/latencyqueue: add a bounded-latency queue Sometimes you do not know how many elements are resasonable, but you do know how stale elements can be at maximum, for example when streaming optimistic synchronization data. latencyqueue provides a primitive that makes it easier to keep such an unbounded queue, but abort and drop the queue if the consumer is not able to keep up with the desired bound. Updates tailscale/corp#34129 Signed-off-by: James Tucker --- util/latencyqueue/latencyqueue.go | 332 ++++++++++++ util/latencyqueue/latencyqueue_test.go | 724 +++++++++++++++++++++++++ 2 files changed, 1056 insertions(+) create mode 100644 util/latencyqueue/latencyqueue.go create mode 100644 util/latencyqueue/latencyqueue_test.go diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go new file mode 100644 index 000000000..65f62a304 --- /dev/null +++ b/util/latencyqueue/latencyqueue.go @@ -0,0 +1,332 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package latencyqueue provides a latency-bounded FIFO queue for asynchronous processing. +package latencyqueue + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" +) + +var ( + // ErrClosed is returned by context.Cause() when Close() has been called. + ErrClosed = errors.New("queue closed") + + // ErrAborted is returned by context.Cause() when Abort() has been called. + ErrAborted = errors.New("queue processing aborted") + + // ErrLagged is returned by context.Cause() when the lag threshold was exceeded. + ErrLagged = errors.New("queue lag threshold exceeded") +) + +// ErrPanic wraps a panic value recovered from the processor function. +type ErrPanic struct { + Panic any +} + +func (e *ErrPanic) Error() string { + return fmt.Sprintf("processor panic: %v", e.Panic) +} + +// Queue is a latency-bounded FIFO queue for asynchronous processing. +// +// The queue is unbounded by item count or storage size, but bounded by the age +// of the oldest item. When an item exceeds the configured lag threshold, +// the queue's context is cancelled with ErrLagged. +// +// # Delivery Semantics +// +// During normal operation, each item is delivered exactly once to the processor, +// in the order enqueued. Items are processed one batch at a time, with each batch +// processed on a separate goroutine. +// +// On abnormal termination (lag threshold exceeded, processor panic, abort, or +// explicit close), unprocessed items are lost and any pending barriers are released. +type Queue[T any] struct { + ctx context.Context + cancel context.CancelCauseFunc + + mu sync.Mutex + items []queueItem[T] + wakeup chan struct{} + started bool + + maxLag time.Duration + + numEnqueued atomic.Uint64 + numProcessed atomic.Uint64 + + done chan struct{} +} + +type itemKind uint8 + +const ( + kindBatch itemKind = iota + kindBarrier +) + +type queueItem[T any] struct { + kind itemKind + batch []T + enqueued time.Time + barrier chan struct{} +} + +// QueueCounters contains observability metrics for the queue. +type QueueCounters struct { + Enqueued uint64 + Processed uint64 +} + +// New creates a bounded-latency queue that processes items asynchronously. +// The parent context is used for lifecycle management. If maxLag is > 0, +// items that remain in the queue longer than maxLag will cause the context +// to be cancelled with ErrLagged. +func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] { + ctx, cancel := context.WithCancelCause(parent) + q := &Queue[T]{ + ctx: ctx, + cancel: cancel, + items: make([]queueItem[T], 0, 128), + wakeup: make(chan struct{}, 1), + maxLag: maxLag, + done: make(chan struct{}), + } + return q +} + +// Start begins processing queued items with the given processor function. +// The processor receives a context (with lag deadline if applicable) and an item. +// The processor is considered infallible; errors should be handled within the processor. +// Must be called before Enqueue. Can only be called once. +func (q *Queue[T]) Start(processor func(context.Context, T)) { + q.mu.Lock() + if q.started { + q.mu.Unlock() + panic("Start called multiple times") + } + q.started = true + q.mu.Unlock() + + go q.run(processor) +} + +// Close stops processing and releases resources. +// Unprocessed items are discarded and barriers are released. +// Blocks until processing stops. +func (q *Queue[T]) Close() { + q.cancel(ErrClosed) + <-q.done +} + +// Abort stops processing immediately. Unprocessed items are discarded +// and barriers are released. The context will be cancelled with ErrAborted. +// Non-blocking. +func (q *Queue[T]) Abort() { + q.cancel(ErrAborted) +} + +// Enqueue adds a batch of items to the queue. +// Returns false if the queue has terminated (closed, lagged, or aborted). +func (q *Queue[T]) Enqueue(batch []T) bool { + if len(batch) == 0 { + return true + } + + now := time.Now() + item := queueItem[T]{ + kind: kindBatch, + batch: batch, + enqueued: now, + } + + q.mu.Lock() + + select { + case <-q.ctx.Done(): + return false + default: + } + + q.items = append(q.items, item) + q.numEnqueued.Add(uint64(len(batch))) + q.mu.Unlock() + + q.wake() + return true +} + +// Barrier returns a channel that closes when all previously enqueued items +// have been processed. Returns an immediately-closed channel if the queue +// has terminated. +func (q *Queue[T]) Barrier() <-chan struct{} { + q.mu.Lock() + + ch := make(chan struct{}) + + select { + case <-q.ctx.Done(): + close(ch) + return ch + default: + } + + item := queueItem[T]{ + kind: kindBarrier, + barrier: ch, + } + q.items = append(q.items, item) + q.mu.Unlock() + + q.wake() + return ch +} + +// Done returns a channel that closes when processing stops. +func (q *Queue[T]) Done() <-chan struct{} { + return q.done +} + +// Context returns the queue's context, which is cancelled when the queue stops. +func (q *Queue[T]) Context() context.Context { + return q.ctx +} + +// Counters returns current queue metrics. +func (q *Queue[T]) Counters() QueueCounters { + return QueueCounters{ + Enqueued: q.numEnqueued.Load(), + Processed: q.numProcessed.Load(), + } +} + +func (q *Queue[T]) wake() { + select { + case q.wakeup <- struct{}{}: + default: + } +} + +func (q *Queue[T]) run(processor func(context.Context, T)) { + defer close(q.done) + defer q.drainAndReleaseBarriers() + + var ( + processingCh = make(chan error, 1) + processing chan error // nil when not processing, points to processingCh when processing + itemCtx context.Context + itemCancel context.CancelFunc + ) + + for { + if processing == nil { + q.mu.Lock() + hasItems := len(q.items) > 0 + var item queueItem[T] + if hasItems { + item = q.items[0] + q.items = q.items[1:] + } + q.mu.Unlock() + + if !hasItems { + select { + case <-q.ctx.Done(): + return + case <-q.wakeup: + continue + } + } + + if item.kind == kindBarrier { + close(item.barrier) + continue + } + + itemCtx = q.ctx + itemCancel = nil + if q.maxLag > 0 { + deadline := item.enqueued.Add(q.maxLag) + remaining := time.Until(deadline) + if remaining <= 0 { + q.cancel(ErrLagged) + return + } + var cancel context.CancelFunc + itemCtx, cancel = context.WithDeadline(q.ctx, deadline) + itemCancel = cancel + } + + batch := item.batch + processing = processingCh + go func() { + defer func() { + if r := recover(); r != nil { + processingCh <- &ErrPanic{Panic: r} + } else { + processingCh <- nil + } + }() + for _, data := range batch { + if itemCtx.Err() != nil { + return + } + processor(itemCtx, data) + q.numProcessed.Add(1) + } + }() + } + + select { + case <-q.ctx.Done(): + if itemCancel != nil { + itemCancel() + } + if processing != nil { + <-processing + } + return + + case err := <-processing: + // Check lag BEFORE cancelling to distinguish deadline from manual cancel + lagDetected := itemCtx.Err() == context.DeadlineExceeded + + if itemCancel != nil { + itemCancel() + itemCancel = nil + } + + if err != nil { + q.cancel(err) + return + } + + if lagDetected { + q.cancel(ErrLagged) + return + } + + processing = nil + + case <-q.wakeup: + } + } +} + +func (q *Queue[T]) drainAndReleaseBarriers() { + q.mu.Lock() + defer q.mu.Unlock() + + for _, item := range q.items { + if item.kind == kindBarrier { + close(item.barrier) + } + } + q.items = nil +} diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go new file mode 100644 index 000000000..6a6051ee1 --- /dev/null +++ b/util/latencyqueue/latencyqueue_test.go @@ -0,0 +1,724 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package latencyqueue + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "testing/synctest" + "time" +) + +func TestBasicEnqueueDequeue(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + processed := make([]int, 0) + var mu sync.Mutex + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + mu.Lock() + processed = append(processed, val) + mu.Unlock() + }) + defer q.Close() + + q.Enqueue([]int{0, 1, 2, 3, 4}) + + barrier := q.Barrier() + <-barrier + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error after successful processing, got %v", err) + } + + mu.Lock() + defer mu.Unlock() + if len(processed) != 5 { + t.Errorf("expected 5 items processed, got %d", len(processed)) + } + for i := range 5 { + if processed[i] != i { + t.Errorf("expected processed[%d] = %d, got %d", i, i, processed[i]) + } + } + }) +} + +func TestLagThresholdExceeded(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(30 * time.Millisecond) + }) + defer q.Close() + + batch := make([]int, 10) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + }) +} + +func TestFastProcessingNoLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + }) + defer q.Close() + + batch := make([]int, 100) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + barrier := q.Barrier() + <-barrier + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error, got %v", err) + } + + if processed.Load() != 100 { + t.Errorf("expected 100 items processed, got %d", processed.Load()) + } + }) +} + +func TestMultipleBarriers(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(5 * time.Millisecond) + }) + defer q.Close() + + barrier1 := q.Barrier() + <-barrier1 + count1 := processed.Load() + if count1 > 0 { + t.Errorf("barrier1: nothing enqueued before it, but got %d processed", count1) + } + + q.Enqueue([]int{0, 1, 2, 3, 4}) + barrier2 := q.Barrier() + <-barrier2 + count2 := processed.Load() + if count2 < 5 { + t.Errorf("barrier2: expected at least 5 processed, got %d", count2) + } + + q.Enqueue([]int{5, 6, 7, 8, 9}) + barrier3 := q.Barrier() + <-barrier3 + count3 := processed.Load() + if count3 != 10 { + t.Errorf("barrier3: expected exactly 10 processed (all items), got %d", count3) + } + }) +} + +func TestCloseStopsProcessing(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(10 * time.Millisecond) + }) + + batch := make([]int, 1000) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + time.Sleep(20 * time.Millisecond) + q.Close() + + processedCount := processed.Load() + if processedCount >= 1000 { + t.Error("expected some items to be dropped after close") + } + + if q.Enqueue([]int{9999}) { + t.Error("enqueue after close should return false") + } + + if err := context.Cause(q.Context()); err != ErrClosed { + t.Errorf("expected ErrClosed, got %v", err) + } + }) +} + +func TestBatchesShareEnqueueTime(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + batch := make([]int, 10) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged - batch items share enqueue time, got %v", err) + } + }) +} + +func TestAbortStopsProcessing(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 200*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + if val == 3 { + q.Abort() + } + time.Sleep(10 * time.Millisecond) + }) + + q.Enqueue([]int{1, 2, 3, 4, 5}) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + + count := processed.Load() + if count > 3 { + t.Errorf("expected at most 3 items processed after abort, got %d", count) + } + if count == 0 { + t.Error("expected at least one item to be processed") + } + }) +} + +func TestConcurrentEnqueuers(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 5*time.Second) + + var processed []int + var mu sync.Mutex + q.Start(func(ctx context.Context, val int) { + mu.Lock() + processed = append(processed, val) + mu.Unlock() + }) + defer q.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + q.Enqueue([]int{100, 101, 102}) + }() + + go func() { + defer wg.Done() + q.Enqueue([]int{200, 201, 202}) + }() + + wg.Wait() + barrier := q.Barrier() + <-barrier + + mu.Lock() + defer mu.Unlock() + + if len(processed) != 6 { + t.Errorf("expected 6 items, got %d", len(processed)) + } + + has100 := false + has200 := false + idx100, idx200 := -1, -1 + + for i, v := range processed { + if v == 100 { + has100 = true + idx100 = i + } + if v == 200 { + has200 = true + idx200 = i + } + } + + if !has100 || !has200 { + t.Fatal("both batches should be processed") + } + + if idx100+2 < len(processed) { + if processed[idx100] != 100 || processed[idx100+1] != 101 || processed[idx100+2] != 102 { + t.Errorf("batch [100,101,102] not in order at position %d", idx100) + } + } + + if idx200+2 < len(processed) { + if processed[idx200] != 200 || processed[idx200+1] != 201 || processed[idx200+2] != 202 { + t.Errorf("batch [200,201,202] not in order at position %d", idx200) + } + } + }) +} + +func TestProcessorReceivesContextCancellation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + processorStarted := make(chan struct{}) + contextCancelledDuringProcessing := atomic.Bool{} + + q.Start(func(ctx context.Context, val int) { + close(processorStarted) + for i := 0; i < 10; i++ { + select { + case <-ctx.Done(): + contextCancelledDuringProcessing.Store(true) + return + default: + time.Sleep(20 * time.Millisecond) + } + } + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + <-processorStarted + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + + if !contextCancelledDuringProcessing.Load() { + t.Error("expected processor to observe context cancellation during processing") + } + }) +} + +func TestProcessorReceivesAbortCancellation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + processorStarted := make(chan struct{}) + contextCancelledDuringProcessing := atomic.Bool{} + + q.Start(func(ctx context.Context, val int) { + if val == 1 { + close(processorStarted) + } + for i := 0; i < 10; i++ { + select { + case <-ctx.Done(): + contextCancelledDuringProcessing.Store(true) + return + default: + time.Sleep(10 * time.Millisecond) + } + } + }) + + q.Enqueue([]int{1, 2, 3, 4, 5}) + + <-processorStarted + q.Abort() + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + + if !contextCancelledDuringProcessing.Load() { + t.Error("expected processor to observe context cancellation during processing") + } + }) +} + +func TestEnqueueFailsAfterLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 30*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(20 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + <-q.Done() + + if q.Enqueue([]int{999}) { + t.Error("enqueue after lag should return false") + } + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + }) +} + +func TestContextCause(t *testing.T) { + t.Parallel() + tests := []struct { + name string + setup func(*Queue[int]) + expectErr error + }{ + { + name: "close", + setup: func(q *Queue[int]) { + q.Start(func(ctx context.Context, val int) {}) + q.Close() + }, + expectErr: ErrClosed, + }, + { + name: "abort", + setup: func(q *Queue[int]) { + q.Start(func(ctx context.Context, val int) {}) + q.Abort() + <-q.Done() + }, + expectErr: ErrAborted, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + tt.setup(q) + + if err := context.Cause(q.Context()); err != tt.expectErr { + t.Errorf("expected %v, got %v", tt.expectErr, err) + } + }) + }) + } +} + +func TestBarrierWithContextDistinction(t *testing.T) { + t.Parallel() + tests := []struct { + name string + setup func(*Queue[int]) <-chan struct{} + expectErr error + description string + }{ + { + name: "normal completion", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) {}) + q.Enqueue([]int{1, 2, 3}) + return q.Barrier() + }, + expectErr: nil, + description: "barrier completes normally when items are processed", + }, + { + name: "close", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(100 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + b := q.Barrier() + time.Sleep(10 * time.Millisecond) + q.Close() + return b + }, + expectErr: ErrClosed, + description: "barrier released when queue is closed", + }, + { + name: "abort", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(100 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + b := q.Barrier() + time.Sleep(10 * time.Millisecond) + q.Abort() + return b + }, + expectErr: ErrAborted, + description: "barrier released when queue is aborted", + }, + { + name: "lag", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(30 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + return q.Barrier() + }, + expectErr: ErrLagged, + description: "barrier released when lag threshold is exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var q *Queue[int] + if tt.name == "lag" { + q = New[int](context.Background(), 50*time.Millisecond) + } else { + q = New[int](context.Background(), 5*time.Second) + } + defer q.Close() + + barrier := tt.setup(q) + <-barrier + + if err := context.Cause(q.Context()); err != tt.expectErr { + t.Errorf("%s: expected %v, got %v", tt.description, tt.expectErr, err) + } + }) + }) + } +} + +func TestFirstStopWins(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Abort() + q.Close() + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted (first error wins), got %v", err) + } + }) +} + +func TestMultipleCloseCallsSafe(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Close() + q.Close() + + if err := context.Cause(q.Context()); err != ErrClosed { + t.Errorf("expected ErrClosed, got %v", err) + } + }) +} + +func TestMultipleAbortCallsSafe(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Abort() + q.Abort() + q.Abort() + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + }) +} + +func TestCounters(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + processed := make(chan struct{}, 10) + q.Start(func(ctx context.Context, val int) { + time.Sleep(5 * time.Millisecond) + processed <- struct{}{} + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + counters := q.Counters() + if counters.Enqueued != 3 { + t.Errorf("expected 3 enqueued, got %d", counters.Enqueued) + } + + q.Enqueue([]int{4, 5}) + + counters = q.Counters() + if counters.Enqueued != 5 { + t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued) + } + + <-processed + <-processed + + counters = q.Counters() + if counters.Processed < 2 { + t.Errorf("expected at least 2 processed, got %d", counters.Processed) + } + if counters.Processed > counters.Enqueued { + t.Errorf("processed (%d) cannot exceed enqueued (%d)", counters.Processed, counters.Enqueued) + } + + barrier := q.Barrier() + <-barrier + + counters = q.Counters() + if counters.Enqueued != 5 { + t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued) + } + if counters.Processed != 5 { + t.Errorf("expected 5 processed, got %d", counters.Processed) + } + }) +} + +func TestPanicRecovery(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + if val == 2 { + panic("test panic") + } + }) + + q.Enqueue([]int{1, 2, 3}) + + <-q.Done() + + err := context.Cause(q.Context()) + if err == nil { + t.Fatal("expected panic error, got nil") + } + + panicErr, ok := err.(*ErrPanic) + if !ok { + t.Fatalf("expected *ErrPanic, got %T: %v", err, err) + } + + if panicErr.Panic != "test panic" { + t.Errorf("expected panic value 'test panic', got %v", panicErr.Panic) + } + }) +} + +func TestContextPropagation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + parentCtx, parentCancel := context.WithCancel(context.Background()) + defer parentCancel() + + q := New[int](parentCtx, 500*time.Millisecond) + + var receivedCtx context.Context + var mu sync.Mutex + q.Start(func(ctx context.Context, val int) { + mu.Lock() + receivedCtx = ctx + mu.Unlock() + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1}) + time.Sleep(5 * time.Millisecond) + + mu.Lock() + ctx := receivedCtx + mu.Unlock() + + if ctx == nil { + t.Fatal("expected context to be passed to processor") + } + + parentCancel() + <-q.Done() + + if err := q.Context().Err(); err != context.Canceled { + t.Errorf("expected context.Canceled when parent cancelled, got %v", err) + } + }) +} + +func TestZeroMaxLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 0) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + barrier := q.Barrier() + <-barrier + + if processed.Load() != 3 { + t.Errorf("expected 3 items processed with zero maxLag, got %d", processed.Load()) + } + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error with zero maxLag, got %v", err) + } + }) +}