@ -41,6 +41,7 @@ import (
"tailscale.com/client/tailscale"
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/disco"
"tailscale.com/metrics"
"tailscale.com/metrics"
"tailscale.com/syncs"
"tailscale.com/types/key"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/logger"
"tailscale.com/types/pad32"
"tailscale.com/types/pad32"
@ -77,6 +78,21 @@ const (
writeTimeout = 2 * time . Second
writeTimeout = 2 * time . Second
)
)
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
// of how duplicate connection for the same key are handled.
type dupPolicy int8
const (
// lastWriterIsActive is a dupPolicy where the connection
// to send traffic for a peer is the active one.
lastWriterIsActive dupPolicy = iota
// disableFighters is a dupPolicy that detects if peers
// are trying to send interleaved with each other and
// then disables all of them.
disableFighters
)
// Server is a DERP server.
// Server is a DERP server.
type Server struct {
type Server struct {
// WriteTimeout, if non-zero, specifies how long to wait
// WriteTimeout, if non-zero, specifies how long to wait
@ -90,9 +106,9 @@ type Server struct {
meshKey string
meshKey string
limitedLogf logger . Logf
limitedLogf logger . Logf
metaCert [ ] byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
metaCert [ ] byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
dupPolicy dupPolicy
// Counters:
// Counters:
_ pad32 . Four
packetsSent , bytesSent expvar . Int
packetsSent , bytesSent expvar . Int
packetsRecv , bytesRecv expvar . Int
packetsRecv , bytesRecv expvar . Int
packetsRecvByKind metrics . LabelMap
packetsRecvByKind metrics . LabelMap
@ -112,9 +128,9 @@ type Server struct {
accepts expvar . Int
accepts expvar . Int
curClients expvar . Int
curClients expvar . Int
curHomeClients expvar . Int // ones with preferred
curHomeClients expvar . Int // ones with preferred
clientsReplaced expvar . Int
dupClientKeys expvar . Int // current number of public keys we have 2+ connections for
clientsReplaceLimited expvar . Int
dupClientConns expvar . Int // current number of connections sharing a public key
clientsReplaceSleeping expvar . Int
dupClientConnTotal expvar . Int // total number of accepted connections when a dup key existed
unknownFrames expvar . Int
unknownFrames expvar . Int
homeMovesIn expvar . Int // established clients announce home server moves in
homeMovesIn expvar . Int // established clients announce home server moves in
homeMovesOut expvar . Int // established clients announce home server moves out
homeMovesOut expvar . Int // established clients announce home server moves out
@ -130,7 +146,7 @@ type Server struct {
mu sync . Mutex
mu sync . Mutex
closed bool
closed bool
netConns map [ Conn ] chan struct { } // chan is closed when conn closes
netConns map [ Conn ] chan struct { } // chan is closed when conn closes
clients map [ key . Public ] * s client
clients map [ key . Public ] clientSe t
watchers map [ * sclient ] bool // mesh peer -> true
watchers map [ * sclient ] bool // mesh peer -> true
// clientsMesh tracks all clients in the cluster, both locally
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
// and to mesh peers. If the value is nil, that means the
@ -148,6 +164,112 @@ type Server struct {
keyOfAddr map [ netaddr . IPPort ] key . Public
keyOfAddr map [ netaddr . IPPort ] key . Public
}
}
// clientSet represents 1 or more *sclients.
//
// The two implementations are singleClient and *dupClientSet.
//
// In the common case, client should only have one connection to the
// DERP server for a given key. When they're connected multiple times,
// we record their set of connections in dupClientSet and keep their
// connections open to make them happy (to keep them from spinning,
// etc) and keep track of which is the latest connection. If only the last
// is sending traffic, that last one is the active connection and it
// gets traffic. Otherwise, in the case of a cloned node key, the
// whole set of dups doesn't receive data frames.
//
// All methods should only be called while holding Server.mu.
//
// TODO(bradfitz): Issue 2746: in the future we'll send some sort of
// "health_error" frame to them that'll communicate to the end users
// that they cloned a device key, and we'll also surface it in the
// admin panel, etc.
type clientSet interface {
// ActiveClient returns the most recently added client to
// the set, as long as it hasn't been disabled, in which
// case it returns nil.
ActiveClient ( ) * sclient
// Len returns the number of clients in the set.
Len ( ) int
// ForeachClient calls f for each client in the set.
ForeachClient ( f func ( * sclient ) )
}
// singleClient is a clientSet of a single connection.
// This is the common case.
type singleClient struct { c * sclient }
func ( s singleClient ) ActiveClient ( ) * sclient { return s . c }
func ( s singleClient ) Len ( ) int { return 1 }
func ( s singleClient ) ForeachClient ( f func ( * sclient ) ) { f ( s . c ) }
// A dupClientSet is a clientSet of more than 1 connection.
//
// This can occur in some reasonable cases (temporarily while users
// are changing networks) or in the case of a cloned key. In the
// cloned key case, both peers are speaking and the clients get
// disabled.
//
// All fields are guarded by Server.mu.
type dupClientSet struct {
// set is the set of connected clients for sclient.key.
// The values are all true.
set map [ * sclient ] bool
// last is the most recent addition to set, or nil if the most
// recent one has since disconnected and nobody else has send
// data since.
last * sclient
// sendHistory is a log of which members of set have sent
// frames to the derp server, with adjacent duplicates
// removed. When a member of set is removed, the same
// element(s) are removed from sendHistory.
sendHistory [ ] * sclient
}
func ( s * dupClientSet ) ActiveClient ( ) * sclient {
if s . last != nil && ! s . last . isDisabled . Get ( ) {
return s . last
}
return nil
}
func ( s * dupClientSet ) Len ( ) int { return len ( s . set ) }
func ( s * dupClientSet ) ForeachClient ( f func ( * sclient ) ) {
for c := range s . set {
f ( c )
}
}
// removeClient removes c from s and reports whether it was in s
// to begin with.
func ( s * dupClientSet ) removeClient ( c * sclient ) bool {
n := len ( s . set )
delete ( s . set , c )
if s . last == c {
s . last = nil
}
if len ( s . set ) == n {
return false
}
trim := s . sendHistory [ : 0 ]
for _ , v := range s . sendHistory {
if s . set [ v ] && ( len ( trim ) == 0 || trim [ len ( trim ) - 1 ] != v ) {
trim = append ( trim , v )
}
}
for i := len ( trim ) ; i < len ( s . sendHistory ) ; i ++ {
s . sendHistory [ i ] = nil
}
s . sendHistory = trim
if s . last == nil && len ( s . sendHistory ) > 0 {
s . last = s . sendHistory [ len ( s . sendHistory ) - 1 ]
}
return true
}
// PacketForwarder is something that can forward packets.
// PacketForwarder is something that can forward packets.
//
//
// It's mostly an interface for circular dependency reasons; the
// It's mostly an interface for circular dependency reasons; the
@ -184,7 +306,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
packetsRecvByKind : metrics . LabelMap { Label : "kind" } ,
packetsRecvByKind : metrics . LabelMap { Label : "kind" } ,
packetsDroppedReason : metrics . LabelMap { Label : "reason" } ,
packetsDroppedReason : metrics . LabelMap { Label : "reason" } ,
packetsDroppedType : metrics . LabelMap { Label : "type" } ,
packetsDroppedType : metrics . LabelMap { Label : "type" } ,
clients : map [ key . Public ] * s client{ } ,
clients : map [ key . Public ] clientSe t{ } ,
clientsMesh : map [ key . Public ] PacketForwarder { } ,
clientsMesh : map [ key . Public ] PacketForwarder { } ,
netConns : map [ Conn ] chan struct { } { } ,
netConns : map [ Conn ] chan struct { } { } ,
memSys0 : ms . Sys ,
memSys0 : ms . Sys ,
@ -344,39 +466,48 @@ func (s *Server) MetaCert() []byte { return s.metaCert }
// registerClient notes that client c is now authenticated and ready for packets.
// registerClient notes that client c is now authenticated and ready for packets.
//
//
// If c's public key was already connected with a different
// If c.key is connected more than once, the earlier connection(s) are
// connection, the prior one is closed, unless it's fighting rapidly
// placed in a non-active state where we read from them (primarily to
// with another client with the same key, in which case the returned
// observe EOFs/timeouts) but won't send them frames on the assumption
// ok is false, and the caller should wait the provided duration
// that they're dead.
// before trying again.
func ( s * Server ) registerClient ( c * sclient ) {
func ( s * Server ) registerClient ( c * sclient ) ( ok bool , d time . Duration ) {
s . mu . Lock ( )
s . mu . Lock ( )
defer s . mu . Unlock ( )
defer s . mu . Unlock ( )
old := s . clients [ c . key ]
if old == nil {
set := s . clients [ c . key ]
c . logf ( "adding connection" )
switch set := set . ( type ) {
} else {
case nil :
// Take over the old rate limiter, discarding the one
s . clients [ c . key ] = singleClient { c }
// our caller just made.
case singleClient :
c . replaceLimiter = old . replaceLimiter
s . dupClientKeys . Add ( 1 )
if rr := c . replaceLimiter . ReserveN ( timeNow ( ) , 1 ) ; rr . OK ( ) {
s . dupClientConns . Add ( 2 ) // both old and new count
if d := rr . DelayFrom ( timeNow ( ) ) ; d > 0 {
s . dupClientConnTotal . Add ( 1 )
s . clientsReplaceLimited . Add ( 1 )
old := set . ActiveClient ( )
return false , d
old . isDup . Set ( true )
}
c . isDup . Set ( true )
s . clients [ c . key ] = & dupClientSet {
last : c ,
set : map [ * sclient ] bool {
old : true ,
c : true ,
} ,
sendHistory : [ ] * sclient { old } ,
}
}
s . clientsReplaced . Add ( 1 )
case * dupClientSet :
c . logf ( "adding connection, replacing %s" , old . remoteAddr )
s . dupClientConns . Add ( 1 ) // the gauge
go old . nc . Close ( )
s . dupClientConnTotal . Add ( 1 ) // the counter
c . isDup . Set ( true )
set . set [ c ] = true
set . last = c
set . sendHistory = append ( set . sendHistory , c )
}
}
s . clients [ c . key ] = c
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
s . clientsMesh [ c . key ] = nil // just for varz of total users in cluster
s . clientsMesh [ c . key ] = nil // just for varz of total users in cluster
}
}
s . keyOfAddr [ c . remoteIPPort ] = c . key
s . keyOfAddr [ c . remoteIPPort ] = c . key
s . curClients . Add ( 1 )
s . curClients . Add ( 1 )
s . broadcastPeerStateChangeLocked ( c . key , true )
s . broadcastPeerStateChangeLocked ( c . key , true )
return true , 0
}
}
// broadcastPeerStateChangeLocked enqueues a message to all watchers
// broadcastPeerStateChangeLocked enqueues a message to all watchers
@ -395,8 +526,12 @@ func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) {
func ( s * Server ) unregisterClient ( c * sclient ) {
func ( s * Server ) unregisterClient ( c * sclient ) {
s . mu . Lock ( )
s . mu . Lock ( )
defer s . mu . Unlock ( )
defer s . mu . Unlock ( )
cur := s . clients [ c . key ]
if cur == c {
set := s . clients [ c . key ]
switch set := set . ( type ) {
case nil :
c . logf ( "[unexpected]; clients map is empty" )
case singleClient :
c . logf ( "removing connection" )
c . logf ( "removing connection" )
delete ( s . clients , c . key )
delete ( s . clients , c . key )
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
@ -404,7 +539,28 @@ func (s *Server) unregisterClient(c *sclient) {
s . notePeerGoneFromRegionLocked ( c . key )
s . notePeerGoneFromRegionLocked ( c . key )
}
}
s . broadcastPeerStateChangeLocked ( c . key , false )
s . broadcastPeerStateChangeLocked ( c . key , false )
case * dupClientSet :
if set . removeClient ( c ) {
s . dupClientConns . Add ( - 1 )
} else {
c . logf ( "[unexpected]; dup client set didn't shrink" )
}
if set . Len ( ) == 1 {
s . dupClientConns . Add ( - 1 ) // again; for the original one's
s . dupClientKeys . Add ( - 1 )
var remain * sclient
for remain = range set . set {
break
}
if remain == nil {
panic ( "unexpected nil remain from single element dup set" )
}
remain . isDisabled . Set ( false )
remain . isDup . Set ( false )
s . clients [ c . key ] = singleClient { remain }
}
}
}
if c . canMesh {
if c . canMesh {
delete ( s . watchers , c )
delete ( s . watchers , c )
}
}
@ -431,9 +587,15 @@ func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
// or move them over to the active client (in case a replaced client
// or move them over to the active client (in case a replaced client
// connection is being unregistered).
// connection is being unregistered).
for pubKey , connNum := range s . sentTo [ key ] {
for pubKey , connNum := range s . sentTo [ key ] {
if peer , ok := s . clients [ pubKey ] ; ok && peer . connNum == connNum {
set , ok := s . clients [ pubKey ]
if ! ok {
continue
}
set . ForeachClient ( func ( peer * sclient ) {
if peer . connNum == connNum {
go peer . requestPeerGoneWrite ( key )
go peer . requestPeerGoneWrite ( key )
}
}
} )
}
}
delete ( s . sentTo , key )
delete ( s . sentTo , key )
}
}
@ -503,12 +665,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
discoSendQueue : make ( chan pkt , perClientSendQueueDepth ) ,
discoSendQueue : make ( chan pkt , perClientSendQueueDepth ) ,
peerGone : make ( chan key . Public ) ,
peerGone : make ( chan key . Public ) ,
canMesh : clientInfo . MeshKey != "" && clientInfo . MeshKey == s . meshKey ,
canMesh : clientInfo . MeshKey != "" && clientInfo . MeshKey == s . meshKey ,
// Allow kicking out previous connections once a
// minute, with a very high burst of 100. Once a
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter : rate . NewLimiter ( rate . Every ( time . Minute ) , 100 ) ,
}
}
if c . canMesh {
if c . canMesh {
@ -518,15 +674,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
c . info = * clientInfo
c . info = * clientInfo
}
}
for {
s . registerClient ( c )
ok , d := s . registerClient ( c )
if ok {
break
}
s . clientsReplaceSleeping . Add ( 1 )
timeSleep ( d )
s . clientsReplaceSleeping . Add ( - 1 )
}
defer s . unregisterClient ( c )
defer s . unregisterClient ( c )
err = s . sendServerInfo ( c . bw , clientKey )
err = s . sendServerInfo ( c . bw , clientKey )
@ -570,6 +718,7 @@ func (c *sclient) run(ctx context.Context) error {
}
}
return fmt . Errorf ( "client %x: readFrameHeader: %w" , c . key , err )
return fmt . Errorf ( "client %x: readFrameHeader: %w" , c . key , err )
}
}
c . s . noteClientActivity ( c )
switch ft {
switch ft {
case frameNotePreferred :
case frameNotePreferred :
err = c . handleFrameNotePreferred ( ft , fl )
err = c . handleFrameNotePreferred ( ft , fl )
@ -634,9 +783,15 @@ func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
s . mu . Lock ( )
s . mu . Lock ( )
defer s . mu . Unlock ( )
defer s . mu . Unlock ( )
if target , ok := s . clients [ targetKey ] ; ok {
if set , ok := s . clients [ targetKey ] ; ok {
if set . Len ( ) == 1 {
c . logf ( "frameClosePeer closing peer %x" , targetKey )
c . logf ( "frameClosePeer closing peer %x" , targetKey )
} else {
c . logf ( "frameClosePeer closing peer %x (%d connections)" , targetKey , set . Len ( ) )
}
set . ForeachClient ( func ( target * sclient ) {
go target . nc . Close ( )
go target . nc . Close ( )
} )
} else {
} else {
c . logf ( "frameClosePeer failed to find peer %x" , targetKey )
c . logf ( "frameClosePeer failed to find peer %x" , targetKey )
}
}
@ -658,15 +813,25 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
}
}
s . packetsForwardedIn . Add ( 1 )
s . packetsForwardedIn . Add ( 1 )
var dstLen int
var dst * sclient
s . mu . Lock ( )
s . mu . Lock ( )
dst := s . clients [ dstKey ]
if set , ok := s . clients [ dstKey ] ; ok {
dstLen = set . Len ( )
dst = set . ActiveClient ( )
}
if dst != nil {
if dst != nil {
s . notePeerSendLocked ( srcKey , dst )
s . notePeerSendLocked ( srcKey , dst )
}
}
s . mu . Unlock ( )
s . mu . Unlock ( )
if dst == nil {
if dst == nil {
s . recordDrop ( contents , srcKey , dstKey , dropReasonUnknownDestOnFwd )
reason := dropReasonUnknownDestOnFwd
if dstLen > 1 {
reason = dropReasonDupClient
}
s . recordDrop ( contents , srcKey , dstKey , reason )
return nil
return nil
}
}
@ -699,12 +864,18 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
}
var fwd PacketForwarder
var fwd PacketForwarder
var dstLen int
var dst * sclient
s . mu . Lock ( )
s . mu . Lock ( )
dst := s . clients [ dstKey ]
if set , ok := s . clients [ dstKey ] ; ok {
if dst == nil {
dstLen = set . Len ( )
fwd = s . clientsMesh [ dstKey ]
dst = set . ActiveClient ( )
} else {
}
if dst != nil {
s . notePeerSendLocked ( c . key , dst )
s . notePeerSendLocked ( c . key , dst )
} else if dstLen < 1 {
fwd = s . clientsMesh [ dstKey ]
}
}
s . mu . Unlock ( )
s . mu . Unlock ( )
@ -717,7 +888,11 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
}
}
return nil
return nil
}
}
s . recordDrop ( contents , c . key , dstKey , dropReasonUnknownDest )
reason := dropReasonUnknownDest
if dstLen > 1 {
reason = dropReasonDupClient
}
s . recordDrop ( contents , c . key , dstKey , reason )
return nil
return nil
}
}
@ -741,6 +916,7 @@ const (
dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed
dropReasonWriteError // OS write() failed
dropReasonDupClient // the public key is connected 2+ times (active/active, fighting)
)
)
func ( s * Server ) recordDrop ( packetBytes [ ] byte , srcKey , dstKey key . Public , reason dropReason ) {
func ( s * Server ) recordDrop ( packetBytes [ ] byte , srcKey , dstKey key . Public , reason dropReason ) {
@ -850,6 +1026,57 @@ func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
return err
return err
}
}
func ( s * Server ) noteClientActivity ( c * sclient ) {
if ! c . isDup . Get ( ) {
// Fast path for clients that aren't in a dup set.
return
}
if c . isDisabled . Get ( ) {
// If they're already disabled, no point checking more.
return
}
s . mu . Lock ( )
defer s . mu . Unlock ( )
ds , ok := s . clients [ c . key ] . ( * dupClientSet )
if ! ok {
// It became unduped in between the isDup fast path check above
// and the mutex check. Nothing to do.
return
}
if s . dupPolicy == lastWriterIsActive {
ds . last = c
} else if ds . last == nil {
// If we didn't have a primary, let the current
// speaker be the primary.
ds . last = c
}
if sh := ds . sendHistory ; len ( sh ) != 0 && sh [ len ( sh ) - 1 ] == c {
// The client c was the last client to make activity
// in this set and it was already recorded. Nothing to
// do.
return
}
// If we saw this connection send previously, then consider
// the group fighting and disable them all.
if s . dupPolicy == disableFighters {
for _ , prior := range ds . sendHistory {
if prior == c {
ds . ForeachClient ( func ( c * sclient ) {
c . isDisabled . Set ( true )
} )
break
}
}
}
// Append this client to the list of clients who spoke last.
ds . sendHistory = append ( ds . sendHistory , c )
}
type serverInfo struct {
type serverInfo struct {
Version int ` json:"version,omitempty" `
Version int ` json:"version,omitempty" `
}
}
@ -987,6 +1214,8 @@ type sclient struct {
peerGone chan key . Public // write request that a previous sender has disconnected (not used by mesh peers)
peerGone chan key . Public // write request that a previous sender has disconnected (not used by mesh peers)
meshUpdate chan struct { } // write request to write peerStateChange
meshUpdate chan struct { } // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup syncs . AtomicBool // whether more than 1 sclient for key is connected
isDisabled syncs . AtomicBool // whether sends to this peer are disabled due to active/active dups
// replaceLimiter controls how quickly two connections with
// replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by
// the same client key can kick each other off the server by
@ -1399,10 +1628,10 @@ func (s *Server) ExpVar() expvar.Var {
m . Set ( "gauge_clients_total" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) } ) )
m . Set ( "gauge_clients_total" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) } ) )
m . Set ( "gauge_clients_local" , expvar . Func ( func ( ) interface { } { return len ( s . clients ) } ) )
m . Set ( "gauge_clients_local" , expvar . Func ( func ( ) interface { } { return len ( s . clients ) } ) )
m . Set ( "gauge_clients_remote" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) - len ( s . clients ) } ) )
m . Set ( "gauge_clients_remote" , expvar . Func ( func ( ) interface { } { return len ( s . clientsMesh ) - len ( s . clients ) } ) )
m . Set ( "gauge_current_dup_client_keys" , & s . dupClientKeys )
m . Set ( "gauge_current_dup_client_conns" , & s . dupClientConns )
m . Set ( "counter_total_dup_client_conns" , & s . dupClientConnTotal )
m . Set ( "accepts" , & s . accepts )
m . Set ( "accepts" , & s . accepts )
m . Set ( "clients_replaced" , & s . clientsReplaced )
m . Set ( "clients_replace_limited" , & s . clientsReplaceLimited )
m . Set ( "gauge_clients_replace_sleeping" , & s . clientsReplaceSleeping )
m . Set ( "bytes_received" , & s . bytesRecv )
m . Set ( "bytes_received" , & s . bytesRecv )
m . Set ( "bytes_sent" , & s . bytesSent )
m . Set ( "bytes_sent" , & s . bytesSent )
m . Set ( "packets_dropped" , & s . packetsDropped )
m . Set ( "packets_dropped" , & s . packetsDropped )