@ -47,6 +47,7 @@ type Server struct {
publicKey key . Public
logf logger . Logf
memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish)
meshKey string
// Counters:
packetsSent , bytesSent expvar . Int
@ -72,6 +73,7 @@ type Server struct {
netConns map [ Conn ] chan struct { } // chan is closed when conn closes
clients map [ key . Public ] * sclient
clientsEver map [ key . Public ] bool // never deleted from, for stats; fine for now
watchers map [ * sclient ] bool // mesh peer -> true
}
// Conn is the subset of the underlying net.Conn the DERP Server needs.
@ -101,6 +103,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
clientsEver : make ( map [ key . Public ] bool ) ,
netConns : make ( map [ Conn ] chan struct { } ) ,
memSys0 : ms . Sys ,
watchers : map [ * sclient ] bool { } ,
}
s . packetsDroppedUnknown = s . packetsDroppedReason . Get ( "unknown_dest" )
s . packetsDroppedGone = s . packetsDroppedReason . Get ( "gone" )
@ -110,6 +113,16 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
return s
}
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves.
//
// It must be called before serving begins.
func ( s * Server ) SetMeshKey ( v string ) {
s . meshKey = v
}
func ( s * Server ) HasMeshKey ( ) bool { return s . meshKey != "" }
// Close closes the server and waits for the connections to disconnect.
func ( s * Server ) Close ( ) error {
s . mu . Lock ( )
@ -188,6 +201,19 @@ func (s *Server) registerClient(c *sclient) {
s . clients [ c . key ] = c
s . clientsEver [ c . key ] = true
s . curClients . Add ( 1 )
s . broadcastPeerStateChangeLocked ( c . key , true )
}
// broadcastPeerStateChangeLocked enqueues a message to all watchers
// (other DERP nodes in the region, or trusted clients) that peer's
// presence changed.
//
// s.mu must be held.
func ( s * Server ) broadcastPeerStateChangeLocked ( peer key . Public , present bool ) {
for w := range s . watchers {
w . peerStateChange = append ( w . peerStateChange , peerConnState { peer : peer , present : present } )
go w . requestMeshUpdate ( )
}
}
// unregisterClient removes a client from the server.
@ -199,6 +225,10 @@ func (s *Server) unregisterClient(c *sclient) {
c . logf ( "removing connection" )
delete ( s . clients , c . key )
}
if c . canMesh {
delete ( s . watchers , c )
}
s . broadcastPeerStateChangeLocked ( c . key , false )
s . curClients . Add ( - 1 )
if c . preferred {
@ -224,6 +254,26 @@ func (s *Server) unregisterClient(c *sclient) {
}
}
func ( s * Server ) addWatcher ( c * sclient ) {
if ! c . canMesh {
panic ( "invariant: addWatcher called without permissions" )
}
s . mu . Lock ( )
defer s . mu . Unlock ( )
// Queue messages for each already-connected client.
for peer := range s . clients {
c . peerStateChange = append ( c . peerStateChange , peerConnState { peer : peer , present : true } )
}
// And enroll the watcher in future updates (of both
// connections & disconnections).
s . watchers [ c ] = true
go c . requestMeshUpdate ( )
}
func ( s * Server ) accept ( nc Conn , brw * bufio . ReadWriter , remoteAddr string , connNum int64 ) error {
br , bw := brw . Reader , brw . Writer
nc . SetDeadline ( time . Now ( ) . Add ( 10 * time . Second ) )
@ -259,6 +309,10 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
sendQueue : make ( chan pkt , perClientSendQueueDepth ) ,
peerGone : make ( chan key . Public ) ,
sentTo : make ( map [ key . Public ] int64 ) ,
canMesh : clientInfo . MeshKey != "" && clientInfo . MeshKey == s . meshKey ,
}
if c . canMesh {
c . meshUpdate = make ( chan struct { } )
}
if clientInfo != nil {
c . info = * clientInfo
@ -307,6 +361,8 @@ func (c *sclient) run(ctx context.Context) error {
err = c . handleFrameNotePreferred ( ft , fl )
case frameSendPacket :
err = c . handleFrameSendPacket ( ft , fl )
case frameWatchConns :
err = c . handleFrameWatchConns ( ft , fl )
default :
err = c . handleUnknownFrame ( ft , fl )
}
@ -333,6 +389,17 @@ func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
return nil
}
func ( c * sclient ) handleFrameWatchConns ( ft frameType , fl uint32 ) error {
if fl != 0 {
return fmt . Errorf ( "handleFrameWatchConns wrong size" )
}
if ! c . canMesh {
return fmt . Errorf ( "insufficient permissions" )
}
c . s . addWatcher ( c )
return nil
}
func ( c * sclient ) handleFrameSendPacket ( ft frameType , fl uint32 ) error {
s := c . s
@ -418,6 +485,16 @@ func (c *sclient) requestPeerGoneWrite(peer key.Public) {
}
}
func ( c * sclient ) requestMeshUpdate ( ) {
if ! c . canMesh {
panic ( "unexpected requestMeshUpdate" )
}
select {
case c . meshUpdate <- struct { } { } :
case <- c . done :
}
}
func ( s * Server ) verifyClient ( clientKey key . Public , info * clientInfo ) error {
// TODO(crawshaw): implement policy constraints on who can use the DERP server
// TODO(bradfitz): ... and at what rate.
@ -532,7 +609,9 @@ type sclient struct {
done <- chan struct { } // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
sendQueue chan pkt // packets queued to this client; never closed
peerGone chan key . Public // write request that a previous sender has disconnected
peerGone chan key . Public // write request that a previous sender has disconnected (not used by mesh peers)
meshUpdate chan struct { } // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
// Owned by run, not thread-safe.
br * bufio . Reader
@ -547,6 +626,21 @@ type sclient struct {
// sentTo tracks all the peers this client has ever sent a packet to, and at which
// connection number.
sentTo map [ key . Public ] int64 // recipient => rcpt's latest sclient.connNum
// Guarded by s.mu
//
// peerStateChange is used by mesh peers (a set of regional
// DERP servers) and contains records that need to be sent to
// the client for them to update their map of who's connected
// to this node.
peerStateChange [ ] peerConnState
}
// peerConnState represents whether a peer is connected to the server
// or not.
type peerConnState struct {
peer key . Public
present bool
}
// pkt is a request to write a data frame to an sclient.
@ -628,6 +722,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case peer := <- c . peerGone :
werr = c . sendPeerGone ( peer )
continue
case <- c . meshUpdate :
werr = c . sendMeshUpdates ( )
continue
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
continue
@ -648,6 +745,9 @@ func (c *sclient) sendLoop(ctx context.Context) error {
return nil
case peer := <- c . peerGone :
werr = c . sendPeerGone ( peer )
case <- c . meshUpdate :
werr = c . sendMeshUpdates ( )
continue
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
case <- keepAliveTick . C :
@ -677,6 +777,59 @@ func (c *sclient) sendPeerGone(peer key.Public) error {
return err
}
// sendPeerPresent sends a peerPresent frame, without flushing.
func ( c * sclient ) sendPeerPresent ( peer key . Public ) error {
c . setWriteDeadline ( )
if err := writeFrameHeader ( c . bw , framePeerPresent , keyLen ) ; err != nil {
return err
}
_ , err := c . bw . Write ( peer [ : ] )
return err
}
// sendMeshUpdates drains as many mesh peerStateChange entries as
// possible into the write buffer WITHOUT flushing or otherwise
// blocking (as it holds c.s.mu while working). If it can't drain them
// all, it schedules itself to be called again in the future.
func ( c * sclient ) sendMeshUpdates ( ) error {
c . s . mu . Lock ( )
defer c . s . mu . Unlock ( )
writes := 0
for _ , pcs := range c . peerStateChange {
if c . bw . Available ( ) <= frameHeaderLen + keyLen {
break
}
var err error
if pcs . present {
err = c . sendPeerPresent ( pcs . peer )
} else {
err = c . sendPeerGone ( pcs . peer )
}
if err != nil {
// Shouldn't happen, though, as we're writing
// into available buffer space, not the
// network.
return err
}
writes ++
}
remain := copy ( c . peerStateChange , c . peerStateChange [ writes : ] )
c . peerStateChange = c . peerStateChange [ : remain ]
// Did we manage to write them all into the bufio buffer without flushing?
if len ( c . peerStateChange ) == 0 {
if cap ( c . peerStateChange ) > 16 {
c . peerStateChange = nil
}
} else {
// Didn't finish in the buffer space provided; schedule a future run.
go c . requestMeshUpdate ( )
}
return nil
}
// sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function
@ -729,6 +882,7 @@ func (s *Server) ExpVar() expvar.Var {
m := new ( metrics . Set )
m . Set ( "counter_unique_clients_ever" , s . expVarFunc ( func ( ) interface { } { return len ( s . clientsEver ) } ) )
m . Set ( "gauge_memstats_sys0" , expvar . Func ( func ( ) interface { } { return int64 ( s . memSys0 ) } ) )
m . Set ( "gauge_watchers" , s . expVarFunc ( func ( ) interface { } { return len ( s . watchers ) } ) )
m . Set ( "gauge_current_connnections" , & s . curClients )
m . Set ( "gauge_current_home_connnections" , & s . curHomeClients )
m . Set ( "accepts" , & s . accepts )