diff --git a/derp/derphttp/derphttp_test.go b/derp/derphttp/derphttp_test.go index 36c11f4fc..76681d498 100644 --- a/derp/derphttp/derphttp_test.go +++ b/derp/derphttp/derphttp_test.go @@ -8,6 +8,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "flag" "fmt" "maps" @@ -18,11 +19,13 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "tailscale.com/derp" "tailscale.com/derp/derphttp" "tailscale.com/derp/derpserver" + "tailscale.com/net/memnet" "tailscale.com/net/netmon" "tailscale.com/net/netx" "tailscale.com/tailcfg" @@ -224,24 +227,21 @@ func TestPing(t *testing.T) { const testMeshKey = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" -func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server) { +func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server, ln *memnet.Listener) { s = derpserver.New(k, t.Logf) httpsrv := &http.Server{ TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), Handler: derpserver.Handler(s), } - ln, err := net.Listen("tcp4", "localhost:0") - if err != nil { - t.Fatal(err) - } + ln = memnet.Listen("localhost:0") + serverURL = "http://" + ln.Addr().String() s.SetMeshKey(testMeshKey) go func() { if err := httpsrv.Serve(ln); err != nil { - if err == http.ErrServerClosed { - t.Logf("server closed") + if errors.Is(err, net.ErrClosed) { return } panic(err) @@ -250,7 +250,7 @@ func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpse return } -func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *derphttp.Client) { +func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string, ln *memnet.Listener) (c *derphttp.Client) { c, err := derphttp.NewClient(watcherPrivateKey, serverToWatchURL, t.Logf, netmon.NewStatic()) if err != nil { t.Fatal(err) @@ -260,6 +260,7 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW t.Fatal(err) } c.MeshKey = k + c.SetURLDialer(ln.Dial) return } @@ -267,170 +268,171 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW // updates after a different thread breaks and reconnects the connection, while // the watcher is waiting on recv(). func TestBreakWatcherConnRecv(t *testing.T) { - // TODO(bradfitz): use synctest + memnet instead - - // Set the wait time before a retry after connection failure to be much lower. - // This needs to be early in the test, for defer to run right at the end after - // the DERP client has finished. - tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) - - var wg sync.WaitGroup - // Make the watcher server - serverPrivateKey1 := key.NewNode() - _, s1 := newTestServer(t, serverPrivateKey1) - defer s1.Close() - - // Make the watched server - serverPrivateKey2 := key.NewNode() - serverURL2, s2 := newTestServer(t, serverPrivateKey2) - defer s2.Close() - - // Make the watcher (but it is not connected yet) - watcher := newWatcherClient(t, serverPrivateKey1, serverURL2) - defer watcher.Close() + synctest.Test(t, func(t *testing.T) { + // Set the wait time before a retry after connection failure to be much lower. + // This needs to be early in the test, for defer to run right at the end after + // the DERP client has finished. + tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) + + var wg sync.WaitGroup + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1, ln1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + defer ln1.Close() + + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + defer ln2.Close() + + // Make the watcher (but it is not connected yet) + watcher := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2) + defer watcher.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watcherChan := make(chan int, 1) + defer close(watcherChan) + errChan := make(chan error, 1) + + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } + notifyErr := func(err error) { + select { + case errChan <- err: + case <-ctx.Done(): + } + } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr) + }() - watcherChan := make(chan int, 1) - defer close(watcherChan) - errChan := make(chan error, 1) + synctest.Wait() - // Start the watcher thread (which connects to the watched server) - wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 - go func() { - defer wg.Done() - var peers int - add := func(m derp.PeerPresentMessage) { - t.Logf("add: %v", m.Key.ShortString()) - peers++ - // Signal that the watcher has run - watcherChan <- peers - } - remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } - notifyErr := func(err error) { + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for range 10 { select { - case errChan <- err: - case <-ctx.Done(): + case peers := <-watcherChan: + if peers != 1 { + t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers) + } + case err := <-errChan: + if err.Error() != "derp.Recv: EOF" { + t.Fatalf("expected notifyError connection error to be EOF, got %v", err) + } } - } - - watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr) - }() - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() + synctest.Wait() - // Wait for the watcher to run, then break the connection and check if it - // reconnected and received peer updates. - for range 10 { - select { - case peers := <-watcherChan: - if peers != 1 { - t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers) - } - case err := <-errChan: - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err) - } - case <-timer.C: - t.Fatalf("watcher did not process the peer update") + watcher.BreakConnection(watcher) + // re-establish connection by sending a packet + watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) } - timer.Reset(5 * time.Second) - watcher.BreakConnection(watcher) - // re-establish connection by sending a packet - watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) - } - cancel() // Cancel the context to stop the watcher loop. - wg.Wait() + cancel() // Cancel the context to stop the watcher loop. + wg.Wait() + }) } // Test that a watcher connection successfully reconnects and processes peer // updates after a different thread breaks and reconnects the connection, while // the watcher is not waiting on recv(). func TestBreakWatcherConn(t *testing.T) { - // TODO(bradfitz): use synctest + memnet instead - - // Set the wait time before a retry after connection failure to be much lower. - // This needs to be early in the test, for defer to run right at the end after - // the DERP client has finished. - tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) - - var wg sync.WaitGroup - // Make the watcher server - serverPrivateKey1 := key.NewNode() - _, s1 := newTestServer(t, serverPrivateKey1) - defer s1.Close() - - // Make the watched server - serverPrivateKey2 := key.NewNode() - serverURL2, s2 := newTestServer(t, serverPrivateKey2) - defer s2.Close() - - // Make the watcher (but it is not connected yet) - watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2) - defer watcher1.Close() + synctest.Test(t, func(t *testing.T) { + // Set the wait time before a retry after connection failure to be much lower. + // This needs to be early in the test, for defer to run right at the end after + // the DERP client has finished. + tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond) + + var wg sync.WaitGroup + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1, ln1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + defer ln1.Close() + + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + defer ln2.Close() + + // Make the watcher (but it is not connected yet) + watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2) + defer watcher1.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + watcherChan := make(chan int, 1) + breakerChan := make(chan bool, 1) + errorChan := make(chan error, 1) + + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(m derp.PeerPresentMessage) { + t.Logf("add: %v", m.Key.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + select { + case <-ctx.Done(): + return + // Wait for breaker to run + case <-breakerChan: + } + } + remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } + notifyError := func(err error) { + errorChan <- err + } - ctx, cancel := context.WithCancel(context.Background()) + watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError) + }() - watcherChan := make(chan int, 1) - breakerChan := make(chan bool, 1) - errorChan := make(chan error, 1) + synctest.Wait() - // Start the watcher thread (which connects to the watched server) - wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 - go func() { - defer wg.Done() - var peers int - add := func(m derp.PeerPresentMessage) { - t.Logf("add: %v", m.Key.ShortString()) - peers++ - // Signal that the watcher has run - watcherChan <- peers + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for range 10 { select { - case <-ctx.Done(): - return - // Wait for breaker to run - case <-breakerChan: + case peers := <-watcherChan: + if peers != 1 { + t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers) + } + case err := <-errorChan: + if !errors.Is(err, net.ErrClosed) { + t.Fatalf("expected notifyError connection error to fail with ErrClosed, got %v", err) + } } - } - remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- } - notifyError := func(err error) { - errorChan <- err - } - - watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError) - }() - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() + synctest.Wait() - // Wait for the watcher to run, then break the connection and check if it - // reconnected and received peer updates. - for range 10 { - select { - case peers := <-watcherChan: - if peers != 1 { - t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers) - } - case err := <-errorChan: - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err) - } - case <-timer.C: - t.Fatalf("watcher did not process the peer update") + watcher1.BreakConnection(watcher1) + // re-establish connection by sending a packet + watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) + // signal that the breaker is done + breakerChan <- true } - watcher1.BreakConnection(watcher1) - // re-establish connection by sending a packet - watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) - // signal that the breaker is done - breakerChan <- true - - timer.Reset(5 * time.Second) - } - watcher1.Close() - cancel() - wg.Wait() + watcher1.Close() + cancel() + wg.Wait() + }) } func noopAdd(derp.PeerPresentMessage) {} @@ -444,12 +446,13 @@ func TestRunWatchConnectionLoopServeConnect(t *testing.T) { defer cancel() priv := key.NewNode() - serverURL, s := newTestServer(t, priv) + serverURL, s, ln := newTestServer(t, priv) defer s.Close() + defer ln.Close() pub := priv.Public() - watcher := newWatcherClient(t, priv, serverURL) + watcher := newWatcherClient(t, priv, serverURL, ln) defer watcher.Close() // Test connecting to ourselves, and that we get hung up on. @@ -518,13 +521,14 @@ func TestNotifyError(t *testing.T) { defer cancel() priv := key.NewNode() - serverURL, s := newTestServer(t, priv) + serverURL, s, ln := newTestServer(t, priv) defer s.Close() + defer ln.Close() pub := priv.Public() // Test early error notification when c.connect fails. - watcher := newWatcherClient(t, priv, serverURL) + watcher := newWatcherClient(t, priv, serverURL, ln) watcher.SetURLDialer(netx.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) { t.Helper() return nil, fmt.Errorf("test error: %s", addr)