ipn, cmd/tailscale/cli: add LocalAPI IPN bus watch, Start, convert CLI

Updates #6417
Updates tailscale/corp#8051

Change-Id: I1ca360730c45ffaa0261d8422877304277fc5625
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/6460/head
Brad Fitzpatrick 2 years ago committed by Brad Fitzpatrick
parent d4f6efa1df
commit 300aba61a6

@ -555,6 +555,20 @@ func (lc *LocalClient) EditPrefs(ctx context.Context, mp *ipn.MaskedPrefs) (*ipn
return decodeJSON[*ipn.Prefs](body)
}
// StartLoginInteractive starts an interactive login.
func (lc *LocalClient) StartLoginInteractive(ctx context.Context) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/login-interactive", http.StatusNoContent, nil)
return err
}
// Start applies the configuration specified in opts, and starts the
// state machine.
func (lc *LocalClient) Start(ctx context.Context, opts ipn.Options) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/start", http.StatusNoContent, jsonBody(opts))
return err
}
// Logout logs out the current node.
func (lc *LocalClient) Logout(ctx context.Context) error {
_, err := lc.send(ctx, "POST", "/localapi/v0/logout", http.StatusNoContent, nil)
return err
@ -966,3 +980,78 @@ func (lc *LocalClient) DeleteProfile(ctx context.Context, profile ipn.ProfileID)
_, err := lc.send(ctx, "DELETE", "/localapi/v0/profiles"+url.PathEscape(string(profile)), http.StatusNoContent, nil)
return err
}
// WatchIPNMask are filtering options for LocalClient.WatchIPNBus.
//
// The zero value is a valid WatchOpt that means to watch everything.
//
// TODO(bradfitz): flesh out.
type WatchIPNMask uint64
// WatchIPNBus subscribes to the IPN notification bus. It returns a watcher
// once the bus is connected successfully.
//
// The context is used for the life of the watch, not just the call to
// WatchIPNBus.
//
// The returned IPNBusWatcher's Close method must be called when done to release
// resources.
func (lc *LocalClient) WatchIPNBus(ctx context.Context, mask WatchIPNMask) (*IPNBusWatcher, error) {
req, err := http.NewRequestWithContext(ctx, "GET",
"http://"+apitype.LocalAPIHost+"/localapi/v0/watch-ipn-bus?mask="+fmt.Sprint(mask),
nil)
if err != nil {
return nil, err
}
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
res.Body.Close()
return nil, errors.New(res.Status)
}
dec := json.NewDecoder(res.Body)
return &IPNBusWatcher{
ctx: ctx,
httpRes: res,
dec: dec,
}, nil
}
// IPNBusWatcher is an active subscription (watch) of the local tailscaled IPN bus.
// It's returned by LocalClient.WatchIPNBus.
//
// It must be closed when done.
type IPNBusWatcher struct {
ctx context.Context // from original WatchIPNBus call
httpRes *http.Response
dec *json.Decoder
mu sync.Mutex
closed bool
}
// Close stops the watcher and releases its resources.
func (w *IPNBusWatcher) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return nil
}
w.closed = true
return w.httpRes.Body.Close()
}
// Next returns the next ipn.Notify from the stream.
// If the context from LocalClient.WatchIPNBus is done, that error is returned.
func (w *IPNBusWatcher) Next() (ipn.Notify, error) {
var n ipn.Notify
if err := w.dec.Decode(&n); err != nil {
if cerr := w.ctx.Err(); cerr != nil {
err = cerr
}
return ipn.Notify{}, err
}
return n, nil
}

@ -13,23 +13,18 @@ import (
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"text/tabwriter"
"github.com/peterbourgon/ff/v3/ffcli"
"golang.org/x/exp/slices"
"tailscale.com/client/tailscale"
"tailscale.com/envknob"
"tailscale.com/ipn"
"tailscale.com/paths"
"tailscale.com/safesocket"
"tailscale.com/version/distro"
)
@ -248,58 +243,6 @@ var rootArgs struct {
socket string
}
func connect(ctx context.Context) (net.Conn, *ipn.BackendClient, context.Context, context.CancelFunc) {
s := safesocket.DefaultConnectionStrategy(rootArgs.socket)
c, err := safesocket.Connect(s)
if err != nil {
if runtime.GOOS != "windows" && rootArgs.socket == "" {
fatalf("--socket cannot be empty")
}
fatalf("Failed to connect to tailscaled. (safesocket.Connect: %v)\n", err)
}
clientToServer := func(b []byte) {
ipn.WriteMsg(c, b)
}
ctx, cancel := context.WithCancel(ctx)
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
select {
case <-interrupt:
case <-ctx.Done():
// Context canceled elsewhere.
signal.Reset(syscall.SIGINT, syscall.SIGTERM)
return
}
c.Close()
cancel()
}()
bc := ipn.NewBackendClient(log.Printf, clientToServer)
return c, bc, ctx, cancel
}
// pump receives backend messages on conn and pushes them into bc.
func pump(ctx context.Context, bc *ipn.BackendClient, conn net.Conn) error {
defer conn.Close()
for ctx.Err() == nil {
msg, err := ipn.ReadMsg(conn)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
return fmt.Errorf("%w (tailscaled stopped running?)", err)
}
return err
}
bc.GotNotifyMsg(msg)
}
return ctx.Err()
}
// usageFuncNoDefaultValues is like usageFunc but doesn't print default values.
func usageFuncNoDefaultValues(c *ffcli.Command) string {
return usageFuncOpt(c, false)

@ -285,19 +285,23 @@ var watchIPNArgs struct {
}
func runWatchIPN(ctx context.Context, args []string) error {
c, bc, ctx, cancel := connect(ctx)
defer cancel()
bc.SetNotifyCallback(func(n ipn.Notify) {
watcher, err := localClient.WatchIPNBus(ctx, 0)
if err != nil {
return err
}
defer watcher.Close()
printf("Connected.\n")
for {
n, err := watcher.Next()
if err != nil {
return err
}
if !watchIPNArgs.netmap {
n.NetMap = nil
}
j, _ := json.MarshalIndent(n, "", "\t")
printf("%s\n", j)
})
bc.RequestEngineStatus()
pump(ctx, bc, c)
return errors.New("exit")
}
}
func runDERPMap(ctx context.Context, args []string) error {

@ -15,11 +15,13 @@ import (
"log"
"net/netip"
"os"
"os/signal"
"reflect"
"runtime"
"sort"
"strings"
"sync"
"syscall"
"time"
shellquote "github.com/kballard/go-shellquote"
@ -535,26 +537,37 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
return err
}
// At this point we need to subscribe to the IPN bus to watch
// for state transitions and possible need to authenticate.
c, bc, pumpCtx, cancel := connect(ctx)
defer cancel()
watchCtx, cancelWatch := context.WithCancel(ctx)
defer cancelWatch()
watcher, err := localClient.WatchIPNBus(watchCtx, 0)
if err != nil {
return err
}
defer watcher.Close()
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
select {
case <-interrupt:
cancelWatch()
case <-watchCtx.Done():
}
}()
running := make(chan bool, 1) // gets value once in state ipn.Running
gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update
pumpErr := make(chan error, 1)
go func() { pumpErr <- pump(pumpCtx, bc, c) }()
var printed bool // whether we've yet printed anything to stdout or stderr
var loginOnce sync.Once
startLoginInteractive := func() { loginOnce.Do(func() { bc.StartLoginInteractive() }) }
startLoginInteractive := func() { loginOnce.Do(func() { localClient.StartLoginInteractive(ctx) }) }
bc.SetNotifyCallback(func(n ipn.Notify) {
if n.Engine != nil {
select {
case gotEngineUpdate <- true:
default:
}
go func() {
for {
n, err := watcher.Next()
if err != nil {
pumpErr <- err
return
}
if n.ErrMessage != nil {
msg := *n.ErrMessage
@ -591,7 +604,7 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
case running <- true:
default:
}
cancel()
cancelWatch()
}
}
if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
@ -625,19 +638,8 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
}
}
}
})
// Wait for backend client to be connected so we know
// we're subscribed to updates. Otherwise we can miss
// an update upon its transition to running. Do so by causing some traffic
// back to the bus that we then wait on.
bc.RequestEngineStatus()
select {
case <-gotEngineUpdate:
case <-pumpCtx.Done():
return pumpCtx.Err()
case err := <-pumpErr:
return err
}
}()
// Special case: bare "tailscale up" means to just start
// running, if there's ever been a login.
@ -660,10 +662,12 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
if err != nil {
return err
}
bc.Start(ipn.Options{
if err := localClient.Start(ctx, ipn.Options{
AuthKey: authKey,
UpdatePrefs: prefs,
})
}); err != nil {
return err
}
if upArgs.forceReauth {
startLoginInteractive()
}
@ -685,13 +689,13 @@ func runUp(ctx context.Context, cmd string, args []string, upArgs upArgsT) (retE
select {
case <-running:
return nil
case <-pumpCtx.Done():
case <-watchCtx.Done():
select {
case <-running:
return nil
default:
}
return pumpCtx.Err()
return watchCtx.Err()
case err := <-pumpErr:
select {
case <-running:

@ -28,8 +28,8 @@ import (
"github.com/peterbourgon/ff/v3/ffcli"
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tailcfg"
"tailscale.com/types/preftype"
"tailscale.com/util/groupmember"
"tailscale.com/version/distro"
)
@ -317,6 +317,7 @@ req.send(null);
`
func webHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if authRedirect(w, r) {
return
}
@ -327,7 +328,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
}
if r.URL.Path == "/redirect" || r.URL.Path == "/redirect/" {
w.Write([]byte(authenticationRedirectHTML))
io.WriteString(w, authenticationRedirectHTML)
return
}
st, err := localClient.Status(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
prefs, err := localClient.GetPrefs(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@ -344,23 +356,31 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
}
prefs, err := localClient.GetPrefs(r.Context())
if err != nil && !postData.Reauthenticate {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
} else {
routes, err := calcAdvertiseRoutes(postData.AdvertiseRoutes, postData.AdvertiseExitNode)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
}
prefs.AdvertiseRoutes = routes
mp := &ipn.MaskedPrefs{
AdvertiseRoutesSet: true,
WantRunningSet: true,
}
mp.Prefs.WantRunning = true
mp.Prefs.AdvertiseRoutes = routes
log.Printf("Doing edit: %v", mp.Pretty())
if _, err := localClient.EditPrefs(ctx, mp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
return
}
w.Header().Set("Content-Type", "application/json")
url, err := tailscaleUp(r.Context(), prefs, postData.Reauthenticate)
log.Printf("tailscaleUp(reauth=%v) ...", postData.Reauthenticate)
url, err := tailscaleUp(r.Context(), st, postData.Reauthenticate)
log.Printf("tailscaleUp = (URL %v, %v)", url != "", err)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(mi{"error": err.Error()})
@ -374,17 +394,6 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
return
}
st, err := localClient.Status(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
prefs, err := localClient.GetPrefs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
profile := st.User[st.Self.UserID]
deviceName := strings.Split(st.Self.DNSName, ".")[0]
data := tmplData{
@ -418,26 +427,18 @@ func webHandler(w http.ResponseWriter, r *http.Request) {
w.Write(buf.Bytes())
}
// TODO(crawshaw): some of this is very similar to the code in 'tailscale up', can we share anything?
func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authURL string, retErr error) {
if prefs == nil {
prefs = ipn.NewPrefs()
prefs.ControlURL = ipn.DefaultControlURL
prefs.WantRunning = true
prefs.CorpDNS = true
prefs.AllowSingleHosts = true
prefs.ForceDaemon = (runtime.GOOS == "windows")
}
func tailscaleUp(ctx context.Context, st *ipnstate.Status, forceReauth bool) (authURL string, retErr error) {
origAuthURL := st.AuthURL
isRunning := st.BackendState == ipn.Running.String()
if distro.Get() == distro.Synology {
prefs.NetfilterMode = preftype.NetfilterOff
if !forceReauth {
if origAuthURL != "" {
return origAuthURL, nil
}
if isRunning {
return "", nil
}
st, err := localClient.Status(ctx)
if err != nil {
return "", fmt.Errorf("can't fetch status: %v", err)
}
origAuthURL := st.AuthURL
// printAuthURL reports whether we should print out the
// provided auth URL from an IPN notify.
@ -445,18 +446,27 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU
return url != origAuthURL
}
c, bc, pumpCtx, cancel := connect(ctx)
defer cancel()
gotEngineUpdate := make(chan bool, 1) // gets value upon an engine update
go pump(pumpCtx, bc, c)
watchCtx, cancelWatch := context.WithCancel(ctx)
defer cancelWatch()
watcher, err := localClient.WatchIPNBus(watchCtx, 0)
if err != nil {
return "", err
}
defer watcher.Close()
bc.SetNotifyCallback(func(n ipn.Notify) {
if n.Engine != nil {
select {
case gotEngineUpdate <- true:
default:
go func() {
if !isRunning {
localClient.Start(ctx, ipn.Options{})
}
if forceReauth {
localClient.StartLoginInteractive(ctx)
}
}()
for {
n, err := watcher.Next()
if err != nil {
return "", err
}
if n.ErrMessage != nil {
msg := *n.ErrMessage
@ -468,48 +478,10 @@ func tailscaleUp(ctx context.Context, prefs *ipn.Prefs, forceReauth bool) (authU
msg += " (try 'sudo tailscale up [...]')"
}
}
retErr = fmt.Errorf("backend error: %v", msg)
cancel()
} else if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
authURL = *url
cancel()
}
if !forceReauth && n.Prefs != nil && n.Prefs.Valid() {
p1, p2 := n.Prefs.AsStruct(), *prefs
p1.Persist = nil
p2.Persist = nil
if p1.Equals(&p2) {
cancel()
}
}
})
// Wait for backend client to be connected so we know
// we're subscribed to updates. Otherwise we can miss
// an update upon its transition to running. Do so by causing some traffic
// back to the bus that we then wait on.
bc.RequestEngineStatus()
select {
case <-gotEngineUpdate:
case <-pumpCtx.Done():
return authURL, pumpCtx.Err()
}
bc.SetPrefs(prefs)
bc.Start(ipn.Options{})
if forceReauth {
bc.StartLoginInteractive()
}
<-pumpCtx.Done() // wait for authURL or complete failure
if authURL == "" && retErr == nil {
if !forceReauth {
return "", nil // no auth URL is fine
return "", fmt.Errorf("backend error: %v", msg)
}
retErr = pumpCtx.Err()
if url := n.BrowseToURL; url != nil && printAuthURL(*url) {
return *url, nil
}
if authURL == "" && retErr == nil {
return "", fmt.Errorf("login failed with no backend error message")
}
return authURL, retErr
}

@ -181,6 +181,7 @@ type LocalBackend struct {
loginFlags controlclient.LoginFlags
incomingFiles map[*incomingFile]bool
fileWaiters map[*mapSetHandle]context.CancelFunc // handle => func to call on file received
notifyWatchers map[*mapSetHandle]chan *ipn.Notify
lastStatusTime time.Time // status.AsOf value of the last processed status update
// directFileRoot, if non-empty, means to write received files
// directly to this directory, without staging them in an
@ -1690,24 +1691,70 @@ func (b *LocalBackend) readPoller() {
}
}
// send delivers n to the connected frontend. If no frontend is
// connected, the notification is dropped without being delivered.
func (b *LocalBackend) send(n ipn.Notify) {
// WatchNotifications subscribes to the ipn.Notify message bus notification
// messages.
//
// WatchNotifications blocks until ctx is done.
//
// The provided fn will only be called with non-nil pointers. The caller must
// not modify roNotify. If fn returns false, the watch also stops.
//
// Failure to consume many notifications in a row will result in dropped
// notifications. There is currently (2022-11-22) no mechanism provided to
// detect when a message has been dropped.
func (b *LocalBackend) WatchNotifications(ctx context.Context, fn func(roNotify *ipn.Notify) (keepGoing bool)) {
handle := new(mapSetHandle)
ch := make(chan *ipn.Notify, 128)
b.mu.Lock()
notifyFunc := b.notify
apiSrv := b.peerAPIServer
mak.Set(&b.notifyWatchers, handle, ch)
b.mu.Unlock()
defer func() {
b.mu.Lock()
delete(b.notifyWatchers, handle)
b.mu.Unlock()
}()
if notifyFunc == nil {
for {
select {
case <-ctx.Done():
return
case n := <-ch:
if !fn(n) {
return
}
}
}
}
// send delivers n to the connected frontend and any API watchers from
// LocalBackend.WatchNotifications (via the LocalAPI).
//
// If no frontend is connected or API watchers are backed up, the notification
// is dropped without being delivered.
func (b *LocalBackend) send(n ipn.Notify) {
n.Version = version.Long
b.mu.Lock()
notifyFunc := b.notify
apiSrv := b.peerAPIServer
if apiSrv.hasFilesWaiting() {
n.FilesWaiting = &empty.Message{}
}
n.Version = version.Long
for _, ch := range b.notifyWatchers {
select {
case ch <- &n:
default:
// Drop the notification if the channel is full.
}
}
b.mu.Unlock()
if notifyFunc != nil {
notifyFunc(n)
}
}
func (b *LocalBackend) sendFileNotify() {

@ -80,6 +80,7 @@ var handler = map[string]localAPIHandler{
"serve-config": (*Handler).serveServeConfig,
"set-dns": (*Handler).serveSetDNS,
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
"start": (*Handler).serveStart,
"status": (*Handler).serveStatus,
"tka/init": (*Handler).serveTKAInit,
"tka/log": (*Handler).serveTKALog,
@ -88,6 +89,7 @@ var handler = map[string]localAPIHandler{
"tka/status": (*Handler).serveTKAStatus,
"tka/disable": (*Handler).serveTKADisable,
"upload-client-metrics": (*Handler).serveUploadClientMetrics,
"watch-ipn-bus": (*Handler).serveWatchIPNBus,
"whois": (*Handler).serveWhoIs,
}
@ -572,6 +574,34 @@ func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
e.Encode(st)
}
func (h *Handler) serveWatchIPNBus(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "denied", http.StatusForbidden)
return
}
f, ok := w.(http.Flusher)
if !ok {
http.Error(w, "not a flusher", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
f.Flush()
ctx := r.Context()
h.b.WatchNotifications(ctx, func(roNotify *ipn.Notify) (keepGoing bool) {
js, err := json.Marshal(roNotify)
if err != nil {
h.logf("json.Marshal: %v", err)
return false
}
if _, err := fmt.Fprintf(w, "%s\n", js); err != nil {
return false
}
f.Flush()
return true
})
}
func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "login access denied", http.StatusForbidden)
@ -586,6 +616,29 @@ func (h *Handler) serveLoginInteractive(w http.ResponseWriter, r *http.Request)
return
}
func (h *Handler) serveStart(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "access denied", http.StatusForbidden)
return
}
if r.Method != "POST" {
http.Error(w, "want POST", 400)
return
}
var o ipn.Options
if err := json.NewDecoder(r.Body).Decode(&o); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err := h.b.Start(o)
if err != nil {
// TODO(bradfitz): map error to a good HTTP error
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) serveLogout(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "logout access denied", http.StatusForbidden)

Loading…
Cancel
Save