wgengine/magicsock: add single element IPPort->endpoint cache in receive path

name           old time/op  new time/op  delta
ReceiveFrom-4  21.8µs ± 2%  20.9µs ± 2%  -4.27%  (p=0.000 n=10+10)

Updates #414

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/1166/head
Brad Fitzpatrick 3 years ago committed by Brad Fitzpatrick
parent da4ec54756
commit 51bd1feae4

@ -161,6 +161,11 @@ type Conn struct {
// whether a DERP channel read should be done. // whether a DERP channel read should be done.
derpRecvCountLast int64 // owned by ReceiveIPv4 derpRecvCountLast int64 // owned by ReceiveIPv4
// ippEndpoint4 and ippEndpoint6 are owned by ReceiveIPv4 and
// ReceiveIPv6, respectively, to cache an IPPort->endpoint for
// hot flows.
ippEndpoint4, ippEndpoint6 ippEndpointCache
// ============================================================ // ============================================================
mu sync.Mutex // guards all following fields; see userspaceEngine lock ordering rules mu sync.Mutex // guards all following fields; see userspaceEngine lock ordering rules
muCond *sync.Cond muCond *sync.Cond
@ -1483,7 +1488,7 @@ func (c *Conn) ReceiveIPv6(b []byte) (int, conn.Endpoint, error) {
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok { if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr), &c.ippEndpoint6); ok {
return n, ep, nil return n, ep, nil
} }
} }
@ -1520,14 +1525,14 @@ func (c *Conn) ReceiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
} }
return 0, nil, err return 0, nil, err
} }
if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr)); ok { if ep, ok := c.receiveIP(b[:n], pAddr.(*net.UDPAddr), &c.ippEndpoint4); ok {
return n, ep, nil return n, ep, nil
} }
} }
} }
// receiveIP is the shared bits of ReceiveIPv4 and ReceiveIPv6. // receiveIP is the shared bits of ReceiveIPv4 and ReceiveIPv6.
func (c *Conn) receiveIP(b []byte, ua *net.UDPAddr) (ep conn.Endpoint, ok bool) { func (c *Conn) receiveIP(b []byte, ua *net.UDPAddr, cache *ippEndpointCache) (ep conn.Endpoint, ok bool) {
ipp, ok := netaddr.FromStdAddr(ua.IP, ua.Port, ua.Zone) ipp, ok := netaddr.FromStdAddr(ua.IP, ua.Port, ua.Zone)
if !ok { if !ok {
return return
@ -1539,9 +1544,18 @@ func (c *Conn) receiveIP(b []byte, ua *net.UDPAddr) (ep conn.Endpoint, ok bool)
if c.handleDiscoMessage(b, ipp) { if c.handleDiscoMessage(b, ipp) {
return return
} }
ep = c.findEndpoint(ipp, ua, b) if cache.ipp == ipp && cache.de != nil && cache.gen == cache.de.numStopAndReset() {
if ep == nil { ep = cache.de
return } else {
ep = c.findEndpoint(ipp, ua, b)
if ep == nil {
return
}
if de, ok := ep.(*discoEndpoint); ok {
cache.ipp = ipp
cache.de = de
cache.gen = de.numStopAndReset()
}
} }
c.noteRecvActivityFromEndpoint(ep) c.noteRecvActivityFromEndpoint(ep)
return ep, true return ep, true
@ -2781,7 +2795,8 @@ func udpAddrDebugString(ua net.UDPAddr) string {
// advertise a DiscoKey and participate in active discovery. // advertise a DiscoKey and participate in active discovery.
type discoEndpoint struct { type discoEndpoint struct {
// atomically accessed; declared first for alignment reasons // atomically accessed; declared first for alignment reasons
lastRecvUnixAtomic int64 lastRecvUnixAtomic int64
numStopAndResetAtomic int64
// These fields are initialized once and never modified. // These fields are initialized once and never modified.
c *Conn c *Conn
@ -3420,6 +3435,7 @@ func (de *discoEndpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
// It's called when a discovery endpoint is no longer present in the NetworkMap, // It's called when a discovery endpoint is no longer present in the NetworkMap,
// or when magicsock is transition from running to stopped state (via SetPrivateKey(zero)) // or when magicsock is transition from running to stopped state (via SetPrivateKey(zero))
func (de *discoEndpoint) stopAndReset() { func (de *discoEndpoint) stopAndReset() {
atomic.AddInt64(&de.numStopAndResetAtomic, 1)
de.mu.Lock() de.mu.Lock()
defer de.mu.Unlock() defer de.mu.Unlock()
@ -3447,6 +3463,10 @@ func (de *discoEndpoint) stopAndReset() {
de.pendingCLIPings = nil de.pendingCLIPings = nil
} }
func (de *discoEndpoint) numStopAndReset() int64 {
return atomic.LoadInt64(&de.numStopAndResetAtomic)
}
// derpStr replaces DERP IPs in s with "derp-". // derpStr replaces DERP IPs in s with "derp-".
func derpStr(s string) string { return strings.ReplaceAll(s, "127.3.3.40:", "derp-") } func derpStr(s string) string { return strings.ReplaceAll(s, "127.3.3.40:", "derp-") }
@ -3468,3 +3488,11 @@ func (c *Conn) WhoIs(ip netaddr.IP) (n *tailcfg.Node, u tailcfg.UserProfile, ok
} }
return nil, u, false return nil, u, false
} }
// ippEndpointCache is a mutex-free single-element cache, mapping from
// a single netaddr.IPPort to a single endpoint.
type ippEndpointCache struct {
ipp netaddr.IPPort
gen int64
de *discoEndpoint
}

Loading…
Cancel
Save