net/netmon: factor out debounce loop, simplify polling impl

This simplifies some netmon code in prep for other changes.

It breaks up Monitor.debounce into a helper method so locking is
easier to read and things unindent, and then it simplifies the polling
netmon implementation to remove the redundant stuff that the caller
(the Monitor.debounce loop) was already basically doing.

Updates #9040

Change-Id: Idcfb45201d00ae64017042a7bdee6ef86ad37a9f
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/9058/head
Brad Fitzpatrick 9 months ago committed by Brad Fitzpatrick
parent 9ea3942b1a
commit e881c1caec

@ -51,8 +51,8 @@ type osMon interface {
// Monitor represents a monitoring instance.
type Monitor struct {
logf logger.Logf
om osMon // nil means not supported on this platform
change chan struct{}
om osMon // nil means not supported on this platform
change chan bool // send false to wake poller, true to also force ChangeDeltas be sent
stop chan struct{} // closed on Stop
mu sync.Mutex // guards all following fields
@ -110,7 +110,7 @@ func New(logf logger.Logf) (*Monitor, error) {
logf = logger.WithPrefix(logf, "monitor: ")
m := &Monitor{
logf: logf,
change: make(chan struct{}, 1),
change: make(chan bool, 1),
stop: make(chan struct{}),
lastWall: wallTime(),
}
@ -253,7 +253,7 @@ func (m *Monitor) Close() error {
// period (under a fraction of a second).
func (m *Monitor) InjectEvent() {
select {
case m.change <- struct{}{}:
case m.change <- true:
default:
// Another change signal is already
// buffered. Debounce will wake up soon
@ -261,6 +261,18 @@ func (m *Monitor) InjectEvent() {
}
}
// Poll forces the monitor to pretend there was a network
// change and re-check the state of the network.
//
// This is like InjectEvent but only fires ChangeFunc callbacks
// if the network state differed at all.
func (m *Monitor) Poll() {
select {
case m.change <- false:
default:
}
}
func (m *Monitor) stopped() bool {
select {
case <-m.stop:
@ -292,7 +304,7 @@ func (m *Monitor) pump() {
if msg.ignore() {
continue
}
m.InjectEvent()
m.Poll()
}
}
@ -316,46 +328,17 @@ func (m *Monitor) isInterestingInterface(i interfaces.Interface, ips []netip.Pre
func (m *Monitor) debounce() {
defer m.goroutines.Done()
for {
var forceCallbacks bool
select {
case <-m.stop:
return
case <-m.change:
case forceCallbacks = <-m.change:
}
if curState, err := m.interfaceStateUncached(); err != nil {
if newState, err := m.interfaceStateUncached(); err != nil {
m.logf("interfaces.State: %v", err)
} else {
m.mu.Lock()
delta := &ChangeDelta{
Old: m.ifState,
New: curState,
}
delta.Major = !delta.New.EqualFiltered(delta.Old, m.isInterestingInterface, interfaces.UseInterestingIPs)
if delta.Major {
m.gwValid = false
m.ifState = curState
if s1, s2 := delta.Old.String(), delta.New.String(); s1 == s2 {
m.logf("[unexpected] network state changed, but stringification didn't: %v", s1)
m.logf("[unexpected] old: %s", jsonSummary(delta.Old))
m.logf("[unexpected] new: %s", jsonSummary(delta.New))
}
}
// See if we have a queued or new time jump signal.
if shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked() {
m.resetTimeJumpedLocked()
delta.TimeJumped = true
if !delta.Major {
// Only log if it wasn't an interesting change.
m.logf("time jumped (probably wake from sleep); synthesizing major change event")
delta.Major = true
}
}
for _, cb := range m.cbs {
go cb(delta)
}
m.mu.Unlock()
m.handlePotentialChange(newState, forceCallbacks)
}
select {
@ -366,6 +349,51 @@ func (m *Monitor) debounce() {
}
}
// handlePotentialChange considers whether newState is different enough to wake
// up callers and updates the monitor's state if so.
//
// If forceCallbacks is true, they're always notified.
func (m *Monitor) handlePotentialChange(newState *interfaces.State, forceCallbacks bool) {
m.mu.Lock()
defer m.mu.Unlock()
oldState := m.ifState
timeJumped := shouldMonitorTimeJump && m.checkWallTimeAdvanceLocked()
if !timeJumped && !forceCallbacks && oldState.EqualFiltered(newState, interfaces.UseAllInterfaces, interfaces.UseAllIPs) {
// Exactly equal. Nothing to do.
return
}
delta := &ChangeDelta{
Old: oldState,
New: newState,
TimeJumped: timeJumped,
}
delta.Major = !newState.EqualFiltered(oldState, m.isInterestingInterface, interfaces.UseInterestingIPs)
if delta.Major {
m.gwValid = false
m.ifState = newState
if s1, s2 := oldState.String(), delta.New.String(); s1 == s2 {
m.logf("[unexpected] network state changed, but stringification didn't: %v", s1)
m.logf("[unexpected] old: %s", jsonSummary(oldState))
m.logf("[unexpected] new: %s", jsonSummary(newState))
}
}
// See if we have a queued or new time jump signal.
if timeJumped {
m.resetTimeJumpedLocked()
if !delta.Major {
// Only log if it wasn't an interesting change.
m.logf("time jumped (probably wake from sleep); synthesizing major change event")
delta.Major = true
}
}
for _, cb := range m.cbs {
go cb(delta)
}
}
func jsonSummary(x any) any {
j, err := json.Marshal(x)
if err != nil {

@ -13,7 +13,6 @@ import (
"sync"
"time"
"tailscale.com/net/interfaces"
"tailscale.com/types/logger"
)
@ -68,22 +67,18 @@ func (pm *pollingMon) Receive() (message, error) {
// so this can go very slowly there, to save battery.
// https://github.com/tailscale/tailscale/issues/1427
d = 10 * time.Minute
}
if pm.isCloudRun() {
} else if pm.isCloudRun() {
// Cloud Run routes never change at runtime. the containers are killed within
// 15 minutes by default, set the interval long enough to be effectively infinite.
pm.logf("monitor polling: Cloud Run detected, reduce polling interval to 24h")
d = 24 * time.Hour
}
ticker := time.NewTicker(d)
defer ticker.Stop()
base := pm.m.InterfaceState()
timer := time.NewTimer(d)
defer timer.Stop()
for {
if cur, err := pm.m.interfaceStateUncached(); err == nil && !cur.EqualFiltered(base, interfaces.UseInterestingInterfaces, interfaces.UseInterestingIPs) {
return unspecifiedMessage{}, nil
}
select {
case <-ticker.C:
case <-timer.C:
return unspecifiedMessage{}, nil
case <-pm.stop:
return nil, errors.New("stopped")
}

Loading…
Cancel
Save