@ -498,7 +498,7 @@ func (s *Server) registerClient(c *sclient) {
switch set := set . ( type ) {
switch set := set . ( type ) {
case nil :
case nil :
s . clients [ c . key ] = singleClient { c }
s . clients [ c . key ] = singleClient { c }
c . debug ( "register single client" )
c . debug Logf ( "register single client" )
case singleClient :
case singleClient :
s . dupClientKeys . Add ( 1 )
s . dupClientKeys . Add ( 1 )
s . dupClientConns . Add ( 2 ) // both old and new count
s . dupClientConns . Add ( 2 ) // both old and new count
@ -514,7 +514,7 @@ func (s *Server) registerClient(c *sclient) {
} ,
} ,
sendHistory : [ ] * sclient { old } ,
sendHistory : [ ] * sclient { old } ,
}
}
c . debug ( "register duplicate client" )
c . debug Logf ( "register duplicate client" )
case * dupClientSet :
case * dupClientSet :
s . dupClientConns . Add ( 1 ) // the gauge
s . dupClientConns . Add ( 1 ) // the gauge
s . dupClientConnTotal . Add ( 1 ) // the counter
s . dupClientConnTotal . Add ( 1 ) // the counter
@ -522,7 +522,7 @@ func (s *Server) registerClient(c *sclient) {
set . set [ c ] = true
set . set [ c ] = true
set . last = c
set . last = c
set . sendHistory = append ( set . sendHistory , c )
set . sendHistory = append ( set . sendHistory , c )
c . debug ( "register another duplicate client" )
c . debug Logf ( "register another duplicate client" )
}
}
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
if _ , ok := s . clientsMesh [ c . key ] ; ! ok {
@ -555,7 +555,7 @@ func (s *Server) unregisterClient(c *sclient) {
case nil :
case nil :
c . logf ( "[unexpected]; clients map is empty" )
c . logf ( "[unexpected]; clients map is empty" )
case singleClient :
case singleClient :
c . l ogf( "removed connection" )
c . debugL ogf( "removed connection" )
delete ( s . clients , c . key )
delete ( s . clients , c . key )
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
if v , ok := s . clientsMesh [ c . key ] ; ok && v == nil {
delete ( s . clientsMesh , c . key )
delete ( s . clientsMesh , c . key )
@ -563,7 +563,7 @@ func (s *Server) unregisterClient(c *sclient) {
}
}
s . broadcastPeerStateChangeLocked ( c . key , false )
s . broadcastPeerStateChangeLocked ( c . key , false )
case * dupClientSet :
case * dupClientSet :
c . debug ( "removed duplicate client" )
c . debug Logf ( "removed duplicate client" )
if set . removeClient ( c ) {
if set . removeClient ( c ) {
s . dupClientConns . Add ( - 1 )
s . dupClientConns . Add ( - 1 )
} else {
} else {
@ -712,9 +712,12 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
if clientInfo != nil {
if clientInfo != nil {
c . info = * clientInfo
c . info = * clientInfo
if envknob . Bool ( "DERP_PROBER_DEBUG_LOGS" ) && clientInfo . IsProber {
if envknob . Bool ( "DERP_PROBER_DEBUG_LOGS" ) && clientInfo . IsProber {
c . debug Logging = true
c . debug = true
}
}
}
}
if s . debug {
c . debug = true
}
s . registerClient ( c )
s . registerClient ( c )
defer s . unregisterClient ( c )
defer s . unregisterClient ( c )
@ -727,6 +730,12 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
return c . run ( ctx )
return c . run ( ctx )
}
}
func ( s * Server ) debugLogf ( format string , v ... any ) {
if s . debug {
s . logf ( format , v ... )
}
}
// for testing
// for testing
var (
var (
timeSleep = time . Sleep
timeSleep = time . Sleep
@ -744,16 +753,20 @@ func (c *sclient) run(ctx context.Context) error {
defer func ( ) {
defer func ( ) {
cancelSender ( )
cancelSender ( )
if err := grp . Wait ( ) ; err != nil && ! c . s . isClosed ( ) {
if err := grp . Wait ( ) ; err != nil && ! c . s . isClosed ( ) {
c . logf ( "sender failed: %v" , err )
if errors . Is ( err , context . Canceled ) {
c . debugLogf ( "sender canceled by reader exiting" )
} else {
c . logf ( "sender failed: %v" , err )
}
}
}
} ( )
} ( )
for {
for {
ft , fl , err := readFrameHeader ( c . br )
ft , fl , err := readFrameHeader ( c . br )
c . debug ( "read frame type %d len %d err %v" , ft , fl , err )
c . debug Logf ( "read frame type %d len %d err %v" , ft , fl , err )
if err != nil {
if err != nil {
if errors . Is ( err , io . EOF ) {
if errors . Is ( err , io . EOF ) {
c . l ogf( "read EOF" )
c . debugL ogf( "read EOF" )
return nil
return nil
}
}
if c . s . isClosed ( ) {
if c . s . isClosed ( ) {
@ -910,7 +923,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return nil
return nil
}
}
dst . debug ( "received forwarded packet from %s via %s" , srcKey . ShortString ( ) , c . key . ShortString ( ) )
dst . debug Logf ( "received forwarded packet from %s via %s" , srcKey . ShortString ( ) , c . key . ShortString ( ) )
return c . sendPkt ( dst , pkt {
return c . sendPkt ( dst , pkt {
bs : contents ,
bs : contents ,
@ -960,7 +973,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
if fwd != nil {
if fwd != nil {
s . packetsForwardedOut . Add ( 1 )
s . packetsForwardedOut . Add ( 1 )
err := fwd . ForwardPacket ( c . key , dstKey , contents )
err := fwd . ForwardPacket ( c . key , dstKey , contents )
c . debug ( "SendPacket for %s, forwarding via %s: %v" , dstKey . ShortString ( ) , fwd , err )
c . debug Logf ( "SendPacket for %s, forwarding via %s: %v" , dstKey . ShortString ( ) , fwd , err )
if err != nil {
if err != nil {
// TODO:
// TODO:
return nil
return nil
@ -974,10 +987,10 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
c . requestPeerGoneWriteLimited ( dstKey , contents , PeerGoneReasonNotHere )
c . requestPeerGoneWriteLimited ( dstKey , contents , PeerGoneReasonNotHere )
}
}
s . recordDrop ( contents , c . key , dstKey , reason )
s . recordDrop ( contents , c . key , dstKey , reason )
c . debug ( "SendPacket for %s, dropping with reason=%s" , dstKey . ShortString ( ) , reason )
c . debug Logf ( "SendPacket for %s, dropping with reason=%s" , dstKey . ShortString ( ) , reason )
return nil
return nil
}
}
c . debug ( "SendPacket for %s, sending directly" , dstKey . ShortString ( ) )
c . debug Logf ( "SendPacket for %s, sending directly" , dstKey . ShortString ( ) )
p := pkt {
p := pkt {
bs : contents ,
bs : contents ,
@ -987,8 +1000,8 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return c . sendPkt ( dst , p )
return c . sendPkt ( dst , p )
}
}
func ( c * sclient ) debug ( format string , v ... any ) {
func ( c * sclient ) debug Logf ( format string , v ... any ) {
if c . debug Logging {
if c . debug {
c . logf ( format , v ... )
c . logf ( format , v ... )
}
}
}
}
@ -1011,7 +1024,8 @@ const (
func ( s * Server ) recordDrop ( packetBytes [ ] byte , srcKey , dstKey key . NodePublic , reason dropReason ) {
func ( s * Server ) recordDrop ( packetBytes [ ] byte , srcKey , dstKey key . NodePublic , reason dropReason ) {
s . packetsDropped . Add ( 1 )
s . packetsDropped . Add ( 1 )
s . packetsDroppedReasonCounters [ reason ] . Add ( 1 )
s . packetsDroppedReasonCounters [ reason ] . Add ( 1 )
if disco . LooksLikeDiscoWrapper ( packetBytes ) {
looksDisco := disco . LooksLikeDiscoWrapper ( packetBytes )
if looksDisco {
s . packetsDroppedTypeDisco . Add ( 1 )
s . packetsDroppedTypeDisco . Add ( 1 )
} else {
} else {
s . packetsDroppedTypeOther . Add ( 1 )
s . packetsDroppedTypeOther . Add ( 1 )
@ -1024,9 +1038,7 @@ func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.NodePublic, r
msg := fmt . Sprintf ( "drop (%s) %s -> %s" , srcKey . ShortString ( ) , reason , dstKey . ShortString ( ) )
msg := fmt . Sprintf ( "drop (%s) %s -> %s" , srcKey . ShortString ( ) , reason , dstKey . ShortString ( ) )
s . limitedLogf ( msg )
s . limitedLogf ( msg )
}
}
if s . debug {
s . debugLogf ( "dropping packet reason=%s dst=%s disco=%v" , reason , dstKey , looksDisco )
s . logf ( "dropping packet reason=%s dst=%s disco=%v" , reason , dstKey , disco . LooksLikeDiscoWrapper ( packetBytes ) )
}
}
}
func ( c * sclient ) sendPkt ( dst * sclient , p pkt ) error {
func ( c * sclient ) sendPkt ( dst * sclient , p pkt ) error {
@ -1044,13 +1056,13 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
select {
select {
case <- dst . done :
case <- dst . done :
s . recordDrop ( p . bs , c . key , dstKey , dropReasonGoneDisconnected )
s . recordDrop ( p . bs , c . key , dstKey , dropReasonGoneDisconnected )
dst . debug ( "sendPkt attempt %d dropped, dst gone" , attempt )
dst . debug Logf ( "sendPkt attempt %d dropped, dst gone" , attempt )
return nil
return nil
default :
default :
}
}
select {
select {
case sendQueue <- p :
case sendQueue <- p :
dst . debug ( "sendPkt attempt %d enqueued" , attempt )
dst . debug Logf ( "sendPkt attempt %d enqueued" , attempt )
return nil
return nil
default :
default :
}
}
@ -1066,7 +1078,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
// contended queue with racing writers. Give up and tail-drop in
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
// this case to keep reader unblocked.
s . recordDrop ( p . bs , c . key , dstKey , dropReasonQueueTail )
s . recordDrop ( p . bs , c . key , dstKey , dropReasonQueueTail )
dst . debug ( "sendPkt attempt %d dropped, queue full" )
dst . debug Logf ( "sendPkt attempt %d dropped, queue full" )
return nil
return nil
}
}
@ -1304,8 +1316,7 @@ type sclient struct {
canMesh bool // clientInfo had correct mesh token for inter-region routing
canMesh bool // clientInfo had correct mesh token for inter-region routing
isDup atomic . Bool // whether more than 1 sclient for key is connected
isDup atomic . Bool // whether more than 1 sclient for key is connected
isDisabled atomic . Bool // whether sends to this peer are disabled due to active/active dups
isDisabled atomic . Bool // whether sends to this peer are disabled due to active/active dups
debug bool // turn on for verbose logging
debugLogging bool
// Owned by run, not thread-safe.
// Owned by run, not thread-safe.
br * bufio . Reader
br * bufio . Reader
@ -1593,7 +1604,7 @@ func (c *sclient) sendPacket(srcKey key.NodePublic, contents []byte) (err error)
c . s . packetsSent . Add ( 1 )
c . s . packetsSent . Add ( 1 )
c . s . bytesSent . Add ( int64 ( len ( contents ) ) )
c . s . bytesSent . Add ( int64 ( len ( contents ) ) )
}
}
c . debug ( "sendPacket from %s: %v" , srcKey . ShortString ( ) , err )
c . debug Logf ( "sendPacket from %s: %v" , srcKey . ShortString ( ) , err )
} ( )
} ( )
c . setWriteDeadline ( )
c . setWriteDeadline ( )