From 0a33aae823eb5604f7698ce1dad99605eaed97c2 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 10 Oct 2025 09:03:38 -0700 Subject: [PATCH] util/eventbus: run subscriber functions in a goroutine (#17510) With a channel subscriber, the subscription processing always occurs on another goroutine. The SubscriberFunc (prior to this commit) runs its callbacks on the client's own goroutine. This changes the semantics, though: In addition to more directly pushing back on the publisher, a publisher and subscriber can deadlock in a SubscriberFunc but succeed on a Subscriber. They should behave equivalently regardless which interface they use. Arguably the caller should deal with this by creating its own goroutine if it needs to. However, that loses much of the benefit of the SubscriberFunc API, as it will need to manage the lifecycle of that goroutine. So, for practical ergonomics, let's make the SubscriberFunc do this management on the user's behalf. (We discussed doing this in #17432, but decided not to do it yet). We can optimize this approach further, if we need to, without changing the API. Updates #17487 Change-Id: I19ea9e8f246f7b406711f5a16518ef7ff21a1ac9 Signed-off-by: M. J. Fromberger --- util/eventbus/subscribe.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 56da413ef..c35c7e7f0 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -214,7 +214,7 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent t := vals.Peek().Event.(T) for { // Keep the cases in this select in sync with subscribeState.pump - // above. The only different should be that this select + // above. The only difference should be that this select // delivers a value on s.read. select { case s.read <- t: @@ -282,20 +282,30 @@ func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFo // dispatch implements part of the subscriber interface. func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { + t := vals.Peek().Event.(T) + callDone := make(chan struct{}) + go s.runCallback(t, callDone) // Keep the cases in this select in sync with subscribeState.pump - // above. The only different should be that this select + // above. The only difference should be that this select // delivers a value by calling s.read. - select { - case val := <-acceptCh(): - vals.Add(val) - case <-ctx.Done(): - return false - case ch := <-snapshot: - ch <- vals.Snapshot() - default: + for { + select { + case <-callDone: + vals.Drop() + return true + case val := <-acceptCh(): + vals.Add(val) + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + } } - t := vals.Peek().Event.(T) - s.read(t) - vals.Drop() - return true +} + +// runCallback invokes the callback on v and closes ch when it returns. +// This should be run in a goroutine. +func (s *SubscriberFunc[T]) runCallback(v T, ch chan struct{}) { + defer close(ch) + s.read(v) }