util/eventbus: add tests for a subscriber publishing events

As of 2025-11-20, publishing more events than the eventbus's
internal queues can hold may deadlock if a subscriber tries
to publish events itself.

This commit adds a test that demonstrates this deadlock,
and skips it until the bug is fixed.

Updates #18012

Signed-off-by: Nick Khyl <nickk@tailscale.com>
(cherry picked from commit 3780f25d51)
pull/18054/head
Nick Khyl 2 weeks ago committed by Nick Khyl
parent 6bc07d3c24
commit f2100e2e1d

@ -589,6 +589,7 @@ func testPublishWithMutex(t *testing.T, n int) {
var mu sync.Mutex
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
// Acquire the same mutex as the publisher.
// As of 2025-11-20, this can deadlock if n is large enough
// and event queues fill up.
mu.Lock()
@ -601,6 +602,7 @@ func testPublishWithMutex(t *testing.T, n int) {
p := eventbus.Publish[EventA](c)
go func() {
// Publish events, acquiring the mutex around each publish.
for i := range n {
mu.Lock()
p.Publish(EventA{Counter: i})
@ -616,6 +618,64 @@ func testPublishWithMutex(t *testing.T, n int) {
})
}
func TestPublishFromSubscriber(t *testing.T) {
t.Run("FewEvents", func(t *testing.T) {
// Publishing up to [totalMaxQueuedItems]-1 is fine.
testPublishFromSubscriber(t, totalMaxQueuedItems-1)
})
t.Run("ManyEvents", func(t *testing.T) {
// As of 2025-11-20, publishing more than [totalMaxQueuedItems] may deadlock.
t.Skip("TODO: fix deadlock in https://github.com/tailscale/tailscale/issues/18012")
// Using 2x to increase chance of deadlock.
testPublishFromSubscriber(t, totalMaxQueuedItems*2)
})
}
// testPublishFromSubscriber publishes the specified number of EventA events.
// Each EventA causes the subscriber to publish an EventB.
// The test fails if it loses any events or if a deadlock occurs.
func testPublishFromSubscriber(t *testing.T, n int) {
synctest.Test(t, func(t *testing.T) {
b := eventbus.New()
defer b.Close()
c := b.Client("TestClient")
// Ultimately we expect to receive n EventB events
// published as a result of receiving n EventA events.
evts := make([]any, n)
for i := range evts {
evts[i] = EventB{Counter: i}
}
exp := expectEvents(t, evts...)
pubA := eventbus.Publish[EventA](c)
pubB := eventbus.Publish[EventB](c)
eventbus.SubscribeFunc[EventA](c, func(e EventA) {
// Upon receiving EventA, publish EventB.
// As of 2025-11-20, this can deadlock if n is large enough
// and event queues fill up.
pubB.Publish(EventB{Counter: e.Counter})
})
eventbus.SubscribeFunc[EventB](c, func(e EventB) {
// Mark EventB as received.
exp.Got(e)
})
for i := range n {
pubA.Publish(EventA{Counter: i})
}
synctest.Wait()
if !exp.Empty() {
t.Errorf("unexpected extra events: %+v", exp.want)
}
})
}
type queueChecker struct {
t *testing.T
want []any

Loading…
Cancel
Save