diff --git a/net/netmon/netmon.go b/net/netmon/netmon.go index cb79d27c0..a555a7c95 100644 --- a/net/netmon/netmon.go +++ b/net/netmon/netmon.go @@ -51,8 +51,8 @@ type osMon interface { // Monitor represents a monitoring instance. type Monitor struct { logf logger.Logf - om osMon // nil means not supported on this platform - change chan struct{} + om osMon // nil means not supported on this platform + change chan bool // send false to wake poller, true to also force ChangeDeltas be sent stop chan struct{} // closed on Stop mu sync.Mutex // guards all following fields @@ -110,7 +110,7 @@ func New(logf logger.Logf) (*Monitor, error) { logf = logger.WithPrefix(logf, "monitor: ") m := &Monitor{ logf: logf, - change: make(chan struct{}, 1), + change: make(chan bool, 1), stop: make(chan struct{}), lastWall: wallTime(), } @@ -253,7 +253,7 @@ func (m *Monitor) Close() error { // period (under a fraction of a second). func (m *Monitor) InjectEvent() { select { - case m.change <- struct{}{}: + case m.change <- true: default: // Another change signal is already // buffered. Debounce will wake up soon @@ -261,6 +261,18 @@ func (m *Monitor) InjectEvent() { } } +// Poll forces the monitor to pretend there was a network +// change and re-check the state of the network. +// +// This is like InjectEvent but only fires ChangeFunc callbacks +// if the network state differed at all. +func (m *Monitor) Poll() { + select { + case m.change <- false: + default: + } +} + func (m *Monitor) stopped() bool { select { case <-m.stop: @@ -292,7 +304,7 @@ func (m *Monitor) pump() { if msg.ignore() { continue } - m.InjectEvent() + m.Poll() } } @@ -316,46 +328,17 @@ func (m *Monitor) isInterestingInterface(i interfaces.Interface, ips []netip.Pre func (m *Monitor) debounce() { defer m.goroutines.Done() for { + var forceCallbacks bool select { case <-m.stop: return - case <-m.change: + case forceCallbacks = <-m.change: } - if curState, err := m.interfaceStateUncached(); err != nil { + if newState, err := m.interfaceStateUncached(); err != nil { m.logf("interfaces.State: %v", err) } else { - m.mu.Lock() - - delta := &ChangeDelta{ - Old: m.ifState, - New: curState, - } - delta.Major = !delta.New.EqualFiltered(delta.Old, m.isInterestingInterface, interfaces.UseInterestingIPs) - if delta.Major { - m.gwValid = false - m.ifState = curState - - if s1, s2 := delta.Old.String(), delta.New.String(); s1 == s2 { - m.logf("[unexpected] network state changed, but stringification didn't: %v", s1) - m.logf("[unexpected] old: %s", jsonSummary(delta.Old)) - m.logf("[unexpected] new: %s", jsonSummary(delta.New)) - } - } - // See if we have a queued or new time jump signal. - if shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked() { - m.resetTimeJumpedLocked() - delta.TimeJumped = true - if !delta.Major { - // Only log if it wasn't an interesting change. - m.logf("time jumped (probably wake from sleep); synthesizing major change event") - delta.Major = true - } - } - for _, cb := range m.cbs { - go cb(delta) - } - m.mu.Unlock() + m.handlePotentialChange(newState, forceCallbacks) } select { @@ -366,6 +349,51 @@ func (m *Monitor) debounce() { } } +// handlePotentialChange considers whether newState is different enough to wake +// up callers and updates the monitor's state if so. +// +// If forceCallbacks is true, they're always notified. +func (m *Monitor) handlePotentialChange(newState *interfaces.State, forceCallbacks bool) { + m.mu.Lock() + defer m.mu.Unlock() + oldState := m.ifState + timeJumped := shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked() + if !timeJumped && !forceCallbacks && oldState.EqualFiltered(newState, interfaces.UseAllInterfaces, interfaces.UseAllIPs) { + // Exactly equal. Nothing to do. + return + } + + delta := &ChangeDelta{ + Old: oldState, + New: newState, + TimeJumped: timeJumped, + } + + delta.Major = !newState.EqualFiltered(oldState, m.isInterestingInterface, interfaces.UseInterestingIPs) + if delta.Major { + m.gwValid = false + m.ifState = newState + + if s1, s2 := oldState.String(), delta.New.String(); s1 == s2 { + m.logf("[unexpected] network state changed, but stringification didn't: %v", s1) + m.logf("[unexpected] old: %s", jsonSummary(oldState)) + m.logf("[unexpected] new: %s", jsonSummary(newState)) + } + } + // See if we have a queued or new time jump signal. + if timeJumped { + m.resetTimeJumpedLocked() + if !delta.Major { + // Only log if it wasn't an interesting change. + m.logf("time jumped (probably wake from sleep); synthesizing major change event") + delta.Major = true + } + } + for _, cb := range m.cbs { + go cb(delta) + } +} + func jsonSummary(x any) any { j, err := json.Marshal(x) if err != nil { diff --git a/net/netmon/polling.go b/net/netmon/polling.go index 9332bdde9..ce1618ed6 100644 --- a/net/netmon/polling.go +++ b/net/netmon/polling.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "tailscale.com/net/interfaces" "tailscale.com/types/logger" ) @@ -68,22 +67,18 @@ func (pm *pollingMon) Receive() (message, error) { // so this can go very slowly there, to save battery. // https://github.com/tailscale/tailscale/issues/1427 d = 10 * time.Minute - } - if pm.isCloudRun() { + } else if pm.isCloudRun() { // Cloud Run routes never change at runtime. the containers are killed within // 15 minutes by default, set the interval long enough to be effectively infinite. pm.logf("monitor polling: Cloud Run detected, reduce polling interval to 24h") d = 24 * time.Hour } - ticker := time.NewTicker(d) - defer ticker.Stop() - base := pm.m.InterfaceState() + timer := time.NewTimer(d) + defer timer.Stop() for { - if cur, err := pm.m.interfaceStateUncached(); err == nil && !cur.EqualFiltered(base, interfaces.UseInterestingInterfaces, interfaces.UseInterestingIPs) { - return unspecifiedMessage{}, nil - } select { - case <-ticker.C: + case <-timer.C: + return unspecifiedMessage{}, nil case <-pm.stop: return nil, errors.New("stopped") }