control/controlclient: move watchdog out of mapSession

In prep for making mapSession's lifetime not be 1:1 with a single HTTP
response's lifetime, this moves the inactivity timer watchdog out of
mapSession and into the caller that owns the streaming HTTP response.

(This is admittedly closer to how it was prior to the mapSession type
existing, but that was before we connected some dots which were
impossible to even see before the mapSession type broke the code up.)

Updates #7175

Change-Id: Ia108dac84a4953db41cbd30e73b1de4a2a676c11
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/10135/head
Brad Fitzpatrick 1 year ago committed by Brad Fitzpatrick
parent c87d58063a
commit 44c6909c92

@ -828,7 +828,7 @@ const watchdogTimeout = 120 * time.Second
// if the context expires or the server returns an error/closes the connection // if the context expires or the server returns an error/closes the connection
// and as such always returns a non-nil error. // and as such always returns a non-nil error.
// //
// If cb is nil, OmitPeers will be set to true. // If nu is nil, OmitPeers will be set to true.
func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu NetmapUpdater) error { func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu NetmapUpdater) error {
if isStreaming && nu == nil { if isStreaming && nu == nil {
panic("cb must be non-nil if isStreaming is true") panic("cb must be non-nil if isStreaming is true")
@ -992,7 +992,27 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
} }
c.expiry = nm.Expiry c.expiry = nm.Expiry
} }
sess.StartWatchdog()
// Create a watchdog timer that breaks the connection if we don't receive a
// MapResponse from the network at least once every two minutes. The
// watchdog timer is stopped every time we receive a MapResponse (so it
// doesn't run when we're processing a MapResponse message, including any
// long-running requested operations like Debug.Sleep) and is reset whenever
// we go back to blocking on network reads.
watchdogTimer, watchdogTimedOut := c.clock.NewTimer(watchdogTimeout)
defer watchdogTimer.Stop()
go func() {
select {
case <-ctx.Done():
vlogf("netmap: ending timeout goroutine")
return
case <-watchdogTimedOut:
c.logf("map response long-poll timed out!")
cancel()
return
}
}()
// gotNonKeepAliveMessage is whether we've yet received a MapResponse message without // gotNonKeepAliveMessage is whether we've yet received a MapResponse message without
// KeepAlive set. // KeepAlive set.
@ -1006,6 +1026,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
// We can use this same read loop either way. // We can use this same read loop either way.
var msg []byte var msg []byte
for mapResIdx := 0; mapResIdx == 0 || isStreaming; mapResIdx++ { for mapResIdx := 0; mapResIdx == 0 || isStreaming; mapResIdx++ {
watchdogTimer.Reset(watchdogTimeout)
vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), mapResIdx) vlogf("netmap: starting size read after %v (poll %v)", time.Since(t0).Round(time.Millisecond), mapResIdx)
var siz [4]byte var siz [4]byte
if _, err := io.ReadFull(res.Body, siz[:]); err != nil { if _, err := io.ReadFull(res.Body, siz[:]); err != nil {
@ -1026,6 +1047,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
vlogf("netmap: decode error: %v") vlogf("netmap: decode error: %v")
return err return err
} }
watchdogTimer.Stop()
metricMapResponseMessages.Add(1) metricMapResponseMessages.Add(1)
@ -1068,14 +1090,6 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
c.logf("netmap: [unexpected] new dial plan; nowhere to store it") c.logf("netmap: [unexpected] new dial plan; nowhere to store it")
} }
} }
select {
case sess.watchdogReset <- struct{}{}:
vlogf("netmap: sent timer reset")
case <-ctx.Done():
c.logf("[v1] netmap: not resetting timer; context done: %v", ctx.Err())
return ctx.Err()
}
if resp.KeepAlive { if resp.KeepAlive {
metricMapResponseKeepAlives.Add(1) metricMapResponseKeepAlives.Add(1)
continue continue
@ -1102,7 +1116,7 @@ func (c *Direct) sendMapRequest(ctx context.Context, isStreaming bool, nu Netmap
return nil return nil
} }
func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, watchdogReset chan<- struct{}) error { func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug) error {
if code := debug.Exit; code != nil { if code := debug.Exit; code != nil {
c.logf("exiting process with status %v per controlplane", *code) c.logf("exiting process with status %v per controlplane", *code)
os.Exit(*code) os.Exit(*code)
@ -1112,7 +1126,7 @@ func (c *Direct) handleDebugMessage(ctx context.Context, debug *tailcfg.Debug, w
envknob.SetNoLogsNoSupport() envknob.SetNoLogsNoSupport()
} }
if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 { if sleep := time.Duration(debug.SleepSeconds * float64(time.Second)); sleep > 0 {
if err := sleepAsRequested(ctx, c.logf, watchdogReset, sleep, c.clock); err != nil { if err := sleepAsRequested(ctx, c.logf, sleep, c.clock); err != nil {
return err return err
} }
} }
@ -1444,7 +1458,7 @@ func answerC2NPing(logf logger.Logf, c2nHandler http.Handler, c *http.Client, pr
// that the client sleep. The complication is that while we're sleeping (if for // that the client sleep. The complication is that while we're sleeping (if for
// a long time), we need to periodically reset the watchdog timer before it // a long time), we need to periodically reset the watchdog timer before it
// expires. // expires.
func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan<- struct{}, d time.Duration, clock tstime.Clock) error { func sleepAsRequested(ctx context.Context, logf logger.Logf, d time.Duration, clock tstime.Clock) error {
const maxSleep = 5 * time.Minute const maxSleep = 5 * time.Minute
if d > maxSleep { if d > maxSleep {
logf("sleeping for %v, capped from server-requested %v ...", maxSleep, d) logf("sleeping for %v, capped from server-requested %v ...", maxSleep, d)
@ -1453,25 +1467,13 @@ func sleepAsRequested(ctx context.Context, logf logger.Logf, watchdogReset chan<
logf("sleeping for server-requested %v ...", d) logf("sleeping for server-requested %v ...", d)
} }
ticker, tickerChannel := clock.NewTicker(watchdogTimeout / 2)
defer ticker.Stop()
timer, timerChannel := clock.NewTimer(d) timer, timerChannel := clock.NewTimer(d)
defer timer.Stop() defer timer.Stop()
for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-timerChannel: case <-timerChannel:
return nil return nil
case <-tickerChannel:
select {
case watchdogReset <- struct{}{}:
case <-timerChannel:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
} }
} }

@ -49,7 +49,6 @@ type mapSession struct {
machinePubKey key.MachinePublic machinePubKey key.MachinePublic
altClock tstime.Clock // if nil, regular time is used altClock tstime.Clock // if nil, regular time is used
cancel context.CancelFunc // always non-nil, shuts down caller's base long poll context cancel context.CancelFunc // always non-nil, shuts down caller's base long poll context
watchdogReset chan struct{} // send to request that the long poll activity watchdog timeout be reset
// sessionAliveCtx is a Background-based context that's alive for the // sessionAliveCtx is a Background-based context that's alive for the
// duration of the mapSession that we own the lifetime of. It's closed by // duration of the mapSession that we own the lifetime of. It's closed by
@ -57,12 +56,12 @@ type mapSession struct {
sessionAliveCtx context.Context sessionAliveCtx context.Context
sessionAliveCtxClose context.CancelFunc // closes sessionAliveCtx sessionAliveCtxClose context.CancelFunc // closes sessionAliveCtx
// Optional hooks, set once before use. // Optional hooks, guaranteed non-nil (set to no-op funcs) by the
// newMapSession constructor. They must be overridden if desired
// before the mapSession is used.
// onDebug specifies what to do with a *tailcfg.Debug message. // onDebug specifies what to do with a *tailcfg.Debug message.
// If the watchdogReset chan is nil, it's not used. Otherwise it can be sent to onDebug func(context.Context, *tailcfg.Debug) error
// to request that the long poll activity watchdog timeout be reset.
onDebug func(_ context.Context, _ *tailcfg.Debug, watchdogReset chan<- struct{}) error
// onSelfNodeChanged is called before the NetmapUpdater if the self node was // onSelfNodeChanged is called before the NetmapUpdater if the self node was
// changed. // changed.
@ -102,13 +101,12 @@ func newMapSession(privateNodeKey key.NodePrivate, nu NetmapUpdater, controlKnob
publicNodeKey: privateNodeKey.Public(), publicNodeKey: privateNodeKey.Public(),
lastDNSConfig: new(tailcfg.DNSConfig), lastDNSConfig: new(tailcfg.DNSConfig),
lastUserProfile: map[tailcfg.UserID]tailcfg.UserProfile{}, lastUserProfile: map[tailcfg.UserID]tailcfg.UserProfile{},
watchdogReset: make(chan struct{}),
// Non-nil no-op defaults, to be optionally overridden by the caller. // Non-nil no-op defaults, to be optionally overridden by the caller.
logf: logger.Discard, logf: logger.Discard,
vlogf: logger.Discard, vlogf: logger.Discard,
cancel: func() {}, cancel: func() {},
onDebug: func(context.Context, *tailcfg.Debug, chan<- struct{}) error { return nil }, onDebug: func(context.Context, *tailcfg.Debug) error { return nil },
onSelfNodeChanged: func(*netmap.NetworkMap) {}, onSelfNodeChanged: func(*netmap.NetworkMap) {},
} }
ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background()) ms.sessionAliveCtx, ms.sessionAliveCtxClose = context.WithCancel(context.Background())
@ -133,38 +131,6 @@ func (ms *mapSession) clock() tstime.Clock {
return cmpx.Or[tstime.Clock](ms.altClock, tstime.StdClock{}) return cmpx.Or[tstime.Clock](ms.altClock, tstime.StdClock{})
} }
// StartWatchdog starts the session's watchdog timer.
// If there's no activity in too long, it tears down the connection.
// Call Close to release these resources.
func (ms *mapSession) StartWatchdog() {
timer, timedOutChan := ms.clock().NewTimer(watchdogTimeout)
go func() {
defer timer.Stop()
for {
select {
case <-ms.sessionAliveCtx.Done():
ms.vlogf("netmap: ending timeout goroutine")
return
case <-timedOutChan:
ms.logf("map response long-poll timed out!")
ms.cancel()
return
case <-ms.watchdogReset:
if !timer.Stop() {
select {
case <-timedOutChan:
case <-ms.sessionAliveCtx.Done():
ms.vlogf("netmap: ending timeout goroutine")
return
}
}
ms.vlogf("netmap: reset timeout timer")
timer.Reset(watchdogTimeout)
}
}
}()
}
func (ms *mapSession) Close() { func (ms *mapSession) Close() {
ms.sessionAliveCtxClose() ms.sessionAliveCtxClose()
} }
@ -179,7 +145,7 @@ func (ms *mapSession) Close() {
// is [re]factoring progress enough. // is [re]factoring progress enough.
func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error { func (ms *mapSession) HandleNonKeepAliveMapResponse(ctx context.Context, resp *tailcfg.MapResponse) error {
if debug := resp.Debug; debug != nil { if debug := resp.Debug; debug != nil {
if err := ms.onDebug(ctx, debug, ms.watchdogReset); err != nil { if err := ms.onDebug(ctx, debug); err != nil {
return err return err
} }
} }

Loading…
Cancel
Save