From 597acd86630ac51ebb6932809b121ec872b7a9c3 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 24 Oct 2025 14:08:47 -0700 Subject: [PATCH] logtail: avoid racing eventbus subscriptions with Shutdown (#17639) When the eventbus is enabled, set up the subscription for change deltas at the beginning when the client is created, rather than waiting for the first awaitInternetUp check. Otherwise, it is possible for a check to race with the client close in Shutdown, which triggers a panic. Updates #17638 Change-Id: I461c07939eca46699072b14b1814ecf28eec750c Signed-off-by: M. J. Fromberger (cherry picked from commit 4346615d77a6de16854c6e78f9d49375d6424e6e) --- logtail/logtail.go | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/logtail/logtail.go b/logtail/logtail.go index 675422890..52823fedf 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -124,6 +124,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { if cfg.Bus != nil { l.eventClient = cfg.Bus.Client("logtail.Logger") + l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) } l.SetSockstatsLabel(sockstats.LabelLogtailLogger) l.compressLogs = cfg.CompressLogs @@ -162,6 +163,7 @@ type Logger struct { httpDoCalls atomic.Int32 sockstatsLabel atomicSocktatsLabel eventClient *eventbus.Client + changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] procID uint32 includeProcSequence bool @@ -427,8 +429,23 @@ func (l *Logger) internetUp() bool { func (l *Logger) awaitInternetUp(ctx context.Context) { if l.eventClient != nil { - l.awaitInternetUpBus(ctx) - return + for { + if l.internetUp() { + return + } + select { + case <-ctx.Done(): + return // give up + case <-l.changeDeltaSub.Done(): + return // give up (closing down) + case delta := <-l.changeDeltaSub.Events(): + if delta.New.AnyInterfaceUp() || l.internetUp() { + fmt.Fprintf(l.stderr, "logtail: internet back up\n") + return + } + fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") + } + } } upc := make(chan bool, 1) defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { @@ -449,24 +466,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) { } } -func (l *Logger) awaitInternetUpBus(ctx context.Context) { - if l.internetUp() { - return - } - sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) - defer sub.Close() - select { - case delta := <-sub.Events(): - if delta.New.AnyInterfaceUp() { - fmt.Fprintf(l.stderr, "logtail: internet back up\n") - return - } - fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") - case <-ctx.Done(): - return - } -} - // upload uploads body to the log server. // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed.