diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 56da413ef..c35c7e7f0 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -214,7 +214,7 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent t := vals.Peek().Event.(T) for { // Keep the cases in this select in sync with subscribeState.pump - // above. The only different should be that this select + // above. The only difference should be that this select // delivers a value on s.read. select { case s.read <- t: @@ -282,20 +282,30 @@ func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFo // dispatch implements part of the subscriber interface. func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { + t := vals.Peek().Event.(T) + callDone := make(chan struct{}) + go s.runCallback(t, callDone) // Keep the cases in this select in sync with subscribeState.pump - // above. The only different should be that this select + // above. The only difference should be that this select // delivers a value by calling s.read. - select { - case val := <-acceptCh(): - vals.Add(val) - case <-ctx.Done(): - return false - case ch := <-snapshot: - ch <- vals.Snapshot() - default: + for { + select { + case <-callDone: + vals.Drop() + return true + case val := <-acceptCh(): + vals.Add(val) + case <-ctx.Done(): + return false + case ch := <-snapshot: + ch <- vals.Snapshot() + } } - t := vals.Peek().Event.(T) - s.read(t) - vals.Drop() - return true +} + +// runCallback invokes the callback on v and closes ch when it returns. +// This should be run in a goroutine. +func (s *SubscriberFunc[T]) runCallback(v T, ch chan struct{}) { + defer close(ch) + s.read(v) }