From 3114a1c88df3f725d245ce137ab2c917049c8b63 Mon Sep 17 00:00:00 2001 From: Anton Tolchanov Date: Wed, 25 Oct 2023 21:41:24 +0100 Subject: [PATCH] derp/derphttp: add watch reconnection tests from #9719 Co-authored-by: Val Signed-off-by: Anton Tolchanov --- derp/derphttp/derphttp_test.go | 191 +++++++++++++++++++++++++++++++++ derp/derphttp/mesh_client.go | 3 +- 2 files changed, 193 insertions(+), 1 deletion(-) diff --git a/derp/derphttp/derphttp_test.go b/derp/derphttp/derphttp_test.go index b121c097c..0ec14e0ee 100644 --- a/derp/derphttp/derphttp_test.go +++ b/derp/derphttp/derphttp_test.go @@ -9,6 +9,7 @@ import ( "crypto/tls" "net" "net/http" + "net/netip" "sync" "testing" "time" @@ -206,3 +207,193 @@ func TestPing(t *testing.T) { t.Fatalf("Ping: %v", err) } } + +func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derp.Server) { + s = derp.NewServer(k, t.Logf) + httpsrv := &http.Server{ + TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), + Handler: Handler(s), + } + + ln, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatal(err) + } + serverURL = "http://" + ln.Addr().String() + s.SetMeshKey("1234") + + go func() { + if err := httpsrv.Serve(ln); err != nil { + if err == http.ErrServerClosed { + t.Logf("server closed") + return + } + panic(err) + } + }() + return +} + +func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *Client) { + c, err := NewClient(watcherPrivateKey, serverToWatchURL, t.Logf) + if err != nil { + t.Fatal(err) + } + c.MeshKey = "1234" + return +} + +// breakConnection breaks the connection, which should trigger a reconnect. +func (c *Client) breakConnection(brokenClient *derp.Client) { + c.mu.Lock() + defer c.mu.Unlock() + if c.client != brokenClient { + return + } + if c.netConn != nil { + c.netConn.Close() + c.netConn = nil + } + c.client = nil +} + +// Test that a watcher connection successfully reconnects and processes peer +// updates after a different thread breaks and reconnects the connection, while +// the watcher is waiting on recv(). +func TestBreakWatcherConnRecv(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + + // Make the watcher (but it is not connected yet) + watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2) + defer watcher1.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watcherChan := make(chan int, 1) + + // Set the wait time after a connection fails to much lower + origRetryInterval := retryInterval + retryInterval = 50 * time.Millisecond + defer func() { retryInterval = origRetryInterval }() + + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(k key.NodePublic, _ netip.AddrPort) { + t.Logf("add: %v", k.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + } + remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } + + watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) + }() + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for i := 0; i < 10; i++ { + select { + case peers := <-watcherChan: + if peers != 1 { + t.Fatal("wrong number of peers added during watcher connection") + } + case <-timer.C: + t.Fatalf("watcher did not process the peer update") + } + watcher1.breakConnection(watcher1.client) + // re-establish connection by sending a packet + watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) + + timer.Reset(5 * time.Second) + } +} + +// Test that a watcher connection successfully reconnects and processes peer +// updates after a different thread breaks and reconnects the connection, while +// the watcher is not waiting on recv(). +func TestBreakWatcherConn(t *testing.T) { + var wg sync.WaitGroup + defer wg.Wait() + // Make the watcher server + serverPrivateKey1 := key.NewNode() + _, s1 := newTestServer(t, serverPrivateKey1) + defer s1.Close() + + // Make the watched server + serverPrivateKey2 := key.NewNode() + serverURL2, s2 := newTestServer(t, serverPrivateKey2) + defer s2.Close() + + // Make the watcher (but it is not connected yet) + watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2) + defer watcher1.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + watcherChan := make(chan int, 1) + breakerChan := make(chan bool, 1) + + // Set the wait time after a connection fails to much lower + origRetryInterval := retryInterval + retryInterval = 50 * time.Millisecond + defer func() { retryInterval = origRetryInterval }() + + // Start the watcher thread (which connects to the watched server) + wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343 + go func() { + defer wg.Done() + var peers int + add := func(k key.NodePublic, _ netip.AddrPort) { + t.Logf("add: %v", k.ShortString()) + peers++ + // Signal that the watcher has run + watcherChan <- peers + // Wait for breaker to run + <-breakerChan + } + remove := func(k key.NodePublic) { t.Logf("remove: %v", k.ShortString()); peers-- } + + watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove) + }() + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + // Wait for the watcher to run, then break the connection and check if it + // reconnected and received peer updates. + for i := 0; i < 10; i++ { + select { + case peers := <-watcherChan: + if peers != 1 { + t.Fatal("wrong number of peers added during watcher connection") + } + case <-timer.C: + t.Fatalf("watcher did not process the peer update") + } + watcher1.breakConnection(watcher1.client) + // re-establish connection by sending a packet + watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus")) + // signal that the breaker is done + breakerChan <- true + + timer.Reset(5 * time.Second) + } +} diff --git a/derp/derphttp/mesh_client.go b/derp/derphttp/mesh_client.go index 2793fd068..9e9e518e1 100644 --- a/derp/derphttp/mesh_client.go +++ b/derp/derphttp/mesh_client.go @@ -14,6 +14,8 @@ import ( "tailscale.com/types/logger" ) +var retryInterval = 5 * time.Second + // RunWatchConnectionLoop loops until ctx is done, sending // WatchConnectionChanges and subscribing to connection changes. // @@ -42,7 +44,6 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key infoLogf = logger.Discard } logf := c.logf - const retryInterval = 5 * time.Second const statusInterval = 10 * time.Second var ( mu sync.Mutex