From 4da7a50c4bcb0c8f771b6c9e0df59fc9560ed8e0 Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Sun, 19 Feb 2023 14:02:25 -0800 Subject: [PATCH] syncs: add Waiter as a way to wakeup worker goroutines Signed-off-by: Maisem Ali --- syncs/syncs.go | 36 ++++++++++++++++++++++++++++++++++++ syncs/syncs_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/syncs/syncs.go b/syncs/syncs.go index 4d2891e3a..ca43f4b36 100644 --- a/syncs/syncs.go +++ b/syncs/syncs.go @@ -12,6 +12,42 @@ import ( "tailscale.com/util/mak" ) +// Waiter is used to wake up a goroutine waiting for something to happen. +type Waiter struct { + ch chan struct{} // buffered chan of size 1 +} + +// NewWaiter returns a new Waiter. +func NewWaiter() *Waiter { + return &Waiter{ch: make(chan struct{}, 1)} +} + +// Wake wakes up a goroutine waiting on Wait. It returns true if it managed to +// mark the waiter as woken up. If it returns false, a Wake was already pending. +// If there are multiple goroutines waiting, only one will wake up. +// If there are no goroutines waiting, the next call to Wait will return +// immediately. Multiple calls to Wake without a call to Wait in between will +// only wake up one goroutine. +func (t *Waiter) Wake() (ok bool) { + select { + case t.ch <- struct{}{}: + return true + default: + return false + } +} + +// Wait blocks until Wake is called. If a wake is already pending, it returns +// immediately. If the context is canceled, it returns ctx.Err(). +func (t *Waiter) Wait(ctx context.Context) error { + select { + case <-t.ch: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + // ClosedChan returns a channel that's already closed. func ClosedChan() <-chan struct{} { return closedChan } diff --git a/syncs/syncs_test.go b/syncs/syncs_test.go index 3532d84f6..76985dd56 100644 --- a/syncs/syncs_test.go +++ b/syncs/syncs_test.go @@ -7,10 +7,34 @@ import ( "context" "sync" "testing" + "time" "github.com/google/go-cmp/cmp" ) +func TestWaiter(t *testing.T) { + w := NewWaiter() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if !w.Wake() { + t.Fatal("Wake() = false; want true") + } + if w.Wake() { // second call should return false + t.Fatal("Wake() = true; want false") + } + if err := w.Wait(ctx); err != nil { + t.Fatal(err) + } + go func() { + if !w.Wake() { + t.Errorf("Wake() = false; want true") + } + }() + if err := w.Wait(ctx); err != nil { + t.Fatal(err) + } +} + func TestWaitGroupChan(t *testing.T) { wg := NewWaitGroupChan()