ipn/{ipnlocal,localapi}: ensure watcher is installed before /watch-ipn-bus/ responds with 200

This change delays the first flush in the /watch-ipn-bus/ handler
until after the watcher has been successfully installed on the IPN
bus. It does this by adding a new onWatchAdded callback to
LocalBackend.WatchNotifications().

Without this, the endpoint returns a 200 almost immediatly, and
only then installs a watcher for IPN events.  This means there's a
small window where events could be missed by clients after calling
WatchIPNBus().

Fixes tailscale/corp#8594.

Signed-off-by: salman <salman@tailscale.com>
pull/6992/head
salman 1 year ago committed by salman
parent 3eb986fe05
commit eec734a578

@ -1802,13 +1802,17 @@ func (b *LocalBackend) readPoller() {
//
// WatchNotifications blocks until ctx is done.
//
// The provided fn will only be called with non-nil pointers. The caller must
// not modify roNotify. If fn returns false, the watch also stops.
// The provided onWatchAdded, if non-nil, will be called once the watcher
// is installed.
//
// The provided fn will be called for each notification. It will only be
// called with non-nil pointers. The caller must not modify roNotify. If
// fn returns false, the watch also stops.
//
// Failure to consume many notifications in a row will result in dropped
// notifications. There is currently (2022-11-22) no mechanism provided to
// detect when a message has been dropped.
func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, fn func(roNotify *ipn.Notify) (keepGoing bool)) {
func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, onWatchAdded func(), fn func(roNotify *ipn.Notify) (keepGoing bool)) {
ch := make(chan *ipn.Notify, 128)
origFn := fn
@ -1858,6 +1862,10 @@ func (b *LocalBackend) WatchNotifications(ctx context.Context, mask ipn.NotifyWa
b.mu.Unlock()
}()
if onWatchAdded != nil {
onWatchAdded()
}
if ini != nil {
if !fn(ini) {
return

@ -5,6 +5,7 @@
package ipnlocal
import (
"context"
"fmt"
"net"
"net/http"
@ -820,3 +821,38 @@ type legacyBackend interface {
// Verify that LocalBackend still implements the legacyBackend interface
// for now, at least until the macOS and iOS clients move off of it.
var _ legacyBackend = (*LocalBackend)(nil)
func TestWatchNotificationsCallbacks(t *testing.T) {
b := new(LocalBackend)
n := new(ipn.Notify)
b.WatchNotifications(context.Background(), 0, func() {
b.mu.Lock()
defer b.mu.Unlock()
// Ensure a watcher has been installed.
if len(b.notifyWatchers) != 1 {
t.Fatalf("unexpected number of watchers in new LocalBackend, want: 1 got: %v", len(b.notifyWatchers))
}
// Send a notification. Range over notifyWatchers to get the channel
// because WatchNotifications doesn't expose the handle for it.
for _, c := range b.notifyWatchers {
select {
case c <- n:
default:
t.Fatalf("could not send notification")
}
}
}, func(roNotify *ipn.Notify) bool {
if roNotify != n {
t.Fatalf("unexpected notification received. want: %v got: %v", n, roNotify)
}
return false
})
// Ensure watchers have been cleaned up.
b.mu.Lock()
defer b.mu.Unlock()
if len(b.notifyWatchers) != 0 {
t.Fatalf("unexpected number of watchers in new LocalBackend, want: 0 got: %v", len(b.notifyWatchers))
}
}

@ -721,7 +721,6 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("Content-Type", "application/json")
f.Flush()
var mask ipn.NotifyWatchOpt
if s := r.FormValue("mask"); s != "" {
@ -733,7 +732,7 @@ func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) {
mask = ipn.NotifyWatchOpt(v)
}
ctx := r.Context()
h.b.WatchNotifications(ctx, mask, func(roNotify *ipn.Notify) (keepGoing bool) {
h.b.WatchNotifications(ctx, mask, f.Flush, func(roNotify *ipn.Notify) (keepGoing bool) {
js, err := json.Marshal(roNotify)
if err != nil {
h.logf("json.Marshal: %v", err)

Loading…
Cancel
Save