From 6bc07d3c2484ce5cf5ec5845a3f4f3e592c33ea5 Mon Sep 17 00:00:00 2001 From: Nick Khyl Date: Wed, 19 Nov 2025 20:13:18 -0600 Subject: [PATCH] util/eventbus: add tests for a subscriber trying to acquire the same mutex as a publisher As of 2025-11-20, publishing more events than the eventbus's internal queues can hold may deadlock if a subscriber tries to acquire a mutex that can also be held by a publisher. This commit adds a test that demonstrates this deadlock, and skips it until the bug is fixed. Updates #17973 Signed-off-by: Nick Khyl (cherry picked from commit 016ccae2da9fae1f6d8ffb29c694f86cb78cca4a) --- util/eventbus/bus_test.go | 70 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 1e0cd8abf..4ebefcab8 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "regexp" + "sync" "testing" "testing/synctest" "time" @@ -546,6 +547,75 @@ func TestRegression(t *testing.T) { }) } +const ( + maxQueuedItems = 16 // same as in queue.go + totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides +) + +func TestPublishWithMutex(t *testing.T) { + t.Run("FewEvents", func(t *testing.T) { + // As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine. + testPublishWithMutex(t, totalMaxQueuedItems) + }) + t.Run("ManyEvents", func(t *testing.T) { + // As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock. + t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973") + + const N = 3 // N larger than one increases the chance of deadlock. + testPublishWithMutex(t, totalMaxQueuedItems+N) + }) +} + +// testPublishWithMutex publishes the specified number of events, +// acquiring and releasing a mutex around each publish and each +// subscriber event receive. +// +// The test fails if it loses any events or times out due to a deadlock. +// Unfortunately, a goroutine waiting on a mutex held by a durably blocked +// goroutine is not itself considered durably blocked, so [synctest] cannot +// detect this deadlock on its own. +func testPublishWithMutex(t *testing.T, n int) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client("TestClient") + + evts := make([]any, n) + for i := range evts { + evts[i] = EventA{Counter: i} + } + exp := expectEvents(t, evts...) + + var mu sync.Mutex + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + // As of 2025-11-20, this can deadlock if n is large enough + // and event queues fill up. + mu.Lock() + mu.Unlock() + + // Mark event as received, so we can check for lost events. + // Not required for the deadlock to occur. + exp.Got(e) + }) + + p := eventbus.Publish[EventA](c) + go func() { + for i := range n { + mu.Lock() + p.Publish(EventA{Counter: i}) + mu.Unlock() + } + }() + + synctest.Wait() + + if !exp.Empty() { + t.Errorf("unexpected extra events: %+v", exp.want) + } + }) +} + type queueChecker struct { t *testing.T want []any