ipn, cmd/tailscale/cli: add LocalAPI IPN bus watch, Start, convert CLI

Updates #6417
Updates tailscale/corp#8051

Change-Id: I1ca360730c45ffaa0261d8422877304277fc5625
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/6460/head
Brad Fitzpatrick 2 years ago committed by Brad Fitzpatrick
parent d4f6efa1df
commit 300aba61a6

@ -555,6 +555,20 @@ func (lc *LocalClient) EditPrefs(ctx context.Context, mp *ipn.MaskedPrefs) (*ipn
return decodeJSON[*ipn.Prefs](body) return decodeJSON[*ipn.Prefs](body)
} }
// StartLoginInteractive starts an interactive login.
func (lc *LocalClient) StartLoginInteractive(ctx context.Context) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/login-interactive", http.StatusNoContent, nil)
return err
}
// Start applies the configuration specified in opts, and starts the
// state machine.
func (lc *LocalClient) Start(ctx context.Context, opts ipn.Options) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/start", http.StatusNoContent, jsonBody(opts))
return err
}
// Logout logs out the current node.
func (lc *LocalClient) Logout(ctx context.Context) error { func (lc *LocalClient) Logout(ctx context.Context) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/logout", http.StatusNoContent, nil) _, err := lc.send(ctx, "POST", "/localapi/v0/logout", http.StatusNoContent, nil)
return err return err
@ -966,3 +980,78 @@ func (lc *LocalClient) DeleteProfile(ctx context.Context, profile ipn.ProfileID)
_, err := lc.send(ctx, "DELETE", "/localapi/v0/profiles"+url.PathEscape(string(profile)), http.StatusNoContent, nil) _, err := lc.send(ctx, "DELETE", "/localapi/v0/profiles"+url.PathEscape(string(profile)), http.StatusNoContent, nil)
return err return err
} }
// WatchIPNMask are filtering options for LocalClient.WatchIPNBus.
//
// The zero value is a valid WatchOpt that means to watch everything.
//
// TODO(bradfitz): flesh out.
type WatchIPNMask uint64
// WatchIPNBus subscribes to the IPN notification bus. It returns a watcher
// once the bus is connected successfully.
//
// The context is used for the life of the watch, not just the call to
// WatchIPNBus.
//
// The returned IPNBusWatcher's Close method must be called when done to release
// resources.
func (lc *LocalClient) WatchIPNBus(ctx context.Context, mask WatchIPNMask) (*IPNBusWatcher, error) {
req, err := http.NewRequestWithContext(ctx, "GET",
"http://"+apitype.LocalAPIHost+"/localapi/v0/watch-ipn-bus?mask="+fmt.Sprint(mask),
nil)
if err != nil {
return nil, err
}
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
res.Body.Close()
return nil, errors.New(res.Status)
}
dec := json.NewDecoder(res.Body)
return &IPNBusWatcher{
ctx: ctx,
httpRes: res,
dec: dec,
}, nil
}
// IPNBusWatcher is an active subscription (watch) of the local tailscaled IPN bus.
// It's returned by LocalClient.WatchIPNBus.
//
// It must be closed when done.
type IPNBusWatcher struct {
ctx context.Context // from original WatchIPNBus call
httpRes *http.Response
dec *json.Decoder
mu sync.Mutex
closed bool
}
// Close stops the watcher and releases its resources.
func (w *IPNBusWatcher) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
return w.httpRes.Body.Close()
}
// Next returns the next ipn.Notify from the stream.
// If the context from LocalClient.WatchIPNBus is done, that error is returned.
func (w *IPNBusWatcher) Next() (ipn.Notify, error) {
var n ipn.Notify
if err := w.dec.Decode(&n); err != nil {
if cerr := w.ctx.Err(); cerr != nil {
err = cerr
}
return ipn.Notify{}, err
}
return n, nil
}

@ -13,23 +13,18 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net"
"os" "os"
"os/signal"
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"syscall"
"text/tabwriter" "text/tabwriter"
"github.com/peterbourgon/ff/v3/ffcli" "github.com/peterbourgon/ff/v3/ffcli"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"tailscale.com/client/tailscale" "tailscale.com/client/tailscale"
"tailscale.com/envknob" "tailscale.com/envknob"
"tailscale.com/ipn"
"tailscale.com/paths" "tailscale.com/paths"
"tailscale.com/safesocket"
"tailscale.com/version/distro" "tailscale.com/version/distro"
) )
@ -248,58 +243,6 @@ var rootArgs struct {
socket string socket string
} }
func connect(ctx context.Context) (net.Conn, *ipn.BackendClient, context.Context, context.CancelFunc) {
s := safesocket.DefaultConnectionStrategy(rootArgs.socket)
c, err := safesocket.Connect(s)
if err != nil {
if runtime.GOOS != "windows" && rootArgs.socket == "" {
fatalf("--socket cannot be empty")
}
fatalf("Failed to connect to tailscaled. (safesocket.Connect: %v)\n", err)
}
clientToServer := func(b []byte) {
ipn.WriteMsg(c, b)
}
ctx, cancel := context.WithCancel(ctx)
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
select {
case <-interrupt:
case <-ctx.Done():
// Context canceled elsewhere.
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
return
}
c.Close()
cancel()
}()
bc := ipn.NewBackendClient(log.Printf, clientToServer)
return c, bc, ctx, cancel
}
// pump receives backend messages on conn and pushes them into bc.
func pump(ctx context.Context, bc *ipn.BackendClient, conn net.Conn) error {
defer conn.Close()
for ctx.Err() == nil {
msg, err := ipn.ReadMsg(conn)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
return fmt.Errorf("%w (tailscaled stopped running?)", err)
}
return err
}
bc.GotNotifyMsg(msg)
}
return ctx.Err()
}
// usageFuncNoDefaultValues is like usageFunc but doesn't print default values. // usageFuncNoDefaultValues is like usageFunc but doesn't print default values.
func usageFuncNoDefaultValues(c *ffcli.Command) string { func usageFuncNoDefaultValues(c *ffcli.Command) string {
return usageFuncOpt(c, false) return usageFuncOpt(c, false)

@ -285,19 +285,23 @@ var watchIPNArgs struct {
} }
func runWatchIPN(ctx context.Context, args []string) error { func runWatchIPN(ctx context.Context, args []string) error {
c, bc, ctx, cancel := connect(ctx) watcher, err := localClient.WatchIPNBus(ctx, 0)
defer cancel() if err != nil {
return err
bc.SetNotifyCallback(func(n ipn.Notify) { }
defer watcher.Close()
printf("Connected.\n")
for {
n, err := watcher.Next()
if err != nil {
return err
}
if !watchIPNArgs.netmap { if !watchIPNArgs.netmap {
n.NetMap = nil n.NetMap = nil
} }
j, _ := json.MarshalIndent(n, "", "\t") j, _ := json.MarshalIndent(n, "", "\t")
printf("%s\n", j) printf("%s\n", j)
}) }
bc.RequestEngineStatus()
pump(ctx, bc, c)
return errors.New("exit")
} }
func runDERPMap(ctx context.Context, args []string) error { func runDERPMap(ctx context.Context, args []string) error {

@ -15,11 +15,13 @@ import (
"log" "log"
"net/netip" "net/netip"
"os" "os"
"os/signal"
"reflect" "reflect"
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
shellquote "github.com/kballard/go-shellquote" shellquote "github.com/kballard/go-shellquote"
@ -535,109 +537,109 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
return err return err
} }
// At this point we need to subscribe to the IPN bus to watch watchCtx, cancelWatch := context.WithCancel(ctx)
// for state transitions and possible need to authenticate. defer cancelWatch()
c, bc, pumpCtx, cancel := connect(ctx) watcher, err := localClient.WatchIPNBus(watchCtx, 0)
defer cancel() if err != nil {
return err
}
defer watcher.Close()
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
select {
case <-interrupt:
cancelWatch()
case <-watchCtx.Done():
}
}()
running := make(chan bool, 1) // gets value once in state ipn.Running running := make(chan bool, 1) // gets value once in state ipn.Running
gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update
pumpErr := make(chan error, 1) pumpErr := make(chan error, 1)
go func() { pumpErr <- pump(pumpCtx, bc, c) }()
var printed bool // whether we've yet printed anything to stdout or stderr var printed bool // whether we've yet printed anything to stdout or stderr
var loginOnce sync.Once var loginOnce sync.Once
startLoginInteractive := func() { loginOnce.Do(func() { bc.StartLoginInteractive() }) } startLoginInteractive := func() { loginOnce.Do(func() { localClient.StartLoginInteractive(ctx) }) }
bc.SetNotifyCallback(func(n ipn.Notify) { go func() {
if n.Engine != nil { for {
select { n, err := watcher.Next()
case gotEngineUpdate <- true: if err != nil {
default: pumpErr <- err
return
} }
} if n.ErrMessage != nil {
if n.ErrMessage != nil { msg := *n.ErrMessage
msg := *n.ErrMessage if msg == ipn.ErrMsgPermissionDenied {
if msg == ipn.ErrMsgPermissionDenied { switch effectiveGOOS() {
switch effectiveGOOS() { case "windows":
case "windows": msg += " (Tailscale service in use by other user?)"
msg += " (Tailscale service in use by other user?)" default:
default: msg += " (try 'sudo tailscale up [...]')"
msg += " (try 'sudo tailscale up [...]')" }
} }
fatalf("backend error: %v\n", msg)
} }
fatalf("backend error: %v\n", msg) if s := n.State; s != nil {
} switch *s {
if s := n.State; s != nil { case ipn.NeedsLogin:
switch *s { startLoginInteractive()
case ipn.NeedsLogin: case ipn.NeedsMachineAuth:
startLoginInteractive() printed = true
case ipn.NeedsMachineAuth: if env.upArgs.json {
printed = true printUpDoneJSON(ipn.NeedsMachineAuth, "")
if env.upArgs.json { } else {
printUpDoneJSON(ipn.NeedsMachineAuth, "") fmt.Fprintf(Stderr, "\nTo authorize your machine, visit (as admin):\n\n\t%s\n\n", prefs.AdminPageURL())
} else { }
fmt.Fprintf(Stderr, "\nTo authorize your machine, visit (as admin):\n\n\t%s\n\n", prefs.AdminPageURL()) case ipn.Running:
} // Done full authentication process
case ipn.Running: if env.upArgs.json {
// Done full authentication process printUpDoneJSON(ipn.Running, "")
if env.upArgs.json { } else if printed {
printUpDoneJSON(ipn.Running, "") // Only need to print an update if we printed the "please click" message earlier.
} else if printed { fmt.Fprintf(Stderr, "Success.\n")
// Only need to print an update if we printed the "please click" message earlier. }
fmt.Fprintf(Stderr, "Success.\n") select {
} case running <- true:
select { default:
case running <- true: }
default: cancelWatch()
} }
cancel()
} }
} if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
if url := n.BrowseToURL; url != nil && printAuthURL(*url) { printed = true
printed = true if upArgs.json {
if upArgs.json { js := &upOutputJSON{AuthURL: *url, BackendState: st.BackendState}
js := &upOutputJSON{AuthURL: *url, BackendState: st.BackendState}
q, err := qrcode.New(*url, qrcode.Medium)
q, err := qrcode.New(*url, qrcode.Medium)
if err == nil {
png, err := q.PNG(128)
if err == nil { if err == nil {
js.QR = "data:image/png;base64," + base64.StdEncoding.EncodeToString(png) png, err := q.PNG(128)
if err == nil {
js.QR = "data:image/png;base64," + base64.StdEncoding.EncodeToString(png)
}
} }
}
data, err := json.MarshalIndent(js, "", "\t") data, err := json.MarshalIndent(js, "", "\t")
if err != nil {
printf("upOutputJSON marshalling error: %v", err)
} else {
outln(string(data))
}
} else {
fmt.Fprintf(Stderr, "\nTo authenticate, visit:\n\n\t%s\n\n", *url)
if upArgs.qr {
q, err := qrcode.New(*url, qrcode.Medium)
if err != nil { if err != nil {
log.Printf("QR code error: %v", err) printf("upOutputJSON marshalling error: %v", err)
} else { } else {
fmt.Fprintf(Stderr, "%s\n", q.ToString(false)) outln(string(data))
}
} else {
fmt.Fprintf(Stderr, "\nTo authenticate, visit:\n\n\t%s\n\n", *url)
if upArgs.qr {
q, err := qrcode.New(*url, qrcode.Medium)
if err != nil {
log.Printf("QR code error: %v", err)
} else {
fmt.Fprintf(Stderr, "%s\n", q.ToString(false))
}
} }
} }
} }
} }
}) }()
// Wait for backend client to be connected so we know
// we're subscribed to updates. Otherwise we can miss
// an update upon its transition to running. Do so by causing some traffic
// back to the bus that we then wait on.
bc.RequestEngineStatus()
select {
case <-gotEngineUpdate:
case <-pumpCtx.Done():
return pumpCtx.Err()
case err := <-pumpErr:
return err
}
// Special case: bare "tailscale up" means to just start // Special case: bare "tailscale up" means to just start
// running, if there's ever been a login. // running, if there's ever been a login.
@ -660,10 +662,12 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
if err != nil { if err != nil {
return err return err
} }
bc.Start(ipn.Options{ if err := localClient.Start(ctx, ipn.Options{
AuthKey: authKey, AuthKey: authKey,
UpdatePrefs: prefs, UpdatePrefs: prefs,
}) }); err != nil {
return err
}
if upArgs.forceReauth { if upArgs.forceReauth {
startLoginInteractive() startLoginInteractive()
} }
@ -685,13 +689,13 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
select { select {
case <-running: case <-running:
return nil return nil
case <-pumpCtx.Done(): case <-watchCtx.Done():
select { select {
case <-running: case <-running:
return nil return nil
default: default:
} }
return pumpCtx.Err() return watchCtx.Err()
case err := <-pumpErr: case err := <-pumpErr:
select { select {
case <-running: case <-running:

@ -28,8 +28,8 @@ import (
"github.com/peterbourgon/ff/v3/ffcli" "github.com/peterbourgon/ff/v3/ffcli"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/preftype"
"tailscale.com/util/groupmember" "tailscale.com/util/groupmember"
"tailscale.com/version/distro" "tailscale.com/version/distro"
) )
@ -317,6 +317,7 @@ req.send(null);
` `
func webHandler(w http.ResponseWriter, r *http.Request) { func webHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if authRedirect(w, r) { if authRedirect(w, r) {
return return
} }
@ -327,7 +328,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
} }
if r.URL.Path == "/redirect" || r.URL.Path == "/redirect/" { if r.URL.Path == "/redirect" || r.URL.Path == "/redirect/" {
w.Write([]byte(authenticationRedirectHTML)) io.WriteString(w, authenticationRedirectHTML)
return
}
st, err := localClient.Status(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
prefs, err := localClient.GetPrefs(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
@ -344,23 +356,31 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(mi{"error": err.Error()}) json.NewEncoder(w).Encode(mi{"error": err.Error()})
return return
} }
prefs, err := localClient.GetPrefs(r.Context())
if err != nil && !postData.Reauthenticate { routes, err := calcAdvertiseRoutes(postData.AdvertiseRoutes, postData.AdvertiseExitNode)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
}
mp := &ipn.MaskedPrefs{
AdvertiseRoutesSet: true,
WantRunningSet: true,
}
mp.Prefs.WantRunning = true
mp.Prefs.AdvertiseRoutes = routes
log.Printf("Doing edit: %v", mp.Pretty())
if _, err := localClient.EditPrefs(ctx, mp); err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()}) json.NewEncoder(w).Encode(mi{"error": err.Error()})
return return
} else {
routes, err := calcAdvertiseRoutes(postData.AdvertiseRoutes, postData.AdvertiseExitNode)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
}
prefs.AdvertiseRoutes = routes
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
url, err := tailscaleUp(r.Context(), prefs, postData.Reauthenticate) log.Printf("tailscaleUp(reauth=%v) ...", postData.Reauthenticate)
url, err := tailscaleUp(r.Context(), st, postData.Reauthenticate)
log.Printf("tailscaleUp = (URL %v, %v)", url != "", err)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()}) json.NewEncoder(w).Encode(mi{"error": err.Error()})
@ -374,17 +394,6 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
st, err := localClient.Status(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
prefs, err := localClient.GetPrefs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
profile := st.User[st.Self.UserID] profile := st.User[st.Self.UserID]
deviceName := strings.Split(st.Self.DNSName, ".")[0] deviceName := strings.Split(st.Self.DNSName, ".")[0]
data := tmplData{ data := tmplData{
@ -418,26 +427,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
w.Write(buf.Bytes()) w.Write(buf.Bytes())
} }
// TODO(crawshaw): some of this is very similar to the code in 'tailscale up', can we share anything? func tailscaleUp(ctx context.Context, st *ipnstate.Status, forceReauth bool) (authURL string, retErr error) {
func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authURL string, retErr error) { origAuthURL := st.AuthURL
if prefs == nil { isRunning := st.BackendState == ipn.Running.String()
prefs = ipn.NewPrefs()
prefs.ControlURL = ipn.DefaultControlURL
prefs.WantRunning = true
prefs.CorpDNS = true
prefs.AllowSingleHosts = true
prefs.ForceDaemon = (runtime.GOOS == "windows")
}
if distro.Get() == distro.Synology {
prefs.NetfilterMode = preftype.NetfilterOff
}
st, err := localClient.Status(ctx) if !forceReauth {
if err != nil { if origAuthURL != "" {
return "", fmt.Errorf("can't fetch status: %v", err) return origAuthURL, nil
}
if isRunning {
return "", nil
}
} }
origAuthURL := st.AuthURL
// printAuthURL reports whether we should print out the // printAuthURL reports whether we should print out the
// provided auth URL from an IPN notify. // provided auth URL from an IPN notify.
@ -445,18 +446,27 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU
return url != origAuthURL return url != origAuthURL
} }
c, bc, pumpCtx, cancel := connect(ctx) watchCtx, cancelWatch := context.WithCancel(ctx)
defer cancel() defer cancelWatch()
watcher, err := localClient.WatchIPNBus(watchCtx, 0)
if err != nil {
return "", err
}
defer watcher.Close()
gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update go func() {
go pump(pumpCtx, bc, c) if !isRunning {
localClient.Start(ctx, ipn.Options{})
}
if forceReauth {
localClient.StartLoginInteractive(ctx)
}
}()
bc.SetNotifyCallback(func(n ipn.Notify) { for {
if n.Engine != nil { n, err := watcher.Next()
select { if err != nil {
case gotEngineUpdate <- true: return "", err
default:
}
} }
if n.ErrMessage != nil { if n.ErrMessage != nil {
msg := *n.ErrMessage msg := *n.ErrMessage
@ -468,48 +478,10 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU
msg += " (try 'sudo tailscale up [...]')" msg += " (try 'sudo tailscale up [...]')"
} }
} }
retErr = fmt.Errorf("backend error: %v", msg) return "", fmt.Errorf("backend error: %v", msg)
cancel()
} else if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
authURL = *url
cancel()
} }
if !forceReauth && n.Prefs != nil && n.Prefs.Valid() { if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
p1, p2 := n.Prefs.AsStruct(), *prefs return *url, nil
p1.Persist = nil
p2.Persist = nil
if p1.Equals(&p2) {
cancel()
}
} }
})
// Wait for backend client to be connected so we know
// we're subscribed to updates. Otherwise we can miss
// an update upon its transition to running. Do so by causing some traffic
// back to the bus that we then wait on.
bc.RequestEngineStatus()
select {
case <-gotEngineUpdate:
case <-pumpCtx.Done():
return authURL, pumpCtx.Err()
}
bc.SetPrefs(prefs)
bc.Start(ipn.Options{})
if forceReauth {
bc.StartLoginInteractive()
}
<-pumpCtx.Done() // wait for authURL or complete failure
if authURL == "" && retErr == nil {
if !forceReauth {
return "", nil // no auth URL is fine
}
retErr = pumpCtx.Err()
}
if authURL == "" && retErr == nil {
return "", fmt.Errorf("login failed with no backend error message")
} }
return authURL, retErr
} }

@ -181,7 +181,8 @@ type LocalBackend struct {
loginFlags controlclient.LoginFlags loginFlags controlclient.LoginFlags
incomingFiles map[*incomingFile]bool incomingFiles map[*incomingFile]bool
fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received
lastStatusTime time.Time // status.AsOf value of the last processed status update notifyWatchers map[*mapSetHandle]chan *ipn.Notify
lastStatusTime time.Time // status.AsOf value of the last processed status update
// directFileRoot, if non-empty, means to write received files // directFileRoot, if non-empty, means to write received files
// directly to this directory, without staging them in an // directly to this directory, without staging them in an
// intermediate buffered directory for "pick-up" later. If // intermediate buffered directory for "pick-up" later. If
@ -1690,24 +1691,70 @@ func (b *LocalBackend) readPoller() {
} }
} }
// send delivers n to the connected frontend. If no frontend is // WatchNotifications subscribes to the ipn.Notify message bus notification
// connected, the notification is dropped without being delivered. // messages.
func (b *LocalBackend) send(n ipn.Notify) { //
// WatchNotifications blocks until ctx is done.
//
// The provided fn will only be called with non-nil pointers. The caller must
// not modify roNotify. If fn returns false, the watch also stops.
//
// Failure to consume many notifications in a row will result in dropped
// notifications. There is currently (2022-11-22) no mechanism provided to
// detect when a message has been dropped.
func (b *LocalBackend) WatchNotifications(ctx context.Context, fn func(roNotify *ipn.Notify) (keepGoing bool)) {
handle := new(mapSetHandle)
ch := make(chan *ipn.Notify, 128)
b.mu.Lock() b.mu.Lock()
notifyFunc := b.notify mak.Set(&b.notifyWatchers, handle, ch)
apiSrv := b.peerAPIServer
b.mu.Unlock() b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.notifyWatchers, handle)
b.mu.Unlock()
}()
if notifyFunc == nil { for {
return select {
case <-ctx.Done():
return
case n := <-ch:
if !fn(n) {
return
}
}
} }
}
// send delivers n to the connected frontend and any API watchers from
// LocalBackend.WatchNotifications (via the LocalAPI).
//
// If no frontend is connected or API watchers are backed up, the notification
// is dropped without being delivered.
func (b *LocalBackend) send(n ipn.Notify) {
n.Version = version.Long
b.mu.Lock()
notifyFunc := b.notify
apiSrv := b.peerAPIServer
if apiSrv.hasFilesWaiting() { if apiSrv.hasFilesWaiting() {
n.FilesWaiting = &empty.Message{} n.FilesWaiting = &empty.Message{}
} }
n.Version = version.Long for _, ch := range b.notifyWatchers {
notifyFunc(n) select {
case ch <- &n:
default:
// Drop the notification if the channel is full.
}
}
b.mu.Unlock()
if notifyFunc != nil {
notifyFunc(n)
}
} }
func (b *LocalBackend) sendFileNotify() { func (b *LocalBackend) sendFileNotify() {

@ -80,6 +80,7 @@ var handler = map[string]localAPIHandler{
"serve-config": (*Handler).serveServeConfig, "serve-config": (*Handler).serveServeConfig,
"set-dns": (*Handler).serveSetDNS, "set-dns": (*Handler).serveSetDNS,
"set-expiry-sooner": (*Handler).serveSetExpirySooner, "set-expiry-sooner": (*Handler).serveSetExpirySooner,
"start": (*Handler).serveStart,
"status": (*Handler).serveStatus, "status": (*Handler).serveStatus,
"tka/init": (*Handler).serveTKAInit, "tka/init": (*Handler).serveTKAInit,
"tka/log": (*Handler).serveTKALog, "tka/log": (*Handler).serveTKALog,
@ -88,6 +89,7 @@ var handler = map[string]localAPIHandler{
"tka/status": (*Handler).serveTKAStatus, "tka/status": (*Handler).serveTKAStatus,
"tka/disable": (*Handler).serveTKADisable, "tka/disable": (*Handler).serveTKADisable,
"upload-client-metrics": (*Handler).serveUploadClientMetrics, "upload-client-metrics": (*Handler).serveUploadClientMetrics,
"watch-ipn-bus": (*Handler).serveWatchIPNBus,
"whois": (*Handler).serveWhoIs, "whois": (*Handler).serveWhoIs,
} }
@ -572,6 +574,34 @@ func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
e.Encode(st) e.Encode(st)
} }
func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "denied", http.StatusForbidden)
return
}
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "not a flusher", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
f.Flush()
ctx := r.Context()
h.b.WatchNotifications(ctx, func(roNotify *ipn.Notify) (keepGoing bool) {
js, err := json.Marshal(roNotify)
if err != nil {
h.logf("json.Marshal: %v", err)
return false
}
if _, err := fmt.Fprintf(w, "%s\n", js); err != nil {
return false
}
f.Flush()
return true
})
}
func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite { if !h.PermitWrite {
http.Error(w, "login access denied", http.StatusForbidden) http.Error(w, "login access denied", http.StatusForbidden)
@ -586,6 +616,29 @@ func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request)
return return
} }
func (h *Handler) serveStart(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "access denied", http.StatusForbidden)
return
}
if r.Method != "POST" {
http.Error(w, "want POST", 400)
return
}
var o ipn.Options
if err := json.NewDecoder(r.Body).Decode(&o); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err := h.b.Start(o)
if err != nil {
// TODO(bradfitz): map error to a good HTTP error
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) serveLogout(w http.ResponseWriter, r *http.Request) { func (h *Handler) serveLogout(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite { if !h.PermitWrite {
http.Error(w, "logout access denied", http.StatusForbidden) http.Error(w, "logout access denied", http.StatusForbidden)

Loading…
Cancel
Save