From 061e6266cf4e9c9a0f06b0d60d4d7840f6b7678d Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 30 Oct 2025 14:40:57 -0700 Subject: [PATCH] util/eventbus: allow logging of slow subscribers (#17705) Add options to the eventbus.Bus to plumb in a logger. Route that logger in to the subscriber machinery, and trigger a log message to it when a subscriber fails to respond to its delivered events for 5s or more. The log message includes the package, filename, and line number of the call site that created the subscription. Add tests that verify this works. Updates #17680 Change-Id: I0546516476b1e13e6a9cf79f19db2fe55e56c698 Signed-off-by: M. J. Fromberger --- flake.nix | 2 +- go.mod | 2 +- go.mod.sri | 2 +- go.sum | 4 +-- shell.nix | 2 +- util/eventbus/bus.go | 35 ++++++++++++++++-- util/eventbus/bus_test.go | 73 ++++++++++++++++++++++++++++++++++++++ util/eventbus/client.go | 7 ++-- util/eventbus/debug.go | 36 +++++++++++++++++++ util/eventbus/subscribe.go | 35 ++++++++++++++++-- 10 files changed, 185 insertions(+), 13 deletions(-) diff --git a/flake.nix b/flake.nix index da4c87a0b..e50f39638 100644 --- a/flake.nix +++ b/flake.nix @@ -151,5 +151,5 @@ }); }; } -# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +# nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/go.mod b/go.mod index 12f7946b8..836810fc0 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/snappy v0.0.4 github.com/golangci/golangci-lint v1.57.1 - github.com/google/go-cmp v0.6.0 + github.com/google/go-cmp v0.7.0 github.com/google/go-containerregistry v0.20.3 github.com/google/go-tpm v0.9.4 github.com/google/gopacket v1.1.19 diff --git a/go.mod.sri b/go.mod.sri index c9f537473..108423f4e 100644 --- a/go.mod.sri +++ b/go.mod.sri @@ -1 +1 @@ -sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/go.sum b/go.sum index eea0d6c7d..a0d9461ec 100644 --- a/go.sum +++ b/go.sum @@ -492,8 +492,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-containerregistry v0.20.3 h1:oNx7IdTI936V8CQRveCjaxOiegWwvM7kqkbXTpyiovI= github.com/google/go-containerregistry v0.20.3/go.mod h1:w00pIgBRDVUDFM6bq+Qx8lwNWK+cxgCuX1vd3PIBDNI= github.com/google/go-github/v66 v66.0.0 h1:ADJsaXj9UotwdgK8/iFZtv7MLc8E8WBl62WLd/D/9+M= diff --git a/shell.nix b/shell.nix index 99cfbd243..6b579b455 100644 --- a/shell.nix +++ b/shell.nix @@ -16,4 +16,4 @@ ) { src = ./.; }).shellNix -# nix-direnv cache busting line: sha256-pZCy1KHUe7f7cjm816OwA+bjGrSRnSTxkvCmB4cmWqw= +# nix-direnv cache busting line: sha256-D0znIEcy9d822snZbdNCNLoN47cOP1F2SKmfwSFRvXw= diff --git a/util/eventbus/bus.go b/util/eventbus/bus.go index d1507d8e6..b1639136a 100644 --- a/util/eventbus/bus.go +++ b/util/eventbus/bus.go @@ -5,10 +5,12 @@ package eventbus import ( "context" + "log" "reflect" "slices" "sync" + "tailscale.com/types/logger" "tailscale.com/util/set" ) @@ -30,6 +32,7 @@ type Bus struct { write chan PublishedEvent snapshot chan chan []PublishedEvent routeDebug hook[RoutedEvent] + logf logger.Logf topicsMu sync.Mutex topics map[reflect.Type][]*subscribeState @@ -40,19 +43,42 @@ type Bus struct { clients set.Set[*Client] } -// New returns a new bus. Use [Publish] to make event publishers, -// and [Subscribe] and [SubscribeFunc] to make event subscribers. -func New() *Bus { +// New returns a new bus with default options. It is equivalent to +// calling [NewWithOptions] with zero [BusOptions]. +func New() *Bus { return NewWithOptions(BusOptions{}) } + +// NewWithOptions returns a new [Bus] with the specified [BusOptions]. +// Use [Bus.Client] to construct clients on the bus. +// Use [Publish] to make event publishers. +// Use [Subscribe] and [SubscribeFunc] to make event subscribers. +func NewWithOptions(opts BusOptions) *Bus { ret := &Bus{ write: make(chan PublishedEvent), snapshot: make(chan chan []PublishedEvent), topics: map[reflect.Type][]*subscribeState{}, clients: set.Set[*Client]{}, + logf: opts.logger(), } ret.router = runWorker(ret.pump) return ret } +// BusOptions are optional parameters for a [Bus]. A zero value is ready for +// use and provides defaults as described. +type BusOptions struct { + // Logf, if non-nil, is used for debug logs emitted by the bus and clients, + // publishers, and subscribers under its care. If it is nil, logs are sent + // to [log.Printf]. + Logf logger.Logf +} + +func (o BusOptions) logger() logger.Logf { + if o.Logf == nil { + return log.Printf + } + return o.Logf +} + // Client returns a new client with no subscriptions. Use [Subscribe] // to receive events, and [Publish] to emit events. // @@ -166,6 +192,9 @@ func (b *Bus) pump(ctx context.Context) { } } +// logger returns a [logger.Logf] to which logs related to bus activity should be written. +func (b *Bus) logger() logger.Logf { return b.logf } + func (b *Bus) dest(t reflect.Type) []*subscribeState { b.topicsMu.Lock() defer b.topicsMu.Unlock() diff --git a/util/eventbus/bus_test.go b/util/eventbus/bus_test.go index de292cf1a..1e0cd8abf 100644 --- a/util/eventbus/bus_test.go +++ b/util/eventbus/bus_test.go @@ -4,8 +4,11 @@ package eventbus_test import ( + "bytes" "errors" "fmt" + "log" + "regexp" "testing" "testing/synctest" "time" @@ -436,6 +439,76 @@ func TestMonitor(t *testing.T) { t.Run("Wait", testMon(t, func(c *eventbus.Client, m eventbus.Monitor) { c.Close(); m.Wait() })) } +func TestSlowSubs(t *testing.T) { + swapLogBuf := func(t *testing.T) *bytes.Buffer { + logBuf := new(bytes.Buffer) + save := log.Writer() + log.SetOutput(logBuf) + t.Cleanup(func() { log.SetOutput(save) }) + return logBuf + } + + t.Run("Subscriber", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + pc := b.Client("pub") + p := eventbus.Publish[EventA](pc) + + sc := b.Client("sub") + s := eventbus.Subscribe[EventA](sc) + + go func() { + time.Sleep(6 * time.Second) // trigger the slow check at 5s. + t.Logf("Subscriber accepted %v", <-s.Events()) + }() + + p.Publish(EventA{12345}) + + time.Sleep(7 * time.Second) // advance time... + synctest.Wait() // subscriber is done + + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `subscriber for eventbus_test.EventA is slow.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want) + } + }) + }) + + t.Run("SubscriberFunc", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + buf := swapLogBuf(t) + + b := eventbus.New() + defer b.Close() + + pc := b.Client("pub") + p := eventbus.Publish[EventB](pc) + + sc := b.Client("sub") + eventbus.SubscribeFunc[EventB](sc, func(e EventB) { + time.Sleep(6 * time.Second) // trigger the slow check at 5s. + t.Logf("SubscriberFunc processed %v", e) + }) + + p.Publish(EventB{67890}) + + time.Sleep(7 * time.Second) // advance time... + synctest.Wait() // subscriber is done + + want := regexp.MustCompile(`^.* tailscale.com/util/eventbus_test bus_test.go:\d+: ` + + `subscriber for eventbus_test.EventB is slow.*`) + if got := buf.String(); !want.MatchString(got) { + t.Errorf("Wrong log output\ngot: %q\nwant: %s", got, want) + } + }) + }) +} + func TestRegression(t *testing.T) { bus := eventbus.New() t.Cleanup(bus.Close) diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 9e3f3ee76..c119c67a9 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -7,6 +7,7 @@ import ( "reflect" "sync" + "tailscale.com/types/logger" "tailscale.com/util/set" ) @@ -29,6 +30,8 @@ type Client struct { func (c *Client) Name() string { return c.name } +func (c *Client) logger() logger.Logf { return c.bus.logger() } + // Close closes the client. It implicitly closes all publishers and // subscribers obtained from this client. func (c *Client) Close() { @@ -142,7 +145,7 @@ func Subscribe[T any](c *Client) *Subscriber[T] { } r := c.subscribeStateLocked() - s := newSubscriber[T](r) + s := newSubscriber[T](r, logfForCaller(c.logger())) r.addSubscriber(s) return s } @@ -165,7 +168,7 @@ func SubscribeFunc[T any](c *Client, f func(T)) *SubscriberFunc[T] { } r := c.subscribeStateLocked() - s := newSubscriberFunc[T](r, f) + s := newSubscriberFunc[T](r, f, logfForCaller(c.logger())) r.addSubscriber(s) return s } diff --git a/util/eventbus/debug.go b/util/eventbus/debug.go index 6d5463bec..2f2c9589a 100644 --- a/util/eventbus/debug.go +++ b/util/eventbus/debug.go @@ -6,12 +6,22 @@ package eventbus import ( "cmp" "fmt" + "path/filepath" "reflect" + "runtime" "slices" + "strings" "sync" "sync/atomic" + "time" + + "tailscale.com/types/logger" ) +// slowSubscriberTimeout is a timeout after which a subscriber that does not +// accept a pending event will be flagged as being slow. +const slowSubscriberTimeout = 5 * time.Second + // A Debugger offers access to a bus's privileged introspection and // debugging facilities. // @@ -204,3 +214,29 @@ type DebugTopic struct { Publisher string Subscribers []string } + +// logfForCaller returns a [logger.Logf] that prefixes its output with the +// package, filename, and line number of the caller's caller. +// If logf == nil, it returns [logger.Discard]. +// If the caller location could not be determined, it returns logf unmodified. +func logfForCaller(logf logger.Logf) logger.Logf { + if logf == nil { + return logger.Discard + } + pc, fpath, line, _ := runtime.Caller(2) // +1 for my caller, +1 for theirs + if f := runtime.FuncForPC(pc); f != nil { + return logger.WithPrefix(logf, fmt.Sprintf("%s %s:%d: ", funcPackageName(f.Name()), filepath.Base(fpath), line)) + } + return logf +} + +func funcPackageName(funcName string) string { + ls := max(strings.LastIndex(funcName, "/"), 0) + for { + i := strings.LastIndex(funcName, ".") + if i <= ls { + return funcName + } + funcName = funcName[:i] + } +} diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index c35c7e7f0..0b821b3f5 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -8,6 +8,9 @@ import ( "fmt" "reflect" "sync" + "time" + + "tailscale.com/types/logger" ) type DeliveredEvent struct { @@ -182,12 +185,18 @@ type Subscriber[T any] struct { stop stopFlag read chan T unregister func() + logf logger.Logf + slow *time.Timer // used to detect slow subscriber service } -func newSubscriber[T any](r *subscribeState) *Subscriber[T] { +func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { + slow := time.NewTimer(0) + slow.Stop() // reset in dispatch return &Subscriber[T]{ read: make(chan T), unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + logf: logf, + slow: slow, } } @@ -212,6 +221,11 @@ func (s *Subscriber[T]) monitor(debugEvent T) { func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { t := vals.Peek().Event.(T) + + start := time.Now() + s.slow.Reset(slowSubscriberTimeout) + defer s.slow.Stop() + for { // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select @@ -226,6 +240,9 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent return false case ch := <-snapshot: ch <- vals.Snapshot() + case <-s.slow.C: + s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) + s.slow.Reset(slowSubscriberTimeout) } } } @@ -260,12 +277,18 @@ type SubscriberFunc[T any] struct { stop stopFlag read func(T) unregister func() + logf logger.Logf + slow *time.Timer // used to detect slow subscriber service } -func newSubscriberFunc[T any](r *subscribeState, f func(T)) *SubscriberFunc[T] { +func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { + slow := time.NewTimer(0) + slow.Stop() // reset in dispatch return &SubscriberFunc[T]{ read: f, unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, + logf: logf, + slow: slow, } } @@ -285,6 +308,11 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE t := vals.Peek().Event.(T) callDone := make(chan struct{}) go s.runCallback(t, callDone) + + start := time.Now() + s.slow.Reset(slowSubscriberTimeout) + defer s.slow.Stop() + // Keep the cases in this select in sync with subscribeState.pump // above. The only difference should be that this select // delivers a value by calling s.read. @@ -299,6 +327,9 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE return false case ch := <-snapshot: ch <- vals.Snapshot() + case <-s.slow.C: + s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) + s.slow.Reset(slowSubscriberTimeout) } } }