diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go
index 2854ca586..7e0561f84 100644
--- a/cmd/derper/derper.go
+++ b/cmd/derper/derper.go
@@ -40,7 +40,6 @@ var (
configPath = flag.String("c", "", "config file path")
certDir = flag.String("certdir", tsweb.DefaultCertDir("derper-certs"), "directory to store LetsEncrypt certs, if addr's port is :443")
hostname = flag.String("hostname", "derp.tailscale.com", "LetsEncrypt host name, if addr's port is :443")
- mbps = flag.Int("mbps", 5, "Mbps (mebibit/s) per-client rate limit; 0 means unlimited")
logCollection = flag.String("logcollection", "", "If non-empty, logtail collection to log to")
runSTUN = flag.Bool("stun", false, "also run a STUN server")
)
@@ -120,9 +119,6 @@ func main() {
s := derp.NewServer(key.Private(cfg.PrivateKey), log.Printf)
s.WriteTimeout = 2 * time.Second
- if *mbps != 0 {
- s.BytesPerSecond = (*mbps << 20) / 8
- }
expvar.Publish("derp", s.ExpVar())
// Create our own mux so we don't expose /debug/ stuff to the world.
@@ -196,7 +192,6 @@ func debugHandler(s *derp.Server) http.Handler {
`)
f("- Hostname: %v
\n", *hostname)
- f("- Rate Limit: %v Mbps
\n", *mbps)
f("- Uptime: %v
\n", tsweb.Uptime())
f(`- /debug/vars (Go)
diff --git a/derp/derp_server.go b/derp/derp_server.go
index ebbb6db56..8d5505885 100644
--- a/derp/derp_server.go
+++ b/derp/derp_server.go
@@ -24,7 +24,6 @@ import (
"time"
"golang.org/x/crypto/nacl/box"
- "golang.org/x/time/rate"
"tailscale.com/metrics"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@@ -32,12 +31,10 @@ import (
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
+const perClientSendQueueDepth = 32 // packets buffered for sending
+
// Server is a DERP server.
type Server struct {
- // BytesPerSecond, if non-zero, specifies how many bytes per
- // second to cap per-client reads at.
- BytesPerSecond int
-
// WriteTimeout, if non-zero, specifies how long to wait
// before failing when writing to a client.
WriteTimeout time.Duration
@@ -201,13 +198,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
return fmt.Errorf("client %x rejected: %v", clientKey, err)
}
- lim := rate.Inf
- if s.BytesPerSecond != 0 {
- lim = rate.Limit(s.BytesPerSecond)
- }
- const burstBytes = 1 << 20 // generous bandwidth delay product? must be over 64k max packet size.
- limiter := rate.NewLimiter(lim, burstBytes)
-
// At this point we trust the client so we don't time out.
nc.SetDeadline(time.Time{})
@@ -217,24 +207,17 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
nc: nc,
br: br,
bw: bw,
- limiter: limiter,
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
remoteAddr: remoteAddr,
connectedAt: time.Now(),
+ sendQueue: make(chan pkt, perClientSendQueueDepth),
}
if clientInfo != nil {
c.info = *clientInfo
}
- // Once the client is registered, it can start receiving
- // traffic, but we want to make sure the first thing it
- // receives after its frameClientInfo is our frameServerInfo,
- // so acquire the c.mu lock (which guards writing to c.bw)
- // while we register.
- c.mu.Lock()
s.registerClient(c)
err = s.sendServerInfo(bw, clientKey)
- c.mu.Unlock()
if err != nil {
return fmt.Errorf("send server info: %v", err)
}
@@ -244,11 +227,18 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
}
func (c *sclient) run() error {
- s := c.s
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ defer func() {
+ // Atomically close+remove send queue, so racing writers don't
+ // send to closed channel.
+ c.mu.Lock()
+ close(c.sendQueue)
+ c.sendQueue = nil
+ c.mu.Unlock()
+ }()
- go s.sendClientKeepAlives(ctx, c)
+ go c.sender(ctx)
for {
ft, fl, err := readFrameHeader(c.br)
@@ -270,9 +260,6 @@ func (c *sclient) run() error {
}
func (c *sclient) handleUnknownFrame(ctx context.Context, ft frameType, fl uint32) error {
- if err := c.limiter.WaitN(ctx, int(fl)); err != nil {
- return fmt.Errorf("rate limit: %v", err)
- }
_, err := io.CopyN(ioutil.Discard, c.br, int64(fl))
return err
}
@@ -292,7 +279,7 @@ func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
func (c *sclient) handleFrameSendPacket(ctx context.Context, ft frameType, fl uint32) error {
s := c.s
- dstKey, contents, err := s.recvPacket(ctx, c.br, fl, c.limiter)
+ dstKey, contents, err := s.recvPacket(ctx, c.br, fl)
if err != nil {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
@@ -309,35 +296,51 @@ func (c *sclient) handleFrameSendPacket(ctx context.Context, ft frameType, fl ui
return nil
}
- dst.mu.Lock()
- if s.WriteTimeout != 0 {
- dst.nc.SetWriteDeadline(time.Now().Add(s.WriteTimeout))
+ dst.mu.RLock()
+ defer dst.mu.RUnlock()
+ if dst.sendQueue == nil {
+ s.packetsDropped.Add(1)
+ if debug {
+ c.logf("dropping packet for shutdown client %x", dstKey)
+ }
}
- err = s.sendPacket(dst.bw, &dst.info, c.key, contents)
- dst.mu.Unlock()
- if err != nil {
- c.logf("write error sending packet to %x: %v", dstKey, err)
+ p := pkt{
+ bs: contents,
+ }
+ if dst.info.Version >= protocolSrcAddrs {
+ p.src = c.key
+ }
+ // Attempt to queue for sending up to 3 times. On each attempt, if
+ // the queue is full, try to drop from queue head to prioritize
+ // fresher packets.
+ for attempt := 0; attempt < 3; attempt++ {
+ select {
+ case dst.sendQueue <- p:
+ return nil
+ default:
+ }
- // If we cannot send to a destination, shut it down.
- // Let its receive loop do the cleanup.
- s.mu.Lock()
- if s.clients[dstKey] == dst {
- s.clients[dstKey].nc.Close()
+ select {
+ case <-dst.sendQueue:
+ s.packetsDropped.Add(1)
+ if debug {
+ c.logf("dropping packet from client %x queue head", dstKey)
+ }
+ default:
}
- s.mu.Unlock()
+ }
+ // Failed to make room for packet. This can happen in a heavily
+ // contended queue with racing writers. Give up and tail-drop in
+ // this case to keep reader unblocked.
+ s.packetsDropped.Add(1)
+ if debug {
+ c.logf("dropping packet from client %x queue tail", dstKey)
}
- // Do not treat a send error as an error with this transmitting client.
return nil
}
-func (s *Server) sendClientKeepAlives(ctx context.Context, c *sclient) {
- if err := c.keepAliveLoop(ctx); err != nil {
- c.logf("keep alive failed: %v", err)
- }
-}
-
func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error {
// TODO(crawshaw): implement policy constraints on who can use the DERP server
// TODO(bradfitz): ... and at what rate.
@@ -418,32 +421,7 @@ func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.Public, info *cl
return clientKey, info, nil
}
-func (s *Server) sendPacket(bw *bufio.Writer, dstInfo *clientInfo, srcKey key.Public, contents []byte) error {
- s.packetsSent.Add(1)
- s.bytesSent.Add(int64(len(contents)))
-
- sendSrc := dstInfo.Version >= protocolSrcAddrs
-
- pktLen := len(contents)
- if sendSrc {
- pktLen += len(srcKey)
- }
-
- if err := writeFrameHeader(bw, frameRecvPacket, uint32(pktLen)); err != nil {
- return err
- }
- if sendSrc {
- if _, err := bw.Write(srcKey[:]); err != nil {
- return err
- }
- }
- if _, err := bw.Write(contents); err != nil {
- return err
- }
- return bw.Flush()
-}
-
-func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint32, limiter *rate.Limiter) (dstKey key.Public, contents []byte, err error) {
+func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint32) (dstKey key.Public, contents []byte, err error) {
if frameLen < keyLen {
return key.Public{}, nil, errors.New("short send packet frame")
}
@@ -454,9 +432,6 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
if packetLen > MaxPacketSize {
return key.Public{}, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
- if err := limiter.WaitN(ctx, int(packetLen)); err != nil {
- return key.Public{}, nil, fmt.Errorf("rate limit: %v", err)
- }
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return key.Public{}, nil, err
@@ -470,23 +445,30 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
- s *Server
- nc Conn
- key key.Public
- info clientInfo
- logf logger.Logf
- limiter *rate.Limiter
- remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
+ // Static after construction.
+ s *Server
+ nc Conn
+ key key.Public
+ info clientInfo
+ logf logger.Logf
+ remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
+
+ // Owned by run, not thread-safe.
+ br *bufio.Reader
connectedAt time.Time
+ preferred bool
- keepAliveTimer *time.Timer
- keepAliveReset chan struct{}
+ // Owned by sender, not thread-safe.
+ bw *bufio.Writer
- preferred bool
+ mu sync.RWMutex // guards access to sendQueue for shutdown.
+ sendQueue chan pkt // packets queued to this client
+}
- mu sync.Mutex // mu guards writing to bw
- br *bufio.Reader
- bw *bufio.Writer
+type pkt struct {
+ src key.Public
+ bs []byte
+ // TODO(danderson): enqueue time, to measure queue latency?
}
func (c *sclient) setPreferred(v bool) {
@@ -514,43 +496,116 @@ func (c *sclient) setPreferred(v bool) {
}
}
-func (c *sclient) keepAliveLoop(ctx context.Context) error {
+func (c *sclient) sender(ctx context.Context) {
+ // If the sender shuts down unilaterally due to an error, close so
+ // that the receive loop unblocks and cleans up the rest.
+ defer c.nc.Close()
+ if err := c.sendLoop(ctx); err != nil {
+ c.logf("sender failed: %v", err)
+ }
+}
+
+func (c *sclient) sendLoop(ctx context.Context) error {
+ c.mu.RLock()
+ queue := c.sendQueue
+ c.mu.RUnlock()
+
+ if queue == nil {
+ // Uncommon race, sclient shut down before this loop ever
+ // started. Nothing to do here, move along.
+ return nil
+ }
+
+ defer func() {
+ // Drain the send queue to count dropped packets
+ for {
+ _, ok := <-queue
+ if !ok {
+ break
+ }
+ c.s.packetsDropped.Add(1)
+ if debug {
+ c.logf("dropping packet for shutdown %x", c.key)
+ }
+ }
+ }()
+
jitterMs, err := crand.Int(crand.Reader, big.NewInt(5000))
if err != nil {
panic(err)
}
jitter := time.Duration(jitterMs.Int64()) * time.Millisecond
- c.keepAliveTimer = time.NewTimer(keepAlive + jitter)
- defer c.keepAliveTimer.Stop()
+ keepAliveTick := time.NewTicker(keepAlive + jitter)
+ defer keepAliveTick.Stop()
for {
select {
case <-ctx.Done():
return nil
- case <-c.keepAliveReset:
- if c.keepAliveTimer.Stop() {
- <-c.keepAliveTimer.C
- }
- c.keepAliveTimer.Reset(keepAlive + jitter)
- case <-c.keepAliveTimer.C:
- c.mu.Lock()
- if c.s.WriteTimeout != 0 {
- c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
+
+ case pkt, ok := <-queue:
+ if !ok {
+ return nil
}
- err := writeFrame(c.bw, frameKeepAlive, nil)
- if err == nil {
- err = c.bw.Flush()
+ if err := c.sendPacket(pkt.src, pkt.bs); err != nil {
+ return err
}
- c.mu.Unlock()
- if err != nil {
- c.nc.Close()
+ case <-keepAliveTick.C:
+ if err := c.sendKeepalive(); err != nil {
return err
}
}
}
}
+func (c *sclient) sendKeepalive() error {
+ if c.s.WriteTimeout != 0 {
+ c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
+ }
+ if err := writeFrame(c.bw, frameKeepAlive, nil); err != nil {
+ return err
+ }
+ return c.bw.Flush()
+}
+
+func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
+ defer func() {
+ // Stats update.
+ if err != nil {
+ c.s.packetsDropped.Add(1)
+ if debug {
+ c.logf("dropping packet to %x: %v", c.key, err)
+ }
+ } else {
+ c.s.packetsSent.Add(1)
+ c.s.bytesSent.Add(int64(len(contents)))
+ }
+ }()
+
+ if c.s.WriteTimeout != 0 {
+ c.nc.SetWriteDeadline(time.Now().Add(c.s.WriteTimeout))
+ }
+
+ withKey := !srcKey.IsZero()
+ pktLen := len(contents)
+ if withKey {
+ pktLen += len(srcKey)
+ }
+ if err = writeFrameHeader(c.bw, frameRecvPacket, uint32(pktLen)); err != nil {
+ return err
+ }
+ if withKey {
+ if _, err = c.bw.Write(srcKey[:]); err != nil {
+ return err
+ }
+ }
+ if _, err = c.bw.Write(contents); err != nil {
+ return err
+ }
+ return c.bw.Flush()
+}
+
func (s *Server) expVarFunc(f func() interface{}) expvar.Func {
return expvar.Func(func() interface{} {
s.mu.Lock()