diff --git a/util/latencyqueue/latencyqueue.go b/util/latencyqueue/latencyqueue.go new file mode 100644 index 000000000..65f62a304 --- /dev/null +++ b/util/latencyqueue/latencyqueue.go @@ -0,0 +1,332 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +// Package latencyqueue provides a latency-bounded FIFO queue for asynchronous processing. +package latencyqueue + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" +) + +var ( + // ErrClosed is returned by context.Cause() when Close() has been called. + ErrClosed = errors.New("queue closed") + + // ErrAborted is returned by context.Cause() when Abort() has been called. + ErrAborted = errors.New("queue processing aborted") + + // ErrLagged is returned by context.Cause() when the lag threshold was exceeded. + ErrLagged = errors.New("queue lag threshold exceeded") +) + +// ErrPanic wraps a panic value recovered from the processor function. +type ErrPanic struct { + Panic any +} + +func (e *ErrPanic) Error() string { + return fmt.Sprintf("processor panic: %v", e.Panic) +} + +// Queue is a latency-bounded FIFO queue for asynchronous processing. +// +// The queue is unbounded by item count or storage size, but bounded by the age +// of the oldest item. When an item exceeds the configured lag threshold, +// the queue's context is cancelled with ErrLagged. +// +// # Delivery Semantics +// +// During normal operation, each item is delivered exactly once to the processor, +// in the order enqueued. Items are processed one batch at a time, with each batch +// processed on a separate goroutine. +// +// On abnormal termination (lag threshold exceeded, processor panic, abort, or +// explicit close), unprocessed items are lost and any pending barriers are released. +type Queue[T any] struct { + ctx context.Context + cancel context.CancelCauseFunc + + mu sync.Mutex + items []queueItem[T] + wakeup chan struct{} + started bool + + maxLag time.Duration + + numEnqueued atomic.Uint64 + numProcessed atomic.Uint64 + + done chan struct{} +} + +type itemKind uint8 + +const ( + kindBatch itemKind = iota + kindBarrier +) + +type queueItem[T any] struct { + kind itemKind + batch []T + enqueued time.Time + barrier chan struct{} +} + +// QueueCounters contains observability metrics for the queue. +type QueueCounters struct { + Enqueued uint64 + Processed uint64 +} + +// New creates a bounded-latency queue that processes items asynchronously. +// The parent context is used for lifecycle management. If maxLag is > 0, +// items that remain in the queue longer than maxLag will cause the context +// to be cancelled with ErrLagged. +func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] { + ctx, cancel := context.WithCancelCause(parent) + q := &Queue[T]{ + ctx: ctx, + cancel: cancel, + items: make([]queueItem[T], 0, 128), + wakeup: make(chan struct{}, 1), + maxLag: maxLag, + done: make(chan struct{}), + } + return q +} + +// Start begins processing queued items with the given processor function. +// The processor receives a context (with lag deadline if applicable) and an item. +// The processor is considered infallible; errors should be handled within the processor. +// Must be called before Enqueue. Can only be called once. +func (q *Queue[T]) Start(processor func(context.Context, T)) { + q.mu.Lock() + if q.started { + q.mu.Unlock() + panic("Start called multiple times") + } + q.started = true + q.mu.Unlock() + + go q.run(processor) +} + +// Close stops processing and releases resources. +// Unprocessed items are discarded and barriers are released. +// Blocks until processing stops. +func (q *Queue[T]) Close() { + q.cancel(ErrClosed) + <-q.done +} + +// Abort stops processing immediately. Unprocessed items are discarded +// and barriers are released. The context will be cancelled with ErrAborted. +// Non-blocking. +func (q *Queue[T]) Abort() { + q.cancel(ErrAborted) +} + +// Enqueue adds a batch of items to the queue. +// Returns false if the queue has terminated (closed, lagged, or aborted). +func (q *Queue[T]) Enqueue(batch []T) bool { + if len(batch) == 0 { + return true + } + + now := time.Now() + item := queueItem[T]{ + kind: kindBatch, + batch: batch, + enqueued: now, + } + + q.mu.Lock() + + select { + case <-q.ctx.Done(): + return false + default: + } + + q.items = append(q.items, item) + q.numEnqueued.Add(uint64(len(batch))) + q.mu.Unlock() + + q.wake() + return true +} + +// Barrier returns a channel that closes when all previously enqueued items +// have been processed. Returns an immediately-closed channel if the queue +// has terminated. +func (q *Queue[T]) Barrier() <-chan struct{} { + q.mu.Lock() + + ch := make(chan struct{}) + + select { + case <-q.ctx.Done(): + close(ch) + return ch + default: + } + + item := queueItem[T]{ + kind: kindBarrier, + barrier: ch, + } + q.items = append(q.items, item) + q.mu.Unlock() + + q.wake() + return ch +} + +// Done returns a channel that closes when processing stops. +func (q *Queue[T]) Done() <-chan struct{} { + return q.done +} + +// Context returns the queue's context, which is cancelled when the queue stops. +func (q *Queue[T]) Context() context.Context { + return q.ctx +} + +// Counters returns current queue metrics. +func (q *Queue[T]) Counters() QueueCounters { + return QueueCounters{ + Enqueued: q.numEnqueued.Load(), + Processed: q.numProcessed.Load(), + } +} + +func (q *Queue[T]) wake() { + select { + case q.wakeup <- struct{}{}: + default: + } +} + +func (q *Queue[T]) run(processor func(context.Context, T)) { + defer close(q.done) + defer q.drainAndReleaseBarriers() + + var ( + processingCh = make(chan error, 1) + processing chan error // nil when not processing, points to processingCh when processing + itemCtx context.Context + itemCancel context.CancelFunc + ) + + for { + if processing == nil { + q.mu.Lock() + hasItems := len(q.items) > 0 + var item queueItem[T] + if hasItems { + item = q.items[0] + q.items = q.items[1:] + } + q.mu.Unlock() + + if !hasItems { + select { + case <-q.ctx.Done(): + return + case <-q.wakeup: + continue + } + } + + if item.kind == kindBarrier { + close(item.barrier) + continue + } + + itemCtx = q.ctx + itemCancel = nil + if q.maxLag > 0 { + deadline := item.enqueued.Add(q.maxLag) + remaining := time.Until(deadline) + if remaining <= 0 { + q.cancel(ErrLagged) + return + } + var cancel context.CancelFunc + itemCtx, cancel = context.WithDeadline(q.ctx, deadline) + itemCancel = cancel + } + + batch := item.batch + processing = processingCh + go func() { + defer func() { + if r := recover(); r != nil { + processingCh <- &ErrPanic{Panic: r} + } else { + processingCh <- nil + } + }() + for _, data := range batch { + if itemCtx.Err() != nil { + return + } + processor(itemCtx, data) + q.numProcessed.Add(1) + } + }() + } + + select { + case <-q.ctx.Done(): + if itemCancel != nil { + itemCancel() + } + if processing != nil { + <-processing + } + return + + case err := <-processing: + // Check lag BEFORE cancelling to distinguish deadline from manual cancel + lagDetected := itemCtx.Err() == context.DeadlineExceeded + + if itemCancel != nil { + itemCancel() + itemCancel = nil + } + + if err != nil { + q.cancel(err) + return + } + + if lagDetected { + q.cancel(ErrLagged) + return + } + + processing = nil + + case <-q.wakeup: + } + } +} + +func (q *Queue[T]) drainAndReleaseBarriers() { + q.mu.Lock() + defer q.mu.Unlock() + + for _, item := range q.items { + if item.kind == kindBarrier { + close(item.barrier) + } + } + q.items = nil +} diff --git a/util/latencyqueue/latencyqueue_test.go b/util/latencyqueue/latencyqueue_test.go new file mode 100644 index 000000000..6a6051ee1 --- /dev/null +++ b/util/latencyqueue/latencyqueue_test.go @@ -0,0 +1,724 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package latencyqueue + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "testing/synctest" + "time" +) + +func TestBasicEnqueueDequeue(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + processed := make([]int, 0) + var mu sync.Mutex + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + mu.Lock() + processed = append(processed, val) + mu.Unlock() + }) + defer q.Close() + + q.Enqueue([]int{0, 1, 2, 3, 4}) + + barrier := q.Barrier() + <-barrier + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error after successful processing, got %v", err) + } + + mu.Lock() + defer mu.Unlock() + if len(processed) != 5 { + t.Errorf("expected 5 items processed, got %d", len(processed)) + } + for i := range 5 { + if processed[i] != i { + t.Errorf("expected processed[%d] = %d, got %d", i, i, processed[i]) + } + } + }) +} + +func TestLagThresholdExceeded(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(30 * time.Millisecond) + }) + defer q.Close() + + batch := make([]int, 10) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + }) +} + +func TestFastProcessingNoLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + }) + defer q.Close() + + batch := make([]int, 100) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + barrier := q.Barrier() + <-barrier + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error, got %v", err) + } + + if processed.Load() != 100 { + t.Errorf("expected 100 items processed, got %d", processed.Load()) + } + }) +} + +func TestMultipleBarriers(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(5 * time.Millisecond) + }) + defer q.Close() + + barrier1 := q.Barrier() + <-barrier1 + count1 := processed.Load() + if count1 > 0 { + t.Errorf("barrier1: nothing enqueued before it, but got %d processed", count1) + } + + q.Enqueue([]int{0, 1, 2, 3, 4}) + barrier2 := q.Barrier() + <-barrier2 + count2 := processed.Load() + if count2 < 5 { + t.Errorf("barrier2: expected at least 5 processed, got %d", count2) + } + + q.Enqueue([]int{5, 6, 7, 8, 9}) + barrier3 := q.Barrier() + <-barrier3 + count3 := processed.Load() + if count3 != 10 { + t.Errorf("barrier3: expected exactly 10 processed (all items), got %d", count3) + } + }) +} + +func TestCloseStopsProcessing(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(10 * time.Millisecond) + }) + + batch := make([]int, 1000) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + time.Sleep(20 * time.Millisecond) + q.Close() + + processedCount := processed.Load() + if processedCount >= 1000 { + t.Error("expected some items to be dropped after close") + } + + if q.Enqueue([]int{9999}) { + t.Error("enqueue after close should return false") + } + + if err := context.Cause(q.Context()); err != ErrClosed { + t.Errorf("expected ErrClosed, got %v", err) + } + }) +} + +func TestBatchesShareEnqueueTime(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + batch := make([]int, 10) + for i := range batch { + batch[i] = i + } + q.Enqueue(batch) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged - batch items share enqueue time, got %v", err) + } + }) +} + +func TestAbortStopsProcessing(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 200*time.Millisecond) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + if val == 3 { + q.Abort() + } + time.Sleep(10 * time.Millisecond) + }) + + q.Enqueue([]int{1, 2, 3, 4, 5}) + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + + count := processed.Load() + if count > 3 { + t.Errorf("expected at most 3 items processed after abort, got %d", count) + } + if count == 0 { + t.Error("expected at least one item to be processed") + } + }) +} + +func TestConcurrentEnqueuers(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 5*time.Second) + + var processed []int + var mu sync.Mutex + q.Start(func(ctx context.Context, val int) { + mu.Lock() + processed = append(processed, val) + mu.Unlock() + }) + defer q.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + q.Enqueue([]int{100, 101, 102}) + }() + + go func() { + defer wg.Done() + q.Enqueue([]int{200, 201, 202}) + }() + + wg.Wait() + barrier := q.Barrier() + <-barrier + + mu.Lock() + defer mu.Unlock() + + if len(processed) != 6 { + t.Errorf("expected 6 items, got %d", len(processed)) + } + + has100 := false + has200 := false + idx100, idx200 := -1, -1 + + for i, v := range processed { + if v == 100 { + has100 = true + idx100 = i + } + if v == 200 { + has200 = true + idx200 = i + } + } + + if !has100 || !has200 { + t.Fatal("both batches should be processed") + } + + if idx100+2 < len(processed) { + if processed[idx100] != 100 || processed[idx100+1] != 101 || processed[idx100+2] != 102 { + t.Errorf("batch [100,101,102] not in order at position %d", idx100) + } + } + + if idx200+2 < len(processed) { + if processed[idx200] != 200 || processed[idx200+1] != 201 || processed[idx200+2] != 202 { + t.Errorf("batch [200,201,202] not in order at position %d", idx200) + } + } + }) +} + +func TestProcessorReceivesContextCancellation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 50*time.Millisecond) + + processorStarted := make(chan struct{}) + contextCancelledDuringProcessing := atomic.Bool{} + + q.Start(func(ctx context.Context, val int) { + close(processorStarted) + for i := 0; i < 10; i++ { + select { + case <-ctx.Done(): + contextCancelledDuringProcessing.Store(true) + return + default: + time.Sleep(20 * time.Millisecond) + } + } + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + <-processorStarted + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + + if !contextCancelledDuringProcessing.Load() { + t.Error("expected processor to observe context cancellation during processing") + } + }) +} + +func TestProcessorReceivesAbortCancellation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + processorStarted := make(chan struct{}) + contextCancelledDuringProcessing := atomic.Bool{} + + q.Start(func(ctx context.Context, val int) { + if val == 1 { + close(processorStarted) + } + for i := 0; i < 10; i++ { + select { + case <-ctx.Done(): + contextCancelledDuringProcessing.Store(true) + return + default: + time.Sleep(10 * time.Millisecond) + } + } + }) + + q.Enqueue([]int{1, 2, 3, 4, 5}) + + <-processorStarted + q.Abort() + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + + if !contextCancelledDuringProcessing.Load() { + t.Error("expected processor to observe context cancellation during processing") + } + }) +} + +func TestEnqueueFailsAfterLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 30*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + time.Sleep(20 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + <-q.Done() + + if q.Enqueue([]int{999}) { + t.Error("enqueue after lag should return false") + } + + if err := context.Cause(q.Context()); err != ErrLagged { + t.Errorf("expected ErrLagged, got %v", err) + } + }) +} + +func TestContextCause(t *testing.T) { + t.Parallel() + tests := []struct { + name string + setup func(*Queue[int]) + expectErr error + }{ + { + name: "close", + setup: func(q *Queue[int]) { + q.Start(func(ctx context.Context, val int) {}) + q.Close() + }, + expectErr: ErrClosed, + }, + { + name: "abort", + setup: func(q *Queue[int]) { + q.Start(func(ctx context.Context, val int) {}) + q.Abort() + <-q.Done() + }, + expectErr: ErrAborted, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + tt.setup(q) + + if err := context.Cause(q.Context()); err != tt.expectErr { + t.Errorf("expected %v, got %v", tt.expectErr, err) + } + }) + }) + } +} + +func TestBarrierWithContextDistinction(t *testing.T) { + t.Parallel() + tests := []struct { + name string + setup func(*Queue[int]) <-chan struct{} + expectErr error + description string + }{ + { + name: "normal completion", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) {}) + q.Enqueue([]int{1, 2, 3}) + return q.Barrier() + }, + expectErr: nil, + description: "barrier completes normally when items are processed", + }, + { + name: "close", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(100 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + b := q.Barrier() + time.Sleep(10 * time.Millisecond) + q.Close() + return b + }, + expectErr: ErrClosed, + description: "barrier released when queue is closed", + }, + { + name: "abort", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(100 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + b := q.Barrier() + time.Sleep(10 * time.Millisecond) + q.Abort() + return b + }, + expectErr: ErrAborted, + description: "barrier released when queue is aborted", + }, + { + name: "lag", + setup: func(q *Queue[int]) <-chan struct{} { + q.Start(func(ctx context.Context, val int) { + time.Sleep(30 * time.Millisecond) + }) + q.Enqueue([]int{1, 2, 3, 4, 5}) + return q.Barrier() + }, + expectErr: ErrLagged, + description: "barrier released when lag threshold is exceeded", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var q *Queue[int] + if tt.name == "lag" { + q = New[int](context.Background(), 50*time.Millisecond) + } else { + q = New[int](context.Background(), 5*time.Second) + } + defer q.Close() + + barrier := tt.setup(q) + <-barrier + + if err := context.Cause(q.Context()); err != tt.expectErr { + t.Errorf("%s: expected %v, got %v", tt.description, tt.expectErr, err) + } + }) + }) + } +} + +func TestFirstStopWins(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Abort() + q.Close() + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted (first error wins), got %v", err) + } + }) +} + +func TestMultipleCloseCallsSafe(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Close() + q.Close() + + if err := context.Cause(q.Context()); err != ErrClosed { + t.Errorf("expected ErrClosed, got %v", err) + } + }) +} + +func TestMultipleAbortCallsSafe(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 100*time.Millisecond) + + q.Start(func(ctx context.Context, val int) {}) + + q.Abort() + q.Abort() + q.Abort() + + <-q.Done() + + if err := context.Cause(q.Context()); err != ErrAborted { + t.Errorf("expected ErrAborted, got %v", err) + } + }) +} + +func TestCounters(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + processed := make(chan struct{}, 10) + q.Start(func(ctx context.Context, val int) { + time.Sleep(5 * time.Millisecond) + processed <- struct{}{} + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + + counters := q.Counters() + if counters.Enqueued != 3 { + t.Errorf("expected 3 enqueued, got %d", counters.Enqueued) + } + + q.Enqueue([]int{4, 5}) + + counters = q.Counters() + if counters.Enqueued != 5 { + t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued) + } + + <-processed + <-processed + + counters = q.Counters() + if counters.Processed < 2 { + t.Errorf("expected at least 2 processed, got %d", counters.Processed) + } + if counters.Processed > counters.Enqueued { + t.Errorf("processed (%d) cannot exceed enqueued (%d)", counters.Processed, counters.Enqueued) + } + + barrier := q.Barrier() + <-barrier + + counters = q.Counters() + if counters.Enqueued != 5 { + t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued) + } + if counters.Processed != 5 { + t.Errorf("expected 5 processed, got %d", counters.Processed) + } + }) +} + +func TestPanicRecovery(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 500*time.Millisecond) + + q.Start(func(ctx context.Context, val int) { + if val == 2 { + panic("test panic") + } + }) + + q.Enqueue([]int{1, 2, 3}) + + <-q.Done() + + err := context.Cause(q.Context()) + if err == nil { + t.Fatal("expected panic error, got nil") + } + + panicErr, ok := err.(*ErrPanic) + if !ok { + t.Fatalf("expected *ErrPanic, got %T: %v", err, err) + } + + if panicErr.Panic != "test panic" { + t.Errorf("expected panic value 'test panic', got %v", panicErr.Panic) + } + }) +} + +func TestContextPropagation(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + parentCtx, parentCancel := context.WithCancel(context.Background()) + defer parentCancel() + + q := New[int](parentCtx, 500*time.Millisecond) + + var receivedCtx context.Context + var mu sync.Mutex + q.Start(func(ctx context.Context, val int) { + mu.Lock() + receivedCtx = ctx + mu.Unlock() + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1}) + time.Sleep(5 * time.Millisecond) + + mu.Lock() + ctx := receivedCtx + mu.Unlock() + + if ctx == nil { + t.Fatal("expected context to be passed to processor") + } + + parentCancel() + <-q.Done() + + if err := q.Context().Err(); err != context.Canceled { + t.Errorf("expected context.Canceled when parent cancelled, got %v", err) + } + }) +} + +func TestZeroMaxLag(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + q := New[int](context.Background(), 0) + + processed := atomic.Int32{} + q.Start(func(ctx context.Context, val int) { + processed.Add(1) + time.Sleep(10 * time.Millisecond) + }) + defer q.Close() + + q.Enqueue([]int{1, 2, 3}) + barrier := q.Barrier() + <-barrier + + if processed.Load() != 3 { + t.Errorf("expected 3 items processed with zero maxLag, got %d", processed.Load()) + } + + if err := context.Cause(q.Context()); err != nil { + t.Errorf("expected no error with zero maxLag, got %v", err) + } + }) +}