derp: add short queues between reader and writer, drop on overload.

This avoids the server blocking on misbehaving or heavily contended
clients. We attempt to drop from the head of the queue to keep
overall queueing time lower.

Also:
 - fixes server->client keepalives, which weren't happening.
 - removes read rate-limiter, deferring instead to kernel-level
   global limiter/fair queuer.

Signed-off-by: David Anderson <dave@natulte.net>
pull/203/head
David Anderson 4 years ago committed by Dave Anderson
parent dd31285ad4
commit dbfc916273

@ -40,7 +40,6 @@ var (
configPath = flag.String("c", "", "config file path")
certDir = flag.String("certdir", tsweb.DefaultCertDir("derper-certs"), "directory to store LetsEncrypt certs, if addr's port is :443")
hostname = flag.String("hostname", "derp.tailscale.com", "LetsEncrypt host name, if addr's port is :443")
mbps = flag.Int("mbps", 5, "Mbps (mebibit/s) per-client rate limit; 0 means unlimited")
logCollection = flag.String("logcollection", "", "If non-empty, logtail collection to log to")
runSTUN = flag.Bool("stun", false, "also run a STUN server")
)
@ -120,9 +119,6 @@ func main() {
s := derp.NewServer(key.Private(cfg.PrivateKey), log.Printf)
s.WriteTimeout = 2 * time.Second
if *mbps != 0 {
s.BytesPerSecond = (*mbps << 20) / 8
}
expvar.Publish("derp", s.ExpVar())
// Create our own mux so we don't expose /debug/ stuff to the world.
@ -196,7 +192,6 @@ func debugHandler(s *derp.Server) http.Handler {
<ul>
`)
f("<li><b>Hostname:</b> %v</li>\n", *hostname)
f("<li><b>Rate Limit:</b> %v Mbps</li>\n", *mbps)
f("<li><b>Uptime:</b> %v</li>\n", tsweb.Uptime())
f(`<li><a href="/debug/vars">/debug/vars</a> (Go)</li>

@ -24,7 +24,6 @@ import (
"time"
"golang.org/x/crypto/nacl/box"
"golang.org/x/time/rate"
"tailscale.com/metrics"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@ -32,12 +31,10 @@ import (
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
const perClientSendQueueDepth = 32 // packets buffered for sending
// Server is a DERP server.
type Server struct {
// BytesPerSecond, if non-zero, specifies how many bytes per
// second to cap per-client reads at.
BytesPerSecond int
// WriteTimeout, if non-zero, specifies how long to wait
// before failing when writing to a client.
WriteTimeout time.Duration
@ -201,13 +198,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
return fmt.Errorf("client %x rejected: %v", clientKey, err)
}
lim := rate.Inf
if s.BytesPerSecond != 0 {
lim = rate.Limit(s.BytesPerSecond)
}
const burstBytes = 1 << 20 // generous bandwidth delay product? must be over 64k max packet size.
limiter := rate.NewLimiter(lim, burstBytes)
// At this point we trust the client so we don't time out.
nc.SetDeadline(time.Time{})
@ -217,24 +207,17 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
nc: nc,
br: br,
bw: bw,
limiter: limiter,
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
remoteAddr: remoteAddr,
connectedAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
}
if clientInfo != nil {
c.info = *clientInfo
}
// Once the client is registered, it can start receiving
// traffic, but we want to make sure the first thing it
// receives after its frameClientInfo is our frameServerInfo,
// so acquire the c.mu lock (which guards writing to c.bw)
// while we register.
c.mu.Lock()
s.registerClient(c)
err = s.sendServerInfo(bw, clientKey)
c.mu.Unlock()
if err != nil {
return fmt.Errorf("send server info: %v", err)
}
@ -244,11 +227,18 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
}
func (c *sclient) run() error {
s := c.s
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer func() {
// Atomically close+remove send queue, so racing writers don't
// send to closed channel.
c.mu.Lock()
close(c.sendQueue)
c.sendQueue = nil
c.mu.Unlock()
}()
go s.sendClientKeepAlives(ctx, c)
go c.sender(ctx)
for {
ft, fl, err := readFrameHeader(c.br)
@ -270,9 +260,6 @@ func (c *sclient) run() error {
}
func (c *sclient) handleUnknownFrame(ctx context.Context, ft frameType, fl uint32) error {
if err := c.limiter.WaitN(ctx, int(fl)); err != nil {
return fmt.Errorf("rate limit: %v", err)
}
_, err := io.CopyN(ioutil.Discard, c.br, int64(fl))
return err
}
@ -292,7 +279,7 @@ func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
func (c *sclient) handleFrameSendPacket(ctx context.Context, ft frameType, fl uint32) error {
s := c.s
dstKey, contents, err := s.recvPacket(ctx, c.br, fl, c.limiter)
dstKey, contents, err := s.recvPacket(ctx, c.br, fl)
if err != nil {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
@ -309,35 +296,51 @@ func (c *sclient) handleFrameSendPacket(ctx context.Context, ft frameType, fl ui
return nil
}
dst.mu.Lock()
if s.WriteTimeout != 0 {
dst.nc.SetWriteDeadline(time.Now().Add(s.WriteTimeout))
dst.mu.RLock()
defer dst.mu.RUnlock()
if dst.sendQueue == nil {
s.packetsDropped.Add(1)
if debug {
c.logf("dropping packet for shutdown client %x", dstKey)
}
}
err = s.sendPacket(dst.bw, &dst.info, c.key, contents)
dst.mu.Unlock()
if err != nil {
c.logf("write error sending packet to %x: %v", dstKey, err)
p := pkt{
bs: contents,
}
if dst.info.Version >= protocolSrcAddrs {
p.src = c.key
}
// Attempt to queue for sending up to 3 times. On each attempt, if
// the queue is full, try to drop from queue head to prioritize
// fresher packets.
for attempt := 0; attempt < 3; attempt++ {
select {
case dst.sendQueue <- p:
return nil
default:
}
// If we cannot send to a destination, shut it down.
// Let its receive loop do the cleanup.
s.mu.Lock()
if s.clients[dstKey] == dst {
s.clients[dstKey].nc.Close()
select {
case <-dst.sendQueue:
s.packetsDropped.Add(1)
if debug {
c.logf("dropping packet from client %x queue head", dstKey)
}
default:
}
s.mu.Unlock()
}
// Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
s.packetsDropped.Add(1)
if debug {
c.logf("dropping packet from client %x queue tail", dstKey)
}
// Do not treat a send error as an error with this transmitting client.
return nil
}
func (s *Server) sendClientKeepAlives(ctx context.Context, c *sclient) {
if err := c.keepAliveLoop(ctx); err != nil {
c.logf("keep alive failed: %v", err)
}
}
func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error {
// TODO(crawshaw): implement policy constraints on who can use the DERP server
// TODO(bradfitz): ... and at what rate.
@ -418,32 +421,7 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.Public, info *cl
return clientKey, info, nil
}
func (s *Server) sendPacket(bw *bufio.Writer, dstInfo *clientInfo, srcKey key.Public, contents []byte) error {
s.packetsSent.Add(1)
s.bytesSent.Add(int64(len(contents)))
sendSrc := dstInfo.Version >= protocolSrcAddrs
pktLen := len(contents)
if sendSrc {
pktLen += len(srcKey)
}
if err := writeFrameHeader(bw, frameRecvPacket, uint32(pktLen)); err != nil {
return err
}
if sendSrc {
if _, err := bw.Write(srcKey[:]); err != nil {
return err
}
}
if _, err := bw.Write(contents); err != nil {
return err
}
return bw.Flush()
}
func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint32, limiter *rate.Limiter) (dstKey key.Public, contents []byte, err error) {
func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint32) (dstKey key.Public, contents []byte, err error) {
if frameLen < keyLen {
return key.Public{}, nil, errors.New("short send packet frame")
}
@ -454,9 +432,6 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
if packetLen > MaxPacketSize {
return key.Public{}, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
if err := limiter.WaitN(ctx, int(packetLen)); err != nil {
return key.Public{}, nil, fmt.Errorf("rate limit: %v", err)
}
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return key.Public{}, nil, err
@ -470,23 +445,30 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
s *Server
nc Conn
key key.Public
info clientInfo
logf logger.Logf
limiter *rate.Limiter
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
// Static after construction.
s *Server
nc Conn
key key.Public
info clientInfo
logf logger.Logf
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
preferred bool
keepAliveTimer *time.Timer
keepAliveReset chan struct{}
// Owned by sender, not thread-safe.
bw *bufio.Writer
preferred bool
mu sync.RWMutex // guards access to sendQueue for shutdown.
sendQueue chan pkt // packets queued to this client
}
mu sync.Mutex // mu guards writing to bw
br *bufio.Reader
bw *bufio.Writer
type pkt struct {
src key.Public
bs []byte
// TODO(danderson): enqueue time, to measure queue latency?
}
func (c *sclient) setPreferred(v bool) {
@ -514,43 +496,116 @@ func (c *sclient) setPreferred(v bool) {
}
}
func (c *sclient) keepAliveLoop(ctx context.Context) error {
func (c *sclient) sender(ctx context.Context) {
// If the sender shuts down unilaterally due to an error, close so
// that the receive loop unblocks and cleans up the rest.
defer c.nc.Close()
if err := c.sendLoop(ctx); err != nil {
c.logf("sender failed: %v", err)
}
}
func (c *sclient) sendLoop(ctx context.Context) error {
c.mu.RLock()
queue := c.sendQueue
c.mu.RUnlock()
if queue == nil {
// Uncommon race, sclient shut down before this loop ever
// started. Nothing to do here, move along.
return nil
}
defer func() {
// Drain the send queue to count dropped packets
for {
_, ok := <-queue
if !ok {
break
}
c.s.packetsDropped.Add(1)
if debug {
c.logf("dropping packet for shutdown %x", c.key)
}
}
}()
jitterMs, err := crand.Int(crand.Reader, big.NewInt(5000))
if err != nil {
panic(err)
}
jitter := time.Duration(jitterMs.Int64()) * time.Millisecond
c.keepAliveTimer = time.NewTimer(keepAlive + jitter)
defer c.keepAliveTimer.Stop()
keepAliveTick := time.NewTicker(keepAlive + jitter)
defer keepAliveTick.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-c.keepAliveReset:
if c.keepAliveTimer.Stop() {
<-c.keepAliveTimer.C
}
c.keepAliveTimer.Reset(keepAlive + jitter)
case <-c.keepAliveTimer.C:
c.mu.Lock()
if c.s.WriteTimeout != 0 {
c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
case pkt, ok := <-queue:
if !ok {
return nil
}
err := writeFrame(c.bw, frameKeepAlive, nil)
if err == nil {
err = c.bw.Flush()
if err := c.sendPacket(pkt.src, pkt.bs); err != nil {
return err
}
c.mu.Unlock()
if err != nil {
c.nc.Close()
case <-keepAliveTick.C:
if err := c.sendKeepalive(); err != nil {
return err
}
}
}
}
func (c *sclient) sendKeepalive() error {
if c.s.WriteTimeout != 0 {
c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
}
if err := writeFrame(c.bw, frameKeepAlive, nil); err != nil {
return err
}
return c.bw.Flush()
}
func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
defer func() {
// Stats update.
if err != nil {
c.s.packetsDropped.Add(1)
if debug {
c.logf("dropping packet to %x: %v", c.key, err)
}
} else {
c.s.packetsSent.Add(1)
c.s.bytesSent.Add(int64(len(contents)))
}
}()
if c.s.WriteTimeout != 0 {
c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
}
withKey := !srcKey.IsZero()
pktLen := len(contents)
if withKey {
pktLen += len(srcKey)
}
if err = writeFrameHeader(c.bw, frameRecvPacket, uint32(pktLen)); err != nil {
return err
}
if withKey {
if _, err = c.bw.Write(srcKey[:]); err != nil {
return err
}
}
if _, err = c.bw.Write(contents); err != nil {
return err
}
return c.bw.Flush()
}
func (s *Server) expVarFunc(f func() interface{}) expvar.Func {
return expvar.Func(func() interface{} {
s.mu.Lock()

Loading…
Cancel
Save