@ -279,6 +279,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool {
// public key gets more than one PacketForwarder registered for it.
// public key gets more than one PacketForwarder registered for it.
type PacketForwarder interface {
type PacketForwarder interface {
ForwardPacket ( src , dst key . NodePublic , payload [ ] byte ) error
ForwardPacket ( src , dst key . NodePublic , payload [ ] byte ) error
String ( ) string
}
}
// Conn is the subset of the underlying net.Conn the DERP Server needs.
// Conn is the subset of the underlying net.Conn the DERP Server needs.
@ -495,6 +496,7 @@ func (s *Server) registerClient(c *sclient) {
switch set := set . ( type ) {
switch set := set . ( type ) {
case nil :
case nil :
s . clients [ c . key ] = singleClient { c }
s . clients [ c . key ] = singleClient { c }
c . debug ( "register single client" )
case singleClient :
case singleClient :
s . dupClientKeys . Add ( 1 )
s . dupClientKeys . Add ( 1 )
s . dupClientConns . Add ( 2 ) // both old and new count
s . dupClientConns . Add ( 2 ) // both old and new count
@ -510,6 +512,7 @@ func (s *Server) registerClient(c *sclient) {
} ,
} ,
sendHistory : [ ] * sclient { old } ,
sendHistory : [ ] * sclient { old } ,
}
}
c . debug ( "register duplicate client" )
case * dupClientSet :
case * dupClientSet :
s . dupClientConns . Add ( 1 ) // the gauge
s . dupClientConns . Add ( 1 ) // the gauge
s . dupClientConnTotal . Add ( 1 ) // the counter
s . dupClientConnTotal . Add ( 1 ) // the counter
@ -517,6 +520,7 @@ func (s *Server) registerClient(c *sclient) {
set . set [ c ] = true
set . set [ c ] = true
set . last = c
set . last = c
set . sendHistory = append ( set . sendHistory , c )
set . sendHistory = append ( set . sendHistory , c )
c . debug ( "register another duplicate client" )
}
}
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
@ -549,7 +553,7 @@ func (s *Server) unregisterClient(c *sclient) {
case nil :
case nil :
c . logf ( "[unexpected]; clients map is empty" )
c . logf ( "[unexpected]; clients map is empty" )
case singleClient :
case singleClient :
c . logf ( "remov ing connection")
c . logf ( "remov ed connection")
delete ( s . clients , c . key )
delete ( s . clients , c . key )
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
delete ( s . clientsMesh , c . key )
delete ( s . clientsMesh , c . key )
@ -557,6 +561,7 @@ func (s *Server) unregisterClient(c *sclient) {
}
}
s . broadcastPeerStateChangeLocked ( c . key , false )
s . broadcastPeerStateChangeLocked ( c . key , false )
case * dupClientSet :
case * dupClientSet :
c . debug ( "removed duplicate client" )
if set . removeClient ( c ) {
if set . removeClient ( c ) {
s . dupClientConns . Add ( - 1 )
s . dupClientConns . Add ( - 1 )
} else {
} else {
@ -673,7 +678,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
nc : nc ,
nc : nc ,
br : br ,
br : br ,
bw : bw ,
bw : bw ,
logf : logger . WithPrefix ( s . logf , fmt . Sprintf ( "derp client %v /%x : ", remoteAddr , clientKey ) ) ,
logf : logger . WithPrefix ( s . logf , fmt . Sprintf ( "derp client %v %s : ", remoteAddr , clientKey . ShortString ( ) ) ) ,
done : ctx . Done ( ) ,
done : ctx . Done ( ) ,
remoteAddr : remoteAddr ,
remoteAddr : remoteAddr ,
remoteIPPort : remoteIPPort ,
remoteIPPort : remoteIPPort ,
@ -690,6 +695,9 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
}
}
if clientInfo != nil {
if clientInfo != nil {
c . info = * clientInfo
c . info = * clientInfo
if envknob . Bool ( "DERP_PROBER_DEBUG_LOGS" ) && clientInfo . IsProber {
c . debugLogging = true
}
}
}
s . registerClient ( c )
s . registerClient ( c )
@ -726,6 +734,7 @@ func (c *sclient) run(ctx context.Context) error {
for {
for {
ft , fl , err := readFrameHeader ( c . br )
ft , fl , err := readFrameHeader ( c . br )
c . debug ( "read frame type %d len %d err %v" , ft , fl , err )
if err != nil {
if err != nil {
if errors . Is ( err , io . EOF ) {
if errors . Is ( err , io . EOF ) {
c . logf ( "read EOF" )
c . logf ( "read EOF" )
@ -735,7 +744,7 @@ func (c *sclient) run(ctx context.Context) error {
c . logf ( "closing; server closed" )
c . logf ( "closing; server closed" )
return nil
return nil
}
}
return fmt . Errorf ( "client % x : readFrameHeader: %w", c . key , err )
return fmt . Errorf ( "client % s : readFrameHeader: %w", c . key . ShortString ( ) , err )
}
}
c . s . noteClientActivity ( c )
c . s . noteClientActivity ( c )
switch ft {
switch ft {
@ -883,6 +892,8 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return nil
return nil
}
}
dst . debug ( "received forwarded packet from %s via %s" , srcKey . ShortString ( ) , c . key . ShortString ( ) )
return c . sendPkt ( dst , pkt {
return c . sendPkt ( dst , pkt {
bs : contents ,
bs : contents ,
enqueuedAt : time . Now ( ) ,
enqueuedAt : time . Now ( ) ,
@ -930,7 +941,9 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
if dst == nil {
if dst == nil {
if fwd != nil {
if fwd != nil {
s . packetsForwardedOut . Add ( 1 )
s . packetsForwardedOut . Add ( 1 )
if err := fwd . ForwardPacket ( c . key , dstKey , contents ) ; err != nil {
err := fwd . ForwardPacket ( c . key , dstKey , contents )
c . debug ( "SendPacket for %s, forwarding via %s: %v" , dstKey . ShortString ( ) , fwd , err )
if err != nil {
// TODO:
// TODO:
return nil
return nil
}
}
@ -941,8 +954,10 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
reason = dropReasonDupClient
reason = dropReasonDupClient
}
}
s . recordDrop ( contents , c . key , dstKey , reason )
s . recordDrop ( contents , c . key , dstKey , reason )
c . debug ( "SendPacket for %s, dropping with reason=%s" , dstKey . ShortString ( ) , reason )
return nil
return nil
}
}
c . debug ( "SendPacket for %s, sending directly" , dstKey . ShortString ( ) )
p := pkt {
p := pkt {
bs : contents ,
bs : contents ,
@ -952,6 +967,12 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return c . sendPkt ( dst , p )
return c . sendPkt ( dst , p )
}
}
func ( c * sclient ) debug ( format string , v ... any ) {
if c . debugLogging {
c . logf ( format , v ... )
}
}
// dropReason is why we dropped a DERP frame.
// dropReason is why we dropped a DERP frame.
type dropReason int
type dropReason int
@ -1003,11 +1024,13 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
select {
select {
case <- dst . done :
case <- dst . done :
s . recordDrop ( p . bs , c . key , dstKey , dropReasonGone )
s . recordDrop ( p . bs , c . key , dstKey , dropReasonGone )
dst . debug ( "sendPkt attempt %d dropped, dst gone" , attempt )
return nil
return nil
default :
default :
}
}
select {
select {
case sendQueue <- p :
case sendQueue <- p :
dst . debug ( "sendPkt attempt %d enqueued" , attempt )
return nil
return nil
default :
default :
}
}
@ -1023,6 +1046,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
// contended queue with racing writers. Give up and tail-drop in
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
// this case to keep reader unblocked.
s . recordDrop ( p . bs , c . key , dstKey , dropReasonQueueTail )
s . recordDrop ( p . bs , c . key , dstKey , dropReasonQueueTail )
dst . debug ( "sendPkt attempt %d dropped, queue full" )
return nil
return nil
}
}
@ -1258,6 +1282,8 @@ type sclient struct {
isDup atomic . Bool // whether more than 1 sclient for key is connected
isDup atomic . Bool // whether more than 1 sclient for key is connected
isDisabled atomic . Bool // whether sends to this peer are disabled due to active/active dups
isDisabled atomic . Bool // whether sends to this peer are disabled due to active/active dups
debugLogging bool
// replaceLimiter controls how quickly two connections with
// replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by
// the same client key can kick each other off the server by
// taking over ownership of a key.
// taking over ownership of a key.
@ -1529,6 +1555,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
c . s . packetsSent . Add ( 1 )
c . s . packetsSent . Add ( 1 )
c . s . bytesSent . Add ( int64 ( len ( contents ) ) )
c . s . bytesSent . Add ( int64 ( len ( contents ) ) )
}
}
c . debug ( "sendPacket from %s: %v" , srcKey . ShortString ( ) , err )
} ( )
} ( )
c . setWriteDeadline ( )
c . setWriteDeadline ( )
@ -1689,6 +1716,10 @@ func (f *multiForwarder) ForwardPacket(src, dst key.NodePublic, payload []byte)
return f . fwd . Load ( ) . ForwardPacket ( src , dst , payload )
return f . fwd . Load ( ) . ForwardPacket ( src , dst , payload )
}
}
func ( f * multiForwarder ) String ( ) string {
return fmt . Sprintf ( "<MultiForwarder fwd=%s total=%d>" , f . fwd . Load ( ) , len ( f . all ) )
}
func ( s * Server ) expVarFunc ( f func ( ) any ) expvar . Func {
func ( s * Server ) expVarFunc ( f func ( ) any ) expvar . Func {
return expvar . Func ( func ( ) any {
return expvar . Func ( func ( ) any {
s . mu . Lock ( )
s . mu . Lock ( )