diff --git a/derp/derp.go b/derp/derp.go index 87d714a6c..5e979503a 100644 --- a/derp/derp.go +++ b/derp/derp.go @@ -74,6 +74,13 @@ const ( frameRecvPacket = frameType(0x05) // v0/1: packet bytes, v2: 32B src pub key + packet bytes frameKeepAlive = frameType(0x06) // no payload, no-op (to be replaced with ping/pong) frameNotePreferred = frameType(0x07) // 1 byte payload: 0x01 or 0x00 for whether this is client's home node + + // framePeerGone is sent from server to client to signal that + // a previous sender is no longer connected. That is, if A + // sent to B, and then if A disconnects, the server sends + // framePeerGone to B so B can forget that a reverse path + // exists on that connection to get back to A. + framePeerGone = frameType(0x08) // 32B pub key of peer that's gone ) var bin = binary.BigEndian diff --git a/derp/derp_client.go b/derp/derp_client.go index 07231ca1d..b4692c483 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -204,6 +204,13 @@ type ReceivedPacket struct { func (ReceivedPacket) msg() {} +// PeerGoneMessage is a ReceivedMessage that indicates that the client +// identified by the underlying public key had previously sent you a +// packet but has now disconnected from the server. +type PeerGoneMessage key.Public + +func (PeerGoneMessage) msg() {} + // Recv reads a message from the DERP server. // The provided buffer must be large enough to receive a complete packet, // which in practice are are 1.5-4 KB, but can be up to 64 KB. @@ -232,6 +239,15 @@ func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) { // TODO: eventually we'll have server->client pings that // require ack pongs. continue + case framePeerGone: + if n < keyLen { + c.logf("[unexpected] dropping short peerGone frame from DERP server") + continue + } + var pg PeerGoneMessage + copy(pg[:], b[:keyLen]) + return pg, nil + case frameRecvPacket: var rp ReceivedPacket if c.protoVersion < protocolSrcAddrs { diff --git a/derp/derp_server.go b/derp/derp_server.go index f9350ab1c..274d00234 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -58,6 +58,7 @@ type Server struct { packetsDroppedQueueHead *expvar.Int // queue full, drop head packet packetsDroppedQueueTail *expvar.Int // queue full, drop tail packet packetsDroppedWrite *expvar.Int // error writing to dst conn + peerGoneFrames expvar.Int // number of peer gone frames sent accepts expvar.Int curClients expvar.Int curHomeClients expvar.Int // ones with preferred @@ -151,8 +152,9 @@ func (s *Server) isClosed() bool { func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) { closed := make(chan struct{}) - s.accepts.Add(1) s.mu.Lock() + s.accepts.Add(1) // while holding s.mu for connNum read on next line + connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1) s.netConns[nc] = closed s.mu.Unlock() @@ -165,7 +167,7 @@ func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) { s.mu.Unlock() }() - if err := s.accept(nc, brw, remoteAddr); err != nil && !s.isClosed() { + if err := s.accept(nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() { s.logf("derp: %s: %v", remoteAddr, err) } } @@ -202,9 +204,17 @@ func (s *Server) unregisterClient(c *sclient) { if c.preferred { s.curHomeClients.Add(-1) } + + // Find still-connected peers to notify that we've gone away + // so they can drop their route entries to us. (issue 150) + for pubKey, connNum := range c.sentTo { + if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum { + go peer.requestPeerGoneWrite(c.key) + } + } } -func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error { +func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { br, bw := brw.Reader, brw.Writer nc.SetDeadline(time.Now().Add(10 * time.Second)) if err := s.sendServerKey(bw); err != nil { @@ -226,6 +236,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error defer cancel() c := &sclient{ + connNum: connNum, s: s, key: clientKey, nc: nc, @@ -236,6 +247,8 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error remoteAddr: remoteAddr, connectedAt: time.Now(), sendQueue: make(chan pkt, perClientSendQueueDepth), + peerGone: make(chan key.Public), + sentTo: make(map[key.Public]int64), } if clientInfo != nil { c.info = *clientInfo @@ -330,6 +343,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return nil } + // Track that we've sent to this peer, so if/when we + // disconnect first, the server can inform all our old + // recipients that we're gone. (Issue 150 optimization) + c.sentTo[dstKey] = dst.connNum + p := pkt{ bs: contents, } @@ -378,6 +396,16 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { return nil } +// requestPeerGoneWrite sends a request to write a "peer gone" frame +// that the provided peer has disconnected. It blocks until either the +// write request is scheduled, or the client has closed. +func (c *sclient) requestPeerGoneWrite(peer key.Public) { + select { + case c.peerGone <- peer: + case <-c.done: + } +} + func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error { // TODO(crawshaw): implement policy constraints on who can use the DERP server // TODO(bradfitz): ... and at what rate. @@ -483,6 +511,7 @@ func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Publi // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) type sclient struct { // Static after construction. + connNum int64 // process-wide unique counter, incremented each Accept s *Server nc Conn key key.Public @@ -491,11 +520,15 @@ type sclient struct { done <-chan struct{} // closed when connection closes remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() sendQueue chan pkt // packets queued to this client; never closed + peerGone chan key.Public // write request that a previous sender has disconnected // Owned by run, not thread-safe. br *bufio.Reader connectedAt time.Time preferred bool + // sentTo tracks all the peers this client has ever sent a packet to, and at which + // connection number. + sentTo map[key.Public]int64 // recipient => rcpt's latest sclient.connNum // Owned by sender, not thread-safe. bw *bufio.Writer @@ -577,6 +610,9 @@ func (c *sclient) sendLoop(ctx context.Context) error { select { case <-ctx.Done(): return nil + case peer := <-c.peerGone: + werr = c.sendPeerGone(peer) + continue case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) continue @@ -595,6 +631,8 @@ func (c *sclient) sendLoop(ctx context.Context) error { select { case <-ctx.Done(): return nil + case peer := <-c.peerGone: + werr = c.sendPeerGone(peer) case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) case <-keepAliveTick.C: @@ -613,6 +651,17 @@ func (c *sclient) sendKeepAlive() error { return writeFrameHeader(c.bw, frameKeepAlive, 0) } +// sendPeerGone sends a peerGone frame, without flushing. +func (c *sclient) sendPeerGone(peer key.Public) error { + c.s.peerGoneFrames.Add(1) + c.setWriteDeadline() + if err := writeFrameHeader(c.bw, framePeerGone, keyLen); err != nil { + return err + } + _, err := c.bw.Write(peer[:]) + return err +} + // sendPacket writes contents to the client in a RecvPacket frame. If // srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses // DERPv2. The bytes of contents are only valid until this function @@ -678,5 +727,6 @@ func (s *Server) ExpVar() expvar.Var { m.Set("unknown_frames", &s.unknownFrames) m.Set("home_moves_in", &s.homeMovesIn) m.Set("home_moves_out", &s.homeMovesOut) + m.Set("peer_gone_frames", &s.peerGoneFrames) return m } diff --git a/derp/derp_test.go b/derp/derp_test.go index de67e58c6..337a866c8 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -9,6 +9,7 @@ import ( "context" crand "crypto/rand" "errors" + "expvar" "fmt" "io" "net" @@ -80,6 +81,8 @@ func TestSendRecv(t *testing.T) { t.Logf("Connected client %d.", i) } + var peerGoneCount expvar.Int + t.Logf("Starting read loops") for i := 0; i < numClients; i++ { go func(i int) { @@ -94,6 +97,8 @@ func TestSendRecv(t *testing.T) { default: t.Errorf("unexpected message type %T", m) continue + case PeerGoneMessage: + peerGoneCount.Add(1) case ReceivedPacket: if m.Source.IsZero() { t.Errorf("zero Source address in ReceivedPacket") @@ -138,6 +143,18 @@ func TestSendRecv(t *testing.T) { t.Errorf("total/home=%v/%v; want %v/%v", gotTotal, gotHome, total, home) } + wantClosedPeers := func(want int64) { + t.Helper() + var got int64 + dl := time.Now().Add(5 * time.Second) + for time.Now().Before(dl) { + if got = peerGoneCount.Value(); got == want { + return + } + } + t.Errorf("peer gone count = %v; want %v", got, want) + } + msg1 := []byte("hello 0->1\n") if err := clients[0].Send(clientKeys[1], msg1); err != nil { t.Fatal(err) @@ -167,15 +184,18 @@ func TestSendRecv(t *testing.T) { wantActive(3, 1) connsOut[1].Close() wantActive(2, 0) + wantClosedPeers(1) clients[2].NotePreferred(true) wantActive(2, 1) clients[2].NotePreferred(false) wantActive(2, 0) connsOut[2].Close() wantActive(1, 0) + wantClosedPeers(1) t.Logf("passed") s.Close() + } func TestSendFreeze(t *testing.T) { diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 7536d7946..c28afe2e8 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -104,6 +104,38 @@ type Conn struct { activeDerp map[int]activeDerp prevDerp map[int]*syncs.WaitGroupChan derpTLSConfig *tls.Config // normally nil; used by tests + derpRoute map[key.Public]derpRoute +} + +// derpRoute is a route entry for a public key, saying that a certain +// peer should be available at DERP node derpID, as long as the +// current connection for that derpID is dc. (but dc should not be +// used to write directly; it's owned by the read/write loops) +type derpRoute struct { + derpID int + dc *derphttp.Client // don't use directly; see comment above +} + +// removeDerpPeerRoute removes a DERP route entry previously added by addDerpPeerRoute. +func (c *Conn) removeDerpPeerRoute(peer key.Public, derpID int, dc *derphttp.Client) { + c.mu.Lock() + defer c.mu.Unlock() + r2 := derpRoute{derpID, dc} + if r, ok := c.derpRoute[peer]; ok && r == r2 { + delete(c.derpRoute, peer) + } +} + +// addDerpPeerRoute adds a DERP route entry, noting that peer was seen +// on DERP node derpID, at least on the connection identified by dc. +// See issue 150 for details. +func (c *Conn) addDerpPeerRoute(peer key.Public, derpID int, dc *derphttp.Client) { + c.mu.Lock() + defer c.mu.Unlock() + if c.derpRoute == nil { + c.derpRoute = make(map[key.Public]derpRoute) + } + c.derpRoute[peer] = derpRoute{derpID, dc} } // DerpMagicIP is a fake WireGuard endpoint IP address that means @@ -395,7 +427,7 @@ func (c *Conn) setNearestDERP(derpNum int) (wantDERP bool) { go ad.c.NotePreferred(i == c.myDerp) } if derpNum != 0 { - go c.derpWriteChanOfAddr(&net.UDPAddr{IP: derpMagicIP, Port: derpNum}) + go c.derpWriteChanOfAddr(&net.UDPAddr{IP: derpMagicIP, Port: derpNum}, key.Public{}) } return true } @@ -650,7 +682,7 @@ func (c *Conn) sendAddr(addr *net.UDPAddr, pubKey key.Public, b []byte) error { return c.sendUDP(addr, b) } - ch := c.derpWriteChanOfAddr(addr) + ch := c.derpWriteChanOfAddr(addr, pubKey) if ch == nil { return nil } @@ -681,10 +713,18 @@ func (c *Conn) sendAddr(addr *net.UDPAddr, pubKey key.Public, b []byte) error { // TODO: this is currently arbitrary. Figure out something better? const bufferedDerpWritesBeforeDrop = 32 +// debugUseDerpRoute temporarily (2020-03-22) controls whether DERP +// reverse routing is enabled (Issue 150). It will become always true +// later. +var debugUseDerpRoute, _ = strconv.ParseBool(os.Getenv("TS_DEBUG_ENABLE_DERP_ROUTE")) + // derpWriteChanOfAddr returns a DERP client for fake UDP addresses that // represent DERP servers, creating them as necessary. For real UDP // addresses, it returns nil. -func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { +// +// If peer is non-zero, it can be used to find an active reverse +// path, without using addr. +func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr, peer key.Public) chan<- derpWriteRequest { if !addr.IP.Equal(derpMagicIP) { return nil } @@ -700,12 +740,31 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { return nil } + // See if we have a connection open to that DERP node ID + // first. If so, might as well use it. (It's a little + // arbitrary whether we use this one vs. the reverse route + // below when we have both.) ad, ok := c.activeDerp[nodeID] if ok { *ad.lastWrite = time.Now() return ad.writeCh } + // If we don't have an open connection to the peer's home DERP + // node, see if we have an open connection to a DERP node + // where we'd heard from that peer already. For instance, + // perhaps peer's home is Frankfurt, but they dialed our home DERP + // node in SF to reach us, so we can reply to them using our + // SF connection rather than dialing Frankfurt. (Issue 150) + if !peer.IsZero() && debugUseDerpRoute { + if r, ok := c.derpRoute[peer]; ok { + if ad, ok := c.activeDerp[r.derpID]; ok && ad.c == r.dc { + *ad.lastWrite = time.Now() + return ad.writeCh + } + } + } + if c.activeDerp == nil { c.activeDerp = make(map[int]activeDerp) c.prevDerp = make(map[int]*syncs.WaitGroupChan) @@ -722,6 +781,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err) return nil } + dc.NotePreferred(c.myDerp == nodeID) dc.DNSCache = dnscache.Get() dc.TLSConfig = c.derpTLSConfig @@ -796,12 +856,21 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc return n } + // peerPresent is the set of senders we know are present on this + // connection, based on messages we've received from the server. + peerPresent := map[key.Public]bool{} + for { msg, err := dc.Recv(buf[:]) if err == derphttp.ErrClientClosed { return } if err != nil { + // Forget that all these peers have routes. + for peer := range peerPresent { + delete(peerPresent, peer) + c.removeDerpPeerRoute(peer, derpFakeAddr.Port, dc) + } select { case <-ctx.Done(): return @@ -819,6 +888,12 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc if logDerpVerbose { c.logf("got derp %v packet: %q", derpFakeAddr, m.Data) } + // If this is a new sender we hadn't seen before, remember it and + // register a route for this peer. + if _, ok := peerPresent[m.Source]; !ok { + peerPresent[m.Source] = true + c.addDerpPeerRoute(m.Source, derpFakeAddr.Port, dc) + } default: // Ignore. // TODO: handle endpoint notification messages.