derp: introduce Conn interface

This lets us test with something other than a net.Conn.

Signed-off-by: David Crawshaw <crawshaw@tailscale.com>
pull/181/head
David Crawshaw 5 years ago committed by David Crawshaw
parent 41ac4a79d6
commit 43aa8595dd

@ -11,7 +11,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"sync" "sync"
"time" "time"
@ -26,7 +25,7 @@ type Client struct {
publicKey key.Public // of privateKey publicKey key.Public // of privateKey
protoVersion int // min of server+client protoVersion int // min of server+client
logf logger.Logf logf logger.Logf
nc net.Conn nc Conn
br *bufio.Reader br *bufio.Reader
wmu sync.Mutex // hold while writing to bw wmu sync.Mutex // hold while writing to bw
@ -34,7 +33,7 @@ type Client struct {
readErr error // sticky read error readErr error // sticky read error
} }
func NewClient(privateKey key.Private, nc net.Conn, brw *bufio.ReadWriter, logf logger.Logf) (*Client, error) { func NewClient(privateKey key.Private, nc Conn, brw *bufio.ReadWriter, logf logger.Logf) (*Client, error) {
c := &Client{ c := &Client{
privateKey: privateKey, privateKey: privateKey,
publicKey: privateKey.Public(), publicKey: privateKey.Public(),
@ -138,7 +137,7 @@ func (c *Client) Send(dstKey key.Public, pkt []byte) error { return c.send(dstKe
func (c *Client) send(dstKey key.Public, pkt []byte) (ret error) { func (c *Client) send(dstKey key.Public, pkt []byte) (ret error) {
defer func() { defer func() {
if ret != nil { if ret != nil {
ret = fmt.Errorf("derp.Send: %v", ret) ret = fmt.Errorf("derp.Send: %w", ret)
} }
}() }()
@ -215,7 +214,7 @@ func (c *Client) Recv(b []byte) (m ReceivedMessage, err error) {
} }
defer func() { defer func() {
if err != nil { if err != nil {
err = fmt.Errorf("derp.Recv: %v", err) err = fmt.Errorf("derp.Recv: %w", err)
c.readErr = err c.readErr = err
} }
}() }()

@ -18,7 +18,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"net"
"os" "os"
"strconv" "strconv"
"sync" "sync"
@ -57,11 +56,23 @@ type Server struct {
mu sync.Mutex mu sync.Mutex
closed bool closed bool
netConns map[net.Conn]chan struct{} // chan is closed when conn closes netConns map[Conn]chan struct{} // chan is closed when conn closes
clients map[key.Public]*sclient clients map[key.Public]*sclient
clientsEver map[key.Public]bool // never deleted from, for stats; fine for now clientsEver map[key.Public]bool // never deleted from, for stats; fine for now
} }
// Conn is the subset of the underlying net.Conn the DERP Server needs.
// It is a defined type so that non-net connections can be used.
type Conn interface {
io.Closer
// The *Deadline methods follow the semantics of net.Conn.
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}
// NewServer returns a new DERP server. It doesn't listen on its own. // NewServer returns a new DERP server. It doesn't listen on its own.
// Connections are given to it via Server.Accept. // Connections are given to it via Server.Accept.
func NewServer(privateKey key.Private, logf logger.Logf) *Server { func NewServer(privateKey key.Private, logf logger.Logf) *Server {
@ -71,7 +82,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
logf: logf, logf: logf,
clients: make(map[key.Public]*sclient), clients: make(map[key.Public]*sclient),
clientsEver: make(map[key.Public]bool), clientsEver: make(map[key.Public]bool),
netConns: make(map[net.Conn]chan struct{}), netConns: make(map[Conn]chan struct{}),
} }
return s return s
} }
@ -115,7 +126,7 @@ func (s *Server) isClosed() bool {
// on its own. // on its own.
// //
// Accept closes nc. // Accept closes nc.
func (s *Server) Accept(nc net.Conn, brw *bufio.ReadWriter) { func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
closed := make(chan struct{}) closed := make(chan struct{})
s.accepts.Add(1) s.accepts.Add(1)
@ -132,8 +143,8 @@ func (s *Server) Accept(nc net.Conn, brw *bufio.ReadWriter) {
s.mu.Unlock() s.mu.Unlock()
}() }()
if err := s.accept(nc, brw); err != nil && !s.isClosed() { if err := s.accept(nc, brw, remoteAddr); err != nil && !s.isClosed() {
s.logf("derp: %s: %v", nc.RemoteAddr(), err) s.logf("derp: %s: %v", remoteAddr, err)
} }
} }
@ -147,8 +158,8 @@ func (s *Server) registerClient(c *sclient) {
c.logf("adding connection") c.logf("adding connection")
} else { } else {
s.clientsReplaced.Add(1) s.clientsReplaced.Add(1)
old.nc.Close() c.logf("adding connection, replacing %s", old.remoteAddr)
c.logf("adding connection, replacing %s", old.nc.RemoteAddr()) go old.nc.Close()
} }
s.clients[c.key] = c s.clients[c.key] = c
s.clientsEver[c.key] = true s.clientsEver[c.key] = true
@ -171,7 +182,7 @@ func (s *Server) unregisterClient(c *sclient) {
} }
} }
func (s *Server) accept(nc net.Conn, brw *bufio.ReadWriter) error { func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error {
br, bw := brw.Reader, brw.Writer br, bw := brw.Reader, brw.Writer
nc.SetDeadline(time.Now().Add(10 * time.Second)) nc.SetDeadline(time.Now().Add(10 * time.Second))
if err := s.sendServerKey(bw); err != nil { if err := s.sendServerKey(bw); err != nil {
@ -203,7 +214,8 @@ func (s *Server) accept(nc net.Conn, brw *bufio.ReadWriter) error {
br: br, br: br,
bw: bw, bw: bw,
limiter: limiter, limiter: limiter,
logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", nc.RemoteAddr(), clientKey)), logf: logger.WithPrefix(s.logf, fmt.Sprintf("derp client %v/%x: ", remoteAddr, clientKey)),
remoteAddr: remoteAddr,
connectedAt: time.Now(), connectedAt: time.Now(),
} }
if clientInfo != nil { if clientInfo != nil {
@ -450,11 +462,12 @@ func (s *Server) recvPacket(ctx context.Context, br *bufio.Reader, frameLen uint
// (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go) // (The "s" prefix is to more explicitly distinguish it from Client in derp_client.go)
type sclient struct { type sclient struct {
s *Server s *Server
nc net.Conn nc Conn
key key.Public key key.Public
info clientInfo info clientInfo
logf logger.Logf logf logger.Logf
limiter *rate.Limiter limiter *rate.Limiter
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
connectedAt time.Time connectedAt time.Time
keepAliveTimer *time.Timer keepAliveTimer *time.Timer

@ -60,7 +60,8 @@ func TestSendRecv(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
defer cin.Close() defer cin.Close()
go s.Accept(cin, bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))) brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", i))
key := clientPrivateKeys[i] key := clientPrivateKeys[i]
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))

@ -32,6 +32,6 @@ func Handler(s *derp.Server) http.Handler {
http.Error(w, "HTTP does not support general TCP support", 500) http.Error(w, "HTTP does not support general TCP support", 500)
return return
} }
s.Accept(netConn, conn) s.Accept(netConn, conn, netConn.RemoteAddr().String())
}) })
} }

Loading…
Cancel
Save