From f2100e2e1d2a9228b3bde5667d7dbaa272aa888c Mon Sep 17 00:00:00 2001 From: Nick Khyl Date: Thu, 20 Nov 2025 11:04:54 -0600 Subject: [PATCH] util/eventbus: add tests for a subscriber publishing events As of 2025-11-20, publishing more events than the eventbus's internal queues can hold may deadlock if a subscriber tries to publish events itself. This commit adds a test that demonstrates this deadlock, and skips it until the bug is fixed. Updates #18012 Signed-off-by: Nick Khyl (cherry picked from commit 3780f25d51522f7148ae11d5b28b066d292e06e4) --- util/eventbus/bus_test.go | 60 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 4ebefcab8..c78ee23de 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -589,6 +589,7 @@ func testPublishWithMutex(t *testing.T, n int) { var mu sync.Mutex eventbus.SubscribeFunc[EventA](c, func(e EventA) { + // Acquire the same mutex as the publisher. // As of 2025-11-20, this can deadlock if n is large enough // and event queues fill up. mu.Lock() @@ -601,6 +602,7 @@ func testPublishWithMutex(t *testing.T, n int) { p := eventbus.Publish[EventA](c) go func() { + // Publish events, acquiring the mutex around each publish. for i := range n { mu.Lock() p.Publish(EventA{Counter: i}) @@ -616,6 +618,64 @@ func testPublishWithMutex(t *testing.T, n int) { }) } +func TestPublishFromSubscriber(t *testing.T) { + t.Run("FewEvents", func(t *testing.T) { + // Publishing up to [totalMaxQueuedItems]-1 is fine. + testPublishFromSubscriber(t, totalMaxQueuedItems-1) + }) + t.Run("ManyEvents", func(t *testing.T) { + // As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock. + t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/18012") + + // Using 2x to increase chance of deadlock. + testPublishFromSubscriber(t, totalMaxQueuedItems*2) + }) +} + +// testPublishFromSubscriber publishes the specified number of EventA events. +// Each EventA causes the subscriber to publish an EventB. +// The test fails if it loses any events or if a deadlock occurs. +func testPublishFromSubscriber(t *testing.T, n int) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestClient") + + // Ultimately we expect to receive n EventB events + // published as a result of receiving n EventA events. + evts := make([]any, n) + for i := range evts { + evts[i] = EventB{Counter: i} + } + exp := expectEvents(t, evts...) + + pubA := eventbus.Publish[EventA](c) + pubB := eventbus.Publish[EventB](c) + + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + // Upon receiving EventA, publish EventB. + // As of 2025-11-20, this can deadlock if n is large enough + // and event queues fill up. + pubB.Publish(EventB{Counter: e.Counter}) + }) + eventbus.SubscribeFunc[EventB](c, func(e EventB) { + // Mark EventB as received. + exp.Got(e) + }) + + for i := range n { + pubA.Publish(EventA{Counter: i}) + } + + synctest.Wait() + + if !exp.Empty() { + t.Errorf("unexpected extra events: %+v", exp.want) + } + }) +} + type queueChecker struct { t *testing.T want []any