From 4db60a84368af9a47b92cdc0080797048e86cb3b Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 6 Jul 2020 16:36:57 -0700 Subject: [PATCH] wgengine/monitor: parse Linux netlink messages, ignore our own events Fixes tailscale/corp#412 ("flood of link change events at start-up") Signed-off-by: Brad Fitzpatrick --- go.mod | 1 + wgengine/monitor/monitor.go | 21 +++--- wgengine/monitor/monitor_freebsd.go | 14 +++- wgengine/monitor/monitor_linux.go | 88 ++++++++++++++++++++++--- wgengine/monitor/monitor_unsupported.go | 4 +- 5 files changed, 106 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index b12a5c1f9..f14884bd4 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e github.com/google/go-cmp v0.4.0 github.com/goreleaser/nfpm v1.1.10 + github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4 github.com/klauspost/compress v1.10.10 github.com/kr/pty v1.1.1 github.com/mdlayher/netlink v1.1.0 diff --git a/wgengine/monitor/monitor.go b/wgengine/monitor/monitor.go index c644b5229..38cdbaa54 100644 --- a/wgengine/monitor/monitor.go +++ b/wgengine/monitor/monitor.go @@ -3,7 +3,8 @@ // license that can be found in the LICENSE file. // Package monitor provides facilities for monitoring network -// interface changes. +// interface and route changes. It primarily exists to know when +// portable devices move between different networks. package monitor import ( @@ -14,10 +15,10 @@ import ( ) // message represents a message returned from an osMon. -// -// TODO: currently messages are being discarded, so the properties of -// the message haven't been defined. -type message interface{} +type message interface { + // Ignore is whether we should ignore this message. + ignore() bool +} // osMon is the interface that each operating system-specific // implementation of the link monitor must implement. @@ -52,7 +53,8 @@ type Mon struct { // are propagated to the callback function. // The returned monitor is inactive until it's started by the Start method. func New(logf logger.Logf, callback ChangeFunc) (*Mon, error) { - om, err := newOSMon() + logf = logger.WithPrefix(logf, "monitor: ") + om, err := newOSMon(logf) if err != nil { return nil, err } @@ -100,7 +102,7 @@ func (m *Mon) Close() error { func (m *Mon) pump() { defer m.goroutines.Done() for { - _, err := m.om.Receive() + msg, err := m.om.Receive() if err != nil { select { case <-m.stop: @@ -108,10 +110,13 @@ func (m *Mon) pump() { default: } // Keep retrying while we're not closed. - m.logf("Error receiving from connection: %v", err) + m.logf("error receiving from connection: %v", err) time.Sleep(time.Second) continue } + if msg.ignore() { + continue + } select { case m.change <- struct{}{}: case <-m.stop: diff --git a/wgengine/monitor/monitor_freebsd.go b/wgengine/monitor/monitor_freebsd.go index 7079cb94c..cb77ac095 100644 --- a/wgengine/monitor/monitor_freebsd.go +++ b/wgengine/monitor/monitor_freebsd.go @@ -9,6 +9,8 @@ import ( "fmt" "net" "strings" + + "tailscale.com/types/logger" ) // devdConn implements osMon using devd(8). @@ -16,7 +18,7 @@ type devdConn struct { conn net.Conn } -func newOSMon() (osMon, error) { +func newOSMon(logf logger.Logf) (osMon, error) { conn, err := net.Dial("unixpacket", "/var/run/devd.seqpacket.pipe") if err != nil { return nil, fmt.Errorf("devd dial error: %v", err) @@ -41,8 +43,14 @@ func (c *devdConn) Receive() (message, error) { if !strings.Contains(msg, "system=IFNET") { continue } - // TODO(]|[): this is where the devd-specific message would + // TODO: this is where the devd-specific message would // get converted into a "standard" event message and returned. - return nil, nil + return unspecifiedMessage{}, nil } } + +// unspecifiedMessage is a minimal message implementation that should not +// be ignored. TODO: make specific messages like monitor_linux.go. +type unspecifiedMessage struct{} + +func (unspecifiedMessage) ignore() bool { return false } diff --git a/wgengine/monitor/monitor_linux.go b/wgengine/monitor/monitor_linux.go index ef3706b01..d67cac381 100644 --- a/wgengine/monitor/monitor_linux.go +++ b/wgengine/monitor/monitor_linux.go @@ -8,10 +8,15 @@ package monitor import ( "fmt" + "net" "time" + "github.com/jsimonetti/rtnetlink" "github.com/mdlayher/netlink" "golang.org/x/sys/unix" + "inet.af/netaddr" + "tailscale.com/net/tsaddr" + "tailscale.com/types/logger" ) // nlConn wraps a *netlink.Conn and returns a monitor.Message @@ -20,10 +25,12 @@ import ( // on the type of event, this provides the capability of handling // each architecture-specific message in a generic fashion. type nlConn struct { - conn *netlink.Conn + logf logger.Logf + conn *netlink.Conn + buffered []netlink.Message } -func newOSMon() (osMon, error) { +func newOSMon(logf logger.Logf) (osMon, error) { conn, err := netlink.Dial(unix.NETLINK_ROUTE, &netlink.Config{ // IPv4 address and route changes. Routes get us most of the // events of interest, but we need address as well to cover @@ -35,7 +42,7 @@ func newOSMon() (osMon, error) { if err != nil { return nil, fmt.Errorf("dialing netlink socket: %v", err) } - return &nlConn{conn}, nil + return &nlConn{logf: logf, conn: conn}, nil } func (c *nlConn) Close() error { @@ -44,12 +51,73 @@ func (c *nlConn) Close() error { } func (c *nlConn) Receive() (message, error) { - // currently ignoring the message - _, err := c.conn.Receive() - if err != nil { - return nil, err + if len(c.buffered) == 0 { + var err error + c.buffered, err = c.conn.Receive() + if err != nil { + return nil, err + } + if len(c.buffered) == 0 { + // Unexpected. Not seen in wild, but sleep defensively. + time.Sleep(time.Second) + return nil, nil + } + } + msg := c.buffered[0] + c.buffered = c.buffered[1:] + + // See https://github.com/torvalds/linux/blob/master/include/uapi/linux/rtnetlink.h + // And https://man7.org/linux/man-pages/man7/rtnetlink.7.html + switch msg.Header.Type { + case unix.RTM_NEWADDR: + var rmsg rtnetlink.AddressMessage + if err := rmsg.UnmarshalBinary(msg.Data); err != nil { + c.logf("RTM_NEWADDR: failed to parse: %v", err) + return nil, nil + } + return &newAddrMessage{ + Label: rmsg.Attributes.Label, + Addr: netaddrIP(rmsg.Attributes.Local), + }, nil + case unix.RTM_NEWROUTE: + var rmsg rtnetlink.RouteMessage + if err := rmsg.UnmarshalBinary(msg.Data); err != nil { + c.logf("RTM_NEWROUTE: failed to parse: %v", err) + return nil, nil + } + return &newRouteMessage{ + Table: rmsg.Table, + Src: netaddrIP(rmsg.Attributes.Src), + Dst: netaddrIP(rmsg.Attributes.Dst), + Gateway: netaddrIP(rmsg.Attributes.Gateway), + }, nil + default: + c.logf("netlink msg %+v, %q", msg.Header, msg.Data) + return nil, nil } - // TODO(]|[): this is where the NetLink-specific message would - // get converted into a "standard" event message and returned. - return nil, nil +} + +func netaddrIP(std net.IP) netaddr.IP { + ip, _ := netaddr.FromStdIP(std) + return ip +} + +// newRouteMessage is a message for a new route being added. +type newRouteMessage struct { + Src, Dst, Gateway netaddr.IP + Table uint8 +} + +func (m *newRouteMessage) ignore() bool { + return m.Table == 88 || tsaddr.IsTailscaleIP(m.Dst) +} + +// newAddrMessage is a message for a new address being added. +type newAddrMessage struct { + Addr netaddr.IP + Label string // netlink Label attribute (e.g. "tailscale0") +} + +func (m *newAddrMessage) ignore() bool { + return tsaddr.IsTailscaleIP(m.Addr) } diff --git a/wgengine/monitor/monitor_unsupported.go b/wgengine/monitor/monitor_unsupported.go index f3a5a1c48..03eee07b6 100644 --- a/wgengine/monitor/monitor_unsupported.go +++ b/wgengine/monitor/monitor_unsupported.go @@ -6,4 +6,6 @@ package monitor -func newOSMon() (osMon, error) { return nil, nil } +import "tailscale.com/types/logger" + +func newOSMon(logger.Logf) (osMon, error) { return nil, nil }