From 9cbec4519bb517e6d35958ddcbe0a09a61146628 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 1 Sep 2023 09:56:31 -0700 Subject: [PATCH] control/controlclient: serialize Observer calls Don't just start goroutines and hope for them to be ordered. Fixes potential regression from earlier 7074a40c0. Updates #cleanup Change-Id: I501a6f3e4e8e6306b958bccdc1e47869991c31f7 Signed-off-by: Brad Fitzpatrick --- control/controlclient/auto.go | 94 ++++++++++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 8 deletions(-) diff --git a/control/controlclient/auto.go b/control/controlclient/auto.go index 8fbc3c904..b0f705c31 100644 --- a/control/controlclient/auto.go +++ b/control/controlclient/auto.go @@ -136,13 +136,14 @@ type updateGen int64 // Auto connects to a tailcontrol server for a node. // It's a concrete implementation of the Client interface. type Auto struct { - direct *Direct // our interface to the server APIs - clock tstime.Clock - logf logger.Logf - closed bool - updateCh chan struct{} // readable when we should inform the server of a change - newMapCh chan struct{} // readable when we must restart a map request - observer Observer // called to update Client status; always non-nil + direct *Direct // our interface to the server APIs + clock tstime.Clock + logf logger.Logf + closed bool + updateCh chan struct{} // readable when we should inform the server of a change + newMapCh chan struct{} // readable when we must restart a map request + observer Observer // called to update Client status; always non-nil + observerQueue execQueue unregisterHealthWatch func() @@ -667,7 +668,9 @@ func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkM // Launch a new goroutine to avoid blocking the caller while the observer // does its thing, which may result in a call back into the client. - go c.observer.SetControlClientStatus(new) + c.observerQueue.Add(func() { + c.observer.SetControlClientStatus(new) + }) } func (c *Auto) Login(t *tailcfg.Oauth2Token, flags LoginFlags) { @@ -735,6 +738,7 @@ func (c *Auto) Shutdown() { c.closed = true c.cancelAuthCtxLocked() c.cancelMapCtxLocked() + go c.observerQueue.shutdown() } c.mu.Unlock() @@ -748,6 +752,9 @@ func (c *Auto) Shutdown() { if direct != nil { direct.Close() } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + c.observerQueue.wait(ctx) c.logf("Client.Shutdown done.") } } @@ -788,3 +795,74 @@ func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) { func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) { return c.direct.GetSingleUseNoiseRoundTripper(ctx) } + +type execQueue struct { + mu sync.Mutex + closed bool + inFlight bool // whether a goroutine is running q.run + doneWaiter chan struct{} // non-nil if waiter is waiting, then closed + queue []func() +} + +func (q *execQueue) Add(f func()) { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return + } + if q.inFlight { + q.queue = append(q.queue, f) + } else { + q.inFlight = true + go q.run(f) + } +} + +func (q *execQueue) run(f func()) { + f() + + q.mu.Lock() + for len(q.queue) > 0 && !q.closed { + f := q.queue[0] + q.queue[0] = nil + q.queue = q.queue[1:] + q.mu.Unlock() + f() + q.mu.Lock() + } + q.inFlight = false + q.queue = nil + if q.doneWaiter != nil { + close(q.doneWaiter) + q.doneWaiter = nil + } + q.mu.Unlock() +} + +func (q *execQueue) shutdown() { + q.mu.Lock() + defer q.mu.Unlock() + q.closed = true +} + +// wait waits for the queue to be empty. +func (q *execQueue) wait(ctx context.Context) error { + q.mu.Lock() + waitCh := q.doneWaiter + if q.inFlight && waitCh == nil { + waitCh = make(chan struct{}) + q.doneWaiter = waitCh + } + q.mu.Unlock() + + if waitCh == nil { + return nil + } + + select { + case <-waitCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +}