From 300aba61a6bdd51256f2a3d9453e5e459ed4cfc6 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 22 Nov 2022 11:41:03 -0800 Subject: [PATCH] ipn, cmd/tailscale/cli: add LocalAPI IPN bus watch, Start, convert CLI Updates #6417 Updates tailscale/corp#8051 Change-Id: I1ca360730c45ffaa0261d8422877304277fc5625 Signed-off-by: Brad Fitzpatrick --- client/tailscale/localclient.go | 89 ++++++++++++++++ cmd/tailscale/cli/cli.go | 57 ---------- cmd/tailscale/cli/debug.go | 20 ++-- cmd/tailscale/cli/up.go | 178 ++++++++++++++++---------------- cmd/tailscale/cli/web.go | 158 ++++++++++++---------------- ipn/ipnlocal/local.go | 67 ++++++++++-- ipn/localapi/localapi.go | 53 ++++++++++ 7 files changed, 367 insertions(+), 255 deletions(-) diff --git a/client/tailscale/localclient.go b/client/tailscale/localclient.go index 71f97db01..8de7a7164 100644 --- a/client/tailscale/localclient.go +++ b/client/tailscale/localclient.go @@ -555,6 +555,20 @@ func (lc *LocalClient) EditPrefs(ctx context.Context, mp *ipn.MaskedPrefs) (*ipn return decodeJSON[*ipn.Prefs](body) } +// StartLoginInteractive starts an interactive login. +func (lc *LocalClient) StartLoginInteractive(ctx context.Context) error { + _, err := lc.send(ctx, "POST", "/localapi/v0/login-interactive", http.StatusNoContent, nil) + return err +} + +// Start applies the configuration specified in opts, and starts the +// state machine. +func (lc *LocalClient) Start(ctx context.Context, opts ipn.Options) error { + _, err := lc.send(ctx, "POST", "/localapi/v0/start", http.StatusNoContent, jsonBody(opts)) + return err +} + +// Logout logs out the current node. func (lc *LocalClient) Logout(ctx context.Context) error { _, err := lc.send(ctx, "POST", "/localapi/v0/logout", http.StatusNoContent, nil) return err @@ -966,3 +980,78 @@ func (lc *LocalClient) DeleteProfile(ctx context.Context, profile ipn.ProfileID) _, err := lc.send(ctx, "DELETE", "/localapi/v0/profiles"+url.PathEscape(string(profile)), http.StatusNoContent, nil) return err } + +// WatchIPNMask are filtering options for LocalClient.WatchIPNBus. +// +// The zero value is a valid WatchOpt that means to watch everything. +// +// TODO(bradfitz): flesh out. +type WatchIPNMask uint64 + +// WatchIPNBus subscribes to the IPN notification bus. It returns a watcher +// once the bus is connected successfully. +// +// The context is used for the life of the watch, not just the call to +// WatchIPNBus. +// +// The returned IPNBusWatcher's Close method must be called when done to release +// resources. +func (lc *LocalClient) WatchIPNBus(ctx context.Context, mask WatchIPNMask) (*IPNBusWatcher, error) { + req, err := http.NewRequestWithContext(ctx, "GET", + "http://"+apitype.LocalAPIHost+"/localapi/v0/watch-ipn-bus?mask="+fmt.Sprint(mask), + nil) + if err != nil { + return nil, err + } + res, err := lc.doLocalRequestNiceError(req) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + res.Body.Close() + return nil, errors.New(res.Status) + } + dec := json.NewDecoder(res.Body) + return &IPNBusWatcher{ + ctx: ctx, + httpRes: res, + dec: dec, + }, nil +} + +// IPNBusWatcher is an active subscription (watch) of the local tailscaled IPN bus. +// It's returned by LocalClient.WatchIPNBus. +// +// It must be closed when done. +type IPNBusWatcher struct { + ctx context.Context // from original WatchIPNBus call + httpRes *http.Response + dec *json.Decoder + + mu sync.Mutex + closed bool +} + +// Close stops the watcher and releases its resources. +func (w *IPNBusWatcher) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.closed { + return nil + } + w.closed = true + return w.httpRes.Body.Close() +} + +// Next returns the next ipn.Notify from the stream. +// If the context from LocalClient.WatchIPNBus is done, that error is returned. +func (w *IPNBusWatcher) Next() (ipn.Notify, error) { + var n ipn.Notify + if err := w.dec.Decode(&n); err != nil { + if cerr := w.ctx.Err(); cerr != nil { + err = cerr + } + return ipn.Notify{}, err + } + return n, nil +} diff --git a/cmd/tailscale/cli/cli.go b/cmd/tailscale/cli/cli.go index 6845cbd93..73f216833 100644 --- a/cmd/tailscale/cli/cli.go +++ b/cmd/tailscale/cli/cli.go @@ -13,23 +13,18 @@ import ( "fmt" "io" "log" - "net" "os" - "os/signal" "runtime" "strconv" "strings" "sync" - "syscall" "text/tabwriter" "github.com/peterbourgon/ff/v3/ffcli" "golang.org/x/exp/slices" "tailscale.com/client/tailscale" "tailscale.com/envknob" - "tailscale.com/ipn" "tailscale.com/paths" - "tailscale.com/safesocket" "tailscale.com/version/distro" ) @@ -248,58 +243,6 @@ var rootArgs struct { socket string } -func connect(ctx context.Context) (net.Conn, *ipn.BackendClient, context.Context, context.CancelFunc) { - s := safesocket.DefaultConnectionStrategy(rootArgs.socket) - c, err := safesocket.Connect(s) - if err != nil { - if runtime.GOOS != "windows" && rootArgs.socket == "" { - fatalf("--socket cannot be empty") - } - fatalf("Failed to connect to tailscaled. (safesocket.Connect: %v)\n", err) - } - clientToServer := func(b []byte) { - ipn.WriteMsg(c, b) - } - - ctx, cancel := context.WithCancel(ctx) - - go func() { - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) - select { - case <-interrupt: - case <-ctx.Done(): - // Context canceled elsewhere. - signal.Reset(syscall.SIGINT, syscall.SIGTERM) - return - } - c.Close() - cancel() - }() - - bc := ipn.NewBackendClient(log.Printf, clientToServer) - return c, bc, ctx, cancel -} - -// pump receives backend messages on conn and pushes them into bc. -func pump(ctx context.Context, bc *ipn.BackendClient, conn net.Conn) error { - defer conn.Close() - for ctx.Err() == nil { - msg, err := ipn.ReadMsg(conn) - if err != nil { - if ctx.Err() != nil { - return ctx.Err() - } - if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { - return fmt.Errorf("%w (tailscaled stopped running?)", err) - } - return err - } - bc.GotNotifyMsg(msg) - } - return ctx.Err() -} - // usageFuncNoDefaultValues is like usageFunc but doesn't print default values. func usageFuncNoDefaultValues(c *ffcli.Command) string { return usageFuncOpt(c, false) diff --git a/cmd/tailscale/cli/debug.go b/cmd/tailscale/cli/debug.go index bbd4508c5..711fbbcdd 100644 --- a/cmd/tailscale/cli/debug.go +++ b/cmd/tailscale/cli/debug.go @@ -285,19 +285,23 @@ var watchIPNArgs struct { } func runWatchIPN(ctx context.Context, args []string) error { - c, bc, ctx, cancel := connect(ctx) - defer cancel() - - bc.SetNotifyCallback(func(n ipn.Notify) { + watcher, err := localClient.WatchIPNBus(ctx, 0) + if err != nil { + return err + } + defer watcher.Close() + printf("Connected.\n") + for { + n, err := watcher.Next() + if err != nil { + return err + } if !watchIPNArgs.netmap { n.NetMap = nil } j, _ := json.MarshalIndent(n, "", "\t") printf("%s\n", j) - }) - bc.RequestEngineStatus() - pump(ctx, bc, c) - return errors.New("exit") + } } func runDERPMap(ctx context.Context, args []string) error { diff --git a/cmd/tailscale/cli/up.go b/cmd/tailscale/cli/up.go index 2c4147034..ed487c622 100644 --- a/cmd/tailscale/cli/up.go +++ b/cmd/tailscale/cli/up.go @@ -15,11 +15,13 @@ import ( "log" "net/netip" "os" + "os/signal" "reflect" "runtime" "sort" "strings" "sync" + "syscall" "time" shellquote "github.com/kballard/go-shellquote" @@ -535,109 +537,109 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE return err } - // At this point we need to subscribe to the IPN bus to watch - // for state transitions and possible need to authenticate. - c, bc, pumpCtx, cancel := connect(ctx) - defer cancel() + watchCtx, cancelWatch := context.WithCancel(ctx) + defer cancelWatch() + watcher, err := localClient.WatchIPNBus(watchCtx, 0) + if err != nil { + return err + } + defer watcher.Close() + + go func() { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) + select { + case <-interrupt: + cancelWatch() + case <-watchCtx.Done(): + } + }() - running := make(chan bool, 1) // gets value once in state ipn.Running - gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update + running := make(chan bool, 1) // gets value once in state ipn.Running pumpErr := make(chan error, 1) - go func() { pumpErr <- pump(pumpCtx, bc, c) }() var printed bool // whether we've yet printed anything to stdout or stderr var loginOnce sync.Once - startLoginInteractive := func() { loginOnce.Do(func() { bc.StartLoginInteractive() }) } + startLoginInteractive := func() { loginOnce.Do(func() { localClient.StartLoginInteractive(ctx) }) } - bc.SetNotifyCallback(func(n ipn.Notify) { - if n.Engine != nil { - select { - case gotEngineUpdate <- true: - default: + go func() { + for { + n, err := watcher.Next() + if err != nil { + pumpErr <- err + return } - } - if n.ErrMessage != nil { - msg := *n.ErrMessage - if msg == ipn.ErrMsgPermissionDenied { - switch effectiveGOOS() { - case "windows": - msg += " (Tailscale service in use by other user?)" - default: - msg += " (try 'sudo tailscale up [...]')" + if n.ErrMessage != nil { + msg := *n.ErrMessage + if msg == ipn.ErrMsgPermissionDenied { + switch effectiveGOOS() { + case "windows": + msg += " (Tailscale service in use by other user?)" + default: + msg += " (try 'sudo tailscale up [...]')" + } } + fatalf("backend error: %v\n", msg) } - fatalf("backend error: %v\n", msg) - } - if s := n.State; s != nil { - switch *s { - case ipn.NeedsLogin: - startLoginInteractive() - case ipn.NeedsMachineAuth: - printed = true - if env.upArgs.json { - printUpDoneJSON(ipn.NeedsMachineAuth, "") - } else { - fmt.Fprintf(Stderr, "\nTo authorize your machine, visit (as admin):\n\n\t%s\n\n", prefs.AdminPageURL()) - } - case ipn.Running: - // Done full authentication process - if env.upArgs.json { - printUpDoneJSON(ipn.Running, "") - } else if printed { - // Only need to print an update if we printed the "please click" message earlier. - fmt.Fprintf(Stderr, "Success.\n") - } - select { - case running <- true: - default: + if s := n.State; s != nil { + switch *s { + case ipn.NeedsLogin: + startLoginInteractive() + case ipn.NeedsMachineAuth: + printed = true + if env.upArgs.json { + printUpDoneJSON(ipn.NeedsMachineAuth, "") + } else { + fmt.Fprintf(Stderr, "\nTo authorize your machine, visit (as admin):\n\n\t%s\n\n", prefs.AdminPageURL()) + } + case ipn.Running: + // Done full authentication process + if env.upArgs.json { + printUpDoneJSON(ipn.Running, "") + } else if printed { + // Only need to print an update if we printed the "please click" message earlier. + fmt.Fprintf(Stderr, "Success.\n") + } + select { + case running <- true: + default: + } + cancelWatch() } - cancel() } - } - if url := n.BrowseToURL; url != nil && printAuthURL(*url) { - printed = true - if upArgs.json { - js := &upOutputJSON{AuthURL: *url, BackendState: st.BackendState} - - q, err := qrcode.New(*url, qrcode.Medium) - if err == nil { - png, err := q.PNG(128) + if url := n.BrowseToURL; url != nil && printAuthURL(*url) { + printed = true + if upArgs.json { + js := &upOutputJSON{AuthURL: *url, BackendState: st.BackendState} + + q, err := qrcode.New(*url, qrcode.Medium) if err == nil { - js.QR = "data:image/png;base64," + base64.StdEncoding.EncodeToString(png) + png, err := q.PNG(128) + if err == nil { + js.QR = "data:image/png;base64," + base64.StdEncoding.EncodeToString(png) + } } - } - data, err := json.MarshalIndent(js, "", "\t") - if err != nil { - printf("upOutputJSON marshalling error: %v", err) - } else { - outln(string(data)) - } - } else { - fmt.Fprintf(Stderr, "\nTo authenticate, visit:\n\n\t%s\n\n", *url) - if upArgs.qr { - q, err := qrcode.New(*url, qrcode.Medium) + data, err := json.MarshalIndent(js, "", "\t") if err != nil { - log.Printf("QR code error: %v", err) + printf("upOutputJSON marshalling error: %v", err) } else { - fmt.Fprintf(Stderr, "%s\n", q.ToString(false)) + outln(string(data)) + } + } else { + fmt.Fprintf(Stderr, "\nTo authenticate, visit:\n\n\t%s\n\n", *url) + if upArgs.qr { + q, err := qrcode.New(*url, qrcode.Medium) + if err != nil { + log.Printf("QR code error: %v", err) + } else { + fmt.Fprintf(Stderr, "%s\n", q.ToString(false)) + } } } } } - }) - // Wait for backend client to be connected so we know - // we're subscribed to updates. Otherwise we can miss - // an update upon its transition to running. Do so by causing some traffic - // back to the bus that we then wait on. - bc.RequestEngineStatus() - select { - case <-gotEngineUpdate: - case <-pumpCtx.Done(): - return pumpCtx.Err() - case err := <-pumpErr: - return err - } + }() // Special case: bare "tailscale up" means to just start // running, if there's ever been a login. @@ -660,10 +662,12 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE if err != nil { return err } - bc.Start(ipn.Options{ + if err := localClient.Start(ctx, ipn.Options{ AuthKey: authKey, UpdatePrefs: prefs, - }) + }); err != nil { + return err + } if upArgs.forceReauth { startLoginInteractive() } @@ -685,13 +689,13 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE select { case <-running: return nil - case <-pumpCtx.Done(): + case <-watchCtx.Done(): select { case <-running: return nil default: } - return pumpCtx.Err() + return watchCtx.Err() case err := <-pumpErr: select { case <-running: diff --git a/cmd/tailscale/cli/web.go b/cmd/tailscale/cli/web.go index b21cc3e0f..784ca0508 100644 --- a/cmd/tailscale/cli/web.go +++ b/cmd/tailscale/cli/web.go @@ -28,8 +28,8 @@ import ( "github.com/peterbourgon/ff/v3/ffcli" "tailscale.com/ipn" + "tailscale.com/ipn/ipnstate" "tailscale.com/tailcfg" - "tailscale.com/types/preftype" "tailscale.com/util/groupmember" "tailscale.com/version/distro" ) @@ -317,6 +317,7 @@ req.send(null); ` func webHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() if authRedirect(w, r) { return } @@ -327,7 +328,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) { } if r.URL.Path == "/redirect" || r.URL.Path == "/redirect/" { - w.Write([]byte(authenticationRedirectHTML)) + io.WriteString(w, authenticationRedirectHTML) + return + } + + st, err := localClient.Status(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + prefs, err := localClient.GetPrefs(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -344,23 +356,31 @@ func webHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(mi{"error": err.Error()}) return } - prefs, err := localClient.GetPrefs(r.Context()) - if err != nil && !postData.Reauthenticate { + + routes, err := calcAdvertiseRoutes(postData.AdvertiseRoutes, postData.AdvertiseExitNode) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(mi{"error": err.Error()}) + return + } + mp := &ipn.MaskedPrefs{ + AdvertiseRoutesSet: true, + WantRunningSet: true, + } + mp.Prefs.WantRunning = true + mp.Prefs.AdvertiseRoutes = routes + log.Printf("Doing edit: %v", mp.Pretty()) + + if _, err := localClient.EditPrefs(ctx, mp); err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(mi{"error": err.Error()}) return - } else { - routes, err := calcAdvertiseRoutes(postData.AdvertiseRoutes, postData.AdvertiseExitNode) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - json.NewEncoder(w).Encode(mi{"error": err.Error()}) - return - } - prefs.AdvertiseRoutes = routes } w.Header().Set("Content-Type", "application/json") - url, err := tailscaleUp(r.Context(), prefs, postData.Reauthenticate) + log.Printf("tailscaleUp(reauth=%v) ...", postData.Reauthenticate) + url, err := tailscaleUp(r.Context(), st, postData.Reauthenticate) + log.Printf("tailscaleUp = (URL %v, %v)", url != "", err) if err != nil { w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(mi{"error": err.Error()}) @@ -374,17 +394,6 @@ func webHandler(w http.ResponseWriter, r *http.Request) { return } - st, err := localClient.Status(r.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - prefs, err := localClient.GetPrefs(r.Context()) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - profile := st.User[st.Self.UserID] deviceName := strings.Split(st.Self.DNSName, ".")[0] data := tmplData{ @@ -418,26 +427,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) { w.Write(buf.Bytes()) } -// TODO(crawshaw): some of this is very similar to the code in 'tailscale up', can we share anything? -func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authURL string, retErr error) { - if prefs == nil { - prefs = ipn.NewPrefs() - prefs.ControlURL = ipn.DefaultControlURL - prefs.WantRunning = true - prefs.CorpDNS = true - prefs.AllowSingleHosts = true - prefs.ForceDaemon = (runtime.GOOS == "windows") - } - - if distro.Get() == distro.Synology { - prefs.NetfilterMode = preftype.NetfilterOff - } +func tailscaleUp(ctx context.Context, st *ipnstate.Status, forceReauth bool) (authURL string, retErr error) { + origAuthURL := st.AuthURL + isRunning := st.BackendState == ipn.Running.String() - st, err := localClient.Status(ctx) - if err != nil { - return "", fmt.Errorf("can't fetch status: %v", err) + if !forceReauth { + if origAuthURL != "" { + return origAuthURL, nil + } + if isRunning { + return "", nil + } } - origAuthURL := st.AuthURL // printAuthURL reports whether we should print out the // provided auth URL from an IPN notify. @@ -445,18 +446,27 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU return url != origAuthURL } - c, bc, pumpCtx, cancel := connect(ctx) - defer cancel() + watchCtx, cancelWatch := context.WithCancel(ctx) + defer cancelWatch() + watcher, err := localClient.WatchIPNBus(watchCtx, 0) + if err != nil { + return "", err + } + defer watcher.Close() - gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update - go pump(pumpCtx, bc, c) + go func() { + if !isRunning { + localClient.Start(ctx, ipn.Options{}) + } + if forceReauth { + localClient.StartLoginInteractive(ctx) + } + }() - bc.SetNotifyCallback(func(n ipn.Notify) { - if n.Engine != nil { - select { - case gotEngineUpdate <- true: - default: - } + for { + n, err := watcher.Next() + if err != nil { + return "", err } if n.ErrMessage != nil { msg := *n.ErrMessage @@ -468,48 +478,10 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU msg += " (try 'sudo tailscale up [...]')" } } - retErr = fmt.Errorf("backend error: %v", msg) - cancel() - } else if url := n.BrowseToURL; url != nil && printAuthURL(*url) { - authURL = *url - cancel() + return "", fmt.Errorf("backend error: %v", msg) } - if !forceReauth && n.Prefs != nil && n.Prefs.Valid() { - p1, p2 := n.Prefs.AsStruct(), *prefs - p1.Persist = nil - p2.Persist = nil - if p1.Equals(&p2) { - cancel() - } + if url := n.BrowseToURL; url != nil && printAuthURL(*url) { + return *url, nil } - }) - // Wait for backend client to be connected so we know - // we're subscribed to updates. Otherwise we can miss - // an update upon its transition to running. Do so by causing some traffic - // back to the bus that we then wait on. - bc.RequestEngineStatus() - select { - case <-gotEngineUpdate: - case <-pumpCtx.Done(): - return authURL, pumpCtx.Err() - } - - bc.SetPrefs(prefs) - - bc.Start(ipn.Options{}) - if forceReauth { - bc.StartLoginInteractive() - } - - <-pumpCtx.Done() // wait for authURL or complete failure - if authURL == "" && retErr == nil { - if !forceReauth { - return "", nil // no auth URL is fine - } - retErr = pumpCtx.Err() - } - if authURL == "" && retErr == nil { - return "", fmt.Errorf("login failed with no backend error message") } - return authURL, retErr } diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 1012f44ff..5dc5f098d 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -181,7 +181,8 @@ type LocalBackend struct { loginFlags controlclient.LoginFlags incomingFiles map[*incomingFile]bool fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received - lastStatusTime time.Time // status.AsOf value of the last processed status update + notifyWatchers map[*mapSetHandle]chan *ipn.Notify + lastStatusTime time.Time // status.AsOf value of the last processed status update // directFileRoot, if non-empty, means to write received files // directly to this directory, without staging them in an // intermediate buffered directory for "pick-up" later. If @@ -1690,24 +1691,70 @@ func (b *LocalBackend) readPoller() { } } -// send delivers n to the connected frontend. If no frontend is -// connected, the notification is dropped without being delivered. -func (b *LocalBackend) send(n ipn.Notify) { +// WatchNotifications subscribes to the ipn.Notify message bus notification +// messages. +// +// WatchNotifications blocks until ctx is done. +// +// The provided fn will only be called with non-nil pointers. The caller must +// not modify roNotify. If fn returns false, the watch also stops. +// +// Failure to consume many notifications in a row will result in dropped +// notifications. There is currently (2022-11-22) no mechanism provided to +// detect when a message has been dropped. +func (b *LocalBackend) WatchNotifications(ctx context.Context, fn func(roNotify *ipn.Notify) (keepGoing bool)) { + handle := new(mapSetHandle) + ch := make(chan *ipn.Notify, 128) + b.mu.Lock() - notifyFunc := b.notify - apiSrv := b.peerAPIServer + mak.Set(&b.notifyWatchers, handle, ch) b.mu.Unlock() + defer func() { + b.mu.Lock() + delete(b.notifyWatchers, handle) + b.mu.Unlock() + }() - if notifyFunc == nil { - return + for { + select { + case <-ctx.Done(): + return + case n := <-ch: + if !fn(n) { + return + } + } } +} + +// send delivers n to the connected frontend and any API watchers from +// LocalBackend.WatchNotifications (via the LocalAPI). +// +// If no frontend is connected or API watchers are backed up, the notification +// is dropped without being delivered. +func (b *LocalBackend) send(n ipn.Notify) { + n.Version = version.Long + b.mu.Lock() + notifyFunc := b.notify + apiSrv := b.peerAPIServer if apiSrv.hasFilesWaiting() { n.FilesWaiting = &empty.Message{} } - n.Version = version.Long - notifyFunc(n) + for _, ch := range b.notifyWatchers { + select { + case ch <- &n: + default: + // Drop the notification if the channel is full. + } + } + + b.mu.Unlock() + + if notifyFunc != nil { + notifyFunc(n) + } } func (b *LocalBackend) sendFileNotify() { diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index 3ef8b0e2d..ba4a5f5b4 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -80,6 +80,7 @@ var handler = map[string]localAPIHandler{ "serve-config": (*Handler).serveServeConfig, "set-dns": (*Handler).serveSetDNS, "set-expiry-sooner": (*Handler).serveSetExpirySooner, + "start": (*Handler).serveStart, "status": (*Handler).serveStatus, "tka/init": (*Handler).serveTKAInit, "tka/log": (*Handler).serveTKALog, @@ -88,6 +89,7 @@ var handler = map[string]localAPIHandler{ "tka/status": (*Handler).serveTKAStatus, "tka/disable": (*Handler).serveTKADisable, "upload-client-metrics": (*Handler).serveUploadClientMetrics, + "watch-ipn-bus": (*Handler).serveWatchIPNBus, "whois": (*Handler).serveWhoIs, } @@ -572,6 +574,34 @@ func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) { e.Encode(st) } +func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) { + if !h.PermitWrite { + http.Error(w, "denied", http.StatusForbidden) + return + } + f, ok := w.(http.Flusher) + if !ok { + http.Error(w, "not a flusher", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + f.Flush() + + ctx := r.Context() + h.b.WatchNotifications(ctx, func(roNotify *ipn.Notify) (keepGoing bool) { + js, err := json.Marshal(roNotify) + if err != nil { + h.logf("json.Marshal: %v", err) + return false + } + if _, err := fmt.Fprintf(w, "%s\n", js); err != nil { + return false + } + f.Flush() + return true + }) +} + func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) { if !h.PermitWrite { http.Error(w, "login access denied", http.StatusForbidden) @@ -586,6 +616,29 @@ func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) return } +func (h *Handler) serveStart(w http.ResponseWriter, r *http.Request) { + if !h.PermitWrite { + http.Error(w, "access denied", http.StatusForbidden) + return + } + if r.Method != "POST" { + http.Error(w, "want POST", 400) + return + } + var o ipn.Options + if err := json.NewDecoder(r.Body).Decode(&o); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err := h.b.Start(o) + if err != nil { + // TODO(bradfitz): map error to a good HTTP error + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} + func (h *Handler) serveLogout(w http.ResponseWriter, r *http.Request) { if !h.PermitWrite { http.Error(w, "logout access denied", http.StatusForbidden)