logtail: avoid racing eventbus subscriptions with Shutdown (#17639)

When the eventbus is enabled, set up the subscription for change deltas at the
beginning when the client is created, rather than waiting for the first
awaitInternetUp check.

Otherwise, it is possible for a check to race with the client close in
Shutdown, which triggers a panic.

Updates #17638

Change-Id: I461c07939eca46699072b14b1814ecf28eec750c
Signed-off-by: M. J. Fromberger <fromberger@tailscale.com>
(cherry picked from commit 4346615d77)
pull/17689/head
M. J. Fromberger 1 month ago committed by Nick Khyl
parent e6a3669277
commit 597acd8663
No known key found for this signature in database

@ -124,6 +124,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
if cfg.Bus != nil {
l.eventClient = cfg.Bus.Client("logtail.Logger")
l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
}
l.SetSockstatsLabel(sockstats.LabelLogtailLogger)
l.compressLogs = cfg.CompressLogs
@ -162,6 +163,7 @@ type Logger struct {
httpDoCalls atomic.Int32
sockstatsLabel atomicSocktatsLabel
eventClient *eventbus.Client
changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta]
procID uint32
includeProcSequence bool
@ -427,8 +429,23 @@ func (l *Logger) internetUp() bool {
func (l *Logger) awaitInternetUp(ctx context.Context) {
if l.eventClient != nil {
l.awaitInternetUpBus(ctx)
return
for {
if l.internetUp() {
return
}
select {
case <-ctx.Done():
return // give up
case <-l.changeDeltaSub.Done():
return // give up (closing down)
case delta := <-l.changeDeltaSub.Events():
if delta.New.AnyInterfaceUp() || l.internetUp() {
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
return
}
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
}
}
}
upc := make(chan bool, 1)
defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
@ -449,24 +466,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) {
}
}
func (l *Logger) awaitInternetUpBus(ctx context.Context) {
if l.internetUp() {
return
}
sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient)
defer sub.Close()
select {
case delta := <-sub.Events():
if delta.New.AnyInterfaceUp() {
fmt.Fprintf(l.stderr, "logtail: internet back up\n")
return
}
fmt.Fprintf(l.stderr, "logtail: network changed, but is not up")
case <-ctx.Done():
return
}
}
// upload uploads body to the log server.
// origlen indicates the pre-compression body length.
// origlen of -1 indicates that the body is not compressed.

Loading…
Cancel
Save