diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index aa6880d01..880e075cc 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -120,7 +120,14 @@ func (b *Bus) Close() { } func (b *Bus) pump(ctx context.Context) { - var vals queue[PublishedEvent] + // Limit how many published events we can buffer in the PublishedEvent queue. + // + // Subscribers have unbounded DeliveredEvent queues (see tailscale/tailscale#18020), + // so this queue doesn't need to be unbounded. Keeping it bounded may also help + // catch cases where subscribers stop pumping events completely, such as due to a bug + // in [subscribeState.pump], [Subscriber.dispatch], or [SubscriberFunc.dispatch]). + const maxPublishedEvents = 16 + vals := queue[PublishedEvent]{capacity: maxPublishedEvents} acceptCh := func() chan PublishedEvent { if vals.Full() { return nil diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 23fe633f3..88e11e719 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -594,23 +594,8 @@ func TestRegression(t *testing.T) { }) } -const ( - maxQueuedItems = 16 // same as in queue.go - totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides -) - func TestPublishWithMutex(t *testing.T) { - t.Run("FewEvents", func(t *testing.T) { - // As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine. - testPublishWithMutex(t, totalMaxQueuedItems) - }) - t.Run("ManyEvents", func(t *testing.T) { - // As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock. - t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973") - - const N = 3 // N larger than one increases the chance of deadlock. - testPublishWithMutex(t, totalMaxQueuedItems+N) - }) + testPublishWithMutex(t, 1024) // arbitrary large number of events } // testPublishWithMutex publishes the specified number of events, @@ -637,13 +622,10 @@ func testPublishWithMutex(t *testing.T, n int) { var mu sync.Mutex eventbus.SubscribeFunc[EventA](c, func(e EventA) { // Acquire the same mutex as the publisher. - // As of 2025-11-20, this can deadlock if n is large enough - // and event queues fill up. mu.Lock() mu.Unlock() // Mark event as received, so we can check for lost events. - // Not required for the deadlock to occur. exp.Got(e) }) @@ -666,17 +648,7 @@ func testPublishWithMutex(t *testing.T, n int) { } func TestPublishFromSubscriber(t *testing.T) { - t.Run("FewEvents", func(t *testing.T) { - // Publishing up to [totalMaxQueuedItems]-1 is fine. - testPublishFromSubscriber(t, totalMaxQueuedItems-1) - }) - t.Run("ManyEvents", func(t *testing.T) { - // As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock. - t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/18012") - - // Using 2x to increase chance of deadlock. - testPublishFromSubscriber(t, totalMaxQueuedItems*2) - }) + testPublishFromSubscriber(t, 1024) // arbitrary large number of events } // testPublishFromSubscriber publishes the specified number of EventA events. @@ -702,8 +674,6 @@ func testPublishFromSubscriber(t *testing.T, n int) { eventbus.SubscribeFunc[EventA](c, func(e EventA) { // Upon receiving EventA, publish EventB. - // As of 2025-11-20, this can deadlock if n is large enough - // and event queues fill up. pubB.Publish(EventB{Counter: e.Counter}) }) eventbus.SubscribeFunc[EventB](c, func(e EventB) { diff --git a/util/eventbus/queue.go b/util/eventbus/queue.go index a62bf3c62..2589b75ce 100644 --- a/util/eventbus/queue.go +++ b/util/eventbus/queue.go @@ -7,18 +7,18 @@ import ( "slices" ) -const maxQueuedItems = 16 - -// queue is an ordered queue of length up to maxQueuedItems. +// queue is an ordered queue of length up to capacity, +// if capacity is non-zero. Otherwise it is unbounded. type queue[T any] struct { - vals []T - start int + vals []T + start int + capacity int // zero means unbounded } // canAppend reports whether a value can be appended to q.vals without // shifting values around. func (q *queue[T]) canAppend() bool { - return cap(q.vals) < maxQueuedItems || len(q.vals) < cap(q.vals) + return q.capacity == 0 || cap(q.vals) < q.capacity || len(q.vals) < cap(q.vals) } func (q *queue[T]) Full() bool {