From 484b7fc9a3cb19e09624642fb220dc3722c32fec Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 1 Jun 2020 15:19:41 -0700 Subject: [PATCH] derp, cmd/derper: add frameWatchConns, framePeerPresent for inter-DERP routing This lets a trusted DERP client that knows a pre-shared key subscribe to the connection list. Upon subscribing, they get the current set of connected public keys, and then all changes over time. This lets a set of DERP server peers within a region all stay connected to each other and know which clients are connected to which nodes. Updates #388 Signed-off-by: Brad Fitzpatrick --- cmd/derper/derper.go | 31 ++++- cmd/derper/derper_test.go | 3 +- derp/derp.go | 21 ++- derp/derp_client.go | 55 +++++++- derp/derp_server.go | 156 ++++++++++++++++++++- derp/derp_test.go | 228 +++++++++++++++++++++++++++++++ derp/derphttp/derphttp_client.go | 15 +- 7 files changed, 498 insertions(+), 11 deletions(-) diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go index 758b27d0e..955c9dbfb 100644 --- a/cmd/derper/derper.go +++ b/cmd/derper/derper.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "regexp" + "strings" "time" "github.com/tailscale/wireguard-go/wgcfg" @@ -42,6 +43,8 @@ var ( hostname = flag.String("hostname", "derp.tailscale.com", "LetsEncrypt host name, if addr's port is :443") logCollection = flag.String("logcollection", "", "If non-empty, logtail collection to log to") runSTUN = flag.Bool("stun", false, "also run a STUN server") + meshPSKFile = flag.String("mesh-psk-file", defaultMeshPSKFile(), "if non-empty, path to file containing the mesh pre-shared key file. It should contain some hex string; whitespace is trimmed.") + meshWith = flag.String("mesh-with", "", "optional comma-separated list of hostnames to mesh with; the server's own hostname can be in the list, in which case it's ignored if its DNS resolves to an IP on the machine") ) type config struct { @@ -118,6 +121,23 @@ func main() { letsEncrypt := tsweb.IsProd443(*addr) s := derp.NewServer(key.Private(cfg.PrivateKey), log.Printf) + + if *meshPSKFile != "" { + b, err := ioutil.ReadFile(*meshPSKFile) + if err != nil { + log.Fatal(err) + } + key := strings.TrimSpace(string(b)) + if matched, _ := regexp.MatchString(`(?i)^[0-9a-f]{64,}$`, key); !matched { + log.Fatalf("key in %s must contain 64+ hex digits", *meshPSKFile) + } + s.SetMeshKey(key) + log.Printf("DERP mesh key configured") + } + + // TODO(bradfitz): parse & use the *meshWith + _ = *meshWith + expvar.Publish("derp", s.ExpVar()) // Create our own mux so we don't expose /debug/ stuff to the world. @@ -192,6 +212,7 @@ func debugHandler(s *derp.Server) http.Handler { `) f("
  • Hostname: %v
  • \n", *hostname) f("
  • Uptime: %v
  • \n", tsweb.Uptime()) + f("
  • Mesh Key: %v
  • \n", s.HasMeshKey()) f(`
  • /debug/vars (Go)
  • /debug/varz (Prometheus)
  • @@ -268,7 +289,7 @@ func serveSTUN() { } } -var validProdHostname = regexp.MustCompile(`^derp(\d+|\-\w+)?\.tailscale\.com\.?$`) +var validProdHostname = regexp.MustCompile(`^derp([^.]*)\.tailscale\.com\.?$`) func prodAutocertHostPolicy(_ context.Context, host string) error { if validProdHostname.MatchString(host) { @@ -276,3 +297,11 @@ func prodAutocertHostPolicy(_ context.Context, host string) error { } return errors.New("invalid hostname") } + +func defaultMeshPSKFile() string { + const def = "/home/derp/keys/derp-mesh.key" + if _, err := os.Stat(def); err == nil { + return def + } + return "" +} diff --git a/cmd/derper/derper_test.go b/cmd/derper/derper_test.go index 2808466a7..e26864a32 100644 --- a/cmd/derper/derper_test.go +++ b/cmd/derper/derper_test.go @@ -17,10 +17,11 @@ func TestProdAutocertHostPolicy(t *testing.T) { {"derp.tailscale.com", true}, {"derp.tailscale.com.", true}, {"derp1.tailscale.com", true}, + {"derp1b.tailscale.com", true}, {"derp2.tailscale.com", true}, {"derp02.tailscale.com", true}, {"derp-nyc.tailscale.com", true}, - {"derpfoo.tailscale.com", false}, + {"derpfoo.tailscale.com", true}, {"derp02.bar.tailscale.com", false}, {"example.net", false}, } diff --git a/derp/derp.go b/derp/derp.go index 7f0ccf24d..08e4f20ae 100644 --- a/derp/derp.go +++ b/derp/derp.go @@ -32,10 +32,11 @@ const MaxPacketSize = 64 << 10 const magic = "DERP🔑" // 8 bytes: 0x44 45 52 50 f0 9f 94 91 const ( - nonceLen = 24 - keyLen = 32 - maxInfoLen = 1 << 20 - keepAlive = 60 * time.Second + nonceLen = 24 + frameHeaderLen = 1 + 4 // frameType byte + 4 byte length + keyLen = 32 + maxInfoLen = 1 << 20 + keepAlive = 60 * time.Second ) // protocolVersion is bumped whenever there's a wire-incompatible change. @@ -81,6 +82,18 @@ const ( // framePeerGone to B so B can forget that a reverse path // exists on that connection to get back to A. framePeerGone = frameType(0x08) // 32B pub key of peer that's gone + + // framePeerPresent is like framePeerGone, but for other + // members of the DERP region when they're meshed up together. + framePeerPresent = frameType(0x09) // 32B pub key of peer that's connected + + // frameWatchConns is how one DERP node in a regional mesh + // subscribes to the others in the region. + // There's no payload. If the sender doesn't have permission, the connection + // is closed. Otherwise, the client is initially flooded with + // framePeerPresent for all connected nodes, and then a stream of + // framePeerPresent & framePeerGone has peers connect and disconnect. + frameWatchConns = frameType(0x10) ) var bin = binary.BigEndian diff --git a/derp/derp_client.go b/derp/derp_client.go index b4692c483..9e537ee32 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -27,6 +27,7 @@ type Client struct { logf logger.Logf nc Conn br *bufio.Reader + meshKey string wmu sync.Mutex // hold while writing to bw bw *bufio.Writer @@ -34,6 +35,15 @@ type Client struct { } func NewClient(privateKey key.Private, nc Conn, brw *bufio.ReadWriter, logf logger.Logf) (*Client, error) { + noMeshKey := "" + return NewMeshClient(privateKey, nc, brw, logf, noMeshKey) +} + +// NewMeshClient is the Client constructor for trusted clients that +// are a peer in a cluster mesh. +// +// An empty meshKey is equivalent to just using NewClient. +func NewMeshClient(privateKey key.Private, nc Conn, brw *bufio.ReadWriter, logf logger.Logf, meshKey string) (*Client, error) { c := &Client{ privateKey: privateKey, publicKey: privateKey.Public(), @@ -41,8 +51,8 @@ func NewClient(privateKey key.Private, nc Conn, brw *bufio.ReadWriter, logf logg nc: nc, br: brw.Reader, bw: brw.Writer, + meshKey: meshKey, } - if err := c.recvServerKey(); err != nil { return nil, fmt.Errorf("derp.Client: failed to receive server key: %v", err) } @@ -109,6 +119,12 @@ func (c *Client) recvServerInfo() (*serverInfo, error) { type clientInfo struct { Version int // `json:"version,omitempty"` + + // MeshKey optionally specifies a pre-shared key used by + // trusted clients. It's required to subscribe to the + // connection list & forward packets. It's empty for regular + // users. + MeshKey string // `json:"meshKey,omitempty"` } func (c *Client) sendClientKey() error { @@ -116,7 +132,10 @@ func (c *Client) sendClientKey() error { if _, err := crand.Read(nonce[:]); err != nil { return err } - msg, err := json.Marshal(clientInfo{Version: protocolVersion}) + msg, err := json.Marshal(clientInfo{ + Version: protocolVersion, + MeshKey: c.meshKey, + }) if err != nil { return err } @@ -186,6 +205,17 @@ func (c *Client) NotePreferred(preferred bool) (err error) { return c.bw.Flush() } +// WatchConnectionChanges sends a request to subscribe to the peer's connection list. +// It's a fatal error if the client wasn't created with NewMeshClient. +func (c *Client) WatchConnectionChanges() error { + c.wmu.Lock() + defer c.wmu.Unlock() + if err := writeFrameHeader(c.bw, frameWatchConns, 0); err != nil { + return err + } + return c.bw.Flush() +} + // ReceivedMessage represents a type returned by Client.Recv. Unless // otherwise documented, the returned message aliases the byte slice // provided to Recv and thus the message is only as good as that @@ -211,11 +241,21 @@ type PeerGoneMessage key.Public func (PeerGoneMessage) msg() {} +// PeerPresentMessage is a ReceivedMessage that indicates that the client +// is connected to the server. (Only used by trusted mesh clients) +type PeerPresentMessage key.Public + +func (PeerPresentMessage) msg() {} + // Recv reads a message from the DERP server. // The provided buffer must be large enough to receive a complete packet, // which in practice are are 1.5-4 KB, but can be up to 64 KB. // Once Recv returns an error, the Client is dead forever. func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) { + return c.recvTimeout(b, 120*time.Second) +} + +func (c *Client) recvTimeout(b []byte, timeout time.Duration) (m ReceivedMessage, err error) { if c.readErr != nil { return nil, c.readErr } @@ -227,7 +267,7 @@ func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) { }() for { - c.nc.SetReadDeadline(time.Now().Add(120 * time.Second)) + c.nc.SetReadDeadline(time.Now().Add(timeout)) t, n, err := readFrame(c.br, 1<<20, b) if err != nil { return nil, err @@ -248,6 +288,15 @@ func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) { copy(pg[:], b[:keyLen]) return pg, nil + case framePeerPresent: + if n < keyLen { + c.logf("[unexpected] dropping short peerPresent frame from DERP server") + continue + } + var pg PeerPresentMessage + copy(pg[:], b[:keyLen]) + return pg, nil + case frameRecvPacket: var rp ReceivedPacket if c.protoVersion < protocolSrcAddrs { diff --git a/derp/derp_server.go b/derp/derp_server.go index 530f87eb4..a95a7702d 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -47,6 +47,7 @@ type Server struct { publicKey key.Public logf logger.Logf memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish) + meshKey string // Counters: packetsSent, bytesSent expvar.Int @@ -72,6 +73,7 @@ type Server struct { netConns map[Conn]chan struct{} // chan is closed when conn closes clients map[key.Public]*sclient clientsEver map[key.Public]bool // never deleted from, for stats; fine for now + watchers map[*sclient]bool // mesh peer -> true } // Conn is the subset of the underlying net.Conn the DERP Server needs. @@ -101,6 +103,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { clientsEver: make(map[key.Public]bool), netConns: make(map[Conn]chan struct{}), memSys0: ms.Sys, + watchers: map[*sclient]bool{}, } s.packetsDroppedUnknown = s.packetsDroppedReason.Get("unknown_dest") s.packetsDroppedGone = s.packetsDroppedReason.Get("gone") @@ -110,6 +113,16 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { return s } +// SetMesh sets the pre-shared key that regional DERP servers used to mesh +// amongst themselves. +// +// It must be called before serving begins. +func (s *Server) SetMeshKey(v string) { + s.meshKey = v +} + +func (s *Server) HasMeshKey() bool { return s.meshKey != "" } + // Close closes the server and waits for the connections to disconnect. func (s *Server) Close() error { s.mu.Lock() @@ -188,6 +201,19 @@ func (s *Server) registerClient(c *sclient) { s.clients[c.key] = c s.clientsEver[c.key] = true s.curClients.Add(1) + s.broadcastPeerStateChangeLocked(c.key, true) +} + +// broadcastPeerStateChangeLocked enqueues a message to all watchers +// (other DERP nodes in the region, or trusted clients) that peer's +// presence changed. +// +// s.mu must be held. +func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) { + for w := range s.watchers { + w.peerStateChange = append(w.peerStateChange, peerConnState{peer: peer, present: present}) + go w.requestMeshUpdate() + } } // unregisterClient removes a client from the server. @@ -199,6 +225,10 @@ func (s *Server) unregisterClient(c *sclient) { c.logf("removing connection") delete(s.clients, c.key) } + if c.canMesh { + delete(s.watchers, c) + } + s.broadcastPeerStateChangeLocked(c.key, false) s.curClients.Add(-1) if c.preferred { @@ -224,6 +254,26 @@ func (s *Server) unregisterClient(c *sclient) { } } +func (s *Server) addWatcher(c *sclient) { + if !c.canMesh { + panic("invariant: addWatcher called without permissions") + } + + s.mu.Lock() + defer s.mu.Unlock() + + // Queue messages for each already-connected client. + for peer := range s.clients { + c.peerStateChange = append(c.peerStateChange, peerConnState{peer: peer, present: true}) + } + + // And enroll the watcher in future updates (of both + // connections & disconnections). + s.watchers[c] = true + + go c.requestMeshUpdate() +} + func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { br, bw := brw.Reader, brw.Writer nc.SetDeadline(time.Now().Add(10 * time.Second)) @@ -259,6 +309,10 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN sendQueue: make(chan pkt, perClientSendQueueDepth), peerGone: make(chan key.Public), sentTo: make(map[key.Public]int64), + canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, + } + if c.canMesh { + c.meshUpdate = make(chan struct{}) } if clientInfo != nil { c.info = *clientInfo @@ -307,6 +361,8 @@ func (c *sclient) run(ctx context.Context) error { err = c.handleFrameNotePreferred(ft, fl) case frameSendPacket: err = c.handleFrameSendPacket(ft, fl) + case frameWatchConns: + err = c.handleFrameWatchConns(ft, fl) default: err = c.handleUnknownFrame(ft, fl) } @@ -333,6 +389,17 @@ func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error { return nil } +func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error { + if fl != 0 { + return fmt.Errorf("handleFrameWatchConns wrong size") + } + if !c.canMesh { + return fmt.Errorf("insufficient permissions") + } + c.s.addWatcher(c) + return nil +} + func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { s := c.s @@ -418,6 +485,16 @@ func (c *sclient) requestPeerGoneWrite(peer key.Public) { } } +func (c *sclient) requestMeshUpdate() { + if !c.canMesh { + panic("unexpected requestMeshUpdate") + } + select { + case c.meshUpdate <- struct{}{}: + case <-c.done: + } +} + func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error { // TODO(crawshaw): implement policy constraints on who can use the DERP server // TODO(bradfitz): ... and at what rate. @@ -532,7 +609,9 @@ type sclient struct { done <-chan struct{} // closed when connection closes remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() sendQueue chan pkt // packets queued to this client; never closed - peerGone chan key.Public // write request that a previous sender has disconnected + peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) + meshUpdate chan struct{} // write request to write peerStateChange + canMesh bool // clientInfo had correct mesh token for inter-region routing // Owned by run, not thread-safe. br *bufio.Reader @@ -547,6 +626,21 @@ type sclient struct { // sentTo tracks all the peers this client has ever sent a packet to, and at which // connection number. sentTo map[key.Public]int64 // recipient => rcpt's latest sclient.connNum + + // Guarded by s.mu + // + // peerStateChange is used by mesh peers (a set of regional + // DERP servers) and contains records that need to be sent to + // the client for them to update their map of who's connected + // to this node. + peerStateChange []peerConnState +} + +// peerConnState represents whether a peer is connected to the server +// or not. +type peerConnState struct { + peer key.Public + present bool } // pkt is a request to write a data frame to an sclient. @@ -628,6 +722,9 @@ func (c *sclient) sendLoop(ctx context.Context) error { case peer := <-c.peerGone: werr = c.sendPeerGone(peer) continue + case <-c.meshUpdate: + werr = c.sendMeshUpdates() + continue case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) continue @@ -648,6 +745,9 @@ func (c *sclient) sendLoop(ctx context.Context) error { return nil case peer := <-c.peerGone: werr = c.sendPeerGone(peer) + case <-c.meshUpdate: + werr = c.sendMeshUpdates() + continue case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) case <-keepAliveTick.C: @@ -677,6 +777,59 @@ func (c *sclient) sendPeerGone(peer key.Public) error { return err } +// sendPeerPresent sends a peerPresent frame, without flushing. +func (c *sclient) sendPeerPresent(peer key.Public) error { + c.setWriteDeadline() + if err := writeFrameHeader(c.bw, framePeerPresent, keyLen); err != nil { + return err + } + _, err := c.bw.Write(peer[:]) + return err +} + +// sendMeshUpdates drains as many mesh peerStateChange entries as +// possible into the write buffer WITHOUT flushing or otherwise +// blocking (as it holds c.s.mu while working). If it can't drain them +// all, it schedules itself to be called again in the future. +func (c *sclient) sendMeshUpdates() error { + c.s.mu.Lock() + defer c.s.mu.Unlock() + + writes := 0 + for _, pcs := range c.peerStateChange { + if c.bw.Available() <= frameHeaderLen+keyLen { + break + } + var err error + if pcs.present { + err = c.sendPeerPresent(pcs.peer) + } else { + err = c.sendPeerGone(pcs.peer) + } + if err != nil { + // Shouldn't happen, though, as we're writing + // into available buffer space, not the + // network. + return err + } + writes++ + } + + remain := copy(c.peerStateChange, c.peerStateChange[writes:]) + c.peerStateChange = c.peerStateChange[:remain] + + // Did we manage to write them all into the bufio buffer without flushing? + if len(c.peerStateChange) == 0 { + if cap(c.peerStateChange) > 16 { + c.peerStateChange = nil + } + } else { + // Didn't finish in the buffer space provided; schedule a future run. + go c.requestMeshUpdate() + } + return nil +} + // sendPacket writes contents to the client in a RecvPacket frame. If // srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses // DERPv2. The bytes of contents are only valid until this function @@ -729,6 +882,7 @@ func (s *Server) ExpVar() expvar.Var { m := new(metrics.Set) m.Set("counter_unique_clients_ever", s.expVarFunc(func() interface{} { return len(s.clientsEver) })) m.Set("gauge_memstats_sys0", expvar.Func(func() interface{} { return int64(s.memSys0) })) + m.Set("gauge_watchers", s.expVarFunc(func() interface{} { return len(s.watchers) })) m.Set("gauge_current_connnections", &s.curClients) m.Set("gauge_current_home_connnections", &s.curHomeClients) m.Set("accepts", &s.accepts) diff --git a/derp/derp_test.go b/derp/derp_test.go index 26cf81e55..0f44f217c 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -13,11 +13,13 @@ import ( "fmt" "io" "net" + "sync" "testing" "time" "tailscale.com/net/nettest" "tailscale.com/types/key" + "tailscale.com/types/logger" ) func newPrivateKey(t *testing.T) (k key.Private) { @@ -391,3 +393,229 @@ func TestSendFreeze(t *testing.T) { } } } + +type testServer struct { + s *Server + ln net.Listener + logf logger.Logf + + mu sync.Mutex + pubName map[key.Public]string + clients map[*testClient]bool +} + +func (ts *testServer) addTestClient(c *testClient) { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.clients[c] = true +} + +func (ts *testServer) addKeyName(k key.Public, name string) { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.pubName[k] = name + ts.logf("test adding named key %q for %x", name, k) +} + +func (ts *testServer) keyName(k key.Public) string { + ts.mu.Lock() + defer ts.mu.Unlock() + if name, ok := ts.pubName[k]; ok { + return name + } + return k.ShortString() +} + +func (ts *testServer) close(t *testing.T) error { + ts.ln.Close() + ts.s.Close() + for c := range ts.clients { + c.close(t) + } + return nil +} + +func newTestServer(t *testing.T) *testServer { + t.Helper() + logf := logger.WithPrefix(t.Logf, "derp-server: ") + s := NewServer(newPrivateKey(t), logf) + s.SetMeshKey("mesh-key") + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + go func() { + i := 0 + for { + i++ + c, err := ln.Accept() + if err != nil { + return + } + // TODO: register c in ts so Close also closes it? + go func(i int) { + brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c)) + go s.Accept(c, brwServer, fmt.Sprintf("test-client-%d", i)) + }(i) + } + }() + return &testServer{ + s: s, + ln: ln, + logf: logf, + clients: map[*testClient]bool{}, + pubName: map[key.Public]string{}, + } +} + +type testClient struct { + name string + c *Client + nc net.Conn + pub key.Public + ts *testServer + closed bool +} + +func newTestClient(t *testing.T, ts *testServer, name string, newClient func(net.Conn, key.Private, logger.Logf) (*Client, error)) *testClient { + t.Helper() + nc, err := net.Dial("tcp", ts.ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + key := newPrivateKey(t) + ts.addKeyName(key.Public(), name) + c, err := newClient(nc, key, logger.WithPrefix(t.Logf, "client-"+name+": ")) + if err != nil { + t.Fatal(err) + } + tc := &testClient{ + name: name, + nc: nc, + c: c, + ts: ts, + pub: key.Public(), + } + ts.addTestClient(tc) + return tc +} + +func newRegularClient(t *testing.T, ts *testServer, name string) *testClient { + return newTestClient(t, ts, name, func(nc net.Conn, priv key.Private, logf logger.Logf) (*Client, error) { + brw := bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)) + return NewClient(priv, nc, brw, logf) + }) +} + +func newTestWatcher(t *testing.T, ts *testServer, name string) *testClient { + return newTestClient(t, ts, name, func(nc net.Conn, priv key.Private, logf logger.Logf) (*Client, error) { + brw := bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)) + c, err := NewMeshClient(priv, nc, brw, logf, "mesh-key") + if err != nil { + return nil, err + } + if err := c.WatchConnectionChanges(); err != nil { + return nil, err + } + return c, nil + }) +} + +func (tc *testClient) wantPresent(t *testing.T, peers ...key.Public) { + t.Helper() + want := map[key.Public]bool{} + for _, k := range peers { + want[k] = true + } + + var buf [64 << 10]byte + for { + m, err := tc.c.recvTimeout(buf[:], time.Second) + if err != nil { + t.Fatal(err) + } + switch m := m.(type) { + case PeerPresentMessage: + got := key.Public(m) + if !want[got] { + t.Fatalf("got peer present for %v; want present for %v", tc.ts.keyName(got), logger.ArgWriter(func(bw *bufio.Writer) { + for _, pub := range peers { + fmt.Fprintf(bw, "%s ", tc.ts.keyName(pub)) + } + })) + } + delete(want, got) + if len(want) == 0 { + return + } + default: + t.Fatalf("unexpected message type %T", m) + } + } +} + +func (tc *testClient) wantGone(t *testing.T, peer key.Public) { + t.Helper() + var buf [64 << 10]byte + m, err := tc.c.recvTimeout(buf[:], time.Second) + if err != nil { + t.Fatal(err) + } + switch m := m.(type) { + case PeerGoneMessage: + got := key.Public(m) + if peer != got { + t.Errorf("got gone message for %v; want gone for %v", tc.ts.keyName(got), tc.ts.keyName(peer)) + } + default: + t.Fatalf("unexpected message type %T", m) + } +} + +func (c *testClient) close(t *testing.T) { + t.Helper() + if c.closed { + return + } + c.closed = true + t.Logf("closing client %q (%x)", c.name, c.pub) + c.nc.Close() +} + +// TestWatch tests the connection watcher mechanism used by regional +// DERP nodes to mesh up with each other. +func TestWatch(t *testing.T) { + ts := newTestServer(t) + defer ts.close(t) + + w1 := newTestWatcher(t, ts, "w1") + w1.wantPresent(t, w1.pub) + + c1 := newRegularClient(t, ts, "c1") + w1.wantPresent(t, c1.pub) + + c2 := newRegularClient(t, ts, "c2") + w1.wantPresent(t, c2.pub) + + w2 := newTestWatcher(t, ts, "w2") + w1.wantPresent(t, w2.pub) + w2.wantPresent(t, w1.pub, w2.pub, c1.pub, c2.pub) + + c3 := newRegularClient(t, ts, "c3") + w1.wantPresent(t, c3.pub) + w2.wantPresent(t, c3.pub) + + c2.close(t) + w1.wantGone(t, c2.pub) + w2.wantGone(t, c2.pub) + + w3 := newTestWatcher(t, ts, "w3") + w1.wantPresent(t, w3.pub) + w2.wantPresent(t, w3.pub) + w3.wantPresent(t, c1.pub, c3.pub, w1.pub, w2.pub, w3.pub) + + c1.close(t) + w1.wantGone(t, c1.pub) + w2.wantGone(t, c1.pub) + w3.wantGone(t, c1.pub) +} diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index e0f4e83cd..95d679907 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -43,6 +43,7 @@ import ( type Client struct { TLSConfig *tls.Config // optional; nil means default DNSCache *dnscache.Resolver // optional; nil means no caching + MeshKey string // optional; for trusted clients privateKey key.Private logf logger.Logf @@ -272,7 +273,7 @@ func (c *Client) connect(ctx context.Context, caller string) (client *derp.Clien return nil, fmt.Errorf("GET failed: %v: %s", err, b) } - derpClient, err := derp.NewClient(c.privateKey, httpConn, brw, c.logf) + derpClient, err := derp.NewMeshClient(c.privateKey, httpConn, brw, c.logf, c.MeshKey) if err != nil { return nil, err } @@ -492,6 +493,18 @@ func (c *Client) NotePreferred(v bool) { } } +func (c *Client) WatchConnectionChanges() error { + client, err := c.connect(context.TODO(), "derphttp.Client.WatchConnectionChanges") + if err != nil { + return err + } + err = client.WatchConnectionChanges() + if err != nil { + c.closeForReconnect(client) + } + return err +} + func (c *Client) Recv(b []byte) (derp.ReceivedMessage, error) { client, err := c.connect(context.TODO(), "derphttp.Client.Recv") if err != nil {