wgengine/monitor: change API to permit multiple independent callbakcks

Currently it assumes exactly 1 registered callback. This changes it to
support 0, 1, or more than 1.

This is a step towards plumbing wgengine/monitor into more places (and
moving some of wgengine's interface state fetching into monitor in a
later step)

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/1409/head
Brad Fitzpatrick 4 years ago
parent 0eea490724
commit dda03a911e

@ -69,13 +69,14 @@ func runMonitor(ctx context.Context) error {
j, _ := json.MarshalIndent(st, "", " ") j, _ := json.MarshalIndent(st, "", " ")
os.Stderr.Write(j) os.Stderr.Write(j)
} }
mon, err := monitor.New(log.Printf, func() { mon, err := monitor.New(log.Printf)
log.Printf("Link monitor fired. State:")
dump()
})
if err != nil { if err != nil {
return err return err
} }
mon.RegisterChangeCallback(func() {
log.Printf("Link monitor fired. State:")
dump()
})
log.Printf("Starting link change monitor; initial state:") log.Printf("Starting link change monitor; initial state:")
dump() dump()
mon.Start() mon.Start()

@ -36,23 +36,28 @@ type osMon interface {
// an interface status changes. // an interface status changes.
type ChangeFunc func() type ChangeFunc func()
// An allocated callbackHandle's address is the Mon.cbs map key.
type callbackHandle byte
// Mon represents a monitoring instance. // Mon represents a monitoring instance.
type Mon struct { type Mon struct {
logf logger.Logf logf logger.Logf
cb ChangeFunc
om osMon // nil means not supported on this platform om osMon // nil means not supported on this platform
change chan struct{} change chan struct{}
stop chan struct{} stop chan struct{}
mu sync.Mutex // guards cbs
cbs map[*callbackHandle]ChangeFunc
onceStart sync.Once onceStart sync.Once
started bool started bool
goroutines sync.WaitGroup goroutines sync.WaitGroup
} }
// New instantiates and starts a monitoring instance. Change notifications // New instantiates and starts a monitoring instance.
// are propagated to the callback function.
// The returned monitor is inactive until it's started by the Start method. // The returned monitor is inactive until it's started by the Start method.
func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) { // Use RegisterChangeCallback to get notified of network changes.
func New(logf logger.Logf) (*Mon, error) {
logf = logger.WithPrefix(logf, "monitor: ") logf = logger.WithPrefix(logf, "monitor: ")
om, err := newOSMon(logf) om, err := newOSMon(logf)
if err != nil { if err != nil {
@ -60,13 +65,28 @@ func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) {
} }
return &Mon{ return &Mon{
logf: logf, logf: logf,
cb: callback, cbs: map[*callbackHandle]ChangeFunc{},
om: om, om: om,
change: make(chan struct{}, 1), change: make(chan struct{}, 1),
stop: make(chan struct{}), stop: make(chan struct{}),
}, nil }, nil
} }
// RegisterChangeCallback adds callback to the set of parties to be
// notified (in their own goroutine) when the network state changes.
// To remove this callback, call unregister (or close the monitor).
func (m *Mon) RegisterChangeCallback(callback ChangeFunc) (unregister func()) {
handle := new(callbackHandle)
m.mu.Lock()
defer m.mu.Unlock()
m.cbs[handle] = callback
return func() {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.cbs, handle)
}
}
// Start starts the monitor. // Start starts the monitor.
// A monitor can only be started & closed once. // A monitor can only be started & closed once.
func (m *Mon) Start() { func (m *Mon) Start() {
@ -136,7 +156,11 @@ func (m *Mon) debounce() {
case <-m.change: case <-m.change:
} }
m.cb() m.mu.Lock()
for _, cb := range m.cbs {
go cb()
}
m.mu.Unlock()
select { select {
case <-m.stop: case <-m.stop:

@ -256,14 +256,16 @@ func newUserspaceEngineAdvanced(conf EngineConfig) (_ Engine, reterr error) {
e.linkState, _ = getLinkState() e.linkState, _ = getLinkState()
logf("link state: %+v", e.linkState) logf("link state: %+v", e.linkState)
mon, err := monitor.New(logf, func() { mon, err := monitor.New(logf)
e.LinkChange(false)
tshttpproxy.InvalidateCache()
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
closePool.add(mon) closePool.add(mon)
unregisterMonWatch := mon.RegisterChangeCallback(func() {
e.LinkChange(false)
tshttpproxy.InvalidateCache()
})
closePool.addFunc(unregisterMonWatch)
e.linkMon = mon e.linkMon = mon
endpointsFn := func(endpoints []string) { endpointsFn := func(endpoints []string) {

Loading…
Cancel
Save