derp, wgengine/magicsock: support more than just packets from Client.Recv

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
reviewable/pr90/r1
Brad Fitzpatrick 5 years ago committed by Brad Fitzpatrick
parent 88f1cc0c98
commit 379a3125fd

@ -22,16 +22,20 @@ import (
"time" "time"
) )
// MaxPacketSize is the maximum size of a packet sent over DERP.
// (This only includes the data bytes visible to magicsock, not
// including its on-wire framing overhead)
const MaxPacketSize = 64 << 10
// magic is the DERP magic number, sent in the frameServerKey frame // magic is the DERP magic number, sent in the frameServerKey frame
// upon initial connection. // upon initial connection.
const magic = "DERP🔑" // 8 bytes: 0x44 45 52 50 f0 9f 94 91 const magic = "DERP🔑" // 8 bytes: 0x44 45 52 50 f0 9f 94 91
const ( const (
nonceLen = 24 nonceLen = 24
keyLen = 32 keyLen = 32
maxInfoLen = 1 << 20 maxInfoLen = 1 << 20
keepAlive = 60 * time.Second keepAlive = 60 * time.Second
maxPacketData = 64 << 10
) )
// frameType is the one byte frame type at the beginning of the frame // frameType is the one byte frame type at the beginning of the frame

@ -131,7 +131,7 @@ func (c *Client) send(dstKey key.Public, pkt []byte) (ret error) {
} }
}() }()
if len(pkt) > maxPacketData { if len(pkt) > MaxPacketSize {
return fmt.Errorf("packet too big: %d", len(pkt)) return fmt.Errorf("packet too big: %d", len(pkt))
} }
@ -147,12 +147,26 @@ func (c *Client) send(dstKey key.Public, pkt []byte) (ret error) {
return c.bw.Flush() return c.bw.Flush()
} }
// Recv reads a data packet from the DERP server. // ReceivedMessage represents a type returned by Client.Recv. Unless
// The provided buffer must be larger enough to receive a complete packet. // otherwise documented, the returned message aliases the byte slice
// provided to Recv and thus the message is only as good as that
// buffer, which is up to the caller.
type ReceivedMessage interface {
msg()
}
// ReceivedPacket is a ReceivedMessage representing an incoming packet.
type ReceivedPacket []byte
func (ReceivedPacket) msg() {}
// Recv reads a message from the DERP server.
// The provided buffer must be large enough to receive a complete packet,
// which in practice are are 1.5-4 KB, but can be up to 64 KB.
// Once Recv returns an error, the Client is dead forever. // Once Recv returns an error, the Client is dead forever.
func (c *Client) Recv(b []byte) (n int, err error) { func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) {
if c.readErr != nil { if c.readErr != nil {
return 0, c.readErr return nil, c.readErr
} }
defer func() { defer func() {
if err != nil { if err != nil {
@ -165,7 +179,7 @@ func (c *Client) Recv(b []byte) (n int, err error) {
c.nc.SetReadDeadline(time.Now().Add(120 * time.Second)) c.nc.SetReadDeadline(time.Now().Add(120 * time.Second))
t, n, err := readFrame(c.br, 1<<20, b) t, n, err := readFrame(c.br, 1<<20, b)
if err != nil { if err != nil {
return 0, err return nil, err
} }
switch t { switch t {
default: default:
@ -175,7 +189,7 @@ func (c *Client) Recv(b []byte) (n int, err error) {
// require ack pongs. // require ack pongs.
continue continue
case frameRecvPacket: case frameRecvPacket:
return int(n), nil return ReceivedPacket(b[:n]), nil
} }
} }
} }

@ -328,8 +328,8 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
return key.Public{}, nil, err return key.Public{}, nil, err
} }
packetLen := frameLen - keyLen packetLen := frameLen - keyLen
if packetLen > maxPacketData { if packetLen > MaxPacketSize {
return key.Public{}, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, maxPacketData) return key.Public{}, nil, fmt.Errorf("data packet longer (%d) than max of %v", packetLen, MaxPacketSize)
} }
if err := limiter.WaitN(ctx, int(packetLen)); err != nil { if err := limiter.WaitN(ctx, int(packetLen)); err != nil {
return key.Public{}, nil, fmt.Errorf("rate limit: %v", err) return key.Public{}, nil, fmt.Errorf("rate limit: %v", err)

@ -76,13 +76,18 @@ func TestSendRecv(t *testing.T) {
go func(i int) { go func(i int) {
for { for {
b := make([]byte, 1<<16) b := make([]byte, 1<<16)
n, err := clients[i].Recv(b) m, err := clients[i].Recv(b)
if err != nil { if err != nil {
errCh <- err errCh <- err
return return
} }
b = b[:n] switch m := m.(type) {
recvChs[i] <- b default:
t.Errorf("unexpected message type %T", m)
continue
case ReceivedPacket:
recvChs[i] <- []byte(m)
}
} }
}(i) }(i)
} }

@ -167,16 +167,16 @@ func (c *Client) Send(dstKey key.Public, b []byte) error {
return err return err
} }
func (c *Client) Recv(b []byte) (int, error) { func (c *Client) Recv(b []byte) (derp.ReceivedMessage, error) {
client, err := c.connect(context.TODO(), "derphttp.Client.Recv") client, err := c.connect(context.TODO(), "derphttp.Client.Recv")
if err != nil { if err != nil {
return 0, err return nil, err
} }
n, err := client.Recv(b) m, err := client.Recv(b)
if err != nil { if err != nil {
c.close() c.close()
} }
return n, err return m, err
} }
// Close closes the client. It will not automatically reconnect after // Close closes the client. It will not automatically reconnect after

@ -94,13 +94,18 @@ func TestSendRecv(t *testing.T) {
default: default:
} }
b := make([]byte, 1<<16) b := make([]byte, 1<<16)
n, err := c.Recv(b) m, err := c.Recv(b)
if err != nil { if err != nil {
t.Logf("client%d: %v", i, err) t.Logf("client%d: %v", i, err)
break break
} }
b = b[:n] switch m := m.(type) {
recvChs[i] <- b default:
t.Errorf("unexpected message type %T", m)
continue
case derp.ReceivedPacket:
recvChs[i] <- []byte(m)
}
} }
}(i) }(i)
} }

@ -21,6 +21,7 @@ import (
"github.com/tailscale/wireguard-go/device" "github.com/tailscale/wireguard-go/device"
"github.com/tailscale/wireguard-go/wgcfg" "github.com/tailscale/wireguard-go/wgcfg"
"tailscale.com/derp"
"tailscale.com/derp/derphttp" "tailscale.com/derp/derphttp"
"tailscale.com/stun" "tailscale.com/stun"
"tailscale.com/stunner" "tailscale.com/stunner"
@ -519,7 +520,7 @@ type derpReadResult struct {
// connection, handling received packets. // connection, handling received packets.
func (c *Conn) runDerpReader(derpFakeAddr *net.UDPAddr, dc *derphttp.Client) { func (c *Conn) runDerpReader(derpFakeAddr *net.UDPAddr, dc *derphttp.Client) {
didCopy := make(chan struct{}, 1) didCopy := make(chan struct{}, 1)
var buf [64 << 10]byte var buf [derp.MaxPacketSize]byte
var bufValid int // bytes in buf that are valid var bufValid int // bytes in buf that are valid
copyFn := func(dst []byte) int { copyFn := func(dst []byte) int {
n := copy(dst, buf[:bufValid]) n := copy(dst, buf[:bufValid])
@ -528,8 +529,7 @@ func (c *Conn) runDerpReader(derpFakeAddr *net.UDPAddr, dc *derphttp.Client) {
} }
for { for {
var err error // no := on next line to not shadow bufValid msg, err := dc.Recv(buf[:])
bufValid, err = dc.Recv(buf[:])
if err != nil { if err != nil {
if err == derphttp.ErrClientClosed { if err == derphttp.ErrClientClosed {
return return
@ -543,6 +543,14 @@ func (c *Conn) runDerpReader(derpFakeAddr *net.UDPAddr, dc *derphttp.Client) {
time.Sleep(250 * time.Millisecond) time.Sleep(250 * time.Millisecond)
continue continue
} }
switch m := msg.(type) {
case derp.ReceivedPacket:
bufValid = len(m)
default:
// Ignore.
// TODO: handle endpoint notification messages.
continue
}
log.Printf("got derp %v packet: %q", derpFakeAddr, buf[:bufValid]) log.Printf("got derp %v packet: %q", derpFakeAddr, buf[:bufValid])
select { select {
case <-c.donec: case <-c.donec:

Loading…
Cancel
Save