diff --git a/logtail/logtail.go b/logtail/logtail.go index 675422890..52823fedf 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -124,6 +124,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { if cfg.Bus != nil { l.eventClient = cfg.Bus.Client("logtail.Logger") + l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) } l.SetSockstatsLabel(sockstats.LabelLogtailLogger) l.compressLogs = cfg.CompressLogs @@ -162,6 +163,7 @@ type Logger struct { httpDoCalls atomic.Int32 sockstatsLabel atomicSocktatsLabel eventClient *eventbus.Client + changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] procID uint32 includeProcSequence bool @@ -427,8 +429,23 @@ func (l *Logger) internetUp() bool { func (l *Logger) awaitInternetUp(ctx context.Context) { if l.eventClient != nil { - l.awaitInternetUpBus(ctx) - return + for { + if l.internetUp() { + return + } + select { + case <-ctx.Done(): + return // give up + case <-l.changeDeltaSub.Done(): + return // give up (closing down) + case delta := <-l.changeDeltaSub.Events(): + if delta.New.AnyInterfaceUp() || l.internetUp() { + fmt.Fprintf(l.stderr, "logtail: internet back up\n") + return + } + fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") + } + } } upc := make(chan bool, 1) defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { @@ -449,24 +466,6 @@ func (l *Logger) awaitInternetUp(ctx context.Context) { } } -func (l *Logger) awaitInternetUpBus(ctx context.Context) { - if l.internetUp() { - return - } - sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) - defer sub.Close() - select { - case delta := <-sub.Events(): - if delta.New.AnyInterfaceUp() { - fmt.Fprintf(l.stderr, "logtail: internet back up\n") - return - } - fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") - case <-ctx.Done(): - return - } -} - // upload uploads body to the log server. // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed.