util/latencyqueue: add a bounded-latency queue

Sometimes you do not know how many elements are resasonable, but you do
know how stale elements can be at maximum, for example when streaming
optimistic synchronization data. latencyqueue provides a primitive that
makes it easier to keep such an unbounded queue, but abort and drop the
queue if the consumer is not able to keep up with the desired bound.

Updates tailscale/corp#34129

Signed-off-by: James Tucker <james@tailscale.com>
raggi/latencyqueue
James Tucker 2 months ago
parent 5b40f0bc54
commit 55b4993256
No known key found for this signature in database

@ -0,0 +1,332 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package latencyqueue provides a latency-bounded FIFO queue for asynchronous processing.
package latencyqueue
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
// ErrClosed is returned by context.Cause() when Close() has been called.
ErrClosed = errors.New("queue closed")
// ErrAborted is returned by context.Cause() when Abort() has been called.
ErrAborted = errors.New("queue processing aborted")
// ErrLagged is returned by context.Cause() when the lag threshold was exceeded.
ErrLagged = errors.New("queue lag threshold exceeded")
)
// ErrPanic wraps a panic value recovered from the processor function.
type ErrPanic struct {
Panic any
}
func (e *ErrPanic) Error() string {
return fmt.Sprintf("processor panic: %v", e.Panic)
}
// Queue is a latency-bounded FIFO queue for asynchronous processing.
//
// The queue is unbounded by item count or storage size, but bounded by the age
// of the oldest item. When an item exceeds the configured lag threshold,
// the queue's context is cancelled with ErrLagged.
//
// # Delivery Semantics
//
// During normal operation, each item is delivered exactly once to the processor,
// in the order enqueued. Items are processed one batch at a time, with each batch
// processed on a separate goroutine.
//
// On abnormal termination (lag threshold exceeded, processor panic, abort, or
// explicit close), unprocessed items are lost and any pending barriers are released.
type Queue[T any] struct {
ctx context.Context
cancel context.CancelCauseFunc
mu sync.Mutex
items []queueItem[T]
wakeup chan struct{}
started bool
maxLag time.Duration
numEnqueued atomic.Uint64
numProcessed atomic.Uint64
done chan struct{}
}
type itemKind uint8
const (
kindBatch itemKind = iota
kindBarrier
)
type queueItem[T any] struct {
kind itemKind
batch []T
enqueued time.Time
barrier chan struct{}
}
// QueueCounters contains observability metrics for the queue.
type QueueCounters struct {
Enqueued uint64
Processed uint64
}
// New creates a bounded-latency queue that processes items asynchronously.
// The parent context is used for lifecycle management. If maxLag is > 0,
// items that remain in the queue longer than maxLag will cause the context
// to be cancelled with ErrLagged.
func New[T any](parent context.Context, maxLag time.Duration) *Queue[T] {
ctx, cancel := context.WithCancelCause(parent)
q := &Queue[T]{
ctx: ctx,
cancel: cancel,
items: make([]queueItem[T], 0, 128),
wakeup: make(chan struct{}, 1),
maxLag: maxLag,
done: make(chan struct{}),
}
return q
}
// Start begins processing queued items with the given processor function.
// The processor receives a context (with lag deadline if applicable) and an item.
// The processor is considered infallible; errors should be handled within the processor.
// Must be called before Enqueue. Can only be called once.
func (q *Queue[T]) Start(processor func(context.Context, T)) {
q.mu.Lock()
if q.started {
q.mu.Unlock()
panic("Start called multiple times")
}
q.started = true
q.mu.Unlock()
go q.run(processor)
}
// Close stops processing and releases resources.
// Unprocessed items are discarded and barriers are released.
// Blocks until processing stops.
func (q *Queue[T]) Close() {
q.cancel(ErrClosed)
<-q.done
}
// Abort stops processing immediately. Unprocessed items are discarded
// and barriers are released. The context will be cancelled with ErrAborted.
// Non-blocking.
func (q *Queue[T]) Abort() {
q.cancel(ErrAborted)
}
// Enqueue adds a batch of items to the queue.
// Returns false if the queue has terminated (closed, lagged, or aborted).
func (q *Queue[T]) Enqueue(batch []T) bool {
if len(batch) == 0 {
return true
}
now := time.Now()
item := queueItem[T]{
kind: kindBatch,
batch: batch,
enqueued: now,
}
q.mu.Lock()
select {
case <-q.ctx.Done():
return false
default:
}
q.items = append(q.items, item)
q.numEnqueued.Add(uint64(len(batch)))
q.mu.Unlock()
q.wake()
return true
}
// Barrier returns a channel that closes when all previously enqueued items
// have been processed. Returns an immediately-closed channel if the queue
// has terminated.
func (q *Queue[T]) Barrier() <-chan struct{} {
q.mu.Lock()
ch := make(chan struct{})
select {
case <-q.ctx.Done():
close(ch)
return ch
default:
}
item := queueItem[T]{
kind: kindBarrier,
barrier: ch,
}
q.items = append(q.items, item)
q.mu.Unlock()
q.wake()
return ch
}
// Done returns a channel that closes when processing stops.
func (q *Queue[T]) Done() <-chan struct{} {
return q.done
}
// Context returns the queue's context, which is cancelled when the queue stops.
func (q *Queue[T]) Context() context.Context {
return q.ctx
}
// Counters returns current queue metrics.
func (q *Queue[T]) Counters() QueueCounters {
return QueueCounters{
Enqueued: q.numEnqueued.Load(),
Processed: q.numProcessed.Load(),
}
}
func (q *Queue[T]) wake() {
select {
case q.wakeup <- struct{}{}:
default:
}
}
func (q *Queue[T]) run(processor func(context.Context, T)) {
defer close(q.done)
defer q.drainAndReleaseBarriers()
var (
processingCh = make(chan error, 1)
processing chan error // nil when not processing, points to processingCh when processing
itemCtx context.Context
itemCancel context.CancelFunc
)
for {
if processing == nil {
q.mu.Lock()
hasItems := len(q.items) > 0
var item queueItem[T]
if hasItems {
item = q.items[0]
q.items = q.items[1:]
}
q.mu.Unlock()
if !hasItems {
select {
case <-q.ctx.Done():
return
case <-q.wakeup:
continue
}
}
if item.kind == kindBarrier {
close(item.barrier)
continue
}
itemCtx = q.ctx
itemCancel = nil
if q.maxLag > 0 {
deadline := item.enqueued.Add(q.maxLag)
remaining := time.Until(deadline)
if remaining <= 0 {
q.cancel(ErrLagged)
return
}
var cancel context.CancelFunc
itemCtx, cancel = context.WithDeadline(q.ctx, deadline)
itemCancel = cancel
}
batch := item.batch
processing = processingCh
go func() {
defer func() {
if r := recover(); r != nil {
processingCh <- &ErrPanic{Panic: r}
} else {
processingCh <- nil
}
}()
for _, data := range batch {
if itemCtx.Err() != nil {
return
}
processor(itemCtx, data)
q.numProcessed.Add(1)
}
}()
}
select {
case <-q.ctx.Done():
if itemCancel != nil {
itemCancel()
}
if processing != nil {
<-processing
}
return
case err := <-processing:
// Check lag BEFORE cancelling to distinguish deadline from manual cancel
lagDetected := itemCtx.Err() == context.DeadlineExceeded
if itemCancel != nil {
itemCancel()
itemCancel = nil
}
if err != nil {
q.cancel(err)
return
}
if lagDetected {
q.cancel(ErrLagged)
return
}
processing = nil
case <-q.wakeup:
}
}
}
func (q *Queue[T]) drainAndReleaseBarriers() {
q.mu.Lock()
defer q.mu.Unlock()
for _, item := range q.items {
if item.kind == kindBarrier {
close(item.barrier)
}
}
q.items = nil
}

@ -0,0 +1,724 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package latencyqueue
import (
"context"
"sync"
"sync/atomic"
"testing"
"testing/synctest"
"time"
)
func TestBasicEnqueueDequeue(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
processed := make([]int, 0)
var mu sync.Mutex
q := New[int](context.Background(), 100*time.Millisecond)
q.Start(func(ctx context.Context, val int) {
mu.Lock()
processed = append(processed, val)
mu.Unlock()
})
defer q.Close()
q.Enqueue([]int{0, 1, 2, 3, 4})
barrier := q.Barrier()
<-barrier
if err := context.Cause(q.Context()); err != nil {
t.Errorf("expected no error after successful processing, got %v", err)
}
mu.Lock()
defer mu.Unlock()
if len(processed) != 5 {
t.Errorf("expected 5 items processed, got %d", len(processed))
}
for i := range 5 {
if processed[i] != i {
t.Errorf("expected processed[%d] = %d, got %d", i, i, processed[i])
}
}
})
}
func TestLagThresholdExceeded(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 50*time.Millisecond)
q.Start(func(ctx context.Context, val int) {
time.Sleep(30 * time.Millisecond)
})
defer q.Close()
batch := make([]int, 10)
for i := range batch {
batch[i] = i
}
q.Enqueue(batch)
<-q.Done()
if err := context.Cause(q.Context()); err != ErrLagged {
t.Errorf("expected ErrLagged, got %v", err)
}
})
}
func TestFastProcessingNoLag(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
processed := atomic.Int32{}
q.Start(func(ctx context.Context, val int) {
processed.Add(1)
})
defer q.Close()
batch := make([]int, 100)
for i := range batch {
batch[i] = i
}
q.Enqueue(batch)
barrier := q.Barrier()
<-barrier
if err := context.Cause(q.Context()); err != nil {
t.Errorf("expected no error, got %v", err)
}
if processed.Load() != 100 {
t.Errorf("expected 100 items processed, got %d", processed.Load())
}
})
}
func TestMultipleBarriers(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
processed := atomic.Int32{}
q.Start(func(ctx context.Context, val int) {
processed.Add(1)
time.Sleep(5 * time.Millisecond)
})
defer q.Close()
barrier1 := q.Barrier()
<-barrier1
count1 := processed.Load()
if count1 > 0 {
t.Errorf("barrier1: nothing enqueued before it, but got %d processed", count1)
}
q.Enqueue([]int{0, 1, 2, 3, 4})
barrier2 := q.Barrier()
<-barrier2
count2 := processed.Load()
if count2 < 5 {
t.Errorf("barrier2: expected at least 5 processed, got %d", count2)
}
q.Enqueue([]int{5, 6, 7, 8, 9})
barrier3 := q.Barrier()
<-barrier3
count3 := processed.Load()
if count3 != 10 {
t.Errorf("barrier3: expected exactly 10 processed (all items), got %d", count3)
}
})
}
func TestCloseStopsProcessing(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
processed := atomic.Int32{}
q.Start(func(ctx context.Context, val int) {
processed.Add(1)
time.Sleep(10 * time.Millisecond)
})
batch := make([]int, 1000)
for i := range batch {
batch[i] = i
}
q.Enqueue(batch)
time.Sleep(20 * time.Millisecond)
q.Close()
processedCount := processed.Load()
if processedCount >= 1000 {
t.Error("expected some items to be dropped after close")
}
if q.Enqueue([]int{9999}) {
t.Error("enqueue after close should return false")
}
if err := context.Cause(q.Context()); err != ErrClosed {
t.Errorf("expected ErrClosed, got %v", err)
}
})
}
func TestBatchesShareEnqueueTime(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 50*time.Millisecond)
q.Start(func(ctx context.Context, val int) {
time.Sleep(10 * time.Millisecond)
})
defer q.Close()
batch := make([]int, 10)
for i := range batch {
batch[i] = i
}
q.Enqueue(batch)
<-q.Done()
if err := context.Cause(q.Context()); err != ErrLagged {
t.Errorf("expected ErrLagged - batch items share enqueue time, got %v", err)
}
})
}
func TestAbortStopsProcessing(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 200*time.Millisecond)
processed := atomic.Int32{}
q.Start(func(ctx context.Context, val int) {
processed.Add(1)
if val == 3 {
q.Abort()
}
time.Sleep(10 * time.Millisecond)
})
q.Enqueue([]int{1, 2, 3, 4, 5})
<-q.Done()
if err := context.Cause(q.Context()); err != ErrAborted {
t.Errorf("expected ErrAborted, got %v", err)
}
count := processed.Load()
if count > 3 {
t.Errorf("expected at most 3 items processed after abort, got %d", count)
}
if count == 0 {
t.Error("expected at least one item to be processed")
}
})
}
func TestConcurrentEnqueuers(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 5*time.Second)
var processed []int
var mu sync.Mutex
q.Start(func(ctx context.Context, val int) {
mu.Lock()
processed = append(processed, val)
mu.Unlock()
})
defer q.Close()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
q.Enqueue([]int{100, 101, 102})
}()
go func() {
defer wg.Done()
q.Enqueue([]int{200, 201, 202})
}()
wg.Wait()
barrier := q.Barrier()
<-barrier
mu.Lock()
defer mu.Unlock()
if len(processed) != 6 {
t.Errorf("expected 6 items, got %d", len(processed))
}
has100 := false
has200 := false
idx100, idx200 := -1, -1
for i, v := range processed {
if v == 100 {
has100 = true
idx100 = i
}
if v == 200 {
has200 = true
idx200 = i
}
}
if !has100 || !has200 {
t.Fatal("both batches should be processed")
}
if idx100+2 < len(processed) {
if processed[idx100] != 100 || processed[idx100+1] != 101 || processed[idx100+2] != 102 {
t.Errorf("batch [100,101,102] not in order at position %d", idx100)
}
}
if idx200+2 < len(processed) {
if processed[idx200] != 200 || processed[idx200+1] != 201 || processed[idx200+2] != 202 {
t.Errorf("batch [200,201,202] not in order at position %d", idx200)
}
}
})
}
func TestProcessorReceivesContextCancellation(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 50*time.Millisecond)
processorStarted := make(chan struct{})
contextCancelledDuringProcessing := atomic.Bool{}
q.Start(func(ctx context.Context, val int) {
close(processorStarted)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
contextCancelledDuringProcessing.Store(true)
return
default:
time.Sleep(20 * time.Millisecond)
}
}
})
defer q.Close()
q.Enqueue([]int{1, 2, 3})
<-processorStarted
<-q.Done()
if err := context.Cause(q.Context()); err != ErrLagged {
t.Errorf("expected ErrLagged, got %v", err)
}
if !contextCancelledDuringProcessing.Load() {
t.Error("expected processor to observe context cancellation during processing")
}
})
}
func TestProcessorReceivesAbortCancellation(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 500*time.Millisecond)
processorStarted := make(chan struct{})
contextCancelledDuringProcessing := atomic.Bool{}
q.Start(func(ctx context.Context, val int) {
if val == 1 {
close(processorStarted)
}
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
contextCancelledDuringProcessing.Store(true)
return
default:
time.Sleep(10 * time.Millisecond)
}
}
})
q.Enqueue([]int{1, 2, 3, 4, 5})
<-processorStarted
q.Abort()
<-q.Done()
if err := context.Cause(q.Context()); err != ErrAborted {
t.Errorf("expected ErrAborted, got %v", err)
}
if !contextCancelledDuringProcessing.Load() {
t.Error("expected processor to observe context cancellation during processing")
}
})
}
func TestEnqueueFailsAfterLag(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 30*time.Millisecond)
q.Start(func(ctx context.Context, val int) {
time.Sleep(20 * time.Millisecond)
})
defer q.Close()
q.Enqueue([]int{1, 2, 3})
<-q.Done()
if q.Enqueue([]int{999}) {
t.Error("enqueue after lag should return false")
}
if err := context.Cause(q.Context()); err != ErrLagged {
t.Errorf("expected ErrLagged, got %v", err)
}
})
}
func TestContextCause(t *testing.T) {
t.Parallel()
tests := []struct {
name string
setup func(*Queue[int])
expectErr error
}{
{
name: "close",
setup: func(q *Queue[int]) {
q.Start(func(ctx context.Context, val int) {})
q.Close()
},
expectErr: ErrClosed,
},
{
name: "abort",
setup: func(q *Queue[int]) {
q.Start(func(ctx context.Context, val int) {})
q.Abort()
<-q.Done()
},
expectErr: ErrAborted,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
tt.setup(q)
if err := context.Cause(q.Context()); err != tt.expectErr {
t.Errorf("expected %v, got %v", tt.expectErr, err)
}
})
})
}
}
func TestBarrierWithContextDistinction(t *testing.T) {
t.Parallel()
tests := []struct {
name string
setup func(*Queue[int]) <-chan struct{}
expectErr error
description string
}{
{
name: "normal completion",
setup: func(q *Queue[int]) <-chan struct{} {
q.Start(func(ctx context.Context, val int) {})
q.Enqueue([]int{1, 2, 3})
return q.Barrier()
},
expectErr: nil,
description: "barrier completes normally when items are processed",
},
{
name: "close",
setup: func(q *Queue[int]) <-chan struct{} {
q.Start(func(ctx context.Context, val int) {
time.Sleep(100 * time.Millisecond)
})
q.Enqueue([]int{1, 2, 3, 4, 5})
b := q.Barrier()
time.Sleep(10 * time.Millisecond)
q.Close()
return b
},
expectErr: ErrClosed,
description: "barrier released when queue is closed",
},
{
name: "abort",
setup: func(q *Queue[int]) <-chan struct{} {
q.Start(func(ctx context.Context, val int) {
time.Sleep(100 * time.Millisecond)
})
q.Enqueue([]int{1, 2, 3, 4, 5})
b := q.Barrier()
time.Sleep(10 * time.Millisecond)
q.Abort()
return b
},
expectErr: ErrAborted,
description: "barrier released when queue is aborted",
},
{
name: "lag",
setup: func(q *Queue[int]) <-chan struct{} {
q.Start(func(ctx context.Context, val int) {
time.Sleep(30 * time.Millisecond)
})
q.Enqueue([]int{1, 2, 3, 4, 5})
return q.Barrier()
},
expectErr: ErrLagged,
description: "barrier released when lag threshold is exceeded",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var q *Queue[int]
if tt.name == "lag" {
q = New[int](context.Background(), 50*time.Millisecond)
} else {
q = New[int](context.Background(), 5*time.Second)
}
defer q.Close()
barrier := tt.setup(q)
<-barrier
if err := context.Cause(q.Context()); err != tt.expectErr {
t.Errorf("%s: expected %v, got %v", tt.description, tt.expectErr, err)
}
})
})
}
}
func TestFirstStopWins(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
q.Start(func(ctx context.Context, val int) {})
q.Abort()
q.Close()
<-q.Done()
if err := context.Cause(q.Context()); err != ErrAborted {
t.Errorf("expected ErrAborted (first error wins), got %v", err)
}
})
}
func TestMultipleCloseCallsSafe(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
q.Start(func(ctx context.Context, val int) {})
q.Close()
q.Close()
if err := context.Cause(q.Context()); err != ErrClosed {
t.Errorf("expected ErrClosed, got %v", err)
}
})
}
func TestMultipleAbortCallsSafe(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 100*time.Millisecond)
q.Start(func(ctx context.Context, val int) {})
q.Abort()
q.Abort()
q.Abort()
<-q.Done()
if err := context.Cause(q.Context()); err != ErrAborted {
t.Errorf("expected ErrAborted, got %v", err)
}
})
}
func TestCounters(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 500*time.Millisecond)
processed := make(chan struct{}, 10)
q.Start(func(ctx context.Context, val int) {
time.Sleep(5 * time.Millisecond)
processed <- struct{}{}
})
defer q.Close()
q.Enqueue([]int{1, 2, 3})
counters := q.Counters()
if counters.Enqueued != 3 {
t.Errorf("expected 3 enqueued, got %d", counters.Enqueued)
}
q.Enqueue([]int{4, 5})
counters = q.Counters()
if counters.Enqueued != 5 {
t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued)
}
<-processed
<-processed
counters = q.Counters()
if counters.Processed < 2 {
t.Errorf("expected at least 2 processed, got %d", counters.Processed)
}
if counters.Processed > counters.Enqueued {
t.Errorf("processed (%d) cannot exceed enqueued (%d)", counters.Processed, counters.Enqueued)
}
barrier := q.Barrier()
<-barrier
counters = q.Counters()
if counters.Enqueued != 5 {
t.Errorf("expected 5 enqueued total, got %d", counters.Enqueued)
}
if counters.Processed != 5 {
t.Errorf("expected 5 processed, got %d", counters.Processed)
}
})
}
func TestPanicRecovery(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 500*time.Millisecond)
q.Start(func(ctx context.Context, val int) {
if val == 2 {
panic("test panic")
}
})
q.Enqueue([]int{1, 2, 3})
<-q.Done()
err := context.Cause(q.Context())
if err == nil {
t.Fatal("expected panic error, got nil")
}
panicErr, ok := err.(*ErrPanic)
if !ok {
t.Fatalf("expected *ErrPanic, got %T: %v", err, err)
}
if panicErr.Panic != "test panic" {
t.Errorf("expected panic value 'test panic', got %v", panicErr.Panic)
}
})
}
func TestContextPropagation(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
parentCtx, parentCancel := context.WithCancel(context.Background())
defer parentCancel()
q := New[int](parentCtx, 500*time.Millisecond)
var receivedCtx context.Context
var mu sync.Mutex
q.Start(func(ctx context.Context, val int) {
mu.Lock()
receivedCtx = ctx
mu.Unlock()
time.Sleep(10 * time.Millisecond)
})
defer q.Close()
q.Enqueue([]int{1})
time.Sleep(5 * time.Millisecond)
mu.Lock()
ctx := receivedCtx
mu.Unlock()
if ctx == nil {
t.Fatal("expected context to be passed to processor")
}
parentCancel()
<-q.Done()
if err := q.Context().Err(); err != context.Canceled {
t.Errorf("expected context.Canceled when parent cancelled, got %v", err)
}
})
}
func TestZeroMaxLag(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
q := New[int](context.Background(), 0)
processed := atomic.Int32{}
q.Start(func(ctx context.Context, val int) {
processed.Add(1)
time.Sleep(10 * time.Millisecond)
})
defer q.Close()
q.Enqueue([]int{1, 2, 3})
barrier := q.Barrier()
<-barrier
if processed.Load() != 3 {
t.Errorf("expected 3 items processed with zero maxLag, got %d", processed.Load())
}
if err := context.Cause(q.Context()); err != nil {
t.Errorf("expected no error with zero maxLag, got %v", err)
}
})
}
Loading…
Cancel
Save