You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tailscale/derp/derp_server.go

1621 lines
47 KiB
Go

// Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package derp
// TODO(crawshaw): with predefined serverKey in clients and HMAC on packets we could skip TLS
import (
"bufio"
"context"
"crypto/ed25519"
crand "crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math"
"math/big"
"math/rand"
"net/http"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go4.org/mem"
"golang.org/x/crypto/nacl/box"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"inet.af/netaddr"
"tailscale.com/client/tailscale"
"tailscale.com/disco"
"tailscale.com/metrics"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/version"
)
var debug, _ = strconv.ParseBool(os.Getenv("DERP_DEBUG_LOGS"))
// verboseDropKeys is the set of destination public keys that should
// verbosely log whenever DERP drops a packet.
var verboseDropKeys = map[key.Public]bool{}
func init() {
keys := os.Getenv("TS_DEBUG_VERBOSE_DROPS")
if keys == "" {
return
}
for _, keyStr := range strings.Split(keys, ",") {
k, err := key.NewPublicFromHexMem(mem.S(keyStr))
if err != nil {
log.Printf("ignoring invalid debug key %q: %v", keyStr, err)
} else {
verboseDropKeys[k] = true
}
}
}
func init() {
rand.Seed(time.Now().UnixNano())
}
const (
perClientSendQueueDepth = 32 // packets buffered for sending
writeTimeout = 2 * time.Second
)
const host64bit = (^uint(0) >> 32) & 1 // 1 on 64-bit, 0 on 32-bit
// pad32bit is 4 on 32-bit machines and 0 on 64-bit.
// It exists so the Server struct's atomic fields can be aligned to 8
// byte boundaries. (As tested by GOARCH=386 go test, etc)
const pad32bit = 4 - host64bit*4 // 0 on 64-bit, 4 on 32-bit
// Server is a DERP server.
type Server struct {
// WriteTimeout, if non-zero, specifies how long to wait
// before failing when writing to a client.
WriteTimeout time.Duration
privateKey key.Private
publicKey key.Public
logf logger.Logf
memSys0 uint64 // runtime.MemStats.Sys at start (or early-ish)
meshKey string
limitedLogf logger.Logf
metaCert []byte // the encoded x509 cert to send after LetsEncrypt cert+intermediate
// Counters:
_ [pad32bit]byte
packetsSent, bytesSent expvar.Int
packetsRecv, bytesRecv expvar.Int
packetsRecvByKind metrics.LabelMap
packetsRecvDisco *expvar.Int
packetsRecvOther *expvar.Int
_ [pad32bit]byte
packetsDropped expvar.Int
packetsDroppedReason metrics.LabelMap
packetsDroppedReasonCounters []*expvar.Int // indexed by dropReason
packetsDroppedType metrics.LabelMap
packetsDroppedTypeDisco *expvar.Int
packetsDroppedTypeOther *expvar.Int
_ [pad32bit]byte
packetsForwardedOut expvar.Int
packetsForwardedIn expvar.Int
peerGoneFrames expvar.Int // number of peer gone frames sent
accepts expvar.Int
curClients expvar.Int
curHomeClients expvar.Int // ones with preferred
clientsReplaced expvar.Int
clientsReplaceLimited expvar.Int
clientsReplaceSleeping expvar.Int
unknownFrames expvar.Int
homeMovesIn expvar.Int // established clients announce home server moves in
homeMovesOut expvar.Int // established clients announce home server moves out
multiForwarderCreated expvar.Int
multiForwarderDeleted expvar.Int
removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically
// verifyClients only accepts client connections to the DERP server if the clientKey is a
// known peer in the network, as specified by a running tailscaled's client's local api.
verifyClients bool
mu sync.Mutex
closed bool
netConns map[Conn]chan struct{} // chan is closed when conn closes
clients map[key.Public]*sclient
watchers map[*sclient]bool // mesh peer -> true
// clientsMesh tracks all clients in the cluster, both locally
// and to mesh peers. If the value is nil, that means the
// peer is only local (and thus in the clients Map, but not
// remote). If the value is non-nil, it's remote (+ maybe also
// local).
clientsMesh map[key.Public]PacketForwarder
// sentTo tracks which peers have sent to which other peers,
// and at which connection number. This isn't on sclient
// because it includes intra-region forwarded packets as the
// src.
sentTo map[key.Public]map[key.Public]int64 // src => dst => dst's latest sclient.connNum
// maps from netaddr.IPPort to a client's public key
keyOfAddr map[netaddr.IPPort]key.Public
}
// PacketForwarder is something that can forward packets.
//
// It's mostly an inteface for circular dependency reasons; the
// typical implementation is derphttp.Client. The other implementation
// is a multiForwarder, which this package creates as needed if a
// public key gets more than one PacketForwarder registered for it.
type PacketForwarder interface {
ForwardPacket(src, dst key.Public, payload []byte) error
}
// Conn is the subset of the underlying net.Conn the DERP Server needs.
// It is a defined type so that non-net connections can be used.
type Conn interface {
io.WriteCloser
// The *Deadline methods follow the semantics of net.Conn.
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}
// NewServer returns a new DERP server. It doesn't listen on its own.
// Connections are given to it via Server.Accept.
func NewServer(privateKey key.Private, logf logger.Logf) *Server {
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
s := &Server{
privateKey: privateKey,
publicKey: privateKey.Public(),
logf: logf,
limitedLogf: logger.RateLimitedFn(logf, 30*time.Second, 5, 100),
packetsRecvByKind: metrics.LabelMap{Label: "kind"},
packetsDroppedReason: metrics.LabelMap{Label: "reason"},
packetsDroppedType: metrics.LabelMap{Label: "type"},
clients: map[key.Public]*sclient{},
clientsMesh: map[key.Public]PacketForwarder{},
netConns: map[Conn]chan struct{}{},
memSys0: ms.Sys,
watchers: map[*sclient]bool{},
sentTo: map[key.Public]map[key.Public]int64{},
avgQueueDuration: new(uint64),
keyOfAddr: map[netaddr.IPPort]key.Public{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
s.packetsRecvOther = s.packetsRecvByKind.Get("other")
s.packetsDroppedReasonCounters = []*expvar.Int{
s.packetsDroppedReason.Get("unknown_dest"),
s.packetsDroppedReason.Get("unknown_dest_on_fwd"),
s.packetsDroppedReason.Get("gone"),
s.packetsDroppedReason.Get("queue_head"),
s.packetsDroppedReason.Get("queue_tail"),
s.packetsDroppedReason.Get("write_error"),
}
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
return s
}
// SetMesh sets the pre-shared key that regional DERP servers used to mesh
// amongst themselves.
//
// It must be called before serving begins.
func (s *Server) SetMeshKey(v string) {
s.meshKey = v
}
// SetVerifyClients sets whether this DERP server verifies clients through tailscaled.
//
// It must be called before serving begins.
func (s *Server) SetVerifyClient(v bool) {
s.verifyClients = v
}
// HasMeshKey reports whether the server is configured with a mesh key.
func (s *Server) HasMeshKey() bool { return s.meshKey != "" }
// MeshKey returns the configured mesh key, if any.
func (s *Server) MeshKey() string { return s.meshKey }
// PrivateKey returns the server's private key.
func (s *Server) PrivateKey() key.Private { return s.privateKey }
// PublicKey returns the server's public key.
func (s *Server) PublicKey() key.Public { return s.publicKey }
// Close closes the server and waits for the connections to disconnect.
func (s *Server) Close() error {
s.mu.Lock()
wasClosed := s.closed
s.closed = true
s.mu.Unlock()
if wasClosed {
return nil
}
var closedChs []chan struct{}
s.mu.Lock()
for nc, closed := range s.netConns {
nc.Close()
closedChs = append(closedChs, closed)
}
s.mu.Unlock()
for _, closed := range closedChs {
<-closed
}
return nil
}
func (s *Server) isClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}
// Accept adds a new connection to the server and serves it.
//
// The provided bufio ReadWriter must be already connected to nc.
// Accept blocks until the Server is closed or the connection closes
// on its own.
//
// Accept closes nc.
func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
closed := make(chan struct{})
s.mu.Lock()
s.accepts.Add(1) // while holding s.mu for connNum read on next line
connNum := s.accepts.Value() // expvar sadly doesn't return new value on Add(1)
s.netConns[nc] = closed
s.mu.Unlock()
defer func() {
nc.Close()
close(closed)
s.mu.Lock()
delete(s.netConns, nc)
s.mu.Unlock()
}()
if err := s.accept(nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() {
s.logf("derp: %s: %v", remoteAddr, err)
}
}
// initMetacert initialized s.metaCert with a self-signed x509 cert
// encoding this server's public key and protocol version. cmd/derper
// then sends this after the Let's Encrypt leaf + intermediate certs
// after the ServerHello (encrypted in TLS 1.3, not that it matters
// much).
//
// Then the client can save a round trip getting that and can start
// speaking DERP right away. (We don't use ALPN because that's sent in
// the clear and we're being paranoid to not look too weird to any
// middleboxes, given that DERP is an ultimate fallback path). But
// since the post-ServerHello certs are encrypted we can have the
// client also use them as a signal to be able to start speaking DERP
// right away, starting with its identity proof, encrypted to the
// server's public key.
//
// This RTT optimization fails where there's a corp-mandated
// TLS proxy with corp-mandated root certs on employee machines and
// and TLS proxy cleans up unnecessary certs. In that case we just fall
// back to the extra RTT.
func (s *Server) initMetacert() {
pub, priv, err := ed25519.GenerateKey(crand.Reader)
if err != nil {
log.Fatal(err)
}
tmpl := &x509.Certificate{
SerialNumber: big.NewInt(ProtocolVersion),
Subject: pkix.Name{
CommonName: fmt.Sprintf("derpkey%x", s.publicKey[:]),
},
// Windows requires NotAfter and NotBefore set:
NotAfter: time.Now().Add(30 * 24 * time.Hour),
NotBefore: time.Now().Add(-30 * 24 * time.Hour),
}
cert, err := x509.CreateCertificate(crand.Reader, tmpl, tmpl, pub, priv)
if err != nil {
log.Fatalf("CreateCertificate: %v", err)
}
s.metaCert = cert
}
// MetaCert returns the server metadata cert that can be sent by the
// TLS server to let the client skip a round trip during start-up.
func (s *Server) MetaCert() []byte { return s.metaCert }
// registerClient notes that client c is now authenticated and ready for packets.
//
// If c's public key was already connected with a different
// connection, the prior one is closed, unless it's fighting rapidly
// with another client with the same key, in which case the returned
// ok is false, and the caller should wait the provided duration
// before trying again.
func (s *Server) registerClient(c *sclient) (ok bool, d time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
old := s.clients[c.key]
if old == nil {
c.logf("adding connection")
} else {
// Take over the old rate limiter, discarding the one
// our caller just made.
c.replaceLimiter = old.replaceLimiter
if rr := c.replaceLimiter.ReserveN(timeNow(), 1); rr.OK() {
if d := rr.DelayFrom(timeNow()); d > 0 {
s.clientsReplaceLimited.Add(1)
return false, d
}
}
s.clientsReplaced.Add(1)
c.logf("adding connection, replacing %s", old.remoteAddr)
go old.nc.Close()
}
s.clients[c.key] = c
if _, ok := s.clientsMesh[c.key]; !ok {
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
}
s.keyOfAddr[c.remoteIPPort] = c.key
s.curClients.Add(1)
s.broadcastPeerStateChangeLocked(c.key, true)
return true, 0
}
// broadcastPeerStateChangeLocked enqueues a message to all watchers
// (other DERP nodes in the region, or trusted clients) that peer's
// presence changed.
//
// s.mu must be held.
func (s *Server) broadcastPeerStateChangeLocked(peer key.Public, present bool) {
for w := range s.watchers {
w.peerStateChange = append(w.peerStateChange, peerConnState{peer: peer, present: present})
go w.requestMeshUpdate()
}
}
// unregisterClient removes a client from the server.
func (s *Server) unregisterClient(c *sclient) {
s.mu.Lock()
defer s.mu.Unlock()
cur := s.clients[c.key]
if cur == c {
c.logf("removing connection")
delete(s.clients, c.key)
if v, ok := s.clientsMesh[c.key]; ok && v == nil {
delete(s.clientsMesh, c.key)
s.notePeerGoneFromRegionLocked(c.key)
}
s.broadcastPeerStateChangeLocked(c.key, false)
}
if c.canMesh {
delete(s.watchers, c)
}
delete(s.keyOfAddr, c.remoteIPPort)
s.curClients.Add(-1)
if c.preferred {
s.curHomeClients.Add(-1)
}
}
// notePeerGoneFromRegionLocked sends peerGone frames to parties that
// key has sent to previously (whether those sends were from a local
// client or forwarded). It must only be called after the key has
// been removed from clientsMesh.
func (s *Server) notePeerGoneFromRegionLocked(key key.Public) {
if _, ok := s.clientsMesh[key]; ok {
panic("usage")
}
// Find still-connected peers and either notify that we've gone away
// so they can drop their route entries to us (issue 150)
// or move them over to the active client (in case a replaced client
// connection is being unregistered).
for pubKey, connNum := range s.sentTo[key] {
if peer, ok := s.clients[pubKey]; ok && peer.connNum == connNum {
go peer.requestPeerGoneWrite(key)
}
}
delete(s.sentTo, key)
}
func (s *Server) addWatcher(c *sclient) {
if !c.canMesh {
panic("invariant: addWatcher called without permissions")
}
if c.key == s.publicKey {
// We're connecting to ourself. Do nothing.
return
}
s.mu.Lock()
defer s.mu.Unlock()
// Queue messages for each already-connected client.
for peer := range s.clients {
c.peerStateChange = append(c.peerStateChange, peerConnState{peer: peer, present: true})
}
// And enroll the watcher in future updates (of both
// connections & disconnections).
s.watchers[c] = true
go c.requestMeshUpdate()
}
func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error {
br := brw.Reader
nc.SetDeadline(time.Now().Add(10 * time.Second))
bw := &lazyBufioWriter{w: nc, lbw: brw.Writer}
if err := s.sendServerKey(bw); err != nil {
return fmt.Errorf("send server key: %v", err)
}
nc.SetDeadline(time.Now().Add(10 * time.Second))
clientKey, clientInfo, err := s.recvClientKey(br)
if err != nil {
return fmt.Errorf("receive client key: %v", err)
}
if err := s.verifyClient(clientKey, clientInfo); err != nil {
return fmt.Errorf("client %x rejected: %v", clientKey, err)
}
// At this point we trust the client so we don't time out.
nc.SetDeadline(time.Time{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr)
c := &sclient{
connNum: connNum,
s: s,
key: clientKey,
nc: nc,
br: br,
bw: bw,
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
done: ctx.Done(),
remoteAddr: remoteAddr,
remoteIPPort: remoteIPPort,
connectedAt: time.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
peerGone: make(chan key.Public),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
// Allow kicking out previous connections once a
// minute, with a very high burst of 100. Once a
// minute is less than the client's 2 minute
// inactivity timeout.
replaceLimiter: rate.NewLimiter(rate.Every(time.Minute), 100),
}
if c.canMesh {
c.meshUpdate = make(chan struct{})
}
if clientInfo != nil {
c.info = *clientInfo
}
for {
ok, d := s.registerClient(c)
if ok {
break
}
s.clientsReplaceSleeping.Add(1)
timeSleep(d)
s.clientsReplaceSleeping.Add(-1)
}
defer s.unregisterClient(c)
err = s.sendServerInfo(c.bw, clientKey)
if err != nil {
return fmt.Errorf("send server info: %v", err)
}
return c.run(ctx)
}
// for testing
var (
timeSleep = time.Sleep
timeNow = time.Now
)
// run serves the client until there's an error.
// If the client hangs up or the server is closed, run returns nil, otherwise run returns an error.
func (c *sclient) run(ctx context.Context) error {
// Launch sender, but don't return from run until sender goroutine is done.
var grp errgroup.Group
sendCtx, cancelSender := context.WithCancel(ctx)
grp.Go(func() error { return c.sendLoop(sendCtx) })
defer func() {
cancelSender()
if err := grp.Wait(); err != nil && !c.s.isClosed() {
c.logf("sender failed: %v", err)
}
}()
for {
ft, fl, err := readFrameHeader(c.br)
if err != nil {
if errors.Is(err, io.EOF) {
c.logf("read EOF")
return nil
}
if c.s.isClosed() {
c.logf("closing; server closed")
return nil
}
return fmt.Errorf("client %x: readFrameHeader: %w", c.key, err)
}
switch ft {
case frameNotePreferred:
err = c.handleFrameNotePreferred(ft, fl)
case frameSendPacket:
err = c.handleFrameSendPacket(ft, fl)
case frameForwardPacket:
err = c.handleFrameForwardPacket(ft, fl)
case frameWatchConns:
err = c.handleFrameWatchConns(ft, fl)
case frameClosePeer:
err = c.handleFrameClosePeer(ft, fl)
default:
err = c.handleUnknownFrame(ft, fl)
}
if err != nil {
return err
}
}
}
func (c *sclient) handleUnknownFrame(ft frameType, fl uint32) error {
_, err := io.CopyN(ioutil.Discard, c.br, int64(fl))
return err
}
func (c *sclient) handleFrameNotePreferred(ft frameType, fl uint32) error {
if fl != 1 {
return fmt.Errorf("frameNotePreferred wrong size")
}
v, err := c.br.ReadByte()
if err != nil {
return fmt.Errorf("frameNotePreferred ReadByte: %v", err)
}
c.setPreferred(v != 0)
return nil
}
func (c *sclient) handleFrameWatchConns(ft frameType, fl uint32) error {
if fl != 0 {
return fmt.Errorf("handleFrameWatchConns wrong size")
}
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
c.s.addWatcher(c)
return nil
}
func (c *sclient) handleFrameClosePeer(ft frameType, fl uint32) error {
if fl != keyLen {
return fmt.Errorf("handleFrameClosePeer wrong size")
}
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
var targetKey key.Public
if _, err := io.ReadFull(c.br, targetKey[:]); err != nil {
return err
}
s := c.s
s.mu.Lock()
defer s.mu.Unlock()
if target, ok := s.clients[targetKey]; ok {
c.logf("frameClosePeer closing peer %x", targetKey)
go target.nc.Close()
} else {
c.logf("frameClosePeer failed to find peer %x", targetKey)
}
return nil
}
// handleFrameForwardPacket reads a "forward packet" frame from the client
// (which must be a trusted client, a peer in our mesh).
func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
if !c.canMesh {
return fmt.Errorf("insufficient permissions")
}
s := c.s
srcKey, dstKey, contents, err := s.recvForwardPacket(c.br, fl)
if err != nil {
return fmt.Errorf("client %x: recvForwardPacket: %v", c.key, err)
}
s.packetsForwardedIn.Add(1)
s.mu.Lock()
dst := s.clients[dstKey]
if dst != nil {
s.notePeerSendLocked(srcKey, dst)
}
s.mu.Unlock()
if dst == nil {
s.recordDrop(contents, srcKey, dstKey, dropReasonUnknownDestOnFwd)
return nil
}
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
src: srcKey,
})
}
// notePeerSendLocked records that src sent to dst. We keep track of
// that so when src disconnects, we can tell dst (if it's still
// around) that src is gone (a peerGone frame).
func (s *Server) notePeerSendLocked(src key.Public, dst *sclient) {
m, ok := s.sentTo[src]
if !ok {
m = map[key.Public]int64{}
s.sentTo[src] = m
}
m[dst.key] = dst.connNum
}
// handleFrameSendPacket reads a "send packet" frame from the client.
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s
dstKey, contents, err := s.recvPacket(c.br, fl)
if err != nil {
return fmt.Errorf("client %x: recvPacket: %v", c.key, err)
}
var fwd PacketForwarder
s.mu.Lock()
dst := s.clients[dstKey]
if dst == nil {
fwd = s.clientsMesh[dstKey]
} else {
s.notePeerSendLocked(c.key, dst)
}
s.mu.Unlock()
if dst == nil {
if fwd != nil {
s.packetsForwardedOut.Add(1)
if err := fwd.ForwardPacket(c.key, dstKey, contents); err != nil {
// TODO:
return nil
}
return nil
}
s.recordDrop(contents, c.key, dstKey, dropReasonUnknownDest)
return nil
}
p := pkt{
bs: contents,
enqueuedAt: time.Now(),
src: c.key,
}
return c.sendPkt(dst, p)
}
// dropReason is why we dropped a DERP frame.
type dropReason int
//go:generate go run tailscale.com/cmd/addlicense -year 2021 -file dropreason_string.go go run golang.org/x/tools/cmd/stringer -type=dropReason -trimprefix=dropReason
const (
dropReasonUnknownDest dropReason = iota // unknown destination pubkey
dropReasonUnknownDestOnFwd // unknown destination pubkey on a derp-forwarded packet
dropReasonGone // destination tailscaled disconnected before we could send
dropReasonQueueHead // destination queue is full, dropped packet at queue head
dropReasonQueueTail // destination queue is full, dropped packet at queue tail
dropReasonWriteError // OS write() failed
)
func (s *Server) recordDrop(packetBytes []byte, srcKey, dstKey key.Public, reason dropReason) {
s.packetsDropped.Add(1)
s.packetsDroppedReasonCounters[reason].Add(1)
if disco.LooksLikeDiscoWrapper(packetBytes) {
s.packetsDroppedTypeDisco.Add(1)
} else {
s.packetsDroppedTypeOther.Add(1)
}
if verboseDropKeys[dstKey] {
// Preformat the log string prior to calling limitedLogf. The
// limiter acts based on the format string, and we want to
// rate-limit per src/dst keys, not on the generic "dropped
// stuff" message.
msg := fmt.Sprintf("drop (%s) %s -> %s", srcKey.ShortString(), reason, dstKey.ShortString())
s.limitedLogf(msg)
}
if debug {
s.logf("dropping packet reason=%s dst=%s disco=%v", reason, dstKey, disco.LooksLikeDiscoWrapper(packetBytes))
}
}
func (c *sclient) sendPkt(dst *sclient, p pkt) error {
s := c.s
dstKey := dst.key
// Attempt to queue for sending up to 3 times. On each attempt, if
// the queue is full, try to drop from queue head to prioritize
// fresher packets.
sendQueue := dst.sendQueue
if disco.LooksLikeDiscoWrapper(p.bs) {
sendQueue = dst.discoSendQueue
}
for attempt := 0; attempt < 3; attempt++ {
select {
case <-dst.done:
s.recordDrop(p.bs, c.key, dstKey, dropReasonGone)
return nil
default:
}
select {
case sendQueue <- p:
return nil
default:
}
select {
case pkt := <-sendQueue:
s.recordDrop(pkt.bs, c.key, dstKey, dropReasonQueueHead)
c.recordQueueTime(pkt.enqueuedAt)
default:
}
}
// Failed to make room for packet. This can happen in a heavily
// contended queue with racing writers. Give up and tail-drop in
// this case to keep reader unblocked.
s.recordDrop(p.bs, c.key, dstKey, dropReasonQueueTail)
return nil
}
// requestPeerGoneWrite sends a request to write a "peer gone" frame
// that the provided peer has disconnected. It blocks until either the
// write request is scheduled, or the client has closed.
func (c *sclient) requestPeerGoneWrite(peer key.Public) {
select {
case c.peerGone <- peer:
case <-c.done:
}
}
func (c *sclient) requestMeshUpdate() {
if !c.canMesh {
panic("unexpected requestMeshUpdate")
}
select {
case c.meshUpdate <- struct{}{}:
case <-c.done:
}
}
func (s *Server) verifyClient(clientKey key.Public, info *clientInfo) error {
if !s.verifyClients {
return nil
}
status, err := tailscale.Status(context.TODO())
if err != nil {
return fmt.Errorf("failed to query local tailscaled status: %w", err)
}
if clientKey == status.Self.PublicKey {
return nil
}
if _, exists := status.Peer[clientKey]; !exists {
return fmt.Errorf("client %v not in set of peers", clientKey)
}
// TODO(bradfitz): add policy for configurable bandwidth rate per client?
return nil
}
func (s *Server) sendServerKey(lw *lazyBufioWriter) error {
buf := make([]byte, 0, len(magic)+len(s.publicKey))
buf = append(buf, magic...)
buf = append(buf, s.publicKey[:]...)
err := writeFrame(lw.bw(), frameServerKey, buf)
lw.Flush() // redundant (no-op) flush to release bufio.Writer
return err
}
type serverInfo struct {
Version int `json:"version,omitempty"`
}
func (s *Server) sendServerInfo(bw *lazyBufioWriter, clientKey key.Public) error {
var nonce [24]byte
if _, err := crand.Read(nonce[:]); err != nil {
return err
}
msg, err := json.Marshal(serverInfo{Version: ProtocolVersion})
if err != nil {
return err
}
msgbox := box.Seal(nil, msg, &nonce, clientKey.B32(), s.privateKey.B32())
if err := writeFrameHeader(bw.bw(), frameServerInfo, nonceLen+uint32(len(msgbox))); err != nil {
return err
}
if _, err := bw.Write(nonce[:]); err != nil {
return err
}
if _, err := bw.Write(msgbox); err != nil {
return err
}
return bw.Flush()
}
// recvClientKey reads the frameClientInfo frame from the client (its
// proof of identity) upon its initial connection. It should be
// considered especially untrusted at this point.
func (s *Server) recvClientKey(br *bufio.Reader) (clientKey key.Public, info *clientInfo, err error) {
fl, err := readFrameTypeHeader(br, frameClientInfo)
if err != nil {
return zpub, nil, err
}
const minLen = keyLen + nonceLen
if fl < minLen {
return zpub, nil, errors.New("short client info")
}
// We don't trust the client at all yet, so limit its input size to limit
// things like JSON resource exhausting (http://github.com/golang/go/issues/31789).
if fl > 256<<10 {
return zpub, nil, errors.New("long client info")
}
if _, err := io.ReadFull(br, clientKey[:]); err != nil {
return zpub, nil, err
}
var nonce [24]byte
if _, err := io.ReadFull(br, nonce[:]); err != nil {
return zpub, nil, fmt.Errorf("nonce: %v", err)
}
msgLen := int(fl - minLen)
msgbox := make([]byte, msgLen)
if _, err := io.ReadFull(br, msgbox); err != nil {
return zpub, nil, fmt.Errorf("msgbox: %v", err)
}
msg, ok := box.Open(nil, msgbox, &nonce, (*[32]byte)(&clientKey), s.privateKey.B32())
if !ok {
return zpub, nil, fmt.Errorf("msgbox: cannot open len=%d with client key %x", msgLen, clientKey[:])
}
info = new(clientInfo)
if err := json.Unmarshal(msg, info); err != nil {
return zpub, nil, fmt.Errorf("msg: %v", err)
}
return clientKey, info, nil
}
func (s *Server) recvPacket(br *bufio.Reader, frameLen uint32) (dstKey key.Public, contents []byte, err error) {
if frameLen < keyLen {
return zpub, nil, errors.New("short send packet frame")
}
if err := readPublicKey(br, &dstKey); err != nil {
return zpub, nil, err
}
packetLen := frameLen - keyLen
if packetLen > MaxPacketSize {
return zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return zpub, nil, err
}
s.packetsRecv.Add(1)
s.bytesRecv.Add(int64(len(contents)))
if disco.LooksLikeDiscoWrapper(contents) {
s.packetsRecvDisco.Add(1)
} else {
s.packetsRecvOther.Add(1)
}
return dstKey, contents, nil
}
// zpub is the key.Public zero value.
var zpub key.Public
func (s *Server) recvForwardPacket(br *bufio.Reader, frameLen uint32) (srcKey, dstKey key.Public, contents []byte, err error) {
if frameLen < keyLen*2 {
return zpub, zpub, nil, errors.New("short send packet frame")
}
if _, err := io.ReadFull(br, srcKey[:]); err != nil {
return zpub, zpub, nil, err
}
if _, err := io.ReadFull(br, dstKey[:]); err != nil {
return zpub, zpub, nil, err
}
packetLen := frameLen - keyLen*2
if packetLen > MaxPacketSize {
return zpub, zpub, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
}
contents = make([]byte, packetLen)
if _, err := io.ReadFull(br, contents); err != nil {
return zpub, zpub, nil, err
}
// TODO: was s.packetsRecv.Add(1)
// TODO: was s.bytesRecv.Add(int64(len(contents)))
return srcKey, dstKey, contents, nil
}
// sclient is a client connection to the server.
//
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct {
// Static after construction.
connNum int64 // process-wide unique counter, incremented each Accept
s *Server
nc Conn
key key.Public
info clientInfo
logf logger.Logf
done <-chan struct{} // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
remoteIPPort netaddr.IPPort // zero if remoteAddr is not ip:port.
sendQueue chan pkt // packets queued to this client; never closed
discoSendQueue chan pkt // important packets queued to this client; never closed
peerGone chan key.Public // write request that a previous sender has disconnected (not used by mesh peers)
meshUpdate chan struct{} // write request to write peerStateChange
canMesh bool // clientInfo had correct mesh token for inter-region routing
// replaceLimiter controls how quickly two connections with
// the same client key can kick each other off the server by
// taking over ownership of a key.
replaceLimiter *rate.Limiter
// Owned by run, not thread-safe.
br *bufio.Reader
connectedAt time.Time
preferred bool
// Owned by sender, not thread-safe.
bw *lazyBufioWriter
// Guarded by s.mu
//
// peerStateChange is used by mesh peers (a set of regional
// DERP servers) and contains records that need to be sent to
// the client for them to update their map of who's connected
// to this node.
peerStateChange []peerConnState
}
// peerConnState represents whether a peer is connected to the server
// or not.
type peerConnState struct {
peer key.Public
present bool
}
// pkt is a request to write a data frame to an sclient.
type pkt struct {
// src is the who's the sender of the packet.
src key.Public
// enqueuedAt is when a packet was put onto a queue before it was sent,
// and is used for reporting metrics on the duration of packets in the queue.
enqueuedAt time.Time
// bs is the data packet bytes.
// The memory is owned by pkt.
bs []byte
}
func (c *sclient) setPreferred(v bool) {
if c.preferred == v {
return
}
c.preferred = v
var homeMove *expvar.Int
if v {
c.s.curHomeClients.Add(1)
homeMove = &c.s.homeMovesIn
} else {
c.s.curHomeClients.Add(-1)
homeMove = &c.s.homeMovesOut
}
// Keep track of varz for home serve moves in/out. But ignore
// the initial packet set when a client connects, which we
// assume happens within 5 seconds. In any case, just for
// graphs, so not important to miss a move. But it shouldn't:
// the netcheck/re-STUNs in magicsock only happen about every
// 30 seconds.
if time.Since(c.connectedAt) > 5*time.Second {
homeMove.Add(1)
}
}
// expMovingAverage returns the new moving average given the previous average,
// a new value, and an alpha decay factor.
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func expMovingAverage(prev, newValue, alpha float64) float64 {
return alpha*newValue + (1-alpha)*prev
}
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func (c *sclient) recordQueueTime(enqueuedAt time.Time) {
elapsed := float64(time.Since(enqueuedAt).Milliseconds())
for {
old := atomic.LoadUint64(c.s.avgQueueDuration)
newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1)
if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) {
break
}
}
}
func (c *sclient) sendLoop(ctx context.Context) error {
defer func() {
// If the sender shuts down unilaterally due to an error, close so
// that the receive loop unblocks and cleans up the rest.
c.nc.Close()
// Drain the send queue to count dropped packets
for {
select {
case pkt := <-c.sendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone)
case pkt := <-c.discoSendQueue:
c.s.recordDrop(pkt.bs, pkt.src, c.key, dropReasonGone)
default:
return
}
}
}()
jitter := time.Duration(rand.Intn(5000)) * time.Millisecond
keepAliveTick := time.NewTicker(keepAlive + jitter)
defer keepAliveTick.Stop()
var werr error // last write error
for {
if werr != nil {
return werr
}
// First, a non-blocking select (with a default) that
// does as many non-flushing writes as possible.
select {
case <-ctx.Done():
return nil
case peer := <-c.peerGone:
werr = c.sendPeerGone(peer)
continue
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
continue
case <-keepAliveTick.C:
werr = c.sendKeepAlive()
continue
default:
// Flush any writes from the 3 sends above, or from
// the blocking loop below.
if werr = c.bw.Flush(); werr != nil {
return werr
}
}
// Then a blocking select with same:
select {
case <-ctx.Done():
return nil
case peer := <-c.peerGone:
werr = c.sendPeerGone(peer)
case <-c.meshUpdate:
werr = c.sendMeshUpdates()
continue
case msg := <-c.sendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case msg := <-c.discoSendQueue:
werr = c.sendPacket(msg.src, msg.bs)
c.recordQueueTime(msg.enqueuedAt)
case <-keepAliveTick.C:
werr = c.sendKeepAlive()
}
}
}
func (c *sclient) setWriteDeadline() {
c.nc.SetWriteDeadline(time.Now().Add(writeTimeout))
}
// sendKeepAlive sends a keep-alive frame, without flushing.
func (c *sclient) sendKeepAlive() error {
c.setWriteDeadline()
return writeFrameHeader(c.bw.bw(), frameKeepAlive, 0)
}
// sendPeerGone sends a peerGone frame, without flushing.
func (c *sclient) sendPeerGone(peer key.Public) error {
c.s.peerGoneFrames.Add(1)
c.setWriteDeadline()
if err := writeFrameHeader(c.bw.bw(), framePeerGone, keyLen); err != nil {
return err
}
_, err := c.bw.Write(peer[:])
return err
}
// sendPeerPresent sends a peerPresent frame, without flushing.
func (c *sclient) sendPeerPresent(peer key.Public) error {
c.setWriteDeadline()
if err := writeFrameHeader(c.bw.bw(), framePeerPresent, keyLen); err != nil {
return err
}
_, err := c.bw.Write(peer[:])
return err
}
// sendMeshUpdates drains as many mesh peerStateChange entries as
// possible into the write buffer WITHOUT flushing or otherwise
// blocking (as it holds c.s.mu while working). If it can't drain them
// all, it schedules itself to be called again in the future.
func (c *sclient) sendMeshUpdates() error {
c.s.mu.Lock()
defer c.s.mu.Unlock()
writes := 0
for _, pcs := range c.peerStateChange {
if c.bw.Available() <= frameHeaderLen+keyLen {
break
}
var err error
if pcs.present {
err = c.sendPeerPresent(pcs.peer)
} else {
err = c.sendPeerGone(pcs.peer)
}
if err != nil {
// Shouldn't happen, though, as we're writing
// into available buffer space, not the
// network.
return err
}
writes++
}
remain := copy(c.peerStateChange, c.peerStateChange[writes:])
c.peerStateChange = c.peerStateChange[:remain]
// Did we manage to write them all into the bufio buffer without flushing?
if len(c.peerStateChange) == 0 {
if cap(c.peerStateChange) > 16 {
c.peerStateChange = nil
}
} else {
// Didn't finish in the buffer space provided; schedule a future run.
go c.requestMeshUpdate()
}
return nil
}
// sendPacket writes contents to the client in a RecvPacket frame. If
// srcKey.IsZero, uses the old DERPv1 framing format, otherwise uses
// DERPv2. The bytes of contents are only valid until this function
// returns, do not retain slices.
// It does not flush its bufio.Writer.
func (c *sclient) sendPacket(srcKey key.Public, contents []byte) (err error) {
defer func() {
// Stats update.
if err != nil {
c.s.recordDrop(contents, srcKey, c.key, dropReasonWriteError)
} else {
c.s.packetsSent.Add(1)
c.s.bytesSent.Add(int64(len(contents)))
}
}()
c.setWriteDeadline()
withKey := !srcKey.IsZero()
pktLen := len(contents)
if withKey {
pktLen += len(srcKey)
}
if err = writeFrameHeader(c.bw.bw(), frameRecvPacket, uint32(pktLen)); err != nil {
return err
}
if withKey {
err := writePublicKey(c.bw.bw(), &srcKey)
if err != nil {
return err
}
}
_, err = c.bw.Write(contents)
return err
}
// AddPacketForwarder registers fwd as a packet forwarder for dst.
// fwd must be comparable.
func (s *Server) AddPacketForwarder(dst key.Public, fwd PacketForwarder) {
s.mu.Lock()
defer s.mu.Unlock()
if prev, ok := s.clientsMesh[dst]; ok {
if prev == fwd {
// Duplicate registration of same forwarder. Ignore.
return
}
if m, ok := prev.(multiForwarder); ok {
if _, ok := m[fwd]; !ok {
// Duplicate registration of same forwarder in set; ignore.
return
}
m[fwd] = m.maxVal() + 1
return
}
if prev != nil {
// Otherwise, the existing value is not a set,
// not a dup, and not local-only (nil) so make
// it a set.
fwd = multiForwarder{
prev: 1, // existed 1st, higher priority
fwd: 2, // the passed in fwd is in 2nd place
}
s.multiForwarderCreated.Add(1)
}
}
s.clientsMesh[dst] = fwd
}
// RemovePacketForwarder removes fwd as a packet forwarder for dst.
// fwd must be comparable.
func (s *Server) RemovePacketForwarder(dst key.Public, fwd PacketForwarder) {
s.mu.Lock()
defer s.mu.Unlock()
v, ok := s.clientsMesh[dst]
if !ok {
return
}
if m, ok := v.(multiForwarder); ok {
if len(m) < 2 {
panic("unexpected")
}
delete(m, fwd)
// If fwd was in m and we no longer need to be a
// multiForwarder, replace the entry with the
// remaining PacketForwarder.
if len(m) == 1 {
var remain PacketForwarder
for k := range m {
remain = k
}
s.clientsMesh[dst] = remain
s.multiForwarderDeleted.Add(1)
}
return
}
if v != fwd {
s.removePktForwardOther.Add(1)
// Delete of an entry that wasn't in the
// map. Harmless, so ignore.
// (This might happen if a user is moving around
// between nodes and/or the server sent duplicate
// connection change broadcasts.)
return
}
if _, isLocal := s.clients[dst]; isLocal {
s.clientsMesh[dst] = nil
} else {
delete(s.clientsMesh, dst)
s.notePeerGoneFromRegionLocked(dst)
}
}
// multiForwarder is a PacketForwarder that represents a set of
// forwarding options. It's used in the rare cases that a client is
// connected to multiple DERP nodes in a region. That shouldn't really
// happen except for perhaps during brief moments while the client is
// reconfiguring, in which case we don't want to forget where the
// client is. The map value is unique connection number; the lowest
// one has been seen the longest. It's used to make sure we forward
// packets consistently to the same node and don't pick randomly.
type multiForwarder map[PacketForwarder]uint8
func (m multiForwarder) maxVal() (max uint8) {
for _, v := range m {
if v > max {
max = v
}
}
return
}
func (m multiForwarder) ForwardPacket(src, dst key.Public, payload []byte) error {
var fwd PacketForwarder
var lowest uint8
for k, v := range m {
if fwd == nil || v < lowest {
fwd = k
lowest = v
}
}
return fwd.ForwardPacket(src, dst, payload)
}
func (s *Server) expVarFunc(f func() interface{}) expvar.Func {
return expvar.Func(func() interface{} {
s.mu.Lock()
defer s.mu.Unlock()
return f()
})
}
// ExpVar returns an expvar variable suitable for registering with expvar.Publish.
func (s *Server) ExpVar() expvar.Var {
m := new(metrics.Set)
m.Set("gauge_memstats_sys0", expvar.Func(func() interface{} { return int64(s.memSys0) }))
m.Set("gauge_watchers", s.expVarFunc(func() interface{} { return len(s.watchers) }))
m.Set("gauge_current_connections", &s.curClients)
m.Set("gauge_current_home_connections", &s.curHomeClients)
m.Set("gauge_clients_total", expvar.Func(func() interface{} { return len(s.clientsMesh) }))
m.Set("gauge_clients_local", expvar.Func(func() interface{} { return len(s.clients) }))
m.Set("gauge_clients_remote", expvar.Func(func() interface{} { return len(s.clientsMesh) - len(s.clients) }))
m.Set("accepts", &s.accepts)
m.Set("clients_replaced", &s.clientsReplaced)
m.Set("clients_replace_limited", &s.clientsReplaceLimited)
m.Set("gauge_clients_replace_sleeping", &s.clientsReplaceSleeping)
m.Set("bytes_received", &s.bytesRecv)
m.Set("bytes_sent", &s.bytesSent)
m.Set("packets_dropped", &s.packetsDropped)
m.Set("counter_packets_dropped_reason", &s.packetsDroppedReason)
m.Set("counter_packets_dropped_type", &s.packetsDroppedType)
m.Set("counter_packets_received_kind", &s.packetsRecvByKind)
m.Set("packets_sent", &s.packetsSent)
m.Set("packets_received", &s.packetsRecv)
m.Set("unknown_frames", &s.unknownFrames)
m.Set("home_moves_in", &s.homeMovesIn)
m.Set("home_moves_out", &s.homeMovesOut)
m.Set("peer_gone_frames", &s.peerGoneFrames)
m.Set("packets_forwarded_out", &s.packetsForwardedOut)
m.Set("packets_forwarded_in", &s.packetsForwardedIn)
m.Set("multiforwarder_created", &s.multiForwarderCreated)
m.Set("multiforwarder_deleted", &s.multiForwarderDeleted)
m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther)
m.Set("average_queue_duration_ms", expvar.Func(func() interface{} {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
}))
var expvarVersion expvar.String
expvarVersion.Set(version.Long)
m.Set("version", &expvarVersion)
return m
}
func (s *Server) ConsistencyCheck() error {
s.mu.Lock()
defer s.mu.Unlock()
var errs []string
var nilMeshNotInClient int
for k, f := range s.clientsMesh {
if f == nil {
if _, ok := s.clients[k]; !ok {
nilMeshNotInClient++
}
}
}
if nilMeshNotInClient != 0 {
errs = append(errs, fmt.Sprintf("%d s.clientsMesh keys not in s.clients", nilMeshNotInClient))
}
var clientNotInMesh int
for k := range s.clients {
if _, ok := s.clientsMesh[k]; !ok {
clientNotInMesh++
}
}
if clientNotInMesh != 0 {
errs = append(errs, fmt.Sprintf("%d s.clients keys not in s.clientsMesh", clientNotInMesh))
}
if s.curClients.Value() != int64(len(s.clients)) {
errs = append(errs, fmt.Sprintf("expvar connections = %d != clients map says of %d",
s.curClients.Value(),
len(s.clients)))
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, ", "))
}
// readPublicKey reads key from br.
// It is ~4x slower than io.ReadFull(br, key),
// but it prevents key from escaping and thus being allocated.
// If io.ReadFull(br, key) does not cause key to escape, use that instead.
func readPublicKey(br *bufio.Reader, key *key.Public) error {
// Do io.ReadFull(br, key), but one byte at a time, to avoid allocation.
for i := range key {
b, err := br.ReadByte()
if err != nil {
return err
}
key[i] = b
}
return nil
}
// writePublicKey writes key to bw.
// It is ~3x slower than bw.Write(key[:]),
// but it prevents key from escaping and thus being allocated.
// If bw.Write(key[:]) does not cause key to escape, use that instead.
func writePublicKey(bw *bufio.Writer, key *key.Public) error {
// Do bw.Write(key[:]), but one byte at a time to avoid allocation.
for _, b := range key {
err := bw.WriteByte(b)
if err != nil {
return err
}
}
return nil
}
const minTimeBetweenLogs = 2 * time.Second
// BytesSentRecv records the number of bytes that have been sent since the last traffic check
// for a given process, as well as the public key of the process sending those bytes.
type BytesSentRecv struct {
Sent uint64
Recv uint64
// Key is the public key of the client which sent/received these bytes.
Key key.Public
}
// parseSSOutput parses the output from the specific call to ss in ServeDebugTraffic.
// Separated out for ease of testing.
func parseSSOutput(raw string) map[netaddr.IPPort]BytesSentRecv {
newState := map[netaddr.IPPort]BytesSentRecv{}
// parse every 2 lines and get src and dst ips, and kv pairs
lines := strings.Split(raw, "\n")
for i := 0; i < len(lines); i += 2 {
ipInfo := strings.Fields(strings.TrimSpace(lines[i]))
if len(ipInfo) < 5 {
continue
}
src, err := netaddr.ParseIPPort(ipInfo[4])
if err != nil {
continue
}
stats := strings.Fields(strings.TrimSpace(lines[i+1]))
stat := BytesSentRecv{}
for _, s := range stats {
if strings.Contains(s, "bytes_sent") {
sent, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
if err == nil {
stat.Sent = uint64(sent)
}
} else if strings.Contains(s, "bytes_received") {
recv, err := strconv.Atoi(s[strings.Index(s, ":")+1:])
if err == nil {
stat.Recv = uint64(recv)
}
}
}
newState[src] = stat
}
return newState
}
func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
prevState := map[netaddr.IPPort]BytesSentRecv{}
enc := json.NewEncoder(w)
for r.Context().Err() == nil {
output, err := exec.Command("ss", "-i", "-H", "-t").Output()
if err != nil {
fmt.Fprintf(w, "ss failed: %v", err)
return
}
newState := parseSSOutput(string(output))
s.mu.Lock()
for k, next := range newState {
prev := prevState[k]
if prev.Sent < next.Sent || prev.Recv < next.Recv {
if pkey, ok := s.keyOfAddr[k]; ok {
next.Key = pkey
if err := enc.Encode(next); err != nil {
s.mu.Unlock()
return
}
}
}
}
s.mu.Unlock()
prevState = newState
if _, err := fmt.Fprintln(w); err != nil {
return
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
time.Sleep(minTimeBetweenLogs)
}
}
var bufioWriterPool = &sync.Pool{
New: func() interface{} {
return bufio.NewWriterSize(ioutil.Discard, 2<<10)
},
}
// lazyBufioWriter is a bufio.Writer-like wrapping writer that lazily
// allocates its actual bufio.Writer from a sync.Pool, releasing it to
// the pool upon flush.
//
// We do this to reduce memory overhead; most DERP connections are
// idle and the idle bufio.Writers were 30% of overall memory usage.
type lazyBufioWriter struct {
w io.Writer // underlying
lbw *bufio.Writer // lazy; nil means it needs an associated buffer
}
func (w *lazyBufioWriter) bw() *bufio.Writer {
if w.lbw == nil {
w.lbw = bufioWriterPool.Get().(*bufio.Writer)
w.lbw.Reset(w.w)
}
return w.lbw
}
func (w *lazyBufioWriter) Available() int { return w.bw().Available() }
func (w *lazyBufioWriter) Write(p []byte) (int, error) { return w.bw().Write(p) }
func (w *lazyBufioWriter) Flush() error {
if w.lbw == nil {
return nil
}
err := w.lbw.Flush()
w.lbw.Reset(ioutil.Discard)
bufioWriterPool.Put(w.lbw)
w.lbw = nil
return err
}