diff --git a/wgengine/monitor/monitor.go b/wgengine/monitor/monitor.go index 355da7da9..75441b088 100644 --- a/wgengine/monitor/monitor.go +++ b/wgengine/monitor/monitor.go @@ -143,17 +143,24 @@ func (m *Mon) Close() error { return err } +func (m *Mon) stopped() bool { + select { + case <-m.stop: + return true + default: + return false + } +} + // pump continuously retrieves messages from the connection, notifying // the change channel of changes, and stopping when a stop is issued. func (m *Mon) pump() { defer m.goroutines.Done() - for { + for !m.stopped() { msg, err := m.om.Receive() if err != nil { - select { - case <-m.stop: + if m.stopped() { return - default: } // Keep retrying while we're not closed. m.logf("error from link monitor: %v", err) @@ -165,8 +172,10 @@ func (m *Mon) pump() { } select { case m.change <- struct{}{}: - case <-m.stop: - return + default: + // Another change signal is already + // buffered. Debounce will wake up soon + // enough. } } } @@ -199,7 +208,7 @@ func (m *Mon) debounce() { select { case <-m.stop: return - case <-time.After(100 * time.Millisecond): + case <-time.After(250 * time.Millisecond): } } } diff --git a/wgengine/monitor/monitor_darwin.go b/wgengine/monitor/monitor_darwin.go index a2397617d..8385d1342 100644 --- a/wgengine/monitor/monitor_darwin.go +++ b/wgengine/monitor/monitor_darwin.go @@ -51,20 +51,47 @@ func (m *darwinRouteMon) Close() error { } func (m *darwinRouteMon) Receive() (message, error) { - n, err := unix.Read(m.fd, m.buf[:]) - if err != nil { - return nil, err - } - msgs, err := route.ParseRIB(route.RIBTypeRoute, m.buf[:n]) - if err != nil { - m.logf("read %d bytes (% 02x), failed to parse RIB: %v", n, m.buf[:n], err) + for { + n, err := unix.Read(m.fd, m.buf[:]) + if err != nil { + return nil, err + } + msgs, err := route.ParseRIB(route.RIBTypeRoute, m.buf[:n]) + if err != nil { + if debugRouteMessages { + m.logf("read %d bytes (% 02x), failed to parse RIB: %v", n, m.buf[:n], err) + } + return unspecifiedMessage{}, nil + } + if len(msgs) == 0 { + if debugRouteMessages { + m.logf("read %d bytes with no messages (% 02x)", n, m.buf[:n]) + } + continue + } + nSkip := 0 + for _, msg := range msgs { + if m.skipMessage(msg) { + nSkip++ + } + } + if debugRouteMessages { + m.logf("read %d bytes, %d messages (%d skipped)", n, len(msgs), nSkip) + m.logMessages(msgs) + } + if nSkip == len(msgs) { + continue + } return unspecifiedMessage{}, nil } - if debugRouteMessages { - m.logf("read: %d bytes, %d msgs", n, len(msgs)) - m.logMessages(msgs) +} + +func (m *darwinRouteMon) skipMessage(msg route.Message) bool { + switch msg.(type) { + case *route.InterfaceMulticastAddrMessage: + return true } - return unspecifiedMessage{}, nil + return false } func (m *darwinRouteMon) logMessages(msgs []route.Message) { diff --git a/wgengine/monitor/monitor_test.go b/wgengine/monitor/monitor_test.go index 9c9019eeb..f1e35439e 100644 --- a/wgengine/monitor/monitor_test.go +++ b/wgengine/monitor/monitor_test.go @@ -5,7 +5,10 @@ package monitor import ( + "flag" "testing" + + "tailscale.com/net/interfaces" ) func TestMonitorStartClose(t *testing.T) { @@ -28,3 +31,35 @@ func TestMonitorJustClose(t *testing.T) { t.Fatal(err) } } + +var monitor = flag.String("monitor", "", `go into monitor mode like 'route monitor'; test never terminates. Value can be either "raw" or "callback"`) + +func TestMonitorMode(t *testing.T) { + switch *monitor { + case "": + t.Skip("skipping non-test without --monitor") + case "raw", "callback": + default: + t.Skipf(`invalid --monitor value: must be "raw" or "callback"`) + } + mon, err := New(t.Logf) + if err != nil { + t.Fatal(err) + } + switch *monitor { + case "raw": + for { + msg, err := mon.om.Receive() + if err != nil { + t.Fatal(err) + } + t.Logf("msg: %#v", msg) + } + case "callback": + mon.RegisterChangeCallback(func(changed bool, st *interfaces.State) { + t.Logf("cb: changed=%v, ifSt=%v", changed, st) + }) + mon.Start() + select {} + } +}