diff --git a/cmd/derper/mesh.go b/cmd/derper/mesh.go index 670d3f5e8..b442d1b36 100644 --- a/cmd/derper/mesh.go +++ b/cmd/derper/mesh.go @@ -5,6 +5,7 @@ package main import ( + "context" "errors" "fmt" "log" @@ -40,6 +41,6 @@ func startMeshWithHost(s *derp.Server, host string) error { c.MeshKey = s.MeshKey() add := func(k key.Public) { s.AddPacketForwarder(k, c) } remove := func(k key.Public) { s.RemovePacketForwarder(k, c) } - go c.RunWatchConnectionLoop(s.PublicKey(), add, remove) + go c.RunWatchConnectionLoop(context.Background(), s.PublicKey(), logf, add, remove) return nil } diff --git a/derp/derphttp/mesh_client.go b/derp/derphttp/mesh_client.go index 28f54653e..e53e9ec5d 100644 --- a/derp/derphttp/mesh_client.go +++ b/derp/derphttp/mesh_client.go @@ -5,20 +5,32 @@ package derphttp import ( + "context" "sync" "time" "tailscale.com/derp" "tailscale.com/types/key" + "tailscale.com/types/logger" ) -// RunWatchConnectionLoop loops forever, sending WatchConnectionChanges and subscribing to +// RunWatchConnectionLoop loops until ctx is done, sending WatchConnectionChanges and subscribing to // connection changes. // // If the server's public key is ignoreServerKey, RunWatchConnectionLoop returns. // // Otherwise, the add and remove funcs are called as clients come & go. -func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove func(key.Public)) { +// +// infoLogf, if non-nil, is the logger to write periodic status +// updates about how many peers are on the server. Error log output is +// set to the c's logger, regardless of infoLogf's value. +// +// To force RunWatchConnectionLoop to return quickly, its ctx needs to +// be closed, and c itself needs to be closed. +func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.Public, infoLogf logger.Logf, add, remove func(key.Public)) { + if infoLogf == nil { + infoLogf = logger.Discard + } logf := c.logf const retryInterval = 5 * time.Second const statusInterval = 10 * time.Second @@ -45,7 +57,7 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove if loggedConnected { return } - logf("connected; %d peers", len(present)) + infoLogf("connected; %d peers", len(present)) loggedConnected = true } @@ -79,12 +91,21 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove } } - for { + sleep := func(d time.Duration) { + t := time.NewTimer(d) + select { + case <-ctx.Done(): + t.Stop() + case <-t.C: + } + } + + for ctx.Err() == nil { err := c.WatchConnectionChanges() if err != nil { clear() logf("WatchConnectionChanges: %v", err) - time.Sleep(retryInterval) + sleep(retryInterval) continue } @@ -97,7 +118,7 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove if err != nil { clear() logf("Recv: %v", err) - time.Sleep(retryInterval) + sleep(retryInterval) break } if connGen != lastConnGen { @@ -114,9 +135,8 @@ func (c *Client) RunWatchConnectionLoop(ignoreServerKey key.Public, add, remove } if now := time.Now(); now.Sub(lastStatus) > statusInterval { lastStatus = now - logf("%d peers", len(present)) + infoLogf("%d peers", len(present)) } } } - }