util/eventbus: add tests for a subscriber trying to acquire the same mutex as a publisher

As of 2025-11-20, publishing more events than the eventbus's
internal queues can hold may deadlock if a subscriber tries
to acquire a mutex that can also be held by a publisher.

This commit adds a test that demonstrates this deadlock,
and skips it until the bug is fixed.

Updates #17973

Signed-off-by: Nick Khyl <nickk@tailscale.com>
(cherry picked from commit 016ccae2da)
pull/18054/head
Nick Khyl 2 weeks ago committed by Nick Khyl
parent ccf4f3c7ce
commit 6bc07d3c24

@ -9,6 +9,7 @@ import (
"fmt"
"log"
"regexp"
"sync"
"testing"
"testing/synctest"
"time"
@ -546,6 +547,75 @@ func TestRegression(t *testing.T) {
})
}
const (
maxQueuedItems = 16 // same as in queue.go
totalMaxQueuedItems = maxQueuedItems * 2 // both publisher and subscriber sides
)
func TestPublishWithMutex(t *testing.T) {
t.Run("FewEvents", func(t *testing.T) {
// As of 2025-11-20, publishing up to [totalMaxQueuedItems] is fine.
testPublishWithMutex(t, totalMaxQueuedItems)
})
t.Run("ManyEvents", func(t *testing.T) {
// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/17973")
const N = 3 // N larger than one increases the chance of deadlock.
testPublishWithMutex(t, totalMaxQueuedItems+N)
})
}
// testPublishWithMutex publishes the specified number of events,
// acquiring and releasing a mutex around each publish and each
// subscriber event receive.
//
// The test fails if it loses any events or times out due to a deadlock.
// Unfortunately, a goroutine waiting on a mutex held by a durably blocked
// goroutine is not itself considered durably blocked, so [synctest] cannot
// detect this deadlock on its own.
func testPublishWithMutex(t *testing.T, n int) {
synctest.Test(t, func(t *testing.T) {
b := eventbus.New()
defer b.Close()
c := b.Client("TestClient")
evts := make([]any, n)
for i := range evts {
evts[i] = EventA{Counter: i}
}
exp := expectEvents(t, evts...)
var mu sync.Mutex
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
// As of 2025-11-20, this can deadlock if n is large enough
// and event queues fill up.
mu.Lock()
mu.Unlock()
// Mark event as received, so we can check for lost events.
// Not required for the deadlock to occur.
exp.Got(e)
})
p := eventbus.Publish[EventA](c)
go func() {
for i := range n {
mu.Lock()
p.Publish(EventA{Counter: i})
mu.Unlock()
}
}()
synctest.Wait()
if !exp.Empty() {
t.Errorf("unexpected extra events: %+v", exp.want)
}
})
}
type queueChecker struct {
t *testing.T
want []any

Loading…
Cancel
Save