control/controlclient,util/execqueue: extract execqueue into a package

This is a useful primitive for asynchronous execution of ordered work I
want to use in another change.

Updates tailscale/corp#16833
Signed-off-by: James Tucker <james@tailscale.com>
pull/10882/head
James Tucker 10 months ago committed by James Tucker
parent 32f01acc79
commit 38a1cf748a

@ -348,6 +348,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
💣 tailscale.com/util/deephash from tailscale.com/ipn/ipnlocal+
L 💣 tailscale.com/util/dirwalk from tailscale.com/metrics+
tailscale.com/util/dnsname from tailscale.com/hostinfo+
tailscale.com/util/execqueue from tailscale.com/control/controlclient
tailscale.com/util/goroutines from tailscale.com/ipn/ipnlocal
tailscale.com/util/groupmember from tailscale.com/ipn/ipnauth+
💣 tailscale.com/util/hashx from tailscale.com/util/deephash

@ -22,6 +22,7 @@ import (
"tailscale.com/types/netmap"
"tailscale.com/types/persist"
"tailscale.com/types/structs"
"tailscale.com/util/execqueue"
)
type LoginGoal struct {
@ -118,7 +119,7 @@ type Auto struct {
closed bool
updateCh chan struct{} // readable when we should inform the server of a change
observer Observer // called to update Client status; always non-nil
observerQueue execQueue
observerQueue execqueue.ExecQueue
unregisterHealthWatch func()
@ -675,7 +676,7 @@ func (c *Auto) Shutdown() {
direct := c.direct
if !closed {
c.closed = true
c.observerQueue.shutdown()
c.observerQueue.Shutdown()
c.cancelAuthCtxLocked()
c.cancelMapCtxLocked()
for _, w := range c.unpauseWaiters {
@ -696,7 +697,7 @@ func (c *Auto) Shutdown() {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.observerQueue.wait(ctx)
c.observerQueue.Wait(ctx)
c.logf("Client.Shutdown done.")
}
}
@ -737,95 +738,3 @@ func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) {
func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) {
return c.direct.GetSingleUseNoiseRoundTripper(ctx)
}
type execQueue struct {
mu sync.Mutex
closed bool
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
}
func (q *execQueue) Add(f func()) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
if q.inFlight {
q.queue = append(q.queue, f)
} else {
q.inFlight = true
go q.run(f)
}
}
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *execQueue) RunSync(ctx context.Context, f func()) error {
for {
if err := q.wait(ctx); err != nil {
return err
}
q.mu.Lock()
if q.inFlight {
q.mu.Unlock()
continue
}
defer q.mu.Unlock()
if q.closed {
return errors.New("closed")
}
f()
return nil
}
}
func (q *execQueue) run(f func()) {
f()
q.mu.Lock()
for len(q.queue) > 0 && !q.closed {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
q.doneWaiter = nil
}
q.mu.Unlock()
}
func (q *execQueue) shutdown() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
}
// wait waits for the queue to be empty.
func (q *execQueue) wait(ctx context.Context) error {
q.mu.Lock()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {
waitCh = make(chan struct{})
q.doneWaiter = waitCh
}
q.mu.Unlock()
if waitCh == nil {
return nil
}
select {
case <-waitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

@ -0,0 +1,104 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
// Package execqueue implements an ordered asynchronous queue for executing functions.
package execqueue
import (
"context"
"errors"
"sync"
)
type ExecQueue struct {
mu sync.Mutex
closed bool
inFlight bool // whether a goroutine is running q.run
doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
queue []func()
}
func (q *ExecQueue) Add(f func()) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
if q.inFlight {
q.queue = append(q.queue, f)
} else {
q.inFlight = true
go q.run(f)
}
}
// RunSync waits for the queue to be drained and then synchronously runs f.
// It returns an error if the queue is closed before f is run or ctx expires.
func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
for {
if err := q.Wait(ctx); err != nil {
return err
}
q.mu.Lock()
if q.inFlight {
q.mu.Unlock()
continue
}
defer q.mu.Unlock()
if q.closed {
return errors.New("closed")
}
f()
return nil
}
}
func (q *ExecQueue) run(f func()) {
f()
q.mu.Lock()
for len(q.queue) > 0 && !q.closed {
f := q.queue[0]
q.queue[0] = nil
q.queue = q.queue[1:]
q.mu.Unlock()
f()
q.mu.Lock()
}
q.inFlight = false
q.queue = nil
if q.doneWaiter != nil {
close(q.doneWaiter)
q.doneWaiter = nil
}
q.mu.Unlock()
}
// Shutdown asynchronously signals the queue to stop.
func (q *ExecQueue) Shutdown() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
}
// Wait waits for the queue to be empty.
func (q *ExecQueue) Wait(ctx context.Context) error {
q.mu.Lock()
waitCh := q.doneWaiter
if q.inFlight && waitCh == nil {
waitCh = make(chan struct{})
q.doneWaiter = waitCh
}
q.mu.Unlock()
if waitCh == nil {
return nil
}
select {
case <-waitCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

@ -0,0 +1,22 @@
// Copyright (c) Tailscale Inc & AUTHORS
// SPDX-License-Identifier: BSD-3-Clause
package execqueue
import (
"context"
"sync/atomic"
"testing"
)
func TestExecQueue(t *testing.T) {
ctx := context.Background()
var n atomic.Int32
q := &ExecQueue{}
defer q.Shutdown()
q.Add(func() { n.Add(1) })
q.Wait(ctx)
if got := n.Load(); got != 1 {
t.Errorf("n=%d; want 1", got)
}
}
Loading…
Cancel
Save