Commit Graph

12 Commits (main)

Author SHA1 Message Date
Nick Khyl 1ccece0f78 util/eventbus: use unbounded event queues for DeliveredEvents in subscribers
Bounded DeliveredEvent queues reduce memory usage, but they can deadlock under load.
Two common scenarios trigger deadlocks when the number of events published in a short
period exceeds twice the queue capacity (there's a PublishedEvent queue of the same size):
 - a subscriber tries to acquire the same mutex as held by a publisher, or
 - a subscriber for A events publishes B events

Avoiding these scenarios is not practical and would limit eventbus usefulness and reduce its adoption,
pushing us back to callbacks and other legacy mechanisms. These deadlocks already occurred in customer
devices, dev machines, and tests. They also make it harder to identify and fix slow subscribers and similar
issues we have been seeing recently.

Choosing an arbitrary large fixed queue capacity would only mask the problem. A client running
on a sufficiently large and complex customer environment can exceed any meaningful constant limit,
since event volume depends on the number of peers and other factors. Behavior also changes
based on scheduling of publishers and subscribers by the Go runtime, OS, and hardware, as the issue
is essentially a race between publishers and subscribers. Additionally, on lower-end devices,
an unreasonably high constant capacity is practically the same as using unbounded queues.

Therefore, this PR changes the event queue implementation to be unbounded by default.
The PublishedEvent queue keeps its existing capacity of 16 items, while subscribers'
DeliveredEvent queues become unbounded.

This change fixes known deadlocks and makes the system stable under load,
at the cost of higher potential memory usage, including cases where a queue grows
during an event burst and does not shrink when load decreases.

Further improvements can be implemented in the future as needed.

Fixes #17973
Fixes #18012

Signed-off-by: Nick Khyl <nickk@tailscale.com>
2 weeks ago
Nick Khyl 3780f25d51 util/eventbus: add tests for a subscriber publishing events
As of 2025-11-20, publishing more events than the eventbus's
internal queues can hold may deadlock if a subscriber tries
to publish events itself.

This commit adds a test that demonstrates this deadlock,
and skips it until the bug is fixed.

Updates #18012

Signed-off-by: Nick Khyl <nickk@tailscale.com>
2 weeks ago
Nick Khyl 016ccae2da util/eventbus: add tests for a subscriber trying to acquire the same mutex as a publisher
As of 2025-11-20, publishing more events than the eventbus's
internal queues can hold may deadlock if a subscriber tries
to acquire a mutex that can also be held by a publisher.

This commit adds a test that demonstrates this deadlock,
and skips it until the bug is fixed.

Updates #17973

Signed-off-by: Nick Khyl <nickk@tailscale.com>
2 weeks ago
M. J. Fromberger 4c856078e4
util/eventbus: block for the subscriber during SubscribeFunc close (#17642)
Prior to this change a SubscriberFunc treated the call to the subscriber's
function as the completion of delivery. But that means when we are closing the
subscriber, that callback could continue to execute for some time after the
close returns.

For channel-based subscribers that works OK because the close takes effect
before the subscriber ever sees the event. To make the two subscriber types
symmetric, we should also wait for the callback to finish before returning.
This ensures that a Close of the client means the same thing with both kinds of
subscriber.

Updates #17638

Change-Id: I82fd31bcaa4e92fab07981ac0e57e6e3a7d9d60b
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
1 month ago
M. J. Fromberger 061e6266cf
util/eventbus: allow logging of slow subscribers (#17705)
Add options to the eventbus.Bus to plumb in a logger.

Route that logger in to the subscriber machinery, and trigger a log message to
it when a subscriber fails to respond to its delivered events for 5s or more.

The log message includes the package, filename, and line number of the call
site that created the subscription.

Add tests that verify this works.

Updates #17680

Change-Id: I0546516476b1e13e6a9cf79f19db2fe55e56c698
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
1 month ago
M. J. Fromberger ad6cf2f8f3
util/eventbus: add a function-based subscriber type (#17432)
Originally proposed by @bradfitz in #17413.

In practice, a lot of subscribers have only one event type of interest, or a
small number of mostly independent ones. In that case, the overhead of running
and maintaining a goroutine to select on multiple channels winds up being more
noisy than we'd like for the user of the API.

For this common case, add a new SubscriberFunc[T] type that delivers events to
a callback owned by the subscriber, directly on the goroutine belonging to the
client itself. This frees the consumer from the need to maintain their own
goroutine to pull events from the channel, and to watch for closure of the
subscriber.

Before:

     s := eventbus.Subscribe[T](eventClient)
     go func() {
       for {
          select {
          case <-s.Done():
            return
          case e := <-s.Events():
            doSomethingWith(e)
          }
       }
     }()
     // ...
     s.Close()

After:

     func doSomethingWithT(e T) { ... }
     s := eventbus.SubscribeFunc(eventClient, doSomethingWithT)
     // ...
     s.Close()

Moreover, unless the caller wants to explicitly stop the subscriber separately
from its governing client, it need not capture the SubscriberFunc value at all.

One downside of this approach is that a slow or deadlocked callback could block
client's service routine and thus stall all other subscriptions on that client,
However, this can already happen more broadly if a subscriber fails to service
its delivery channel in a timely manner, it just feeds back more immediately.

Updates #17487

Change-Id: I64592d786005177aa9fd445c263178ed415784d5
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2 months ago
M. J. Fromberger df747f1c1b
util/eventbus: add a Done method to the Monitor type (#17263)
Some systems need to tell whether the monitored goroutine has finished
alongside other channel operations (notably in this case the relay server, but
there seem likely to be others similarly situated).

Updates #15160

Change-Id: I5f0f3fae827b07f9b7102a3b08f60cda9737fe28
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2 months ago
M. J. Fromberger e59fbaab64
util/eventbus: give a nicer error when attempting to use a closed client (#17208)
It is a programming error to Publish or Subscribe on a closed Client, but now
the way you discover that is by getting a panic from down in the machinery of
the bus after the client state has been cleaned up.

To provide a more helpful error, let's panic explicitly when that happens and
say what went wrong ("the client is closed"), by preventing subscriptions from
interleaving with closure of the client. With this change, either an attachment
fails outright (because the client is already closed) or completes and then
shuts down in good order in the normal course.

This does not change the semantics of the client, publishers, or subscribers,
it's just making the failure more eager so we can attach explanatory text.

Updates #15160

Change-Id: Ia492f4c1dea7535aec2cdcc2e5ea5410ed5218d2
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
2 months ago
M. J. Fromberger ca9d795006
util/eventbus: add a Monitor type to manage subscriber goroutines (#17127)
A common pattern in event bus usage is to run a goroutine to service a
collection of subscribers on a single bus client. To have an orderly shutdown,
however, we need a way to wait for such a goroutine to be finished.

This commit adds a Monitor type that makes this pattern easier to wire up:
rather than having to track all the subscribers and an extra channel, the
component need only track the client and the monitor.  For example:

   cli := bus.Client("example")
   m := cli.Monitor(func(c *eventbus.Client) {
     s1 := eventbus.Subscribe[T](cli)
     s2 := eventbus.Subscribe[U](cli)
     for {
       select {
       case <-c.Done():
         return
       case t := <-s1.Events():
          processT(t)
       case u := <-s2.Events():
          processU(u)
       }
     }
   })

To shut down the client and wait for the goroutine, the caller can write:

   m.Close()

which closes cli and waits for the goroutine to finish. Or, separately:

   cli.Close()
   // do other stuff
   m.Wait()

While the goroutine management is not explicitly tied to subscriptions, it is a
common enough pattern that this seems like a useful simplification in use.

Updates #15160

Change-Id: I657afda1cfaf03465a9dce1336e9fd518a968bca
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
3 months ago
M. J. Fromberger 5b5ae2b2ee
util/eventbus: add a Done channel to the Client (#17118)
Subscribers already have a Done channel that the caller can use to detect when
the subscriber has been closed. Typically this happens when the governing
Client closes, which in turn is typically because the Bus closed.

But clients and subscribers can stop at other times too, and a caller has no
good way to tell the difference between "this subscriber closed but the rest
are OK" and "the client closed and all these subscribers are finished".

We've worked around this in practice by knowing the closure of one subscriber
implies the fate of the rest, but we can do better: Add a Done method to the
Client that allows us to tell when that has been closed explicitly, after all
the publishers and subscribers associated with that client have been closed.
This allows the caller to be sure that, by the time that occurs, no further
pending events are forthcoming on that client.

Updates #15160

Change-Id: Id601a79ba043365ecdb47dd035f1fdadd984f303
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
3 months ago
David Anderson 3e18434595 util/eventbus: rework to have a Client abstraction
The Client carries both publishers and subscribers for a single
actor. This makes the APIs for publish and subscribe look more
similar, and this structure is a better fit for upcoming debug
facilities.

Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
9 months ago
David Anderson ef906763ee util/eventbus: initial implementation of an in-process event bus
Updates #15160

Signed-off-by: David Anderson <dave@tailscale.com>
Co-authored-by: M. J. Fromberger <fromberger@tailscale.com>
9 months ago