diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index 1e0cd8abf..61728fbfd 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -89,6 +89,61 @@ func TestSubscriberFunc(t *testing.T) { } }) + t.Run("CloseWait", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + time.Sleep(2 * time.Second) + }) + + p := eventbus.Publish[EventA](c) + p.Publish(EventA{12345}) + + synctest.Wait() // subscriber has the event + c.Close() + + // If close does not wait for the subscriber, the test will fail + // because an active goroutine remains in the bubble. + }) + }) + + t.Run("CloseWait/Belated", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + c := b.Client(t.Name()) + + // This subscriber stalls for a long time, so that when we try to + // close the client it gives up and returns in the timeout condition. + eventbus.SubscribeFunc[EventA](c, func(e EventA) { + time.Sleep(time.Minute) // notably, longer than the wait period + }) + + p := eventbus.Publish[EventA](c) + p.Publish(EventA{12345}) + + synctest.Wait() // subscriber has the event + c.Close() + + // Verify that the logger recorded that Close gave up on the slowpoke. + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `giving up on subscriber for eventbus_test.EventA after \d+s at close.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant %s", got, want) + } + + // Wait for the subscriber to actually finish to clean up the goroutine. + time.Sleep(2 * time.Minute) + }) + }) + t.Run("SubscriberPublishes", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { b := eventbus.New() @@ -440,14 +495,6 @@ func TestMonitor(t *testing.T) { } func TestSlowSubs(t *testing.T) { - swapLogBuf := func(t *testing.T) *bytes.Buffer { - logBuf := new(bytes.Buffer) - save := log.Writer() - log.SetOutput(logBuf) - t.Cleanup(func() { log.SetOutput(save) }) - return logBuf - } - t.Run("Subscriber", func(t *testing.T) { synctest.Test(t, func(t *testing.T) { buf := swapLogBuf(t) @@ -571,3 +618,11 @@ func (q *queueChecker) Got(v any) { func (q *queueChecker) Empty() bool { return len(q.want) == 0 } + +func swapLogBuf(t *testing.T) *bytes.Buffer { + logBuf := new(bytes.Buffer) + save := log.Writer() + log.SetOutput(logBuf) + t.Cleanup(func() { log.SetOutput(save) }) + return logBuf +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 0b821b3f5..03d577f27 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -324,6 +324,13 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE case val := <-acceptCh(): vals.Add(val) case <-ctx.Done(): + // Wait for the callback to be complete, but not forever. + s.slow.Reset(5 * slowSubscriberTimeout) + select { + case <-s.slow.C: + s.logf("giving up on subscriber for %T after %v at close", t, time.Since(start)) + case <-callDone: + } return false case ch := <-snapshot: ch <- vals.Snapshot()