Commit Graph

13 Commits (b6eabd403862f9ccc2fd11ecd330fe85592be583)

Author SHA1 Message Date
M. J. Fromberger b6eabd4038 util/eventbus: allow logging of slow subscribers (#17705)
Add options to the eventbus.Bus to plumb in a logger.

Route that logger in to the subscriber machinery, and trigger a log message to
it when a subscriber fails to respond to its delivered events for 5s or more.

The log message includes the package, filename, and line number of the call
site that created the subscription.

Add tests that verify this works.

Updates #17680

Change-Id: I0546516476b1e13e6a9cf79f19db2fe55e56c698
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
(cherry picked from commit 061e6266cf)
1 month ago
M. J. Fromberger 0a33aae823
util/eventbus: run subscriber functions in a goroutine (#17510)
With a channel subscriber, the subscription processing always occurs on another
goroutine. The SubscriberFunc (prior to this commit) runs its callbacks on the
client's own goroutine. This changes the semantics, though: In addition to more
directly pushing back on the publisher, a publisher and subscriber can deadlock
in a SubscriberFunc but succeed on a Subscriber. They should behave
equivalently regardless which interface they use.

Arguably the caller should deal with this by creating its own goroutine if it
needs to. However, that loses much of the benefit of the SubscriberFunc API, as
it will need to manage the lifecycle of that goroutine. So, for practical
ergonomics, let's make the SubscriberFunc do this management on the user's
behalf. (We discussed doing this in #17432, but decided not to do it yet).  We
can optimize this approach further, if we need to, without changing the API.

Updates #17487

Change-Id: I19ea9e8f246f7b406711f5a16518ef7ff21a1ac9
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2 months ago
M. J. Fromberger ad6cf2f8f3
util/eventbus: add a function-based subscriber type (#17432)
Originally proposed by @bradfitz in #17413.

In practice, a lot of subscribers have only one event type of interest, or a
small number of mostly independent ones. In that case, the overhead of running
and maintaining a goroutine to select on multiple channels winds up being more
noisy than we'd like for the user of the API.

For this common case, add a new SubscriberFunc[T] type that delivers events to
a callback owned by the subscriber, directly on the goroutine belonging to the
client itself. This frees the consumer from the need to maintain their own
goroutine to pull events from the channel, and to watch for closure of the
subscriber.

Before:

     s := eventbus.Subscribe[T](eventClient)
     go func() {
       for {
          select {
          case <-s.Done():
            return
          case e := <-s.Events():
            doSomethingWith(e)
          }
       }
     }()
     // ...
     s.Close()

After:

     func doSomethingWithT(e T) { ... }
     s := eventbus.SubscribeFunc(eventClient, doSomethingWithT)
     // ...
     s.Close()

Moreover, unless the caller wants to explicitly stop the subscriber separately
from its governing client, it need not capture the SubscriberFunc value at all.

One downside of this approach is that a slow or deadlocked callback could block
client's service routine and thus stall all other subscriptions on that client,
However, this can already happen more broadly if a subscriber fails to service
its delivery channel in a timely manner, it just feeds back more immediately.

Updates #17487

Change-Id: I64592d786005177aa9fd445c263178ed415784d5
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2 months ago
Brad Fitzpatrick a40f23ad4a util/eventbus: flesh out docs a bit
Updates #cleanup

Change-Id: Ia6b0e4b0426be1dd10a777aff0a81d4dd6b69b01
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2 months ago
Nick Khyl 866614202c util/eventbus: remove redundant code from eventbus.Publish
eventbus.Publish() calls newPublisher(), which in turn invokes (*Client).addPublisher().
That method adds the new publisher to c.pub, so we don’t need to add it again in eventbus.Publish.

Updates #cleanup

Signed-off-by: Nick Khyl <nickk@tailscale.com>
6 months ago
David Anderson 346a35f612 util/eventbus: add debugger methods to list pub/sub types
This lets debug tools list the types that clients are wielding, so
that they can build a dataflow graph and other debugging views.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson 853abf8661 util/eventbus: initial debugging facilities for the event bus
Enables monitoring events as they flow, listing bus clients, and
snapshotting internal queues to troubleshoot stalls.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson e80d2b4ad1 util/eventbus: add debug hooks to snoop on bus traffic
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson cf5c788cf1 util/eventbus: track additional event context in subscribe queue
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson bf40bc4fa0 util/eventbus: make internal queue a generic type
In preparation for making the queues carry additional event metadata.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson 24d4846f00 util/eventbus: adjust worker goroutine management helpers
This makes the helpers closer in behavior to cancelable contexts
and taskgroup.Single, and makes the worker code use a more normal
and easier to reason about context.Context for shutdown.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson 3e18434595 util/eventbus: rework to have a Client abstraction
The Client carries both publishers and subscribers for a single
actor. This makes the APIs for publish and subscribe look more
similar, and this structure is a better fit for upcoming debug
facilities.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson ef906763ee util/eventbus: initial implementation of an in-process event bus
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
Co-authored-by: M. J. Fromberger <fromberger@tailscale.com>
9 months ago