util/eventbus: run subscriber functions in a goroutine (#17510)

With a channel subscriber, the subscription processing always occurs on another
goroutine. The SubscriberFunc (prior to this commit) runs its callbacks on the
client's own goroutine. This changes the semantics, though: In addition to more
directly pushing back on the publisher, a publisher and subscriber can deadlock
in a SubscriberFunc but succeed on a Subscriber. They should behave
equivalently regardless which interface they use.

Arguably the caller should deal with this by creating its own goroutine if it
needs to. However, that loses much of the benefit of the SubscriberFunc API, as
it will need to manage the lifecycle of that goroutine. So, for practical
ergonomics, let's make the SubscriberFunc do this management on the user's
behalf. (We discussed doing this in #17432, but decided not to do it yet).  We
can optimize this approach further, if we need to, without changing the API.

Updates #17487

Change-Id: I19ea9e8f246f7b406711f5a16518ef7ff21a1ac9
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
pull/17281/head
M. J. Fromberger 2 months ago committed by GitHub
parent f157f3288d
commit 0a33aae823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -214,7 +214,7 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
t := vals.Peek().Event.(T)
for {
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select
// above. The only difference should be that this select
// delivers a value on s.read.
select {
case s.read <- t:
@ -282,20 +282,30 @@ func (s *SubscriberFunc[T]) subscribeType() reflect.Type { return reflect.TypeFo
// dispatch implements part of the subscriber interface.
func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
t := vals.Peek().Event.(T)
callDone := make(chan struct{})
go s.runCallback(t, callDone)
// Keep the cases in this select in sync with subscribeState.pump
// above. The only different should be that this select
// above. The only difference should be that this select
// delivers a value by calling s.read.
for {
select {
case <-callDone:
vals.Drop()
return true
case val := <-acceptCh():
vals.Add(val)
case <-ctx.Done():
return false
case ch := <-snapshot:
ch <- vals.Snapshot()
default:
}
t := vals.Peek().Event.(T)
s.read(t)
vals.Drop()
return true
}
}
// runCallback invokes the callback on v and closes ch when it returns.
// This should be run in a goroutine.
func (s *SubscriberFunc[T]) runCallback(v T, ch chan struct{}) {
defer close(ch)
s.read(v)
}

Loading…
Cancel
Save