From 4c0feba38ead02c621f475c0311df913b0605dd7 Mon Sep 17 00:00:00 2001 From: Charlotte Brandhorst-Satzkorn <46385858+catzkorn@users.noreply.github.com> Date: Mon, 18 Jul 2022 15:43:03 -0700 Subject: [PATCH] derp: plumb '/derp' request context through (#5083) This change is required to implement tracing for derp. Signed-off-by: Charlotte Brandhorst-Satzkorn --- cmd/derper/websocket.go | 5 ++--- derp/derp_server.go | 8 +++---- derp/derp_test.go | 38 +++++++++++++++++++++++--------- derp/derphttp/derphttp_server.go | 2 +- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cmd/derper/websocket.go b/cmd/derper/websocket.go index b3c7eb7c9..23e1376d6 100644 --- a/cmd/derper/websocket.go +++ b/cmd/derper/websocket.go @@ -6,7 +6,6 @@ package main import ( "bufio" - "context" "expvar" "log" "net/http" @@ -45,8 +44,8 @@ func addWebSocketSupport(s *derp.Server, base http.Handler) http.Handler { return } counterWebSocketAccepts.Add(1) - wc := websocket.NetConn(context.Background(), c, websocket.MessageBinary) + wc := websocket.NetConn(r.Context(), c, websocket.MessageBinary) brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc)) - s.Accept(wc, brw, r.RemoteAddr) + s.Accept(r.Context(), wc, brw, r.RemoteAddr) }) } diff --git a/derp/derp_server.go b/derp/derp_server.go index ec50e53f4..16df37948 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -410,7 +410,7 @@ func (s *Server) IsClientConnectedForTest(k key.NodePublic) bool { // on its own. // // Accept closes nc. -func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) { +func (s *Server) Accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string) { closed := make(chan struct{}) s.mu.Lock() @@ -428,7 +428,7 @@ func (s *Server) Accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) { s.mu.Unlock() }() - if err := s.accept(nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() { + if err := s.accept(ctx, nc, brw, remoteAddr, connNum); err != nil && !s.isClosed() { s.logf("derp: %s: %v", remoteAddr, err) } } @@ -641,7 +641,7 @@ func (s *Server) addWatcher(c *sclient) { go c.requestMeshUpdate() } -func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { +func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, remoteAddr string, connNum int64) error { br := brw.Reader nc.SetDeadline(time.Now().Add(10 * time.Second)) bw := &lazyBufioWriter{w: nc, lbw: brw.Writer} @@ -660,7 +660,7 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string, connN // At this point we trust the client so we don't time out. nc.SetDeadline(time.Time{}) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() remoteIPPort, _ := netaddr.ParseIPPort(remoteAddr) diff --git a/derp/derp_test.go b/derp/derp_test.go index 72bfcae85..255a949c8 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -85,8 +85,12 @@ func TestSendRecv(t *testing.T) { t.Fatal(err) } defer cin.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + brwServer := bufio.NewReadWriter(bufio.NewReader(cin), bufio.NewWriter(cin)) - go s.Accept(cin, brwServer, fmt.Sprintf("test-client-%d", i)) + go s.Accept(ctx, cin, brwServer, fmt.Sprintf("test-client-%d", i)) key := clientPrivateKeys[i] brw := bufio.NewReadWriter(bufio.NewReader(cout), bufio.NewWriter(cout)) @@ -231,10 +235,10 @@ func TestSendFreeze(t *testing.T) { // Then cathy stops processing messsages. // That should not interfere with alice talking to bob. - newClient := func(name string, k key.NodePrivate) (c *Client, clientConn nettest.Conn) { + newClient := func(ctx context.Context, name string, k key.NodePrivate) (c *Client, clientConn nettest.Conn) { t.Helper() c1, c2 := nettest.NewConn(name, 1024) - go s.Accept(c1, bufio.NewReadWriter(bufio.NewReader(c1), bufio.NewWriter(c1)), name) + go s.Accept(ctx, c1, bufio.NewReadWriter(bufio.NewReader(c1), bufio.NewWriter(c1)), name) brw := bufio.NewReadWriter(bufio.NewReader(c2), bufio.NewWriter(c2)) c, err := NewClient(k, c2, brw, t.Logf) @@ -245,14 +249,17 @@ func TestSendFreeze(t *testing.T) { return c, c2 } + ctx, clientCtxCancel := context.WithCancel(context.Background()) + defer clientCtxCancel() + aliceKey := key.NewNode() - aliceClient, aliceConn := newClient("alice", aliceKey) + aliceClient, aliceConn := newClient(ctx, "alice", aliceKey) bobKey := key.NewNode() - bobClient, bobConn := newClient("bob", bobKey) + bobClient, bobConn := newClient(ctx, "bob", bobKey) cathyKey := key.NewNode() - cathyClient, cathyConn := newClient("cathy", cathyKey) + cathyClient, cathyConn := newClient(ctx, "cathy", cathyKey) var ( aliceCh = make(chan struct{}, 32) @@ -455,7 +462,7 @@ func (ts *testServer) close(t *testing.T) error { return nil } -func newTestServer(t *testing.T) *testServer { +func newTestServer(t *testing.T, ctx context.Context) *testServer { t.Helper() logf := logger.WithPrefix(t.Logf, "derp-server: ") s := NewServer(key.NewNode(), logf) @@ -475,7 +482,7 @@ func newTestServer(t *testing.T) *testServer { // TODO: register c in ts so Close also closes it? go func(i int) { brwServer := bufio.NewReadWriter(bufio.NewReader(c), bufio.NewWriter(c)) - go s.Accept(c, brwServer, fmt.Sprintf("test-client-%d", i)) + go s.Accept(ctx, c, brwServer, fmt.Sprintf("test-client-%d", i)) }(i) } }() @@ -610,7 +617,10 @@ func (c *testClient) close(t *testing.T) { // TestWatch tests the connection watcher mechanism used by regional // DERP nodes to mesh up with each other. func TestWatch(t *testing.T) { - ts := newTestServer(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts := newTestServer(t, ctx) defer ts.close(t) w1 := newTestWatcher(t, ts, "w1") @@ -1198,7 +1208,10 @@ func benchmarkSendRecvSize(b *testing.B, packetSize int) { defer connIn.Close() brwServer := bufio.NewReadWriter(bufio.NewReader(connIn), bufio.NewWriter(connIn)) - go s.Accept(connIn, brwServer, "test-client") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Accept(ctx, connIn, brwServer, "test-client") brw := bufio.NewReadWriter(bufio.NewReader(connOut), bufio.NewWriter(connOut)) client, err := NewClient(k, connOut, brw, logger.Discard) @@ -1354,7 +1367,10 @@ func TestClientSendRateLimiting(t *testing.T) { } func TestServerRepliesToPing(t *testing.T) { - ts := newTestServer(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts := newTestServer(t, ctx) defer ts.close(t) tc := newRegularClient(t, ts, "alice") diff --git a/derp/derphttp/derphttp_server.go b/derp/derphttp/derphttp_server.go index 3d2b72e88..17edb8c21 100644 --- a/derp/derphttp/derphttp_server.go +++ b/derp/derphttp/derphttp_server.go @@ -56,6 +56,6 @@ func Handler(s *derp.Server) http.Handler { pubKey.UntypedHexString()) } - s.Accept(netConn, conn, netConn.RemoteAddr().String()) + s.Accept(r.Context(), netConn, conn, netConn.RemoteAddr().String()) }) }