diff --git a/derp/derp.go b/derp/derp.go index 9616a0814..a6799ce86 100644 --- a/derp/derp.go +++ b/derp/derp.go @@ -95,6 +95,12 @@ const ( // framePeerPresent for all connected nodes, and then a stream of // framePeerPresent & framePeerGone has peers connect and disconnect. frameWatchConns = frameType(0x10) + + // frameClosePeer is a privileged frame type (requires the + // mesh key for now) that closes the provided peer's + // connection. (To be used for cluster load balancing + // purposes, when clients end up on a non-ideal node) + frameClosePeer = frameType(0x11) // 32B pub key of peer to close. ) var bin = binary.BigEndian diff --git a/derp/derp_client.go b/derp/derp_client.go index b5107dcc4..d4cbb583d 100644 --- a/derp/derp_client.go +++ b/derp/derp_client.go @@ -269,7 +269,7 @@ func (c *Client) NotePreferred(preferred bool) (err error) { } // WatchConnectionChanges sends a request to subscribe to the peer's connection list. -// It's a fatal error if the client wasn't created with NewMeshClient. +// It's a fatal error if the client wasn't created using MeshKey. func (c *Client) WatchConnectionChanges() error { c.wmu.Lock() defer c.wmu.Unlock() @@ -279,6 +279,14 @@ func (c *Client) WatchConnectionChanges() error { return c.bw.Flush() } +// ClosePeer asks the server to close target's TCP connection. +// It's a fatal error if the client wasn't created using MeshKey. +func (c *Client) ClosePeer(target key.Public) error { + c.wmu.Lock() + defer c.wmu.Unlock() + return writeFrame(c.bw, frameClosePeer, target[:]) +} + // ReceivedMessage represents a type returned by Client.Recv. Unless // otherwise documented, the returned message aliases the byte slice // provided to Recv and thus the message is only as good as that diff --git a/derp/derp_server.go b/derp/derp_server.go index 0eee50665..ff1cb3e12 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -420,6 +420,8 @@ func (c *sclient) run(ctx context.Context) error { err = c.handleFrameForwardPacket(ft, fl) case frameWatchConns: err = c.handleFrameWatchConns(ft, fl) + case frameClosePeer: + err = c.handleFrameClosePeer(ft, fl) default: err = c.handleUnknownFrame(ft, fl) } @@ -457,6 +459,32 @@ func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error { return nil } +func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error { + if fl != keyLen { + return fmt.Errorf("handleFrameClosePeer wrong size") + } + if !c.canMesh { + return fmt.Errorf("insufficient permissions") + } + var targetKey key.Public + if _, err := io.ReadFull(c.br, targetKey[:]); err != nil { + return err + } + s := c.s + + s.mu.Lock() + defer s.mu.Unlock() + + if target, ok := s.clients[targetKey]; ok { + c.logf("frameClosePeer closing peer %x", targetKey) + go target.nc.Close() + } else { + c.logf("frameClosePeer failed to find peer %x", targetKey) + } + + return nil +} + // handleFrameForwardPacket reads a "forward packet" frame from the client // (which must be a trusted client, a peer in our mesh). func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { diff --git a/derp/derphttp/derphttp_client.go b/derp/derphttp/derphttp_client.go index b2c2e76fb..946c8c937 100644 --- a/derp/derphttp/derphttp_client.go +++ b/derp/derphttp/derphttp_client.go @@ -518,6 +518,10 @@ func (c *Client) NotePreferred(v bool) { } } +// WatchConnectionChanges sends a request to subscribe to +// notifications about clients connecting & disconnecting. +// +// Only trusted connections (using MeshKey) are allowed to use this. func (c *Client) WatchConnectionChanges() error { client, _, err := c.connect(context.TODO(), "derphttp.Client.WatchConnectionChanges") if err != nil { @@ -530,6 +534,21 @@ func (c *Client) WatchConnectionChanges() error { return err } +// ClosePeer asks the server to close target's TCP connection. +// +// Only trusted connections (using MeshKey) are allowed to use this. +func (c *Client) ClosePeer(target key.Public) error { + client, _, err := c.connect(context.TODO(), "derphttp.Client.ClosePeer") + if err != nil { + return err + } + err = client.ClosePeer(target) + if err != nil { + c.closeForReconnect(client) + } + return err +} + // Recv reads a message from c. The returned message may alias memory from Client. // The message should only be used until the next Client call. func (c *Client) Recv() (derp.ReceivedMessage, error) {