From 44c6909c92ba4b6197cf05d5e3efe7a61bb3a14e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 5 Nov 2023 10:41:13 -0800 Subject: [PATCH] control/controlclient: move watchdog out of mapSession In prep for making mapSession's lifetime not be 1:1 with a single HTTP response's lifetime, this moves the inactivity timer watchdog out of mapSession and into the caller that owns the streaming HTTP response. (This is admittedly closer to how it was prior to the mapSession type existing, but that was before we connected some dots which were impossible to even see before the mapSession type broke the code up.) Updates #7175 Change-Id: Ia108dac84a4953db41cbd30e73b1de4a2a676c11 Signed-off-by: Brad Fitzpatrick --- control/controlclient/direct.go | 62 +++++++++++++++++---------------- control/controlclient/map.go | 46 ++++-------------------- 2 files changed, 38 insertions(+), 70 deletions(-) diff --git a/control/controlclient/direct.go b/control/controlclient/direct.go index 31481d50c..c3db722f5 100644 --- a/control/controlclient/direct.go +++ b/control/controlclient/direct.go @@ -828,7 +828,7 @@ const watchdogTimeout = 120 * time.Second // if the context expires or the server returns an error/closes the connection // and as such always returns a non-nil error. // -// If cb is nil, OmitPeers will be set to true. +// If nu is nil, OmitPeers will be set to true. func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu NetmapUpdater) error { if isStreaming && nu == nil { panic("cb must be non-nil if isStreaming is true") @@ -992,7 +992,27 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap } c.expiry = nm.Expiry } - sess.StartWatchdog() + + // Create a watchdog timer that breaks the connection if we don't receive a + // MapResponse from the network at least once every two minutes. The + // watchdog timer is stopped every time we receive a MapResponse (so it + // doesn't run when we're processing a MapResponse message, including any + // long-running requested operations like Debug.Sleep) and is reset whenever + // we go back to blocking on network reads. + watchdogTimer, watchdogTimedOut := c.clock.NewTimer(watchdogTimeout) + defer watchdogTimer.Stop() + + go func() { + select { + case <-ctx.Done(): + vlogf("netmap: ending timeout goroutine") + return + case <-watchdogTimedOut: + c.logf("map response long-poll timed out!") + cancel() + return + } + }() // gotNonKeepAliveMessage is whether we've yet received a MapResponse message without // KeepAlive set. @@ -1006,6 +1026,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap // We can use this same read loop either way. var msg []byte for mapResIdx := 0; mapResIdx == 0 || isStreaming; mapResIdx++ { + watchdogTimer.Reset(watchdogTimeout) vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), mapResIdx) var siz [4]byte if _, err := io.ReadFull(res.Body, siz[:]); err != nil { @@ -1026,6 +1047,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap vlogf("netmap: decode error: %v") return err } + watchdogTimer.Stop() metricMapResponseMessages.Add(1) @@ -1068,14 +1090,6 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap c.logf("netmap: [unexpected] new dial plan; nowhere to store it") } } - - select { - case sess.watchdogReset <- struct{}{}: - vlogf("netmap: sent timer reset") - case <-ctx.Done(): - c.logf("[v1] netmap: not resetting timer; context done: %v", ctx.Err()) - return ctx.Err() - } if resp.KeepAlive { metricMapResponseKeepAlives.Add(1) continue @@ -1102,7 +1116,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap return nil } -func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, watchdogReset chan<- struct{}) error { +func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug) error { if code := debug.Exit; code != nil { c.logf("exiting process with status %v per controlplane", *code) os.Exit(*code) @@ -1112,7 +1126,7 @@ func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, w envknob.SetNoLogsNoSupport() } if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 { - if err := sleepAsRequested(ctx, c.logf, watchdogReset, sleep, c.clock); err != nil { + if err := sleepAsRequested(ctx, c.logf, sleep, c.clock); err != nil { return err } } @@ -1444,7 +1458,7 @@ func answerC2NPing(logf logger.Logf, c2nHandler http.Handler, c *http.Client, pr // that the client sleep. The complication is that while we're sleeping (if for // a long time), we need to periodically reset the watchdog timer before it // expires. -func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan<- struct{}, d time.Duration, clock tstime.Clock) error { +func sleepAsRequested(ctx context.Context, logf logger.Logf, d time.Duration, clock tstime.Clock) error { const maxSleep = 5 * time.Minute if d > maxSleep { logf("sleeping for %v, capped from server-requested %v ...", maxSleep, d) @@ -1453,25 +1467,13 @@ func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan< logf("sleeping for server-requested %v ...", d) } - ticker, tickerChannel := clock.NewTicker(watchdogTimeout / 2) - defer ticker.Stop() timer, timerChannel := clock.NewTimer(d) defer timer.Stop() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-timerChannel: - return nil - case <-tickerChannel: - select { - case watchdogReset <- struct{}{}: - case <-timerChannel: - return nil - case <-ctx.Done(): - return ctx.Err() - } - } + select { + case <-ctx.Done(): + return ctx.Err() + case <-timerChannel: + return nil } } diff --git a/control/controlclient/map.go b/control/controlclient/map.go index 6c0e006a5..8f0d4cac2 100644 --- a/control/controlclient/map.go +++ b/control/controlclient/map.go @@ -49,7 +49,6 @@ type mapSession struct { machinePubKey key.MachinePublic altClock tstime.Clock // if nil, regular time is used cancel context.CancelFunc // always non-nil, shuts down caller's base long poll context - watchdogReset chan struct{} // send to request that the long poll activity watchdog timeout be reset // sessionAliveCtx is a Background-based context that's alive for the // duration of the mapSession that we own the lifetime of. It's closed by @@ -57,12 +56,12 @@ type mapSession struct { sessionAliveCtx context.Context sessionAliveCtxClose context.CancelFunc // closes sessionAliveCtx - // Optional hooks, set once before use. + // Optional hooks, guaranteed non-nil (set to no-op funcs) by the + // newMapSession constructor. They must be overridden if desired + // before the mapSession is used. // onDebug specifies what to do with a *tailcfg.Debug message. - // If the watchdogReset chan is nil, it's not used. Otherwise it can be sent to - // to request that the long poll activity watchdog timeout be reset. - onDebug func(_ context.Context, _ *tailcfg.Debug, watchdogReset chan<- struct{}) error + onDebug func(context.Context, *tailcfg.Debug) error // onSelfNodeChanged is called before the NetmapUpdater if the self node was // changed. @@ -102,13 +101,12 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob publicNodeKey: privateNodeKey.Public(), lastDNSConfig: new(tailcfg.DNSConfig), lastUserProfile: map[tailcfg.UserID]tailcfg.UserProfile{}, - watchdogReset: make(chan struct{}), // Non-nil no-op defaults, to be optionally overridden by the caller. logf: logger.Discard, vlogf: logger.Discard, cancel: func() {}, - onDebug: func(context.Context, *tailcfg.Debug, chan<- struct{}) error { return nil }, + onDebug: func(context.Context, *tailcfg.Debug) error { return nil }, onSelfNodeChanged: func(*netmap.NetworkMap) {}, } ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background()) @@ -133,38 +131,6 @@ func (ms *mapSession) clock() tstime.Clock { return cmpx.Or[tstime.Clock](ms.altClock, tstime.StdClock{}) } -// StartWatchdog starts the session's watchdog timer. -// If there's no activity in too long, it tears down the connection. -// Call Close to release these resources. -func (ms *mapSession) StartWatchdog() { - timer, timedOutChan := ms.clock().NewTimer(watchdogTimeout) - go func() { - defer timer.Stop() - for { - select { - case <-ms.sessionAliveCtx.Done(): - ms.vlogf("netmap: ending timeout goroutine") - return - case <-timedOutChan: - ms.logf("map response long-poll timed out!") - ms.cancel() - return - case <-ms.watchdogReset: - if !timer.Stop() { - select { - case <-timedOutChan: - case <-ms.sessionAliveCtx.Done(): - ms.vlogf("netmap: ending timeout goroutine") - return - } - } - ms.vlogf("netmap: reset timeout timer") - timer.Reset(watchdogTimeout) - } - } - }() -} - func (ms *mapSession) Close() { ms.sessionAliveCtxClose() } @@ -179,7 +145,7 @@ func (ms *mapSession) Close() { // is [re]factoring progress enough. func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { if debug := resp.Debug; debug != nil { - if err := ms.onDebug(ctx, debug, ms.watchdogReset); err != nil { + if err := ms.onDebug(ctx, debug); err != nil { return err } }