control/controlclient: serialize Observer calls

Don't just start goroutines and hope for them to be ordered.

Fixes potential regression from earlier 7074a40c0.

Updates #cleanup

Change-Id: I501a6f3e4e8e6306b958bccdc1e47869991c31f7
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/9206/head
Brad Fitzpatrick 1 year ago committed by Brad Fitzpatrick
parent 8b3ea13af0
commit 9cbec4519b

@ -136,13 +136,14 @@ type updateGen int64
// Auto connects to a tailcontrol server for a node.
// It's a concrete implementation of the Client interface.
type Auto struct {
direct *Direct // our interface to the server APIs
clock tstime.Clock
logf logger.Logf
closed bool
updateCh chan struct{} // readable when we should inform the server of a change
newMapCh chan struct{} // readable when we must restart a map request
observer Observer // called to update Client status; always non-nil
direct *Direct // our interface to the server APIs
clock tstime.Clock
logf logger.Logf
closed bool
updateCh chan struct{} // readable when we should inform the server of a change
newMapCh chan struct{} // readable when we must restart a map request
observer Observer // called to update Client status; always non-nil
observerQueue execQueue
unregisterHealthWatch func()
@ -667,7 +668,9 @@ func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkM
// Launch a new goroutine to avoid blocking the caller while the observer
// does its thing, which may result in a call back into the client.
go c.observer.SetControlClientStatus(new)
c.observerQueue.Add(func() {
c.observer.SetControlClientStatus(new)
})
}
func (c *Auto) Login(t *tailcfg.Oauth2Token, flags LoginFlags) {
@ -735,6 +738,7 @@ func (c *Auto) Shutdown() {
c.closed = true
c.cancelAuthCtxLocked()
c.cancelMapCtxLocked()
go c.observerQueue.shutdown()
}
c.mu.Unlock()
@ -748,6 +752,9 @@ func (c *Auto) Shutdown() {
if direct != nil {
direct.Close()
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.observerQueue.wait(ctx)
c.logf("Client.Shutdown done.")
}
}
@ -788,3 +795,74 @@ func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) {
func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) {
return c.direct.GetSingleUseNoiseRoundTripper(ctx)
}
type execQueue struct {
mu sync.Mutex
closed bool
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
}
func (q *execQueue) Add(f func()) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
if q.inFlight {
q.queue = append(q.queue, f)
} else {
q.inFlight = true
go q.run(f)
}
}
func (q *execQueue) run(f func()) {
f()
q.mu.Lock()
for len(q.queue) > 0 && !q.closed {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
q.doneWaiter = nil
}
q.mu.Unlock()
}
func (q *execQueue) shutdown() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
}
// wait waits for the queue to be empty.
func (q *execQueue) wait(ctx context.Context) error {
q.mu.Lock()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {
waitCh = make(chan struct{})
q.doneWaiter = waitCh
}
q.mu.Unlock()
if waitCh == nil {
return nil
}
select {
case <-waitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Loading…
Cancel
Save