wgengine/magicsock: bunch of misc discovery path cleanups

* fix tailscale status for peers using discovery
* as part of that, pull out disco address selection into reusable
  and testable discoEndpoint.addrForSendLocked
* truncate ping/pong logged hex txids in half to eliminate noise
* move a bunch of random time constants into named constants
  with docs
* track a history of per-endpoint pong replies for future use &
  status display
* add "send" and " got" prefix to discovery message logging
  immediately before the frame type so it's easier to read than
  searching for the "<-" or "->" arrows earlier in the line; but keep
  those as the more reasily machine readable part for later.

Updates #483
pull/522/head
Brad Fitzpatrick 4 years ago
parent e9643ae724
commit f5f3885b5b

@ -157,9 +157,9 @@ func parsePong(ver uint8, p []byte) (m *Pong, err error) {
func MessageSummary(m Message) string { func MessageSummary(m Message) string {
switch m := m.(type) { switch m := m.(type) {
case *Ping: case *Ping:
return fmt.Sprintf("ping tx=%x", m.TxID) return fmt.Sprintf("ping tx=%x", m.TxID[:6])
case *Pong: case *Pong:
return fmt.Sprintf("pong tx=%x", m.TxID) return fmt.Sprintf("pong tx=%x", m.TxID[:6])
case CallMeMaybe: case CallMeMaybe:
return "call-me-maybe" return "call-me-maybe"
default: default:

@ -14,6 +14,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"math"
"math/rand" "math/rand"
"net" "net"
"os" "os"
@ -1411,7 +1412,7 @@ func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey key.Public, dstDisco
pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey) pkt = box.SealAfterPrecomputation(pkt, m.AppendMarshal(nil), &nonce, sharedKey)
sent, err = c.sendAddr(dst, dstKey, pkt) sent, err = c.sendAddr(dst, dstKey, pkt)
if sent { if sent {
c.logf("magicsock: disco: %v->%v (%v, %v): %v", c.discoShort, dstDisco.ShortString(), dstKey.ShortString(), derpStr(dst.String()), disco.MessageSummary(m)) c.logf("magicsock: disco: %v->%v (%v, %v) sent %v", c.discoShort, dstDisco.ShortString(), dstKey.ShortString(), derpStr(dst.String()), disco.MessageSummary(m))
} else if err == nil { } else if err == nil {
// Can't send. (e.g. no IPv6 locally) // Can't send. (e.g. no IPv6 locally)
} else { } else {
@ -1511,6 +1512,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
c.logf("[unexpected] CallMeMaybe packets should only come via DERP") c.logf("[unexpected] CallMeMaybe packets should only come via DERP")
return true return true
} }
c.logf("magicsock: disco: %v<-%v (%v, %v) got call-me-maybe", c.discoShort, de.discoShort, de.publicKey.ShortString(), derpStr(src.String()))
go de.handleCallMeMaybe() go de.handleCallMeMaybe()
} }
@ -1518,7 +1520,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) bool {
} }
func (c *Conn) handlePingLocked(dm *disco.Ping, de *discoEndpoint, src netaddr.IPPort) { func (c *Conn) handlePingLocked(dm *disco.Ping, de *discoEndpoint, src netaddr.IPPort) {
c.logf("magicsock: disco: %v<-%v (%v, %v) ping tx=%x", c.discoShort, de.discoKey.ShortString(), de.publicKey.ShortString(), src, dm.TxID) c.logf("magicsock: disco: %v<-%v (%v, %v) got ping tx=%x", c.discoShort, de.discoShort, de.publicKey.ShortString(), src, dm.TxID[:6])
// Remember this this route if not present. // Remember this this route if not present.
c.setAddrToDiscoLocked(src, de.discoKey, nil) c.setAddrToDiscoLocked(src, de.discoKey, nil)
@ -2568,6 +2570,15 @@ func (c *Conn) UpdateStatus(sb *ipnstate.StatusBuilder) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
for dk, de := range c.endpointOfDisco {
ps := &ipnstate.PeerStatus{InMagicSock: true}
if node, ok := c.nodeOfDisco[dk]; ok {
ps.Addrs = append(ps.Addrs, node.Endpoints...)
}
de.populatePeerStatus(ps)
sb.AddPeer(de.publicKey, ps)
}
// Old-style (pre-disco) peers:
for k, as := range c.addrsByKey { for k, as := range c.addrsByKey {
ps := &ipnstate.PeerStatus{ ps := &ipnstate.PeerStatus{
InMagicSock: true, InMagicSock: true,
@ -2629,10 +2640,41 @@ type discoEndpoint struct {
timers map[*time.Timer]bool timers map[*time.Timer]bool
} }
const (
// discoPingInterval is the minimum time between pings
// to an endpoint. (Except in the case of CallMeMaybe frames
// resetting the counter, as the first pings likely didn't through
// the firewall)
discoPingInterval = 5 * time.Second
// pingTimeoutDuration is how long we wait for a pong reply before
// assuming it's never coming.
pingTimeoutDuration = 5 * time.Second
// trustUDPAddrDuration is how long we trust a UDP address as the exclusive
// path (without using DERP) without having heard a Pong reply.
trustUDPAddrDuration = 5 * time.Second
)
// endpointState is some state and history for a specific endpoint of
// a discoEndpoint. (The subject is the discoEndpoint.endpointState
// map key)
type endpointState struct { type endpointState struct {
lastPing time.Time // all fields guarded by discoEndpoint.mu:
// TODO: lastPong time.Time lastPing time.Time
index int // index in nodecfg.Node.Endpoints recentPongs []pongReply // ring buffer up to pongHistoryCount entries
recentPong uint16 // index into recentPongs of most recent; older , wrapped
index int16 // index in nodecfg.Node.Endpoints
}
// pongHistoryCount is how many pongReply values we keep per endpointState
const pongHistoryCount = 64
type pongReply struct {
latency time.Duration
pongAt time.Time // when we received the pong
from netaddr.IPPort // the pong's src (usually same as endpoint map key)
pongSrc netaddr.IPPort // what they reported they heard
} }
type sentPing struct { type sentPing struct {
@ -2683,35 +2725,44 @@ func (de *discoEndpoint) UpdateDst(addr *net.UDPAddr) error {
return nil return nil
} }
// addrForSendLocked returns the address(es) that should be used for
// sending the next packet. Zero, one, or both of UDP address and DERP
// addr may be non-zero.
//
// de.mu must be held.
func (de *discoEndpoint) addrForSendLocked(now time.Time) (udpAddr, derpAddr netaddr.IPPort) {
udpAddr = de.bestAddr
if udpAddr.IsZero() || now.After(de.trustBestAddrUntil) {
// We had a bestAddr but it expired so send both to it
// and DERP.
derpAddr = de.derpAddr
}
return
}
func (de *discoEndpoint) send(b []byte) error { func (de *discoEndpoint) send(b []byte) error {
now := time.Now() now := time.Now()
de.mu.Lock() de.mu.Lock()
de.lastSend = now de.lastSend = now
derpAddr := de.derpAddr udpAddr, derpAddr := de.addrForSendLocked(now)
haveDerp := !derpAddr.IsZero() if udpAddr.IsZero() || now.After(de.trustBestAddrUntil) {
bestAddr := de.bestAddr
bestOld := now.After(de.trustBestAddrUntil)
if bestAddr.IsZero() || bestOld {
de.sendPingsLocked(now, true) de.sendPingsLocked(now, true)
} }
de.mu.Unlock() de.mu.Unlock()
var didDerp bool if udpAddr.IsZero() && derpAddr.IsZero() {
if bestAddr.IsZero() { return errors.New("no UDP or DERP addr")
if !haveDerp {
return errors.New("no DERP addr")
}
bestAddr = derpAddr
} else if bestOld && haveDerp {
// We have a bestAddr, but it hasn't been confirmed in a while,
// so let's not entirely trust it and also send via DERP.
didDerp, _ = de.c.sendAddr(derpAddr, de.publicKey, b)
} }
var err error
_, err := de.c.sendAddr(bestAddr, de.publicKey, b) if !udpAddr.IsZero() {
if didDerp { _, err = de.c.sendAddr(udpAddr, de.publicKey, b)
return nil }
if !derpAddr.IsZero() {
if ok, _ := de.c.sendAddr(derpAddr, de.publicKey, b); ok && err != nil {
// UDP failed but DERP worked, so good enough:
return nil
}
} }
return err return err
} }
@ -2765,14 +2816,14 @@ func (de *discoEndpoint) sendPingsLocked(now time.Time, sendCallMeMaybe bool) {
var sentAny bool var sentAny bool
for ep, st := range de.endpointState { for ep, st := range de.endpointState {
ep := ep ep := ep
if !st.lastPing.IsZero() && now.Sub(st.lastPing) < 5*time.Second { if !st.lastPing.IsZero() && now.Sub(st.lastPing) < discoPingInterval {
continue continue
} }
st.lastPing = now st.lastPing = now
txid := stun.NewTxID() txid := stun.NewTxID()
t := de.newTimerLocked(5*time.Second, func() { t := de.newTimerLocked(pingTimeoutDuration, func() {
de.c.logf("magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid, ep, de.publicKey.ShortString(), de.discoShort) de.c.logf("magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], ep, de.publicKey.ShortString(), de.discoShort)
de.forgetPing(txid) de.forgetPing(txid)
}) })
de.sentPing[txid] = sentPing{ de.sentPing[txid] = sentPing{
@ -2824,15 +2875,19 @@ func (de *discoEndpoint) updateFromNode(n *tailcfg.Node) {
st.index = -1 // assume deleted until updated in next loop st.index = -1 // assume deleted until updated in next loop
} }
for i, epStr := range n.Endpoints { for i, epStr := range n.Endpoints {
if i > math.MaxInt16 {
// Seems unlikely.
continue
}
ipp, err := netaddr.ParseIPPort(epStr) ipp, err := netaddr.ParseIPPort(epStr)
if err != nil { if err != nil {
de.c.logf("magicsock: bogus netmap endpoint %q", epStr) de.c.logf("magicsock: bogus netmap endpoint %q", epStr)
continue continue
} }
if st, ok := de.endpointState[ipp]; ok { if st, ok := de.endpointState[ipp]; ok {
st.index = i st.index = int16(i)
} else { } else {
de.endpointState[ipp] = &endpointState{index: i} de.endpointState[ipp] = &endpointState{index: int16(i)}
} }
} }
// Now delete anything that wasn't updated. // Now delete anything that wasn't updated.
@ -2878,36 +2933,58 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort)
} }
de.removeSentPingLocked(m.TxID, sp) de.removeSentPingLocked(m.TxID, sp)
st, ok := de.endpointState[sp.to]
if !ok {
// This is no longer an endpoint we care about.
return
}
de.c.setAddrToDiscoLocked(src, de.discoKey, de) de.c.setAddrToDiscoLocked(src, de.discoKey, de)
now := time.Now() now := time.Now()
delay := now.Sub(sp.at) latency := now.Sub(sp.at)
st.addPongReplyLocked(pongReply{
latency: latency,
pongAt: now,
from: src,
pongSrc: m.Src,
})
de.c.logf("magicsock: disco: %v<-%v (%v, %v) pong tx=%x latency=%v pong.src=%v%v", de.c.discoShort, de.discoShort, de.publicKey.ShortString(), src, m.TxID, delay.Round(time.Millisecond), m.Src, logger.ArgWriter(func(bw *bufio.Writer) { de.c.logf("magicsock: disco: %v<-%v (%v, %v) got pong tx=%x latency=%v pong.src=%v%v", de.c.discoShort, de.discoShort, de.publicKey.ShortString(), src, m.TxID[:6], latency.Round(time.Millisecond), m.Src, logger.ArgWriter(func(bw *bufio.Writer) {
if sp.to != src { if sp.to != src {
fmt.Fprintf(bw, " ping.to=%v", sp.to) fmt.Fprintf(bw, " ping.to=%v", sp.to)
} }
})) }))
// Expire our best address if we haven't heard from it in awhile.
tooOld := now.Add(-15 * time.Second)
if !de.bestAddr.IsZero() && de.bestAddrAt.Before(tooOld) {
de.bestAddr = netaddr.IPPort{}
}
// Promote this pong response to our current best address if it's lower latency. // Promote this pong response to our current best address if it's lower latency.
// TODO(bradfitz): decide how latency vs. preference order affects decision // TODO(bradfitz): decide how latency vs. preference order affects decision
if de.bestAddr.IsZero() || delay < de.bestAddrLatency { if de.bestAddr.IsZero() || latency < de.bestAddrLatency {
if de.bestAddr != sp.to { if de.bestAddr != sp.to {
de.c.logf("magicsock: disco: node %v %v now using %v", de.publicKey.ShortString(), de.discoShort, sp.to) de.c.logf("magicsock: disco: node %v %v now using %v", de.publicKey.ShortString(), de.discoShort, sp.to)
de.bestAddr = sp.to de.bestAddr = sp.to
} }
} }
if de.bestAddr == sp.to { if de.bestAddr == sp.to {
de.bestAddrLatency = delay de.bestAddrLatency = latency
de.bestAddrAt = now de.bestAddrAt = now
de.trustBestAddrUntil = now.Add(5 * time.Second) de.trustBestAddrUntil = now.Add(trustUDPAddrDuration)
}
}
// discoEndpoint.mu must be held.
func (st *endpointState) addPongReplyLocked(r pongReply) {
if n := len(st.recentPongs); n < pongHistoryCount {
st.recentPong = uint16(n)
st.recentPongs = append(st.recentPongs, r)
return
} }
i := st.recentPong + 1
if i == pongHistoryCount {
i = 0
}
st.recentPongs[i] = r
st.recentPong = i
} }
// handleCallMeMaybe handles a CallMeMaybe discovery message via // handleCallMeMaybe handles a CallMeMaybe discovery message via
@ -2926,6 +3003,20 @@ func (de *discoEndpoint) handleCallMeMaybe() {
de.sendPingsLocked(time.Now(), false) de.sendPingsLocked(time.Now(), false)
} }
func (de *discoEndpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
de.mu.Lock()
defer de.mu.Unlock()
if de.lastSend.IsZero() {
return
}
now := time.Now()
if udpAddr, _ := de.addrForSendLocked(now); !udpAddr.IsZero() {
ps.CurAddr = udpAddr.String()
}
}
// cleanup is called when a discovery endpoint is no longer present in the NetworkMap. // cleanup is called when a discovery endpoint is no longer present in the NetworkMap.
// This is where we can do cleanup such as closing goroutines or canceling timers. // This is where we can do cleanup such as closing goroutines or canceling timers.
func (de *discoEndpoint) cleanup() { func (de *discoEndpoint) cleanup() {

Loading…
Cancel
Save