From 73280595a8880bdca12d406fb3b18e6e96e8bf73 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 30 Aug 2021 11:16:11 -0700 Subject: [PATCH] derp: accept dup clients without closing prior's connection A public key should only have max one connection to a given DERP node (or really: one connection to a node in a region). But if people clone their machine keys (e.g. clone their VM, Raspbery Pi SD card, etc), then we can get into a situation where a public key is connected multiple times. Originally, the DERP server handled this by just kicking out a prior connections whenever a new one came. But this led to reconnect fights where 2+ nodes were in hard loops trying to reconnect and kicking out their peer. Then a909d37a59f6e36f47209db4e6b16497715f8de9 tried to add rate limiting to how often that dup-kicking can happen, but empirically it just doesn't work and ~leaks a bunch of goroutines and TCP connections, tying them up for hour+ while more and more accumulate and waste memory. Mostly because we were doing a time.Sleep forever while not reading from their TCP connections. Instead, just accept multiple connections per public key but track which is the most recent. And if two both are writing back & forth, then optionally disable them both. That last part is only enabled in tests for now. The current default policy is just last-sender-wins while we gather the next round of stats. Updates #2751 Signed-off-by: Brad Fitzpatrick --- derp/derp_server.go | 367 +++++++++++++++++++++++++++++++------- derp/derp_test.go | 333 +++++++++++++++++++++++----------- derp/dropreason_string.go | 5 +- 3 files changed, 529 insertions(+), 176 deletions(-) diff --git a/derp/derp_server.go b/derp/derp_server.go index e0443319b..9fe36a524 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -41,6 +41,7 @@ import ( "tailscale.com/client/tailscale" "tailscale.com/disco" "tailscale.com/metrics" + "tailscale.com/syncs" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/pad32" @@ -77,6 +78,21 @@ const ( writeTimeout = 2 * time.Second ) +// dupPolicy is a temporary (2021-08-30) mechanism to change the policy +// of how duplicate connection for the same key are handled. +type dupPolicy int8 + +const ( + // lastWriterIsActive is a dupPolicy where the connection + // to send traffic for a peer is the active one. + lastWriterIsActive dupPolicy = iota + + // disableFighters is a dupPolicy that detects if peers + // are trying to send interleaved with each other and + // then disables all of them. + disableFighters +) + // Server is a DERP server. type Server struct { // WriteTimeout, if non-zero, specifies how long to wait @@ -90,9 +106,9 @@ type Server struct { meshKey string limitedLogf logger.Logf metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate + dupPolicy dupPolicy // Counters: - _ pad32.Four packetsSent, bytesSent expvar.Int packetsRecv, bytesRecv expvar.Int packetsRecvByKind metrics.LabelMap @@ -112,9 +128,9 @@ type Server struct { accepts expvar.Int curClients expvar.Int curHomeClients expvar.Int // ones with preferred - clientsReplaced expvar.Int - clientsReplaceLimited expvar.Int - clientsReplaceSleeping expvar.Int + dupClientKeys expvar.Int // current number of public keys we have 2+ connections for + dupClientConns expvar.Int // current number of connections sharing a public key + dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed unknownFrames expvar.Int homeMovesIn expvar.Int // established clients announce home server moves in homeMovesOut expvar.Int // established clients announce home server moves out @@ -130,7 +146,7 @@ type Server struct { mu sync.Mutex closed bool netConns map[Conn]chan struct{} // chan is closed when conn closes - clients map[key.Public]*sclient + clients map[key.Public]clientSet watchers map[*sclient]bool // mesh peer -> true // clientsMesh tracks all clients in the cluster, both locally // and to mesh peers. If the value is nil, that means the @@ -148,6 +164,112 @@ type Server struct { keyOfAddr map[netaddr.IPPort]key.Public } +// clientSet represents 1 or more *sclients. +// +// The two implementations are singleClient and *dupClientSet. +// +// In the common case, client should only have one connection to the +// DERP server for a given key. When they're connected multiple times, +// we record their set of connections in dupClientSet and keep their +// connections open to make them happy (to keep them from spinning, +// etc) and keep track of which is the latest connection. If only the last +// is sending traffic, that last one is the active connection and it +// gets traffic. Otherwise, in the case of a cloned node key, the +// whole set of dups doesn't receive data frames. +// +// All methods should only be called while holding Server.mu. +// +// TODO(bradfitz): Issue 2746: in the future we'll send some sort of +// "health_error" frame to them that'll communicate to the end users +// that they cloned a device key, and we'll also surface it in the +// admin panel, etc. +type clientSet interface { + // ActiveClient returns the most recently added client to + // the set, as long as it hasn't been disabled, in which + // case it returns nil. + ActiveClient() *sclient + + // Len returns the number of clients in the set. + Len() int + + // ForeachClient calls f for each client in the set. + ForeachClient(f func(*sclient)) +} + +// singleClient is a clientSet of a single connection. +// This is the common case. +type singleClient struct{ c *sclient } + +func (s singleClient) ActiveClient() *sclient { return s.c } +func (s singleClient) Len() int { return 1 } +func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) } + +// A dupClientSet is a clientSet of more than 1 connection. +// +// This can occur in some reasonable cases (temporarily while users +// are changing networks) or in the case of a cloned key. In the +// cloned key case, both peers are speaking and the clients get +// disabled. +// +// All fields are guarded by Server.mu. +type dupClientSet struct { + // set is the set of connected clients for sclient.key. + // The values are all true. + set map[*sclient]bool + + // last is the most recent addition to set, or nil if the most + // recent one has since disconnected and nobody else has send + // data since. + last *sclient + + // sendHistory is a log of which members of set have sent + // frames to the derp server, with adjacent duplicates + // removed. When a member of set is removed, the same + // element(s) are removed from sendHistory. + sendHistory []*sclient +} + +func (s *dupClientSet) ActiveClient() *sclient { + if s.last != nil && !s.last.isDisabled.Get() { + return s.last + } + return nil +} +func (s *dupClientSet) Len() int { return len(s.set) } +func (s *dupClientSet) ForeachClient(f func(*sclient)) { + for c := range s.set { + f(c) + } +} + +// removeClient removes c from s and reports whether it was in s +// to begin with. +func (s *dupClientSet) removeClient(c *sclient) bool { + n := len(s.set) + delete(s.set, c) + if s.last == c { + s.last = nil + } + if len(s.set) == n { + return false + } + + trim := s.sendHistory[:0] + for _, v := range s.sendHistory { + if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) { + trim = append(trim, v) + } + } + for i := len(trim); i < len(s.sendHistory); i++ { + s.sendHistory[i] = nil + } + s.sendHistory = trim + if s.last == nil && len(s.sendHistory) > 0 { + s.last = s.sendHistory[len(s.sendHistory)-1] + } + return true +} + // PacketForwarder is something that can forward packets. // // It's mostly an interface for circular dependency reasons; the @@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { packetsRecvByKind: metrics.LabelMap{Label: "kind"}, packetsDroppedReason: metrics.LabelMap{Label: "reason"}, packetsDroppedType: metrics.LabelMap{Label: "type"}, - clients: map[key.Public]*sclient{}, + clients: map[key.Public]clientSet{}, clientsMesh: map[key.Public]PacketForwarder{}, netConns: map[Conn]chan struct{}{}, memSys0: ms.Sys, @@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert } // registerClient notes that client c is now authenticated and ready for packets. // -// If c's public key was already connected with a different -// connection, the prior one is closed, unless it's fighting rapidly -// with another client with the same key, in which case the returned -// ok is false, and the caller should wait the provided duration -// before trying again. -func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) { +// If c.key is connected more than once, the earlier connection(s) are +// placed in a non-active state where we read from them (primarily to +// observe EOFs/timeouts) but won't send them frames on the assumption +// that they're dead. +func (s *Server) registerClient(c *sclient) { s.mu.Lock() defer s.mu.Unlock() - old := s.clients[c.key] - if old == nil { - c.logf("adding connection") - } else { - // Take over the old rate limiter, discarding the one - // our caller just made. - c.replaceLimiter = old.replaceLimiter - if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() { - if d := rr.DelayFrom(timeNow()); d > 0 { - s.clientsReplaceLimited.Add(1) - return false, d - } + + set := s.clients[c.key] + switch set := set.(type) { + case nil: + s.clients[c.key] = singleClient{c} + case singleClient: + s.dupClientKeys.Add(1) + s.dupClientConns.Add(2) // both old and new count + s.dupClientConnTotal.Add(1) + old := set.ActiveClient() + old.isDup.Set(true) + c.isDup.Set(true) + s.clients[c.key] = &dupClientSet{ + last: c, + set: map[*sclient]bool{ + old: true, + c: true, + }, + sendHistory: []*sclient{old}, } - s.clientsReplaced.Add(1) - c.logf("adding connection, replacing %s", old.remoteAddr) - go old.nc.Close() + case *dupClientSet: + s.dupClientConns.Add(1) // the gauge + s.dupClientConnTotal.Add(1) // the counter + c.isDup.Set(true) + set.set[c] = true + set.last = c + set.sendHistory = append(set.sendHistory, c) } - s.clients[c.key] = c + if _, ok := s.clientsMesh[c.key]; !ok { s.clientsMesh[c.key] = nil // just for varz of total users in cluster } s.keyOfAddr[c.remoteIPPort] = c.key s.curClients.Add(1) s.broadcastPeerStateChangeLocked(c.key, true) - return true, 0 } // broadcastPeerStateChangeLocked enqueues a message to all watchers @@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) { func (s *Server) unregisterClient(c *sclient) { s.mu.Lock() defer s.mu.Unlock() - cur := s.clients[c.key] - if cur == c { + + set := s.clients[c.key] + switch set := set.(type) { + case nil: + c.logf("[unexpected]; clients map is empty") + case singleClient: c.logf("removing connection") delete(s.clients, c.key) if v, ok := s.clientsMesh[c.key]; ok && v == nil { @@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) { s.notePeerGoneFromRegionLocked(c.key) } s.broadcastPeerStateChangeLocked(c.key, false) + case *dupClientSet: + if set.removeClient(c) { + s.dupClientConns.Add(-1) + } else { + c.logf("[unexpected]; dup client set didn't shrink") + } + if set.Len() == 1 { + s.dupClientConns.Add(-1) // again; for the original one's + s.dupClientKeys.Add(-1) + var remain *sclient + for remain = range set.set { + break + } + if remain == nil { + panic("unexpected nil remain from single element dup set") + } + remain.isDisabled.Set(false) + remain.isDup.Set(false) + s.clients[c.key] = singleClient{remain} + } } + if c.canMesh { delete(s.watchers, c) } @@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) { // or move them over to the active client (in case a replaced client // connection is being unregistered). for pubKey, connNum := range s.sentTo[key] { - if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum { - go peer.requestPeerGoneWrite(key) + set, ok := s.clients[pubKey] + if !ok { + continue } + set.ForeachClient(func(peer *sclient) { + if peer.connNum == connNum { + go peer.requestPeerGoneWrite(key) + } + }) } delete(s.sentTo, key) } @@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN discoSendQueue: make(chan pkt, perClientSendQueueDepth), peerGone: make(chan key.Public), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, - - // Allow kicking out previous connections once a - // minute, with a very high burst of 100. Once a - // minute is less than the client's 2 minute - // inactivity timeout. - replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100), } if c.canMesh { @@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN c.info = *clientInfo } - for { - ok, d := s.registerClient(c) - if ok { - break - } - s.clientsReplaceSleeping.Add(1) - timeSleep(d) - s.clientsReplaceSleeping.Add(-1) - } + s.registerClient(c) defer s.unregisterClient(c) err = s.sendServerInfo(c.bw, clientKey) @@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error { } return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err) } + c.s.noteClientActivity(c) switch ft { case frameNotePreferred: err = c.handleFrameNotePreferred(ft, fl) @@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error { s.mu.Lock() defer s.mu.Unlock() - if target, ok := s.clients[targetKey]; ok { - c.logf("frameClosePeer closing peer %x", targetKey) - go target.nc.Close() + if set, ok := s.clients[targetKey]; ok { + if set.Len() == 1 { + c.logf("frameClosePeer closing peer %x", targetKey) + } else { + c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len()) + } + set.ForeachClient(func(target *sclient) { + go target.nc.Close() + }) } else { c.logf("frameClosePeer failed to find peer %x", targetKey) } @@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } s.packetsForwardedIn.Add(1) + var dstLen int + var dst *sclient + s.mu.Lock() - dst := s.clients[dstKey] + if set, ok := s.clients[dstKey]; ok { + dstLen = set.Len() + dst = set.ActiveClient() + } if dst != nil { s.notePeerSendLocked(srcKey, dst) } s.mu.Unlock() if dst == nil { - s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd) + reason := dropReasonUnknownDestOnFwd + if dstLen > 1 { + reason = dropReasonDupClient + } + s.recordDrop(contents, srcKey, dstKey, reason) return nil } @@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } var fwd PacketForwarder + var dstLen int + var dst *sclient + s.mu.Lock() - dst := s.clients[dstKey] - if dst == nil { - fwd = s.clientsMesh[dstKey] - } else { + if set, ok := s.clients[dstKey]; ok { + dstLen = set.Len() + dst = set.ActiveClient() + } + if dst != nil { s.notePeerSendLocked(c.key, dst) + } else if dstLen < 1 { + fwd = s.clientsMesh[dstKey] } s.mu.Unlock() @@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } return nil } - s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest) + reason := dropReasonUnknownDest + if dstLen > 1 { + reason = dropReasonDupClient + } + s.recordDrop(contents, c.key, dstKey, reason) return nil } @@ -741,6 +916,7 @@ const ( dropReasonQueueHead // destination queue is full, dropped packet at queue head dropReasonQueueTail // destination queue is full, dropped packet at queue tail dropReasonWriteError // OS write() failed + dropReasonDupClient // the public key is connected 2+ times (active/active, fighting) ) func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) { @@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error { return err } +func (s *Server) noteClientActivity(c *sclient) { + if !c.isDup.Get() { + // Fast path for clients that aren't in a dup set. + return + } + if c.isDisabled.Get() { + // If they're already disabled, no point checking more. + return + } + s.mu.Lock() + defer s.mu.Unlock() + + ds, ok := s.clients[c.key].(*dupClientSet) + if !ok { + // It became unduped in between the isDup fast path check above + // and the mutex check. Nothing to do. + return + } + + if s.dupPolicy == lastWriterIsActive { + ds.last = c + } else if ds.last == nil { + // If we didn't have a primary, let the current + // speaker be the primary. + ds.last = c + } + + if sh := ds.sendHistory; len(sh) != 0 && sh[len(sh)-1] == c { + // The client c was the last client to make activity + // in this set and it was already recorded. Nothing to + // do. + return + } + + // If we saw this connection send previously, then consider + // the group fighting and disable them all. + if s.dupPolicy == disableFighters { + for _, prior := range ds.sendHistory { + if prior == c { + ds.ForeachClient(func(c *sclient) { + c.isDisabled.Set(true) + }) + break + } + } + } + + // Append this client to the list of clients who spoke last. + ds.sendHistory = append(ds.sendHistory, c) +} + type serverInfo struct { Version int `json:"version,omitempty"` } @@ -979,14 +1206,16 @@ type sclient struct { key key.Public info clientInfo logf logger.Logf - done <-chan struct{} // closed when connection closes - remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() - remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. - sendQueue chan pkt // packets queued to this client; never closed - discoSendQueue chan pkt // important packets queued to this client; never closed - peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) - meshUpdate chan struct{} // write request to write peerStateChange - canMesh bool // clientInfo had correct mesh token for inter-region routing + done <-chan struct{} // closed when connection closes + remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() + remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. + sendQueue chan pkt // packets queued to this client; never closed + discoSendQueue chan pkt // important packets queued to this client; never closed + peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) + meshUpdate chan struct{} // write request to write peerStateChange + canMesh bool // clientInfo had correct mesh token for inter-region routing + isDup syncs.AtomicBool // whether more than 1 sclient for key is connected + isDisabled syncs.AtomicBool // whether sends to this peer are disabled due to active/active dups // replaceLimiter controls how quickly two connections with // the same client key can kick each other off the server by @@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var { m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) })) m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) })) m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) })) + m.Set("gauge_current_dup_client_keys", &s.dupClientKeys) + m.Set("gauge_current_dup_client_conns", &s.dupClientConns) + m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal) m.Set("accepts", &s.accepts) - m.Set("clients_replaced", &s.clientsReplaced) - m.Set("clients_replace_limited", &s.clientsReplaceLimited) - m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping) m.Set("bytes_received", &s.bytesRecv) m.Set("bytes_sent", &s.bytesSent) m.Set("packets_dropped", &s.packetsDropped) diff --git a/derp/derp_test.go b/derp/derp_test.go index 1aa45363b..5521fdc09 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -662,7 +662,7 @@ func pubAll(b byte) (ret key.Public) { func TestForwarderRegistration(t *testing.T) { s := &Server{ - clients: make(map[key.Public]*sclient), + clients: make(map[key.Public]clientSet), clientsMesh: map[key.Public]PacketForwarder{}, } want := func(want map[key.Public]PacketForwarder) { @@ -745,7 +745,7 @@ func TestForwarderRegistration(t *testing.T) { key: u1, logf: logger.Discard, } - s.clients[u1] = u1c + s.clients[u1] = singleClient{u1c} s.RemovePacketForwarder(u1, testFwd(100)) want(map[key.Public]PacketForwarder{ u1: nil, @@ -765,7 +765,7 @@ func TestForwarderRegistration(t *testing.T) { // Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard // that they're also connected to a peer of ours. That sholdn't transition the forwarder // from nil to the new one, not a multiForwarder. - s.clients[u1] = u1c + s.clients[u1] = singleClient{u1c} s.clientsMesh[u1] = nil want(map[key.Public]PacketForwarder{ u1: nil, @@ -851,125 +851,248 @@ func TestClientSendPong(t *testing.T) { } -func TestServerReplaceClients(t *testing.T) { - defer func() { - timeSleep = time.Sleep - timeNow = time.Now - }() - - var ( - mu sync.Mutex - now = time.Unix(123, 0) - sleeps int - slept time.Duration - ) - timeSleep = func(d time.Duration) { - mu.Lock() - defer mu.Unlock() - sleeps++ - slept += d - now = now.Add(d) +func TestServerDupClients(t *testing.T) { + serverPriv := newPrivateKey(t) + var s *Server + + clientPriv := newPrivateKey(t) + clientPub := clientPriv.Public() + + var c1, c2, c3 *sclient + var clientName map[*sclient]string + + // run starts a new test case and resets clients back to their zero values. + run := func(name string, dupPolicy dupPolicy, f func(t *testing.T)) { + s = NewServer(serverPriv, t.Logf) + s.dupPolicy = dupPolicy + c1 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c1: ")} + c2 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c2: ")} + c3 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c3: ")} + clientName = map[*sclient]string{ + c1: "c1", + c2: "c2", + c3: "c3", + } + t.Run(name, f) } - timeNow = func() time.Time { - mu.Lock() - defer mu.Unlock() - return now + runBothWays := func(name string, f func(t *testing.T)) { + run(name+"_disablefighters", disableFighters, f) + run(name+"_lastwriteractive", lastWriterIsActive, f) } - - serverPrivateKey := newPrivateKey(t) - var logger logger.Logf = logger.Discard - const debug = false - if debug { - logger = t.Logf + wantSingleClient := func(t *testing.T, want *sclient) { + t.Helper() + switch s := s.clients[want.key].(type) { + case singleClient: + if s.c != want { + t.Error("wrong single client") + return + } + if want.isDup.Get() { + t.Errorf("unexpected isDup on singleClient") + } + if want.isDisabled.Get() { + t.Errorf("unexpected isDisabled on singleClient") + } + case nil: + t.Error("no clients for key") + case *dupClientSet: + t.Error("unexpected multiple clients for key") + } } - - s := NewServer(serverPrivateKey, logger) - defer s.Close() - - priv := newPrivateKey(t) - - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) + wantNoClient := func(t *testing.T) { + t.Helper() + switch s := s.clients[clientPub].(type) { + case nil: + // Good. + return + default: + t.Errorf("got %T; want empty", s) + } } - defer ln.Close() - - connNum := 0 - connect := func() *Client { - connNum++ - cout, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - t.Fatal(err) + wantDupSet := func(t *testing.T) *dupClientSet { + t.Helper() + switch s := s.clients[clientPub].(type) { + case *dupClientSet: + return s + default: + t.Fatalf("wanted dup set; got %T", s) + return nil } - - cin, err := ln.Accept() - if err != nil { - t.Fatal(err) + } + wantActive := func(t *testing.T, want *sclient) { + t.Helper() + set, ok := s.clients[clientPub] + if !ok { + t.Error("no set for key") + return } - - brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin)) - go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", connNum)) - - brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) - c, err := NewClient(priv, cout, brw, logger) - if err != nil { - t.Fatalf("client %d: %v", connNum, err) + got := set.ActiveClient() + if got != want { + t.Errorf("active client = %q; want %q", clientName[got], clientName[want]) } - return c } - - wantVar := func(v *expvar.Int, want int64) { + checkDup := func(t *testing.T, c *sclient, want bool) { t.Helper() - if got := v.Value(); got != want { - t.Errorf("got %d; want %d", got, want) + if got := c.isDup.Get(); got != want { + t.Errorf("client %q isDup = %v; want %v", clientName[c], got, want) } } - - wantClosed := func(c *Client) { + checkDisabled := func(t *testing.T, c *sclient, want bool) { t.Helper() - for { - m, err := c.Recv() - if err != nil { - t.Logf("got expected error: %v", err) - return - } - switch m.(type) { - case ServerInfoMessage: - continue - default: - t.Fatalf("client got %T; wanted an error", m) - } + if got := c.isDisabled.Get(); got != want { + t.Errorf("client %q isDisabled = %v; want %v", clientName[c], got, want) } } - - c1 := connect() - waitConnect(t, c1) - c2 := connect() - waitConnect(t, c2) - wantVar(&s.clientsReplaced, 1) - wantClosed(c1) - - for i := 0; i < 100+5; i++ { - c := connect() - defer c.nc.Close() - if s.clientsReplaceLimited.Value() == 0 && i < 90 { - continue + wantDupConns := func(t *testing.T, want int) { + t.Helper() + if got := s.dupClientConns.Value(); got != int64(want) { + t.Errorf("dupClientConns = %v; want %v", got, want) } - t.Logf("for %d: replaced=%d, limited=%d, sleeping=%d", i, - s.clientsReplaced.Value(), - s.clientsReplaceLimited.Value(), - s.clientsReplaceSleeping.Value(), - ) - } - - mu.Lock() - defer mu.Unlock() - if sleeps == 0 { - t.Errorf("no sleeps") } - if slept == 0 { - t.Errorf("total sleep duration was 0") + wantDupKeys := func(t *testing.T, want int) { + t.Helper() + if got := s.dupClientKeys.Value(); got != int64(want) { + t.Errorf("dupClientKeys = %v; want %v", got, want) + } } + + // Common case: a single client comes and goes, with no dups. + runBothWays("one_comes_and_goes", func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + s.unregisterClient(c1) + wantNoClient(t) + }) + + // A still somewhat common case: a single client was + // connected and then their wifi dies or laptop closes + // or they switch networks and connect from a + // different network. They have two connections but + // it's not very bad. Only their new one is + // active. The last one, being dead, doesn't send and + // thus the new one doesn't get disabled. + runBothWays("small_overlap_replacement", func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + wantActive(t, c1) + wantDupKeys(t, 0) + wantDupKeys(t, 0) + + s.registerClient(c2) // wifi dies; c2 replacement connects + wantDupSet(t) + wantDupConns(t, 2) + wantDupKeys(t, 1) + checkDup(t, c1, true) + checkDup(t, c2, true) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c2) // sends go to the replacement + + s.unregisterClient(c1) // c1 finally times out + wantSingleClient(t, c2) + checkDup(t, c2, false) // c2 is longer a dup + wantActive(t, c2) + wantDupConns(t, 0) + wantDupKeys(t, 0) + }) + + // Key cloning situation with concurrent clients, both trying + // to write. + run("concurrent_dups_get_disabled", disableFighters, func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + wantActive(t, c1) + s.registerClient(c2) + wantDupSet(t) + wantDupKeys(t, 1) + wantDupConns(t, 2) + wantActive(t, c2) + checkDup(t, c1, true) + checkDup(t, c2, true) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + + s.noteClientActivity(c2) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + s.noteClientActivity(c1) + checkDisabled(t, c1, true) + checkDisabled(t, c2, true) + wantActive(t, nil) + + s.registerClient(c3) + wantActive(t, c3) + checkDisabled(t, c3, false) + wantDupKeys(t, 1) + wantDupConns(t, 3) + + s.unregisterClient(c3) + wantActive(t, nil) + wantDupKeys(t, 1) + wantDupConns(t, 2) + + s.unregisterClient(c2) + wantSingleClient(t, c1) + wantDupKeys(t, 0) + wantDupConns(t, 0) + }) + + // Key cloning with an A->B->C->A series instead. + run("concurrent_dups_three_parties", disableFighters, func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + s.registerClient(c2) + s.registerClient(c3) + s.noteClientActivity(c1) + checkDisabled(t, c1, true) + checkDisabled(t, c2, true) + checkDisabled(t, c3, true) + wantActive(t, nil) + }) + + run("activity_promotes_primary_when_nil", disableFighters, func(t *testing.T) { + wantNoClient(t) + + // Last registered client is the active one... + s.registerClient(c1) + wantActive(t, c1) + s.registerClient(c2) + wantActive(t, c2) + s.registerClient(c3) + s.noteClientActivity(c2) + wantActive(t, c3) + + // But if the last one goes away, the one with the + // most recent activity wins. + s.unregisterClient(c3) + wantActive(t, c2) + }) + + run("concurrent_dups_three_parties_last_writer", lastWriterIsActive, func(t *testing.T) { + wantNoClient(t) + + s.registerClient(c1) + wantActive(t, c1) + s.registerClient(c2) + wantActive(t, c2) + + s.noteClientActivity(c1) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c1) + + s.noteClientActivity(c2) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c2) + + s.unregisterClient(c2) + checkDisabled(t, c1, false) + wantActive(t, c1) + }) } func TestLimiter(t *testing.T) { diff --git a/derp/dropreason_string.go b/derp/dropreason_string.go index 46c1383e4..5ed41a26b 100644 --- a/derp/dropreason_string.go +++ b/derp/dropreason_string.go @@ -18,11 +18,12 @@ func _() { _ = x[dropReasonQueueHead-3] _ = x[dropReasonQueueTail-4] _ = x[dropReasonWriteError-5] + _ = x[dropReasonDupClient-6] } -const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteError" +const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient" -var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59} +var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68} func (i dropReason) String() string { if i < 0 || i >= dropReason(len(_dropReason_index)-1) {