From ce752b8a88214a2d45477aa8b77384175ebbdf18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Claus=20Lensb=C3=B8l?= Date: Wed, 1 Oct 2025 14:59:38 -0400 Subject: [PATCH] net/netmon: remove usage of direct callbacks from netmon (#17292) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The callback itself is not removed as it is used in other repos, making it simpler for those to slowly transition to the eventbus. Updates #15160 Signed-off-by: Claus Lensbøl --- cmd/tailscaled/debug.go | 33 ++++++++++++---- cmd/tailscaled/tailscaled.go | 11 +++++- cmd/tsconnect/wasm/wasm_js.go | 2 +- control/controlclient/controlclient_test.go | 2 + control/controlclient/direct_test.go | 8 +++- control/controlclient/noise_test.go | 3 ++ control/controlhttp/http_test.go | 7 +++- ipn/ipnlocal/local.go | 2 +- ipn/ipnlocal/local_test.go | 8 +++- ipn/ipnlocal/network-lock_test.go | 4 +- ipn/ipnlocal/state_test.go | 1 + log/sockstatlog/logger.go | 4 +- log/sockstatlog/logger_test.go | 2 +- logpolicy/logpolicy.go | 7 ++++ logtail/config.go | 2 + logtail/logtail.go | 31 +++++++++++++++ logtail/logtail_test.go | 7 +++- net/dns/manager.go | 4 +- net/dns/manager_tcp_test.go | 10 ++++- net/dns/manager_test.go | 10 ++++- net/dns/resolver/forwarder_test.go | 2 +- net/dns/resolver/tsdns_test.go | 7 +++- net/netmon/loghelper.go | 22 +++++++++-- net/netmon/loghelper_test.go | 21 ++++------ net/tsdial/tsdial.go | 43 +++++++++++++++++++++ tsnet/tsnet.go | 2 + wgengine/netlog/netlog.go | 4 +- wgengine/userspace.go | 6 ++- 28 files changed, 217 insertions(+), 48 deletions(-) diff --git a/cmd/tailscaled/debug.go b/cmd/tailscaled/debug.go index 96f98d9d6..bcc34fb0d 100644 --- a/cmd/tailscaled/debug.go +++ b/cmd/tailscaled/debug.go @@ -104,14 +104,10 @@ func runMonitor(ctx context.Context, loop bool) error { } defer mon.Close() - mon.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { - if !delta.Major { - log.Printf("Network monitor fired; not a major change") - return - } - log.Printf("Network monitor fired. New state:") - dump(delta.New) - }) + eventClient := b.Client("debug.runMonitor") + m := eventClient.Monitor(changeDeltaWatcher(eventClient, ctx, dump)) + defer m.Close() + if loop { log.Printf("Starting link change monitor; initial state:") } @@ -124,6 +120,27 @@ func runMonitor(ctx context.Context, loop bool) error { select {} } +func changeDeltaWatcher(ec *eventbus.Client, ctx context.Context, dump func(st *netmon.State)) func(*eventbus.Client) { + changeSub := eventbus.Subscribe[netmon.ChangeDelta](ec) + return func(ec *eventbus.Client) { + for { + select { + case <-ctx.Done(): + return + case <-ec.Done(): + return + case delta := <-changeSub.Events(): + if !delta.Major { + log.Printf("Network monitor fired; not a major change") + return + } + log.Printf("Network monitor fired. New state:") + dump(delta.New) + } + } + } +} + func getURL(ctx context.Context, urlStr string) error { if urlStr == "login" { urlStr = "https://login.tailscale.com" diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index 8de473b7c..27fec05a3 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -433,7 +433,13 @@ func run() (err error) { var publicLogID logid.PublicID if buildfeatures.HasLogTail { - pol := logpolicy.New(logtail.CollectionNode, netMon, sys.HealthTracker.Get(), nil /* use log.Printf */) + + pol := logpolicy.Options{ + Collection: logtail.CollectionNode, + NetMon: netMon, + Health: sys.HealthTracker.Get(), + Bus: sys.Bus.Get(), + }.New() pol.SetVerbosityLevel(args.verbose) publicLogID = pol.PublicID logPol = pol @@ -470,7 +476,7 @@ func run() (err error) { // Always clean up, even if we're going to run the server. This covers cases // such as when a system was rebooted without shutting down, or tailscaled // crashed, and would for example restore system DNS configuration. - dns.CleanUp(logf, netMon, sys.HealthTracker.Get(), args.tunname) + dns.CleanUp(logf, netMon, sys.Bus.Get(), sys.HealthTracker.Get(), args.tunname) router.CleanUp(logf, netMon, args.tunname) // If the cleanUp flag was passed, then exit. if args.cleanUp { @@ -616,6 +622,7 @@ func getLocalBackend(ctx context.Context, logf logger.Logf, logID logid.PublicID } dialer := &tsdial.Dialer{Logf: logf} // mutated below (before used) + dialer.SetBus(sys.Bus.Get()) sys.Set(dialer) onlyNetstack, err := createEngine(logf, sys) diff --git a/cmd/tsconnect/wasm/wasm_js.go b/cmd/tsconnect/wasm/wasm_js.go index fbf7968a0..2e81fa4a8 100644 --- a/cmd/tsconnect/wasm/wasm_js.go +++ b/cmd/tsconnect/wasm/wasm_js.go @@ -104,6 +104,7 @@ func newIPN(jsConfig js.Value) map[string]any { sys := tsd.NewSystem() sys.Set(store) dialer := &tsdial.Dialer{Logf: logf} + dialer.SetBus(sys.Bus.Get()) eng, err := wgengine.NewUserspaceEngine(logf, wgengine.Config{ Dialer: dialer, SetSubsystem: sys.Set, @@ -463,7 +464,6 @@ func (s *jsSSHSession) Run() { cols = s.pendingResizeCols } err = session.RequestPty("xterm", rows, cols, ssh.TerminalModes{}) - if err != nil { writeError("Pseudo Terminal", err) return diff --git a/control/controlclient/controlclient_test.go b/control/controlclient/controlclient_test.go index 78646d76a..3914d10ef 100644 --- a/control/controlclient/controlclient_test.go +++ b/control/controlclient/controlclient_test.go @@ -223,6 +223,7 @@ func TestDirectProxyManual(t *testing.T) { dialer := &tsdial.Dialer{} dialer.SetNetMon(netmon.NewStatic()) + dialer.SetBus(bus) opts := Options{ Persist: persist.Persist{}, @@ -300,6 +301,7 @@ func testHTTPS(t *testing.T, withProxy bool) { dialer := &tsdial.Dialer{} dialer.SetNetMon(netmon.NewStatic()) + dialer.SetBus(bus) dialer.SetSystemDialerForTest(func(ctx context.Context, network, addr string) (net.Conn, error) { host, _, err := net.SplitHostPort(addr) if err != nil { diff --git a/control/controlclient/direct_test.go b/control/controlclient/direct_test.go index bba76d6f0..dd93dc7b3 100644 --- a/control/controlclient/direct_test.go +++ b/control/controlclient/direct_test.go @@ -27,13 +27,15 @@ func TestNewDirect(t *testing.T) { bus := eventbustest.NewBus(t) k := key.NewMachine() + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) opts := Options{ ServerURL: "https://example.com", Hostinfo: hi, GetMachinePrivateKey: func() (key.MachinePrivate, error) { return k, nil }, - Dialer: tsdial.NewDialer(netmon.NewStatic()), + Dialer: dialer, Bus: bus, } c, err := NewDirect(opts) @@ -105,13 +107,15 @@ func TestTsmpPing(t *testing.T) { bus := eventbustest.NewBus(t) k := key.NewMachine() + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) opts := Options{ ServerURL: "https://example.com", Hostinfo: hi, GetMachinePrivateKey: func() (key.MachinePrivate, error) { return k, nil }, - Dialer: tsdial.NewDialer(netmon.NewStatic()), + Dialer: dialer, Bus: bus, } diff --git a/control/controlclient/noise_test.go b/control/controlclient/noise_test.go index 4904016f2..d9c71cf27 100644 --- a/control/controlclient/noise_test.go +++ b/control/controlclient/noise_test.go @@ -22,6 +22,7 @@ import ( "tailscale.com/tstest/nettest" "tailscale.com/types/key" "tailscale.com/types/logger" + "tailscale.com/util/eventbus/eventbustest" ) // maxAllowedNoiseVersion is the highest we expect the Tailscale @@ -175,6 +176,7 @@ func (tt noiseClientTest) run(t *testing.T) { serverPrivate := key.NewMachine() clientPrivate := key.NewMachine() chalPrivate := key.NewChallenge() + bus := eventbustest.NewBus(t) const msg = "Hello, client" h2 := &http2.Server{} @@ -194,6 +196,7 @@ func (tt noiseClientTest) run(t *testing.T) { defer hs.Close() dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) if nettest.PreferMemNetwork() { dialer.SetSystemDialerForTest(nw.Dial) } diff --git a/control/controlhttp/http_test.go b/control/controlhttp/http_test.go index 6485761ac..648b9e5ed 100644 --- a/control/controlhttp/http_test.go +++ b/control/controlhttp/http_test.go @@ -149,6 +149,8 @@ func testControlHTTP(t *testing.T, param httpTestParam) { proxy := param.proxy client, server := key.NewMachine(), key.NewMachine() + bus := eventbustest.NewBus(t) + const testProtocolVersion = 1 const earlyWriteMsg = "Hello, world!" sch := make(chan serverResult, 1) @@ -218,6 +220,7 @@ func testControlHTTP(t *testing.T, param httpTestParam) { netMon := netmon.NewStatic() dialer := tsdial.NewDialer(netMon) + dialer.SetBus(bus) a := &Dialer{ Hostname: "localhost", HTTPPort: strconv.Itoa(httpLn.Addr().(*net.TCPAddr).Port), @@ -775,7 +778,7 @@ func runDialPlanTest(t *testing.T, plan *tailcfg.ControlDialPlan, want []netip.A if allowFallback { host = fallbackAddr.String() } - + bus := eventbustest.NewBus(t) a := &Dialer{ Hostname: host, HTTPPort: httpPort, @@ -790,7 +793,7 @@ func runDialPlanTest(t *testing.T, plan *tailcfg.ControlDialPlan, want []netip.A omitCertErrorLogging: true, testFallbackDelay: 50 * time.Millisecond, Clock: clock, - HealthTracker: health.NewTracker(eventbustest.NewBus(t)), + HealthTracker: health.NewTracker(bus), } start := time.Now() diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 5e738572f..af5a40550 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -526,7 +526,7 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo }() netMon := sys.NetMon.Get() - b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker.Get()) + b.sockstatLogger, err = sockstatlog.NewLogger(logpolicy.LogsDir(logf), logf, logID, netMon, sys.HealthTracker.Get(), sys.Bus.Get()) if err != nil { log.Printf("error setting up sockstat logger: %v", err) } diff --git a/ipn/ipnlocal/local_test.go b/ipn/ipnlocal/local_test.go index 571f472cc..ec65c67ee 100644 --- a/ipn/ipnlocal/local_test.go +++ b/ipn/ipnlocal/local_test.go @@ -480,7 +480,9 @@ func newTestLocalBackendWithSys(t testing.TB, sys *tsd.System) *LocalBackend { t.Log("Added fake userspace engine for testing") } if _, ok := sys.Dialer.GetOK(); !ok { - sys.Set(tsdial.NewDialer(netmon.NewStatic())) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(sys.Bus.Get()) + sys.Set(dialer) t.Log("Added static dialer for testing") } lb, err := NewLocalBackend(logf, logid.PublicID{}, sys, 0) @@ -3108,12 +3110,14 @@ func TestAutoExitNodeSetNetInfoCallback(t *testing.T) { b.hostinfo = hi k := key.NewMachine() var cc *mockControl + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(sys.Bus.Get()) opts := controlclient.Options{ ServerURL: "https://example.com", GetMachinePrivateKey: func() (key.MachinePrivate, error) { return k, nil }, - Dialer: tsdial.NewDialer(netmon.NewStatic()), + Dialer: dialer, Logf: b.logf, PolicyClient: polc, } diff --git a/ipn/ipnlocal/network-lock_test.go b/ipn/ipnlocal/network-lock_test.go index 0d3f7db43..c7c4c905f 100644 --- a/ipn/ipnlocal/network-lock_test.go +++ b/ipn/ipnlocal/network-lock_test.go @@ -54,6 +54,8 @@ func fakeControlClient(t *testing.T, c *http.Client) (*controlclient.Auto, *even bus := eventbustest.NewBus(t) k := key.NewMachine() + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) opts := controlclient.Options{ ServerURL: "https://example.com", Hostinfo: hi, @@ -63,7 +65,7 @@ func fakeControlClient(t *testing.T, c *http.Client) (*controlclient.Auto, *even HTTPTestClient: c, NoiseTestClient: c, Observer: observerFunc(func(controlclient.Status) {}), - Dialer: tsdial.NewDialer(netmon.NewStatic()), + Dialer: dialer, Bus: bus, } diff --git a/ipn/ipnlocal/state_test.go b/ipn/ipnlocal/state_test.go index 347aaf8b8..a387af035 100644 --- a/ipn/ipnlocal/state_test.go +++ b/ipn/ipnlocal/state_test.go @@ -1668,6 +1668,7 @@ func newLocalBackendWithMockEngineAndControl(t *testing.T, enableLogging bool) ( sys := tsd.NewSystemWithBus(bus) sys.Set(dialer) sys.Set(dialer.NetMon()) + dialer.SetBus(bus) magicConn, err := magicsock.NewConn(magicsock.Options{ Logf: logf, diff --git a/log/sockstatlog/logger.go b/log/sockstatlog/logger.go index 4f8909725..e0744de0f 100644 --- a/log/sockstatlog/logger.go +++ b/log/sockstatlog/logger.go @@ -26,6 +26,7 @@ import ( "tailscale.com/net/sockstats" "tailscale.com/types/logger" "tailscale.com/types/logid" + "tailscale.com/util/eventbus" "tailscale.com/util/mak" ) @@ -97,7 +98,7 @@ func SockstatLogID(logID logid.PublicID) logid.PrivateID { // // The netMon parameter is optional. It should be specified in environments where // Tailscaled is manipulating the routing table. -func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *netmon.Monitor, health *health.Tracker) (*Logger, error) { +func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus) (*Logger, error) { if !sockstats.IsAvailable || !buildfeatures.HasLogTail { return nil, nil } @@ -127,6 +128,7 @@ func NewLogger(logdir string, logf logger.Logf, logID logid.PublicID, netMon *ne PrivateID: SockstatLogID(logID), Collection: "sockstats.log.tailscale.io", Buffer: filch, + Bus: bus, CompressLogs: true, FlushDelayFn: func() time.Duration { // set flush delay to 100 years so it never flushes automatically diff --git a/log/sockstatlog/logger_test.go b/log/sockstatlog/logger_test.go index 31fb17e46..e5c2feb29 100644 --- a/log/sockstatlog/logger_test.go +++ b/log/sockstatlog/logger_test.go @@ -24,7 +24,7 @@ func TestResourceCleanup(t *testing.T) { if err != nil { t.Fatal(err) } - lg, err := NewLogger(td, logger.Discard, id.Public(), nil, nil) + lg, err := NewLogger(td, logger.Discard, id.Public(), nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/logpolicy/logpolicy.go b/logpolicy/logpolicy.go index c1f3e553a..9c7e62ab0 100644 --- a/logpolicy/logpolicy.go +++ b/logpolicy/logpolicy.go @@ -50,6 +50,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/logid" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/must" "tailscale.com/util/racebuild" "tailscale.com/util/syspolicy/pkey" @@ -489,6 +490,11 @@ type Options struct { // If non-nil, it's used to construct the default HTTP client. Health *health.Tracker + // Bus is an optional parameter for communication on the eventbus. + // If non-nil, it's passed to logtail for use in interface monitoring. + // TODO(cmol): Make this non-optional when it's plumbed in by the clients. + Bus *eventbus.Bus + // Logf is an optional logger to use. // If nil, [log.Printf] will be used instead. Logf logger.Logf @@ -615,6 +621,7 @@ func (opts Options) init(disableLogging bool) (*logtail.Config, *Policy) { Stderr: logWriter{console}, CompressLogs: true, MaxUploadSize: opts.MaxUploadSize, + Bus: opts.Bus, } if opts.Collection == logtail.CollectionNode { conf.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta diff --git a/logtail/config.go b/logtail/config.go index a6c068c0c..bf47dd8aa 100644 --- a/logtail/config.go +++ b/logtail/config.go @@ -10,6 +10,7 @@ import ( "tailscale.com/tstime" "tailscale.com/types/logid" + "tailscale.com/util/eventbus" ) // DefaultHost is the default host name to upload logs to when @@ -34,6 +35,7 @@ type Config struct { LowMemory bool // if true, logtail minimizes memory use Clock tstime.Clock // if set, Clock.Now substitutes uses of time.Now Stderr io.Writer // if set, logs are sent here instead of os.Stderr + Bus *eventbus.Bus // if set, uses the eventbus for awaitInternetUp instead of callback StderrLevel int // max verbosity level to write to stderr; 0 means the non-verbose messages only Buffer Buffer // temp storage, if nil a MemoryBuffer CompressLogs bool // whether to compress the log uploads diff --git a/logtail/logtail.go b/logtail/logtail.go index 948c5a460..675422890 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -32,6 +32,7 @@ import ( "tailscale.com/tstime" tslogger "tailscale.com/types/logger" "tailscale.com/types/logid" + "tailscale.com/util/eventbus" "tailscale.com/util/set" "tailscale.com/util/truncate" "tailscale.com/util/zstdframe" @@ -120,6 +121,10 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { shutdownStart: make(chan struct{}), shutdownDone: make(chan struct{}), } + + if cfg.Bus != nil { + l.eventClient = cfg.Bus.Client("logtail.Logger") + } l.SetSockstatsLabel(sockstats.LabelLogtailLogger) l.compressLogs = cfg.CompressLogs @@ -156,6 +161,7 @@ type Logger struct { privateID logid.PrivateID httpDoCalls atomic.Int32 sockstatsLabel atomicSocktatsLabel + eventClient *eventbus.Client procID uint32 includeProcSequence bool @@ -221,6 +227,9 @@ func (l *Logger) Shutdown(ctx context.Context) error { l.httpc.CloseIdleConnections() }() + if l.eventClient != nil { + l.eventClient.Close() + } l.shutdownStartMu.Lock() select { case <-l.shutdownStart: @@ -417,6 +426,10 @@ func (l *Logger) internetUp() bool { } func (l *Logger) awaitInternetUp(ctx context.Context) { + if l.eventClient != nil { + l.awaitInternetUpBus(ctx) + return + } upc := make(chan bool, 1) defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { if delta.New.AnyInterfaceUp() { @@ -436,6 +449,24 @@ func (l *Logger) awaitInternetUp(ctx context.Context) { } } +func (l *Logger) awaitInternetUpBus(ctx context.Context) { + if l.internetUp() { + return + } + sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) + defer sub.Close() + select { + case delta := <-sub.Events(): + if delta.New.AnyInterfaceUp() { + fmt.Fprintf(l.stderr, "logtail: internet back up\n") + return + } + fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") + case <-ctx.Done(): + return + } +} + // upload uploads body to the log server. // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed. diff --git a/logtail/logtail_test.go b/logtail/logtail_test.go index b8c46c448..a92f88b4b 100644 --- a/logtail/logtail_test.go +++ b/logtail/logtail_test.go @@ -17,6 +17,7 @@ import ( "github.com/go-json-experiment/json/jsontext" "tailscale.com/tstest" "tailscale.com/tstime" + "tailscale.com/util/eventbus/eventbustest" "tailscale.com/util/must" ) @@ -30,6 +31,7 @@ func TestFastShutdown(t *testing.T) { l := NewLogger(Config{ BaseURL: testServ.URL, + Bus: eventbustest.NewBus(t), }, t.Logf) err := l.Shutdown(ctx) if err != nil { @@ -62,7 +64,10 @@ func NewLogtailTestHarness(t *testing.T) (*LogtailTestServer, *Logger) { t.Cleanup(ts.srv.Close) - l := NewLogger(Config{BaseURL: ts.srv.URL}, t.Logf) + l := NewLogger(Config{ + BaseURL: ts.srv.URL, + Bus: eventbustest.NewBus(t), + }, t.Logf) // There is always an initial "logtail started" message body := <-ts.uploaded diff --git a/net/dns/manager.go b/net/dns/manager.go index edf156ece..de99fe646 100644 --- a/net/dns/manager.go +++ b/net/dns/manager.go @@ -30,6 +30,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/util/clientmetric" "tailscale.com/util/dnsname" + "tailscale.com/util/eventbus" "tailscale.com/util/slicesx" "tailscale.com/util/syspolicy/policyclient" ) @@ -600,7 +601,7 @@ func (m *Manager) FlushCaches() error { // No other state needs to be instantiated before this runs. // // health must not be nil -func CleanUp(logf logger.Logf, netMon *netmon.Monitor, health *health.Tracker, interfaceName string) { +func CleanUp(logf logger.Logf, netMon *netmon.Monitor, bus *eventbus.Bus, health *health.Tracker, interfaceName string) { if !buildfeatures.HasDNS { return } @@ -611,6 +612,7 @@ func CleanUp(logf logger.Logf, netMon *netmon.Monitor, health *health.Tracker, i } d := &tsdial.Dialer{Logf: logf} d.SetNetMon(netMon) + d.SetBus(bus) dns := NewManager(logf, oscfg, health, d, nil, nil, runtime.GOOS) if err := dns.Down(); err != nil { logf("dns down: %v", err) diff --git a/net/dns/manager_tcp_test.go b/net/dns/manager_tcp_test.go index 46883a1e7..dcdc88c7a 100644 --- a/net/dns/manager_tcp_test.go +++ b/net/dns/manager_tcp_test.go @@ -90,7 +90,10 @@ func TestDNSOverTCP(t *testing.T) { SearchDomains: fqdns("coffee.shop"), }, } - m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") + bus := eventbustest.NewBus(t) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) + m := NewManager(t.Logf, &f, health.NewTracker(bus), dialer, nil, nil, "") m.resolver.TestOnlySetHook(f.SetResolver) m.Set(Config{ Hosts: hosts( @@ -175,7 +178,10 @@ func TestDNSOverTCP_TooLarge(t *testing.T) { SearchDomains: fqdns("coffee.shop"), }, } - m := NewManager(log, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "") + bus := eventbustest.NewBus(t) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) + m := NewManager(log, &f, health.NewTracker(bus), dialer, nil, nil, "") m.resolver.TestOnlySetHook(f.SetResolver) m.Set(Config{ Hosts: hosts("andrew.ts.com.", "1.2.3.4"), diff --git a/net/dns/manager_test.go b/net/dns/manager_test.go index b5a510862..92b660007 100644 --- a/net/dns/manager_test.go +++ b/net/dns/manager_test.go @@ -933,7 +933,10 @@ func TestManager(t *testing.T) { goos = "linux" } knobs := &controlknobs.Knobs{} - m := NewManager(t.Logf, &f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, knobs, goos) + bus := eventbustest.NewBus(t) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) + m := NewManager(t.Logf, &f, health.NewTracker(bus), dialer, nil, knobs, goos) m.resolver.TestOnlySetHook(f.SetResolver) if err := m.Set(test.in); err != nil { @@ -1039,7 +1042,10 @@ func TestConfigRecompilation(t *testing.T) { SearchDomains: fqdns("foo.ts.net"), } - m := NewManager(t.Logf, f, health.NewTracker(eventbustest.NewBus(t)), tsdial.NewDialer(netmon.NewStatic()), nil, nil, "darwin") + bus := eventbustest.NewBus(t) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) + m := NewManager(t.Logf, f, health.NewTracker(bus), dialer, nil, nil, "darwin") var managerConfig *resolver.Config m.resolver.TestOnlySetHook(func(cfg resolver.Config) { diff --git a/net/dns/resolver/forwarder_test.go b/net/dns/resolver/forwarder_test.go index b5cc7d018..ec491c581 100644 --- a/net/dns/resolver/forwarder_test.go +++ b/net/dns/resolver/forwarder_test.go @@ -122,7 +122,6 @@ func TestResolversWithDelays(t *testing.T) { } }) } - } func TestGetRCode(t *testing.T) { @@ -454,6 +453,7 @@ func runTestQuery(tb testing.TB, request []byte, modify func(*forwarder), ports var dialer tsdial.Dialer dialer.SetNetMon(netMon) + dialer.SetBus(bus) fwd := newForwarder(logf, netMon, nil, &dialer, health.NewTracker(bus), nil) if modify != nil { diff --git a/net/dns/resolver/tsdns_test.go b/net/dns/resolver/tsdns_test.go index 0823ea139..f0dbb48b3 100644 --- a/net/dns/resolver/tsdns_test.go +++ b/net/dns/resolver/tsdns_test.go @@ -353,10 +353,13 @@ func TestRDNSNameToIPv6(t *testing.T) { } func newResolver(t testing.TB) *Resolver { + bus := eventbustest.NewBus(t) + dialer := tsdial.NewDialer(netmon.NewStatic()) + dialer.SetBus(bus) return New(t.Logf, nil, // no link selector - tsdial.NewDialer(netmon.NewStatic()), - health.NewTracker(eventbustest.NewBus(t)), + dialer, + health.NewTracker(bus), nil, // no control knobs ) } diff --git a/net/netmon/loghelper.go b/net/netmon/loghelper.go index 96991644c..2e28e8cda 100644 --- a/net/netmon/loghelper.go +++ b/net/netmon/loghelper.go @@ -8,6 +8,7 @@ import ( "sync" "tailscale.com/types/logger" + "tailscale.com/util/eventbus" ) // LinkChangeLogLimiter returns a new [logger.Logf] that logs each unique @@ -17,13 +18,12 @@ import ( // done. func LinkChangeLogLimiter(ctx context.Context, logf logger.Logf, nm *Monitor) logger.Logf { var formatSeen sync.Map // map[string]bool - unregister := nm.RegisterChangeCallback(func(cd *ChangeDelta) { + nm.b.Monitor(nm.changeDeltaWatcher(nm.b, ctx, func(cd ChangeDelta) { // If we're in a major change or a time jump, clear the seen map. if cd.Major || cd.TimeJumped { formatSeen.Clear() } - }) - context.AfterFunc(ctx, unregister) + })) return func(format string, args ...any) { // We only store 'true' in the map, so if it's present then it @@ -42,3 +42,19 @@ func LinkChangeLogLimiter(ctx context.Context, logf logger.Logf, nm *Monitor) lo logf(format, args...) } } + +func (nm *Monitor) changeDeltaWatcher(ec *eventbus.Client, ctx context.Context, fn func(ChangeDelta)) func(*eventbus.Client) { + sub := eventbus.Subscribe[ChangeDelta](ec) + return func(ec *eventbus.Client) { + for { + select { + case <-ctx.Done(): + return + case <-sub.Done(): + return + case change := <-sub.Events(): + fn(change) + } + } + } +} diff --git a/net/netmon/loghelper_test.go b/net/netmon/loghelper_test.go index aeac9f031..ca3b1284c 100644 --- a/net/netmon/loghelper_test.go +++ b/net/netmon/loghelper_test.go @@ -11,6 +11,7 @@ import ( "testing/synctest" "tailscale.com/util/eventbus" + "tailscale.com/util/eventbus/eventbustest" ) func TestLinkChangeLogLimiter(t *testing.T) { synctest.Test(t, syncTestLinkChangeLogLimiter) } @@ -61,21 +62,15 @@ func syncTestLinkChangeLogLimiter(t *testing.T) { // string cache and allow the next log to write to our log buffer. // // InjectEvent doesn't work because it's not a major event, so we - // instead reach into the netmon and grab the callback, and then call - // it ourselves. - mon.mu.Lock() - var cb func(*ChangeDelta) - for _, c := range mon.cbs { - cb = c - break - } - mon.mu.Unlock() - - cb(&ChangeDelta{Major: true}) + // instead inject the event ourselves. + injector := eventbustest.NewInjector(t, bus) + eventbustest.Inject(injector, ChangeDelta{Major: true}) + synctest.Wait() logf("hello %s", "world") - if got := logBuffer.String(); got != "hello world\nother message\nhello world\n" { - t.Errorf("unexpected log buffer contents: %q", got) + want := "hello world\nother message\nhello world\n" + if got := logBuffer.String(); got != want { + t.Errorf("unexpected log buffer contents, got: %q, want, %q", got, want) } // Canceling the context we passed to LinkChangeLogLimiter should diff --git a/net/tsdial/tsdial.go b/net/tsdial/tsdial.go index e4e4e9e8b..bec196a2e 100644 --- a/net/tsdial/tsdial.go +++ b/net/tsdial/tsdial.go @@ -28,6 +28,7 @@ import ( "tailscale.com/types/logger" "tailscale.com/types/netmap" "tailscale.com/util/clientmetric" + "tailscale.com/util/eventbus" "tailscale.com/util/mak" "tailscale.com/util/testenv" "tailscale.com/version" @@ -86,6 +87,8 @@ type Dialer struct { dnsCache *dnscache.MessageCache // nil until first non-empty SetExitDNSDoH nextSysConnID int activeSysConns map[int]net.Conn // active connections not yet closed + eventClient *eventbus.Client + eventBusSubs eventbus.Monitor } // sysConn wraps a net.Conn that was created using d.SystemDial. @@ -158,6 +161,9 @@ func (d *Dialer) SetRoutes(routes, localRoutes []netip.Prefix) { } func (d *Dialer) Close() error { + if d.eventClient != nil { + d.eventBusSubs.Close() + } d.mu.Lock() defer d.mu.Unlock() d.closed = true @@ -186,6 +192,14 @@ func (d *Dialer) SetNetMon(netMon *netmon.Monitor) { d.netMonUnregister = nil } d.netMon = netMon + // Having multiple watchers could lead to problems, + // so remove the eventClient if it exists. + // This should really not happen, but better checking for it than not. + // TODO(cmol): Should this just be a panic? + if d.eventClient != nil { + d.eventBusSubs.Close() + d.eventClient = nil + } d.netMonUnregister = d.netMon.RegisterChangeCallback(d.linkChanged) } @@ -197,6 +211,35 @@ func (d *Dialer) NetMon() *netmon.Monitor { return d.netMon } +func (d *Dialer) SetBus(bus *eventbus.Bus) { + d.mu.Lock() + defer d.mu.Unlock() + if d.eventClient != nil { + panic("eventbus has already been set") + } + // Having multiple watchers could lead to problems, + // so unregister the callback if it exists. + if d.netMonUnregister != nil { + d.netMonUnregister() + } + d.eventClient = bus.Client("tsdial.Dialer") + d.eventBusSubs = d.eventClient.Monitor(d.linkChangeWatcher(d.eventClient)) +} + +func (d *Dialer) linkChangeWatcher(ec *eventbus.Client) func(*eventbus.Client) { + linkChangeSub := eventbus.Subscribe[netmon.ChangeDelta](ec) + return func(ec *eventbus.Client) { + for { + select { + case <-ec.Done(): + return + case cd := <-linkChangeSub.Events(): + d.linkChanged(&cd) + } + } + } +} + var ( metricLinkChangeConnClosed = clientmetric.NewCounter("tsdial_linkchange_closes") metricChangeDeltaNoDefaultRoute = clientmetric.NewCounter("tsdial_changedelta_no_default_route") diff --git a/tsnet/tsnet.go b/tsnet/tsnet.go index d14f1f16c..890193d0b 100644 --- a/tsnet/tsnet.go +++ b/tsnet/tsnet.go @@ -592,6 +592,7 @@ func (s *Server) start() (reterr error) { closePool.add(s.netMon) s.dialer = &tsdial.Dialer{Logf: tsLogf} // mutated below (before used) + s.dialer.SetBus(sys.Bus.Get()) eng, err := wgengine.NewUserspaceEngine(tsLogf, wgengine.Config{ EventBus: sys.Bus.Get(), ListenPort: s.Port, @@ -767,6 +768,7 @@ func (s *Server) startLogger(closePool *closeOnErrorPool, health *health.Tracker Stderr: io.Discard, // log everything to Buffer Buffer: s.logbuffer, CompressLogs: true, + Bus: s.sys.Bus.Get(), HTTPC: &http.Client{Transport: logpolicy.NewLogtailTransport(logtail.DefaultHost, s.netMon, health, tsLogf)}, MetricsDelta: clientmetric.EncodeLogTailMetricsDelta, } diff --git a/wgengine/netlog/netlog.go b/wgengine/netlog/netlog.go index 34b78a2b5..b7281e542 100644 --- a/wgengine/netlog/netlog.go +++ b/wgengine/netlog/netlog.go @@ -29,6 +29,7 @@ import ( "tailscale.com/tailcfg" "tailscale.com/types/logid" "tailscale.com/types/netlogtype" + "tailscale.com/util/eventbus" "tailscale.com/wgengine/router" ) @@ -95,7 +96,7 @@ var testClient *http.Client // The IP protocol and source port are always zero. // The sock is used to populated the PhysicalTraffic field in Message. // The netMon parameter is optional; if non-nil it's used to do faster interface lookups. -func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, logExitFlowEnabledEnabled bool) error { +func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID logid.PrivateID, tun, sock Device, netMon *netmon.Monitor, health *health.Tracker, bus *eventbus.Bus, logExitFlowEnabledEnabled bool) error { nl.mu.Lock() defer nl.mu.Unlock() if nl.logger != nil { @@ -112,6 +113,7 @@ func (nl *Logger) Startup(nodeID tailcfg.StableNodeID, nodeLogID, domainLogID lo Collection: "tailtraffic.log.tailscale.io", PrivateID: nodeLogID, CopyPrivateID: domainLogID, + Bus: bus, Stderr: io.Discard, CompressLogs: true, HTTPC: httpc, diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 049abcf17..30486f7a9 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -312,6 +312,9 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } if conf.Dialer == nil { conf.Dialer = &tsdial.Dialer{Logf: logf} + if conf.EventBus != nil { + conf.Dialer.SetBus(conf.EventBus) + } } var tsTUNDev *tstun.Wrapper @@ -379,6 +382,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) tunName, _ := conf.Tun.Name() conf.Dialer.SetTUNName(tunName) conf.Dialer.SetNetMon(e.netMon) + conf.Dialer.SetBus(e.eventBus) e.dns = dns.NewManager(logf, conf.DNS, e.health, conf.Dialer, fwdDNSLinkSelector{e, tunName}, conf.ControlKnobs, runtime.GOOS) // TODO: there's probably a better place for this @@ -1035,7 +1039,7 @@ func (e *userspaceEngine) Reconfig(cfg *wgcfg.Config, routerCfg *router.Config, tid := cfg.NetworkLogging.DomainID logExitFlowEnabled := cfg.NetworkLogging.LogExitFlowEnabled e.logf("wgengine: Reconfig: starting up network logger (node:%s tailnet:%s)", nid.Public(), tid.Public()) - if err := e.networkLogger.Startup(cfg.NodeID, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, logExitFlowEnabled); err != nil { + if err := e.networkLogger.Startup(cfg.NodeID, nid, tid, e.tundev, e.magicConn, e.netMon, e.health, e.eventBus, logExitFlowEnabled); err != nil { e.logf("wgengine: Reconfig: error starting up network logger: %v", err) } e.networkLogger.ReconfigRoutes(routerCfg)