derp: plumb '/derp' request context through (#5083)

This change is required to implement tracing for derp.

Signed-off-by: Charlotte Brandhorst-Satzkorn <charlotte@tailscale.com>
pull/5094/head
Charlotte Brandhorst-Satzkorn 2 years ago committed by GitHub
parent 3c892d106c
commit 4c0feba38e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,7 +6,6 @@ package main
import ( import (
"bufio" "bufio"
"context"
"expvar" "expvar"
"log" "log"
"net/http" "net/http"
@ -45,8 +44,8 @@ func addWebSocketSupport(s *derp.Server, base http.Handler) http.Handler {
return return
} }
counterWebSocketAccepts.Add(1) counterWebSocketAccepts.Add(1)
wc := websocket.NetConn(context.Background(), c, websocket.MessageBinary) wc := websocket.NetConn(r.Context(), c, websocket.MessageBinary)
brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc)) brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc))
s.Accept(wc, brw, r.RemoteAddr) s.Accept(r.Context(), wc, brw, r.RemoteAddr)
}) })
} }

@ -410,7 +410,7 @@ func (s *Server) IsClientConnectedForTest(k key.NodePublic) bool {
// on its own. // on its own.
// //
// Accept closes nc. // Accept closes nc.
func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) { func (s *Server) Accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
closed := make(chan struct{}) closed := make(chan struct{})
s.mu.Lock() s.mu.Lock()
@ -428,7 +428,7 @@ func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) {
s.mu.Unlock() s.mu.Unlock()
}() }()
if err := s.accept(nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() { if err := s.accept(ctx, nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() {
s.logf("derp: %s: %v", remoteAddr, err) s.logf("derp: %s: %v", remoteAddr, err)
} }
} }
@ -641,7 +641,7 @@ func (s *Server) addWatcher(c *sclient) {
go c.requestMeshUpdate() go c.requestMeshUpdate()
} }
func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error {
br := brw.Reader br := brw.Reader
nc.SetDeadline(time.Now().Add(10 * time.Second)) nc.SetDeadline(time.Now().Add(10 * time.Second))
bw := &lazyBufioWriter{w: nc, lbw: brw.Writer} bw := &lazyBufioWriter{w: nc, lbw: brw.Writer}
@ -660,7 +660,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN
// At this point we trust the client so we don't time out. // At this point we trust the client so we don't time out.
nc.SetDeadline(time.Time{}) nc.SetDeadline(time.Time{})
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr) remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr)

@ -85,8 +85,12 @@ func TestSendRecv(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
defer cin.Close() defer cin.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin)) brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin))
go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", i)) go s.Accept(ctx, cin, brwServer, fmt.Sprintf("test-client-%d", i))
key := clientPrivateKeys[i] key := clientPrivateKeys[i]
brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout))
@ -231,10 +235,10 @@ func TestSendFreeze(t *testing.T) {
// Then cathy stops processing messsages. // Then cathy stops processing messsages.
// That should not interfere with alice talking to bob. // That should not interfere with alice talking to bob.
newClient := func(name string, k key.NodePrivate) (c *Client, clientConn nettest.Conn) { newClient := func(ctx context.Context, name string, k key.NodePrivate) (c *Client, clientConn nettest.Conn) {
t.Helper() t.Helper()
c1, c2 := nettest.NewConn(name, 1024) c1, c2 := nettest.NewConn(name, 1024)
go s.Accept(c1, bufio.NewReadWriter(bufio.NewReader(c1), bufio.NewWriter(c1)), name) go s.Accept(ctx, c1, bufio.NewReadWriter(bufio.NewReader(c1), bufio.NewWriter(c1)), name)
brw := bufio.NewReadWriter(bufio.NewReader(c2), bufio.NewWriter(c2)) brw := bufio.NewReadWriter(bufio.NewReader(c2), bufio.NewWriter(c2))
c, err := NewClient(k, c2, brw, t.Logf) c, err := NewClient(k, c2, brw, t.Logf)
@ -245,14 +249,17 @@ func TestSendFreeze(t *testing.T) {
return c, c2 return c, c2
} }
ctx, clientCtxCancel := context.WithCancel(context.Background())
defer clientCtxCancel()
aliceKey := key.NewNode() aliceKey := key.NewNode()
aliceClient, aliceConn := newClient("alice", aliceKey) aliceClient, aliceConn := newClient(ctx, "alice", aliceKey)
bobKey := key.NewNode() bobKey := key.NewNode()
bobClient, bobConn := newClient("bob", bobKey) bobClient, bobConn := newClient(ctx, "bob", bobKey)
cathyKey := key.NewNode() cathyKey := key.NewNode()
cathyClient, cathyConn := newClient("cathy", cathyKey) cathyClient, cathyConn := newClient(ctx, "cathy", cathyKey)
var ( var (
aliceCh = make(chan struct{}, 32) aliceCh = make(chan struct{}, 32)
@ -455,7 +462,7 @@ func (ts *testServer) close(t *testing.T) error {
return nil return nil
} }
func newTestServer(t *testing.T) *testServer { func newTestServer(t *testing.T, ctx context.Context) *testServer {
t.Helper() t.Helper()
logf := logger.WithPrefix(t.Logf, "derp-server: ") logf := logger.WithPrefix(t.Logf, "derp-server: ")
s := NewServer(key.NewNode(), logf) s := NewServer(key.NewNode(), logf)
@ -475,7 +482,7 @@ func newTestServer(t *testing.T) *testServer {
// TODO: register c in ts so Close also closes it? // TODO: register c in ts so Close also closes it?
go func(i int) { go func(i int) {
brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c)) brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c))
go s.Accept(c, brwServer, fmt.Sprintf("test-client-%d", i)) go s.Accept(ctx, c, brwServer, fmt.Sprintf("test-client-%d", i))
}(i) }(i)
} }
}() }()
@ -610,7 +617,10 @@ func (c *testClient) close(t *testing.T) {
// TestWatch tests the connection watcher mechanism used by regional // TestWatch tests the connection watcher mechanism used by regional
// DERP nodes to mesh up with each other. // DERP nodes to mesh up with each other.
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
ts := newTestServer(t) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := newTestServer(t, ctx)
defer ts.close(t) defer ts.close(t)
w1 := newTestWatcher(t, ts, "w1") w1 := newTestWatcher(t, ts, "w1")
@ -1198,7 +1208,10 @@ func benchmarkSendRecvSize(b *testing.B, packetSize int) {
defer connIn.Close() defer connIn.Close()
brwServer := bufio.NewReadWriter(bufio.NewReader(connIn), bufio.NewWriter(connIn)) brwServer := bufio.NewReadWriter(bufio.NewReader(connIn), bufio.NewWriter(connIn))
go s.Accept(connIn, brwServer, "test-client")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Accept(ctx, connIn, brwServer, "test-client")
brw := bufio.NewReadWriter(bufio.NewReader(connOut), bufio.NewWriter(connOut)) brw := bufio.NewReadWriter(bufio.NewReader(connOut), bufio.NewWriter(connOut))
client, err := NewClient(k, connOut, brw, logger.Discard) client, err := NewClient(k, connOut, brw, logger.Discard)
@ -1354,7 +1367,10 @@ func TestClientSendRateLimiting(t *testing.T) {
} }
func TestServerRepliesToPing(t *testing.T) { func TestServerRepliesToPing(t *testing.T) {
ts := newTestServer(t) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ts := newTestServer(t, ctx)
defer ts.close(t) defer ts.close(t)
tc := newRegularClient(t, ts, "alice") tc := newRegularClient(t, ts, "alice")

@ -56,6 +56,6 @@ func Handler(s *derp.Server) http.Handler {
pubKey.UntypedHexString()) pubKey.UntypedHexString())
} }
s.Accept(netConn, conn, netConn.RemoteAddr().String()) s.Accept(r.Context(), netConn, conn, netConn.RemoteAddr().String())
}) })
} }

Loading…
Cancel
Save