derp: update peerGone code to work with regional DERP mesh clusters too

Updates #150
Updates #388
pull/497/head
Brad Fitzpatrick 4 years ago
parent de5f6d70a8
commit 6fbd1abcd3

@ -84,6 +84,11 @@ type Server struct {
// peer is only remote (and thus in the clients Map). If the // peer is only remote (and thus in the clients Map). If the
// value is non-nil, it's only remote. // value is non-nil, it's only remote.
clientsMesh map[key.Public]PacketForwarder clientsMesh map[key.Public]PacketForwarder
// sentTo tracks which peers have sent to which other peers,
// and at which connection number. This isn't on sclient
// because it includes intra-region forwarded packets as the
// src.
sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
} }
// PacketForwarder is something that can forward packets. // PacketForwarder is something that can forward packets.
@ -119,12 +124,13 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
publicKey: privateKey.Public(), publicKey: privateKey.Public(),
logf: logf, logf: logf,
packetsDroppedReason: metrics.LabelMap{Label: "reason"}, packetsDroppedReason: metrics.LabelMap{Label: "reason"},
clients: make(map[key.Public]*sclient), clients: map[key.Public]*sclient{},
clientsEver: make(map[key.Public]bool), clientsEver: map[key.Public]bool{},
clientsMesh: map[key.Public]PacketForwarder{}, clientsMesh: map[key.Public]PacketForwarder{},
netConns: make(map[Conn]chan struct{}), netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys, memSys0: ms.Sys,
watchers: map[*sclient]bool{}, watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
} }
s.packetsDroppedUnknown = s.packetsDroppedReason.Get("unknown_dest") s.packetsDroppedUnknown = s.packetsDroppedReason.Get("unknown_dest")
s.packetsDroppedFwdUnknown = s.packetsDroppedReason.Get("unknown_dest_on_fwd") s.packetsDroppedFwdUnknown = s.packetsDroppedReason.Get("unknown_dest_on_fwd")
@ -259,37 +265,41 @@ func (s *Server) unregisterClient(c *sclient) {
if cur == c { if cur == c {
c.logf("removing connection") c.logf("removing connection")
delete(s.clients, c.key) delete(s.clients, c.key)
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
delete(s.clientsMesh, c.key)
s.notePeerGoneFromRegionLocked(c.key)
}
s.broadcastPeerStateChangeLocked(c.key, false)
} }
if c.canMesh { if c.canMesh {
delete(s.watchers, c) delete(s.watchers, c)
} }
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
delete(s.clientsMesh, c.key)
}
s.broadcastPeerStateChangeLocked(c.key, false)
s.curClients.Add(-1) s.curClients.Add(-1)
if c.preferred { if c.preferred {
s.curHomeClients.Add(-1) s.curHomeClients.Add(-1)
} }
}
// notePeerGoneFromRegionLocked sends peerGone frames to parties that
// key has sent to previously (whether those sends were from a local
// client or forwarded). It must only be called after the key has
// been removed from clientsMesh.
func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
if _, ok := s.clientsMesh[key]; ok {
panic("usage")
}
// Find still-connected peers and either notify that we've gone away // Find still-connected peers and either notify that we've gone away
// so they can drop their route entries to us (issue 150) // so they can drop their route entries to us (issue 150)
// or move them over to the active client (in case a replaced client // or move them over to the active client (in case a replaced client
// connection is being unregistered). // connection is being unregistered).
for pubKey, connNum := range c.sentTo { for pubKey, connNum := range s.sentTo[key] {
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum { if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum {
if cur == c { go peer.requestPeerGoneWrite(key)
go peer.requestPeerGoneWrite(c.key)
} else {
// Only if the current client has not already accepted a newer
// connection from the peer.
if _, ok := cur.sentTo[pubKey]; !ok {
cur.sentTo[pubKey] = connNum
}
}
} }
} }
delete(s.sentTo, key)
} }
func (s *Server) addWatcher(c *sclient) { func (s *Server) addWatcher(c *sclient) {
@ -351,7 +361,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
connectedAt: time.Now(), connectedAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth), sendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public), peerGone: make(chan key.Public),
sentTo: make(map[key.Public]int64),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
} }
if c.canMesh { if c.canMesh {
@ -461,8 +470,9 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
s.mu.Lock() s.mu.Lock()
dst := s.clients[dstKey] dst := s.clients[dstKey]
// TODO(bradfitz): think about the sentTo/Issue 150 optimization if dst != nil {
// in the context of DERP meshes. s.notePeerSendLocked(srcKey, dst)
}
s.mu.Unlock() s.mu.Unlock()
if dst == nil { if dst == nil {
@ -480,6 +490,18 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
}) })
} }
// notePeerSendLocked records that src sent to dst. We keep track of
// that so when src disconnects, we can tell dst (if it's still
// around) that src is gone (a peerGone frame).
func (s *Server) notePeerSendLocked(src key.Public, dst *sclient) {
m, ok := s.sentTo[src]
if !ok {
m = map[key.Public]int64{}
s.sentTo[src] = m
}
m[dst.key] = dst.connNum
}
// handleFrameSendPacket reads a "send packet" frame from the client. // handleFrameSendPacket reads a "send packet" frame from the client.
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s s := c.s
@ -495,10 +517,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
if dst == nil { if dst == nil {
fwd = s.clientsMesh[dstKey] fwd = s.clientsMesh[dstKey]
} else { } else {
// Track that we've sent to this peer, so if/when we s.notePeerSendLocked(c.key, dst)
// disconnect first, the server can inform all our old
// recipients that we're gone. (Issue 150 optimization)
c.sentTo[dstKey] = dst.connNum
} }
s.mu.Unlock() s.mu.Unlock()
@ -746,12 +765,6 @@ type sclient struct {
// Owned by sender, not thread-safe. // Owned by sender, not thread-safe.
bw *bufio.Writer bw *bufio.Writer
// Guarded by s.mu.
//
// sentTo tracks all the peers this client has ever sent a packet to, and at which
// connection number.
sentTo map[key.Public]int64 // recipient => rcpt's latest sclient.connNum
// Guarded by s.mu // Guarded by s.mu
// //
// peerStateChange is used by mesh peers (a set of regional // peerStateChange is used by mesh peers (a set of regional
@ -1066,6 +1079,7 @@ func (s *Server) RemovePacketForwarder(dst key.Public, fwd PacketForwarder) {
s.clientsMesh[dst] = nil s.clientsMesh[dst] = nil
} else { } else {
delete(s.clientsMesh, dst) delete(s.clientsMesh, dst)
s.notePeerGoneFromRegionLocked(dst)
} }
} }

Loading…
Cancel
Save