diff --git a/derp/derp_server.go b/derp/derp_server.go index e0443319b..9fe36a524 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -41,6 +41,7 @@ import ( "tailscale.com/client/tailscale" "tailscale.com/disco" "tailscale.com/metrics" + "tailscale.com/syncs" "tailscale.com/types/key" "tailscale.com/types/logger" "tailscale.com/types/pad32" @@ -77,6 +78,21 @@ const ( writeTimeout = 2 * time.Second ) +// dupPolicy is a temporary (2021-08-30) mechanism to change the policy +// of how duplicate connection for the same key are handled. +type dupPolicy int8 + +const ( + // lastWriterIsActive is a dupPolicy where the connection + // to send traffic for a peer is the active one. + lastWriterIsActive dupPolicy = iota + + // disableFighters is a dupPolicy that detects if peers + // are trying to send interleaved with each other and + // then disables all of them. + disableFighters +) + // Server is a DERP server. type Server struct { // WriteTimeout, if non-zero, specifies how long to wait @@ -90,9 +106,9 @@ type Server struct { meshKey string limitedLogf logger.Logf metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate + dupPolicy dupPolicy // Counters: - _ pad32.Four packetsSent, bytesSent expvar.Int packetsRecv, bytesRecv expvar.Int packetsRecvByKind metrics.LabelMap @@ -112,9 +128,9 @@ type Server struct { accepts expvar.Int curClients expvar.Int curHomeClients expvar.Int // ones with preferred - clientsReplaced expvar.Int - clientsReplaceLimited expvar.Int - clientsReplaceSleeping expvar.Int + dupClientKeys expvar.Int // current number of public keys we have 2+ connections for + dupClientConns expvar.Int // current number of connections sharing a public key + dupClientConnTotal expvar.Int // total number of accepted connections when a dup key existed unknownFrames expvar.Int homeMovesIn expvar.Int // established clients announce home server moves in homeMovesOut expvar.Int // established clients announce home server moves out @@ -130,7 +146,7 @@ type Server struct { mu sync.Mutex closed bool netConns map[Conn]chan struct{} // chan is closed when conn closes - clients map[key.Public]*sclient + clients map[key.Public]clientSet watchers map[*sclient]bool // mesh peer -> true // clientsMesh tracks all clients in the cluster, both locally // and to mesh peers. If the value is nil, that means the @@ -148,6 +164,112 @@ type Server struct { keyOfAddr map[netaddr.IPPort]key.Public } +// clientSet represents 1 or more *sclients. +// +// The two implementations are singleClient and *dupClientSet. +// +// In the common case, client should only have one connection to the +// DERP server for a given key. When they're connected multiple times, +// we record their set of connections in dupClientSet and keep their +// connections open to make them happy (to keep them from spinning, +// etc) and keep track of which is the latest connection. If only the last +// is sending traffic, that last one is the active connection and it +// gets traffic. Otherwise, in the case of a cloned node key, the +// whole set of dups doesn't receive data frames. +// +// All methods should only be called while holding Server.mu. +// +// TODO(bradfitz): Issue 2746: in the future we'll send some sort of +// "health_error" frame to them that'll communicate to the end users +// that they cloned a device key, and we'll also surface it in the +// admin panel, etc. +type clientSet interface { + // ActiveClient returns the most recently added client to + // the set, as long as it hasn't been disabled, in which + // case it returns nil. + ActiveClient() *sclient + + // Len returns the number of clients in the set. + Len() int + + // ForeachClient calls f for each client in the set. + ForeachClient(f func(*sclient)) +} + +// singleClient is a clientSet of a single connection. +// This is the common case. +type singleClient struct{ c *sclient } + +func (s singleClient) ActiveClient() *sclient { return s.c } +func (s singleClient) Len() int { return 1 } +func (s singleClient) ForeachClient(f func(*sclient)) { f(s.c) } + +// A dupClientSet is a clientSet of more than 1 connection. +// +// This can occur in some reasonable cases (temporarily while users +// are changing networks) or in the case of a cloned key. In the +// cloned key case, both peers are speaking and the clients get +// disabled. +// +// All fields are guarded by Server.mu. +type dupClientSet struct { + // set is the set of connected clients for sclient.key. + // The values are all true. + set map[*sclient]bool + + // last is the most recent addition to set, or nil if the most + // recent one has since disconnected and nobody else has send + // data since. + last *sclient + + // sendHistory is a log of which members of set have sent + // frames to the derp server, with adjacent duplicates + // removed. When a member of set is removed, the same + // element(s) are removed from sendHistory. + sendHistory []*sclient +} + +func (s *dupClientSet) ActiveClient() *sclient { + if s.last != nil && !s.last.isDisabled.Get() { + return s.last + } + return nil +} +func (s *dupClientSet) Len() int { return len(s.set) } +func (s *dupClientSet) ForeachClient(f func(*sclient)) { + for c := range s.set { + f(c) + } +} + +// removeClient removes c from s and reports whether it was in s +// to begin with. +func (s *dupClientSet) removeClient(c *sclient) bool { + n := len(s.set) + delete(s.set, c) + if s.last == c { + s.last = nil + } + if len(s.set) == n { + return false + } + + trim := s.sendHistory[:0] + for _, v := range s.sendHistory { + if s.set[v] && (len(trim) == 0 || trim[len(trim)-1] != v) { + trim = append(trim, v) + } + } + for i := len(trim); i < len(s.sendHistory); i++ { + s.sendHistory[i] = nil + } + s.sendHistory = trim + if s.last == nil && len(s.sendHistory) > 0 { + s.last = s.sendHistory[len(s.sendHistory)-1] + } + return true +} + // PacketForwarder is something that can forward packets. // // It's mostly an interface for circular dependency reasons; the @@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { packetsRecvByKind: metrics.LabelMap{Label: "kind"}, packetsDroppedReason: metrics.LabelMap{Label: "reason"}, packetsDroppedType: metrics.LabelMap{Label: "type"}, - clients: map[key.Public]*sclient{}, + clients: map[key.Public]clientSet{}, clientsMesh: map[key.Public]PacketForwarder{}, netConns: map[Conn]chan struct{}{}, memSys0: ms.Sys, @@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert } // registerClient notes that client c is now authenticated and ready for packets. // -// If c's public key was already connected with a different -// connection, the prior one is closed, unless it's fighting rapidly -// with another client with the same key, in which case the returned -// ok is false, and the caller should wait the provided duration -// before trying again. -func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) { +// If c.key is connected more than once, the earlier connection(s) are +// placed in a non-active state where we read from them (primarily to +// observe EOFs/timeouts) but won't send them frames on the assumption +// that they're dead. +func (s *Server) registerClient(c *sclient) { s.mu.Lock() defer s.mu.Unlock() - old := s.clients[c.key] - if old == nil { - c.logf("adding connection") - } else { - // Take over the old rate limiter, discarding the one - // our caller just made. - c.replaceLimiter = old.replaceLimiter - if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() { - if d := rr.DelayFrom(timeNow()); d > 0 { - s.clientsReplaceLimited.Add(1) - return false, d - } + + set := s.clients[c.key] + switch set := set.(type) { + case nil: + s.clients[c.key] = singleClient{c} + case singleClient: + s.dupClientKeys.Add(1) + s.dupClientConns.Add(2) // both old and new count + s.dupClientConnTotal.Add(1) + old := set.ActiveClient() + old.isDup.Set(true) + c.isDup.Set(true) + s.clients[c.key] = &dupClientSet{ + last: c, + set: map[*sclient]bool{ + old: true, + c: true, + }, + sendHistory: []*sclient{old}, } - s.clientsReplaced.Add(1) - c.logf("adding connection, replacing %s", old.remoteAddr) - go old.nc.Close() + case *dupClientSet: + s.dupClientConns.Add(1) // the gauge + s.dupClientConnTotal.Add(1) // the counter + c.isDup.Set(true) + set.set[c] = true + set.last = c + set.sendHistory = append(set.sendHistory, c) } - s.clients[c.key] = c + if _, ok := s.clientsMesh[c.key]; !ok { s.clientsMesh[c.key] = nil // just for varz of total users in cluster } s.keyOfAddr[c.remoteIPPort] = c.key s.curClients.Add(1) s.broadcastPeerStateChangeLocked(c.key, true) - return true, 0 } // broadcastPeerStateChangeLocked enqueues a message to all watchers @@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) { func (s *Server) unregisterClient(c *sclient) { s.mu.Lock() defer s.mu.Unlock() - cur := s.clients[c.key] - if cur == c { + + set := s.clients[c.key] + switch set := set.(type) { + case nil: + c.logf("[unexpected]; clients map is empty") + case singleClient: c.logf("removing connection") delete(s.clients, c.key) if v, ok := s.clientsMesh[c.key]; ok && v == nil { @@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) { s.notePeerGoneFromRegionLocked(c.key) } s.broadcastPeerStateChangeLocked(c.key, false) + case *dupClientSet: + if set.removeClient(c) { + s.dupClientConns.Add(-1) + } else { + c.logf("[unexpected]; dup client set didn't shrink") + } + if set.Len() == 1 { + s.dupClientConns.Add(-1) // again; for the original one's + s.dupClientKeys.Add(-1) + var remain *sclient + for remain = range set.set { + break + } + if remain == nil { + panic("unexpected nil remain from single element dup set") + } + remain.isDisabled.Set(false) + remain.isDup.Set(false) + s.clients[c.key] = singleClient{remain} + } } + if c.canMesh { delete(s.watchers, c) } @@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) { // or move them over to the active client (in case a replaced client // connection is being unregistered). for pubKey, connNum := range s.sentTo[key] { - if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum { - go peer.requestPeerGoneWrite(key) + set, ok := s.clients[pubKey] + if !ok { + continue } + set.ForeachClient(func(peer *sclient) { + if peer.connNum == connNum { + go peer.requestPeerGoneWrite(key) + } + }) } delete(s.sentTo, key) } @@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN discoSendQueue: make(chan pkt, perClientSendQueueDepth), peerGone: make(chan key.Public), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, - - // Allow kicking out previous connections once a - // minute, with a very high burst of 100. Once a - // minute is less than the client's 2 minute - // inactivity timeout. - replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100), } if c.canMesh { @@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN c.info = *clientInfo } - for { - ok, d := s.registerClient(c) - if ok { - break - } - s.clientsReplaceSleeping.Add(1) - timeSleep(d) - s.clientsReplaceSleeping.Add(-1) - } + s.registerClient(c) defer s.unregisterClient(c) err = s.sendServerInfo(c.bw, clientKey) @@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error { } return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err) } + c.s.noteClientActivity(c) switch ft { case frameNotePreferred: err = c.handleFrameNotePreferred(ft, fl) @@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error { s.mu.Lock() defer s.mu.Unlock() - if target, ok := s.clients[targetKey]; ok { - c.logf("frameClosePeer closing peer %x", targetKey) - go target.nc.Close() + if set, ok := s.clients[targetKey]; ok { + if set.Len() == 1 { + c.logf("frameClosePeer closing peer %x", targetKey) + } else { + c.logf("frameClosePeer closing peer %x (%d connections)", targetKey, set.Len()) + } + set.ForeachClient(func(target *sclient) { + go target.nc.Close() + }) } else { c.logf("frameClosePeer failed to find peer %x", targetKey) } @@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } s.packetsForwardedIn.Add(1) + var dstLen int + var dst *sclient + s.mu.Lock() - dst := s.clients[dstKey] + if set, ok := s.clients[dstKey]; ok { + dstLen = set.Len() + dst = set.ActiveClient() + } if dst != nil { s.notePeerSendLocked(srcKey, dst) } s.mu.Unlock() if dst == nil { - s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd) + reason := dropReasonUnknownDestOnFwd + if dstLen > 1 { + reason = dropReasonDupClient + } + s.recordDrop(contents, srcKey, dstKey, reason) return nil } @@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } var fwd PacketForwarder + var dstLen int + var dst *sclient + s.mu.Lock() - dst := s.clients[dstKey] - if dst == nil { - fwd = s.clientsMesh[dstKey] - } else { + if set, ok := s.clients[dstKey]; ok { + dstLen = set.Len() + dst = set.ActiveClient() + } + if dst != nil { s.notePeerSendLocked(c.key, dst) + } else if dstLen < 1 { + fwd = s.clientsMesh[dstKey] } s.mu.Unlock() @@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } return nil } - s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest) + reason := dropReasonUnknownDest + if dstLen > 1 { + reason = dropReasonDupClient + } + s.recordDrop(contents, c.key, dstKey, reason) return nil } @@ -741,6 +916,7 @@ const ( dropReasonQueueHead // destination queue is full, dropped packet at queue head dropReasonQueueTail // destination queue is full, dropped packet at queue tail dropReasonWriteError // OS write() failed + dropReasonDupClient // the public key is connected 2+ times (active/active, fighting) ) func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) { @@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error { return err } +func (s *Server) noteClientActivity(c *sclient) { + if !c.isDup.Get() { + // Fast path for clients that aren't in a dup set. + return + } + if c.isDisabled.Get() { + // If they're already disabled, no point checking more. + return + } + s.mu.Lock() + defer s.mu.Unlock() + + ds, ok := s.clients[c.key].(*dupClientSet) + if !ok { + // It became unduped in between the isDup fast path check above + // and the mutex check. Nothing to do. + return + } + + if s.dupPolicy == lastWriterIsActive { + ds.last = c + } else if ds.last == nil { + // If we didn't have a primary, let the current + // speaker be the primary. + ds.last = c + } + + if sh := ds.sendHistory; len(sh) != 0 && sh[len(sh)-1] == c { + // The client c was the last client to make activity + // in this set and it was already recorded. Nothing to + // do. + return + } + + // If we saw this connection send previously, then consider + // the group fighting and disable them all. + if s.dupPolicy == disableFighters { + for _, prior := range ds.sendHistory { + if prior == c { + ds.ForeachClient(func(c *sclient) { + c.isDisabled.Set(true) + }) + break + } + } + } + + // Append this client to the list of clients who spoke last. + ds.sendHistory = append(ds.sendHistory, c) +} + type serverInfo struct { Version int `json:"version,omitempty"` } @@ -979,14 +1206,16 @@ type sclient struct { key key.Public info clientInfo logf logger.Logf - done <-chan struct{} // closed when connection closes - remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() - remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. - sendQueue chan pkt // packets queued to this client; never closed - discoSendQueue chan pkt // important packets queued to this client; never closed - peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) - meshUpdate chan struct{} // write request to write peerStateChange - canMesh bool // clientInfo had correct mesh token for inter-region routing + done <-chan struct{} // closed when connection closes + remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() + remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port. + sendQueue chan pkt // packets queued to this client; never closed + discoSendQueue chan pkt // important packets queued to this client; never closed + peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers) + meshUpdate chan struct{} // write request to write peerStateChange + canMesh bool // clientInfo had correct mesh token for inter-region routing + isDup syncs.AtomicBool // whether more than 1 sclient for key is connected + isDisabled syncs.AtomicBool // whether sends to this peer are disabled due to active/active dups // replaceLimiter controls how quickly two connections with // the same client key can kick each other off the server by @@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var { m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) })) m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) })) m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) })) + m.Set("gauge_current_dup_client_keys", &s.dupClientKeys) + m.Set("gauge_current_dup_client_conns", &s.dupClientConns) + m.Set("counter_total_dup_client_conns", &s.dupClientConnTotal) m.Set("accepts", &s.accepts) - m.Set("clients_replaced", &s.clientsReplaced) - m.Set("clients_replace_limited", &s.clientsReplaceLimited) - m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping) m.Set("bytes_received", &s.bytesRecv) m.Set("bytes_sent", &s.bytesSent) m.Set("packets_dropped", &s.packetsDropped) diff --git a/derp/derp_test.go b/derp/derp_test.go index 1aa45363b..5521fdc09 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -662,7 +662,7 @@ func pubAll(b byte) (ret key.Public) { func TestForwarderRegistration(t *testing.T) { s := &Server{ - clients: make(map[key.Public]*sclient), + clients: make(map[key.Public]clientSet), clientsMesh: map[key.Public]PacketForwarder{}, } want := func(want map[key.Public]PacketForwarder) { @@ -745,7 +745,7 @@ func TestForwarderRegistration(t *testing.T) { key: u1, logf: logger.Discard, } - s.clients[u1] = u1c + s.clients[u1] = singleClient{u1c} s.RemovePacketForwarder(u1, testFwd(100)) want(map[key.Public]PacketForwarder{ u1: nil, @@ -765,7 +765,7 @@ func TestForwarderRegistration(t *testing.T) { // Now pretend u1 was already connected locally (so clientsMesh[u1] is nil), and then we heard // that they're also connected to a peer of ours. That sholdn't transition the forwarder // from nil to the new one, not a multiForwarder. - s.clients[u1] = u1c + s.clients[u1] = singleClient{u1c} s.clientsMesh[u1] = nil want(map[key.Public]PacketForwarder{ u1: nil, @@ -851,125 +851,248 @@ func TestClientSendPong(t *testing.T) { } -func TestServerReplaceClients(t *testing.T) { - defer func() { - timeSleep = time.Sleep - timeNow = time.Now - }() - - var ( - mu sync.Mutex - now = time.Unix(123, 0) - sleeps int - slept time.Duration - ) - timeSleep = func(d time.Duration) { - mu.Lock() - defer mu.Unlock() - sleeps++ - slept += d - now = now.Add(d) +func TestServerDupClients(t *testing.T) { + serverPriv := newPrivateKey(t) + var s *Server + + clientPriv := newPrivateKey(t) + clientPub := clientPriv.Public() + + var c1, c2, c3 *sclient + var clientName map[*sclient]string + + // run starts a new test case and resets clients back to their zero values. + run := func(name string, dupPolicy dupPolicy, f func(t *testing.T)) { + s = NewServer(serverPriv, t.Logf) + s.dupPolicy = dupPolicy + c1 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c1: ")} + c2 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c2: ")} + c3 = &sclient{key: clientPub, logf: logger.WithPrefix(t.Logf, "c3: ")} + clientName = map[*sclient]string{ + c1: "c1", + c2: "c2", + c3: "c3", + } + t.Run(name, f) } - timeNow = func() time.Time { - mu.Lock() - defer mu.Unlock() - return now + runBothWays := func(name string, f func(t *testing.T)) { + run(name+"_disablefighters", disableFighters, f) + run(name+"_lastwriteractive", lastWriterIsActive, f) } - - serverPrivateKey := newPrivateKey(t) - var logger logger.Logf = logger.Discard - const debug = false - if debug { - logger = t.Logf + wantSingleClient := func(t *testing.T, want *sclient) { + t.Helper() + switch s := s.clients[want.key].(type) { + case singleClient: + if s.c != want { + t.Error("wrong single client") + return + } + if want.isDup.Get() { + t.Errorf("unexpected isDup on singleClient") + } + if want.isDisabled.Get() { + t.Errorf("unexpected isDisabled on singleClient") + } + case nil: + t.Error("no clients for key") + case *dupClientSet: + t.Error("unexpected multiple clients for key") + } } - - s := NewServer(serverPrivateKey, logger) - defer s.Close() - - priv := newPrivateKey(t) - - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - t.Fatal(err) + wantNoClient := func(t *testing.T) { + t.Helper() + switch s := s.clients[clientPub].(type) { + case nil: + // Good. + return + default: + t.Errorf("got %T; want empty", s) + } } - defer ln.Close() - - connNum := 0 - connect := func() *Client { - connNum++ - cout, err := net.Dial("tcp", ln.Addr().String()) - if err != nil { - t.Fatal(err) + wantDupSet := func(t *testing.T) *dupClientSet { + t.Helper() + switch s := s.clients[clientPub].(type) { + case *dupClientSet: + return s + default: + t.Fatalf("wanted dup set; got %T", s) + return nil } - - cin, err := ln.Accept() - if err != nil { - t.Fatal(err) + } + wantActive := func(t *testing.T, want *sclient) { + t.Helper() + set, ok := s.clients[clientPub] + if !ok { + t.Error("no set for key") + return } - - brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin)) - go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", connNum)) - - brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) - c, err := NewClient(priv, cout, brw, logger) - if err != nil { - t.Fatalf("client %d: %v", connNum, err) + got := set.ActiveClient() + if got != want { + t.Errorf("active client = %q; want %q", clientName[got], clientName[want]) } - return c } - - wantVar := func(v *expvar.Int, want int64) { + checkDup := func(t *testing.T, c *sclient, want bool) { t.Helper() - if got := v.Value(); got != want { - t.Errorf("got %d; want %d", got, want) + if got := c.isDup.Get(); got != want { + t.Errorf("client %q isDup = %v; want %v", clientName[c], got, want) } } - - wantClosed := func(c *Client) { + checkDisabled := func(t *testing.T, c *sclient, want bool) { t.Helper() - for { - m, err := c.Recv() - if err != nil { - t.Logf("got expected error: %v", err) - return - } - switch m.(type) { - case ServerInfoMessage: - continue - default: - t.Fatalf("client got %T; wanted an error", m) - } + if got := c.isDisabled.Get(); got != want { + t.Errorf("client %q isDisabled = %v; want %v", clientName[c], got, want) } } - - c1 := connect() - waitConnect(t, c1) - c2 := connect() - waitConnect(t, c2) - wantVar(&s.clientsReplaced, 1) - wantClosed(c1) - - for i := 0; i < 100+5; i++ { - c := connect() - defer c.nc.Close() - if s.clientsReplaceLimited.Value() == 0 && i < 90 { - continue + wantDupConns := func(t *testing.T, want int) { + t.Helper() + if got := s.dupClientConns.Value(); got != int64(want) { + t.Errorf("dupClientConns = %v; want %v", got, want) } - t.Logf("for %d: replaced=%d, limited=%d, sleeping=%d", i, - s.clientsReplaced.Value(), - s.clientsReplaceLimited.Value(), - s.clientsReplaceSleeping.Value(), - ) - } - - mu.Lock() - defer mu.Unlock() - if sleeps == 0 { - t.Errorf("no sleeps") } - if slept == 0 { - t.Errorf("total sleep duration was 0") + wantDupKeys := func(t *testing.T, want int) { + t.Helper() + if got := s.dupClientKeys.Value(); got != int64(want) { + t.Errorf("dupClientKeys = %v; want %v", got, want) + } } + + // Common case: a single client comes and goes, with no dups. + runBothWays("one_comes_and_goes", func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + s.unregisterClient(c1) + wantNoClient(t) + }) + + // A still somewhat common case: a single client was + // connected and then their wifi dies or laptop closes + // or they switch networks and connect from a + // different network. They have two connections but + // it's not very bad. Only their new one is + // active. The last one, being dead, doesn't send and + // thus the new one doesn't get disabled. + runBothWays("small_overlap_replacement", func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + wantActive(t, c1) + wantDupKeys(t, 0) + wantDupKeys(t, 0) + + s.registerClient(c2) // wifi dies; c2 replacement connects + wantDupSet(t) + wantDupConns(t, 2) + wantDupKeys(t, 1) + checkDup(t, c1, true) + checkDup(t, c2, true) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c2) // sends go to the replacement + + s.unregisterClient(c1) // c1 finally times out + wantSingleClient(t, c2) + checkDup(t, c2, false) // c2 is longer a dup + wantActive(t, c2) + wantDupConns(t, 0) + wantDupKeys(t, 0) + }) + + // Key cloning situation with concurrent clients, both trying + // to write. + run("concurrent_dups_get_disabled", disableFighters, func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + wantSingleClient(t, c1) + wantActive(t, c1) + s.registerClient(c2) + wantDupSet(t) + wantDupKeys(t, 1) + wantDupConns(t, 2) + wantActive(t, c2) + checkDup(t, c1, true) + checkDup(t, c2, true) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + + s.noteClientActivity(c2) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + s.noteClientActivity(c1) + checkDisabled(t, c1, true) + checkDisabled(t, c2, true) + wantActive(t, nil) + + s.registerClient(c3) + wantActive(t, c3) + checkDisabled(t, c3, false) + wantDupKeys(t, 1) + wantDupConns(t, 3) + + s.unregisterClient(c3) + wantActive(t, nil) + wantDupKeys(t, 1) + wantDupConns(t, 2) + + s.unregisterClient(c2) + wantSingleClient(t, c1) + wantDupKeys(t, 0) + wantDupConns(t, 0) + }) + + // Key cloning with an A->B->C->A series instead. + run("concurrent_dups_three_parties", disableFighters, func(t *testing.T) { + wantNoClient(t) + s.registerClient(c1) + s.registerClient(c2) + s.registerClient(c3) + s.noteClientActivity(c1) + checkDisabled(t, c1, true) + checkDisabled(t, c2, true) + checkDisabled(t, c3, true) + wantActive(t, nil) + }) + + run("activity_promotes_primary_when_nil", disableFighters, func(t *testing.T) { + wantNoClient(t) + + // Last registered client is the active one... + s.registerClient(c1) + wantActive(t, c1) + s.registerClient(c2) + wantActive(t, c2) + s.registerClient(c3) + s.noteClientActivity(c2) + wantActive(t, c3) + + // But if the last one goes away, the one with the + // most recent activity wins. + s.unregisterClient(c3) + wantActive(t, c2) + }) + + run("concurrent_dups_three_parties_last_writer", lastWriterIsActive, func(t *testing.T) { + wantNoClient(t) + + s.registerClient(c1) + wantActive(t, c1) + s.registerClient(c2) + wantActive(t, c2) + + s.noteClientActivity(c1) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c1) + + s.noteClientActivity(c2) + checkDisabled(t, c1, false) + checkDisabled(t, c2, false) + wantActive(t, c2) + + s.unregisterClient(c2) + checkDisabled(t, c1, false) + wantActive(t, c1) + }) } func TestLimiter(t *testing.T) { diff --git a/derp/dropreason_string.go b/derp/dropreason_string.go index 46c1383e4..5ed41a26b 100644 --- a/derp/dropreason_string.go +++ b/derp/dropreason_string.go @@ -18,11 +18,12 @@ func _() { _ = x[dropReasonQueueHead-3] _ = x[dropReasonQueueTail-4] _ = x[dropReasonWriteError-5] + _ = x[dropReasonDupClient-6] } -const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteError" +const _dropReason_name = "UnknownDestUnknownDestOnFwdGoneQueueHeadQueueTailWriteErrorDupClient" -var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59} +var _dropReason_index = [...]uint8{0, 11, 27, 31, 40, 49, 59, 68} func (i dropReason) String() string { if i < 0 || i >= dropReason(len(_dropReason_index)-1) {