From e881c1caec019f545a01836314e59904e1377b5b Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 23 Aug 2023 15:04:58 -0700 Subject: [PATCH] net/netmon: factor out debounce loop, simplify polling impl This simplifies some netmon code in prep for other changes. It breaks up Monitor.debounce into a helper method so locking is easier to read and things unindent, and then it simplifies the polling netmon implementation to remove the redundant stuff that the caller (the Monitor.debounce loop) was already basically doing. Updates #9040 Change-Id: Idcfb45201d00ae64017042a7bdee6ef86ad37a9f Signed-off-by: Brad Fitzpatrick --- net/netmon/netmon.go | 104 +++++++++++++++++++++++++++--------------- net/netmon/polling.go | 15 ++---- 2 files changed, 71 insertions(+), 48 deletions(-) diff --git a/net/netmon/netmon.go b/net/netmon/netmon.go index cb79d27c0..a555a7c95 100644 --- a/net/netmon/netmon.go +++ b/net/netmon/netmon.go @@ -51,8 +51,8 @@ type osMon interface { // Monitor represents a monitoring instance. type Monitor struct { logf logger.Logf - om osMon // nil means not supported on this platform - change chan struct{} + om osMon // nil means not supported on this platform + change chan bool // send false to wake poller, true to also force ChangeDeltas be sent stop chan struct{} // closed on Stop mu sync.Mutex // guards all following fields @@ -110,7 +110,7 @@ func New(logf logger.Logf) (*Monitor, error) { logf = logger.WithPrefix(logf, "monitor: ") m := &Monitor{ logf: logf, - change: make(chan struct{}, 1), + change: make(chan bool, 1), stop: make(chan struct{}), lastWall: wallTime(), } @@ -253,7 +253,7 @@ func (m *Monitor) Close() error { // period (under a fraction of a second). func (m *Monitor) InjectEvent() { select { - case m.change <- struct{}{}: + case m.change <- true: default: // Another change signal is already // buffered. Debounce will wake up soon @@ -261,6 +261,18 @@ func (m *Monitor) InjectEvent() { } } +// Poll forces the monitor to pretend there was a network +// change and re-check the state of the network. +// +// This is like InjectEvent but only fires ChangeFunc callbacks +// if the network state differed at all. +func (m *Monitor) Poll() { + select { + case m.change <- false: + default: + } +} + func (m *Monitor) stopped() bool { select { case <-m.stop: @@ -292,7 +304,7 @@ func (m *Monitor) pump() { if msg.ignore() { continue } - m.InjectEvent() + m.Poll() } } @@ -316,46 +328,17 @@ func (m *Monitor) isInterestingInterface(i interfaces.Interface, ips []netip.Pre func (m *Monitor) debounce() { defer m.goroutines.Done() for { + var forceCallbacks bool select { case <-m.stop: return - case <-m.change: + case forceCallbacks = <-m.change: } - if curState, err := m.interfaceStateUncached(); err != nil { + if newState, err := m.interfaceStateUncached(); err != nil { m.logf("interfaces.State: %v", err) } else { - m.mu.Lock() - - delta := &ChangeDelta{ - Old: m.ifState, - New: curState, - } - delta.Major = !delta.New.EqualFiltered(delta.Old, m.isInterestingInterface, interfaces.UseInterestingIPs) - if delta.Major { - m.gwValid = false - m.ifState = curState - - if s1, s2 := delta.Old.String(), delta.New.String(); s1 == s2 { - m.logf("[unexpected] network state changed, but stringification didn't: %v", s1) - m.logf("[unexpected] old: %s", jsonSummary(delta.Old)) - m.logf("[unexpected] new: %s", jsonSummary(delta.New)) - } - } - // See if we have a queued or new time jump signal. - if shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked() { - m.resetTimeJumpedLocked() - delta.TimeJumped = true - if !delta.Major { - // Only log if it wasn't an interesting change. - m.logf("time jumped (probably wake from sleep); synthesizing major change event") - delta.Major = true - } - } - for _, cb := range m.cbs { - go cb(delta) - } - m.mu.Unlock() + m.handlePotentialChange(newState, forceCallbacks) } select { @@ -366,6 +349,51 @@ func (m *Monitor) debounce() { } } +// handlePotentialChange considers whether newState is different enough to wake +// up callers and updates the monitor's state if so. +// +// If forceCallbacks is true, they're always notified. +func (m *Monitor) handlePotentialChange(newState *interfaces.State, forceCallbacks bool) { + m.mu.Lock() + defer m.mu.Unlock() + oldState := m.ifState + timeJumped := shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked() + if !timeJumped && !forceCallbacks && oldState.EqualFiltered(newState, interfaces.UseAllInterfaces, interfaces.UseAllIPs) { + // Exactly equal. Nothing to do. + return + } + + delta := &ChangeDelta{ + Old: oldState, + New: newState, + TimeJumped: timeJumped, + } + + delta.Major = !newState.EqualFiltered(oldState, m.isInterestingInterface, interfaces.UseInterestingIPs) + if delta.Major { + m.gwValid = false + m.ifState = newState + + if s1, s2 := oldState.String(), delta.New.String(); s1 == s2 { + m.logf("[unexpected] network state changed, but stringification didn't: %v", s1) + m.logf("[unexpected] old: %s", jsonSummary(oldState)) + m.logf("[unexpected] new: %s", jsonSummary(newState)) + } + } + // See if we have a queued or new time jump signal. + if timeJumped { + m.resetTimeJumpedLocked() + if !delta.Major { + // Only log if it wasn't an interesting change. + m.logf("time jumped (probably wake from sleep); synthesizing major change event") + delta.Major = true + } + } + for _, cb := range m.cbs { + go cb(delta) + } +} + func jsonSummary(x any) any { j, err := json.Marshal(x) if err != nil { diff --git a/net/netmon/polling.go b/net/netmon/polling.go index 9332bdde9..ce1618ed6 100644 --- a/net/netmon/polling.go +++ b/net/netmon/polling.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "tailscale.com/net/interfaces" "tailscale.com/types/logger" ) @@ -68,22 +67,18 @@ func (pm *pollingMon) Receive() (message, error) { // so this can go very slowly there, to save battery. // https://github.com/tailscale/tailscale/issues/1427 d = 10 * time.Minute - } - if pm.isCloudRun() { + } else if pm.isCloudRun() { // Cloud Run routes never change at runtime. the containers are killed within // 15 minutes by default, set the interval long enough to be effectively infinite. pm.logf("monitor polling: Cloud Run detected, reduce polling interval to 24h") d = 24 * time.Hour } - ticker := time.NewTicker(d) - defer ticker.Stop() - base := pm.m.InterfaceState() + timer := time.NewTimer(d) + defer timer.Stop() for { - if cur, err := pm.m.interfaceStateUncached(); err == nil && !cur.EqualFiltered(base, interfaces.UseInterestingInterfaces, interfaces.UseInterestingIPs) { - return unspecifiedMessage{}, nil - } select { - case <-ticker.C: + case <-timer.C: + return unspecifiedMessage{}, nil case <-pm.stop: return nil, errors.New("stopped") }