cmd/tailscaled, ipn/ipnserver: refactor ipnserver

More work towards removing the massive ipnserver.Run and ipnserver.Options
and making composable pieces.

Work remains. (The getEngine retry loop on Windows complicates things.)
For now some duplicate code exists. Once the Windows side is fixed
to either not need the retry loop or to move the retry loop into a
custom wgengine.Engine wrapper, then we can unify tailscaled_windows.go
too.

Change-Id: If84d16e3cd15b54ead3c3bb301f27ae78d055f80
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/3275/head
Brad Fitzpatrick 3 years ago committed by Brad Fitzpatrick
parent 5f36ab8a90
commit e6fbc0cd54

@ -224,9 +224,6 @@ func ipnServerOpts() (o ipnserver.Options) {
goos = runtime.GOOS
}
o.Port = safesocket.WindowsLocalPort
o.StatePath = statePathOrDefault()
o.SocketPath = args.socketpath // even for goos=="windows", for tests
o.VarRoot = args.statedir
// If an absolute --state is provided but not --statedir, try to derive
@ -359,8 +356,27 @@ func run() error {
}()
opts := ipnServerOpts()
opts.DebugMux = debugMux
err = ipnserver.Run(ctx, logf, pol.PublicID.String(), ipnserver.FixedEngine(e), opts)
store, err := ipnserver.StateStore(statePathOrDefault(), logf)
if err != nil {
return err
}
srv, err := ipnserver.New(logf, pol.PublicID.String(), store, e, nil, opts)
if err != nil {
logf("ipnserver.New: %v", err)
return err
}
if debugMux != nil {
debugMux.HandleFunc("/debug/ipn", srv.ServeHTMLStatus)
}
ln, _, err := safesocket.Listen(args.socketpath, safesocket.WindowsLocalPort)
if err != nil {
return fmt.Errorf("safesocket.Listen: %v", err)
}
err = srv.Run(ctx, ln)
// Cancelation is not an error: it is the only way to stop ipnserver.
if err != nil && err != context.Canceled {
logf("ipnserver.Run: %v", err)

@ -33,6 +33,7 @@ import (
"tailscale.com/logpolicy"
"tailscale.com/net/dns"
"tailscale.com/net/tstun"
"tailscale.com/safesocket"
"tailscale.com/types/logger"
"tailscale.com/util/winutil"
"tailscale.com/version"
@ -271,7 +272,18 @@ func startIPNServer(ctx context.Context, logid string) error {
return nil, fmt.Errorf("%w\n\nlogid: %v", res.Err, logid)
}
}
err := ipnserver.Run(ctx, logf, logid, getEngine, ipnServerOpts())
store, err := ipnserver.StateStore(statePathOrDefault(), logf)
if err != nil {
return err
}
ln, _, err := safesocket.Listen(args.socketpath, safesocket.WindowsLocalPort)
if err != nil {
return fmt.Errorf("safesocket.Listen: %v", err)
}
err = ipnserver.Run(ctx, logf, ln, store, logid, getEngine, ipnServerOpts())
if err != nil {
logf("ipnserver.Run: %v", err)
}

@ -52,26 +52,6 @@ import (
// Options is the configuration of the Tailscale node agent.
type Options struct {
// SocketPath, on unix systems, is the unix socket path to listen
// on for frontend connections.
SocketPath string
// Port, on windows, is the localhost TCP port to listen on for
// frontend connections.
Port int
// StatePath is the path to the stored agent state.
// It should be an absolute path to a file.
//
// Special cases:
//
// * empty string means to use an in-memory store
// * if the string begins with "kube:", the suffix
// is a Kubernetes secret name
// * if the string begins with "arn:", the value is
// an AWS ARN for an SSM.
StatePath string
// VarRoot is the the Tailscale daemon's private writable
// directory (usually "/var/lib/tailscale" on Linux) that
// contains the "tailscaled.state" file, the "certs" directory
@ -100,10 +80,6 @@ type Options struct {
// the actual definition of "disconnect" is when the
// connection count transitions from 1 to 0.
SurviveDisconnects bool
// DebugMux, if non-nil, specifies an HTTP ServeMux in which
// to register a debug handler.
DebugMux *http.ServeMux
}
// Server is an IPN backend and its set of 0 or more active localhost
@ -118,7 +94,7 @@ type Server struct {
// connection (such as on Windows by default). Even if this
// is true, the ForceDaemon pref can override this.
resetOnZero bool
opts Options
autostartStateKey ipn.StateKey
bsMu sync.Mutex // lock order: bsMu, then mu
bs *ipn.BackendServer
@ -623,18 +599,57 @@ func tryWindowsAppDataMigration(logf logger.Logf, path string) string {
return paths.TryConfigFileMigration(logf, oldFile, path)
}
// StateStore returns a StateStore from path.
//
// The path should be an absolute path to a file.
//
// Special cases:
//
// * empty string means to use an in-memory store
// * if the string begins with "kube:", the suffix
// is a Kubernetes secret name
// * if the string begins with "arn:", the value is
// an AWS ARN for an SSM.
func StateStore(path string, logf logger.Logf) (ipn.StateStore, error) {
if path == "" {
return &ipn.MemoryStore{}, nil
}
const kubePrefix = "kube:"
const arnPrefix = "arn:"
switch {
case strings.HasPrefix(path, kubePrefix):
secretName := strings.TrimPrefix(path, kubePrefix)
store, err := ipn.NewKubeStore(secretName)
if err != nil {
return nil, fmt.Errorf("ipn.NewKubeStore(%q): %v", secretName, err)
}
return store, nil
case strings.HasPrefix(path, arnPrefix):
store, err := aws.NewStore(path)
if err != nil {
return nil, fmt.Errorf("aws.NewStore(%q): %v", path, err)
}
return store, nil
}
if runtime.GOOS == "windows" {
path = tryWindowsAppDataMigration(logf, path)
}
store, err := ipn.NewFileStore(path)
if err != nil {
return nil, fmt.Errorf("ipn.NewFileStore(%q): %v", path, err)
}
return store, nil
}
// Run runs a Tailscale backend service.
// The getEngine func is called repeatedly, once per connection, until it returns an engine successfully.
func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (wgengine.Engine, error), opts Options) error {
//
// Deprecated: use New and Server.Run instead.
func Run(ctx context.Context, logf logger.Logf, ln net.Listener, store ipn.StateStore, logid string, getEngine func() (wgengine.Engine, error), opts Options) error {
getEngine = getEngineUntilItWorksWrapper(getEngine)
runDone := make(chan struct{})
defer close(runDone)
listen, _, err := safesocket.Listen(opts.SocketPath, uint16(opts.Port))
if err != nil {
return fmt.Errorf("safesocket.Listen: %v", err)
}
var serverMu sync.Mutex
var serverOrNil *Server
@ -650,41 +665,15 @@ func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (
s.stopAll()
}
serverMu.Unlock()
listen.Close()
ln.Close()
}()
logf("Listening on %v", listen.Addr())
logf("Listening on %v", ln.Addr())
var serverModeUser *user.User
var store ipn.StateStore
if opts.StatePath != "" {
const kubePrefix = "kube:"
const arnPrefix = "arn:"
path := opts.StatePath
switch {
case strings.HasPrefix(path, kubePrefix):
secretName := strings.TrimPrefix(path, kubePrefix)
store, err = ipn.NewKubeStore(secretName)
if err != nil {
return fmt.Errorf("ipn.NewKubeStore(%q): %v", secretName, err)
}
case strings.HasPrefix(path, arnPrefix):
store, err = aws.NewStore(path)
if err != nil {
return fmt.Errorf("aws.NewStore(%q): %v", path, err)
}
default:
if runtime.GOOS == "windows" {
path = tryWindowsAppDataMigration(logf, path)
}
store, err = ipn.NewFileStore(path)
if err != nil {
return fmt.Errorf("ipn.NewFileStore(%q): %v", path, err)
}
}
if opts.AutostartStateKey == "" {
autoStartKey, err := store.ReadState(ipn.ServerModeStartKey)
if err != nil && err != ipn.ErrStateNotExist {
return fmt.Errorf("calling ReadState on %s: %w", path, err)
return fmt.Errorf("calling ReadState on state store: %w", err)
}
key := string(autoStartKey)
if strings.HasPrefix(key, "user-") {
@ -699,9 +688,6 @@ func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (
opts.AutostartStateKey = ipn.StateKey(key)
}
}
} else {
store = &ipn.MemoryStore{}
}
bo := backoff.NewBackoff("ipnserver", logf, 30*time.Second)
var unservedConn net.Conn // if non-nil, accepted, but hasn't served yet
@ -710,7 +696,7 @@ func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (
if err != nil {
logf("ipnserver: initial getEngine call: %v", err)
for i := 1; ctx.Err() == nil; i++ {
c, err := listen.Accept()
c, err := ln.Accept()
if err != nil {
logf("%d: Accept: %v", i, err)
bo.BackOff(ctx, err)
@ -737,8 +723,8 @@ func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (
}
}
if unservedConn != nil {
listen = &listenerWithReadyConn{
Listener: listen,
ln = &listenerWithReadyConn{
Listener: ln,
c: unservedConn,
}
}
@ -750,12 +736,12 @@ func Run(ctx context.Context, logf logger.Logf, logid string, getEngine func() (
serverMu.Lock()
serverOrNil = server
serverMu.Unlock()
return server.Serve(ctx, listen)
return server.Run(ctx, ln)
}
// New returns a new Server.
//
// The opts.StatePath option is ignored; it's only used by Run.
// To start it, use the Server.Run method.
func New(logf logger.Logf, logid string, store ipn.StateStore, eng wgengine.Engine, serverModeUser *user.User, opts Options) (*Server, error) {
b, err := ipnlocal.NewLocalBackend(logf, logid, store, eng)
if err != nil {
@ -766,10 +752,23 @@ func New(logf logger.Logf, logid string, store ipn.StateStore, eng wgengine.Engi
return smallzstd.NewDecoder(nil)
})
if opts.DebugMux != nil {
opts.DebugMux.HandleFunc("/debug/ipn", func(w http.ResponseWriter, r *http.Request) {
serveHTMLStatus(w, b)
})
if opts.AutostartStateKey == "" {
autoStartKey, err := store.ReadState(ipn.ServerModeStartKey)
if err != nil && err != ipn.ErrStateNotExist {
return nil, fmt.Errorf("calling ReadState on store: %w", err)
}
key := string(autoStartKey)
if strings.HasPrefix(key, "user-") {
uid := strings.TrimPrefix(key, "user-")
u, err := lookupUserFromID(logf, uid)
if err != nil {
logf("ipnserver: found server mode auto-start key %q; failed to load user: %v", key, err)
} else {
logf("ipnserver: found server mode auto-start key %q (user %s)", key, u.Username)
serverModeUser = u
}
opts.AutostartStateKey = ipn.StateKey(key)
}
}
server := &Server{
@ -778,22 +777,37 @@ func New(logf logger.Logf, logid string, store ipn.StateStore, eng wgengine.Engi
logf: logf,
resetOnZero: !opts.SurviveDisconnects,
serverModeUser: serverModeUser,
opts: opts,
autostartStateKey: opts.AutostartStateKey,
}
server.bs = ipn.NewBackendServer(logf, b, server.writeToClients)
return server, nil
}
// Serve accepts connections from ln forever.
// Run runs the server, accepting connections from ln forever.
//
// The context is only used to suppress errors
func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
// If the context is done, the listener is closed.
func (s *Server) Run(ctx context.Context, ln net.Listener) error {
defer s.b.Shutdown()
if s.opts.AutostartStateKey != "" {
runDone := make(chan struct{})
defer close(runDone)
// When the context is closed or when we return, whichever is first, close our listener
// and all open connections.
go func() {
select {
case <-ctx.Done():
case <-runDone:
}
s.stopAll()
ln.Close()
}()
if s.autostartStateKey != "" {
s.bs.GotCommand(ctx, &ipn.Command{
Version: version.Long,
Start: &ipn.StartArgs{
Opts: ipn.Options{StateKey: s.opts.AutostartStateKey},
Opts: ipn.Options{StateKey: s.autostartStateKey},
},
})
}
@ -1031,13 +1045,13 @@ func (s *Server) localhostHandler(ci connIdentity) http.Handler {
io.WriteString(w, "<html><title>Tailscale</title><body><h1>Tailscale</h1>This is the local Tailscale daemon.")
return
}
serveHTMLStatus(w, s.b)
s.ServeHTMLStatus(w, r)
})
}
func serveHTMLStatus(w http.ResponseWriter, b *ipnlocal.LocalBackend) {
func (s *Server) ServeHTMLStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
st := b.Status()
st := s.b.Status()
// TODO(bradfitz): add LogID and opts to st?
st.WriteHTML(w)
}

@ -62,10 +62,16 @@ func TestRunMultipleAccepts(t *testing.T) {
}
t.Cleanup(eng.Close)
opts := ipnserver.Options{
SocketPath: socketPath,
}
opts := ipnserver.Options{}
t.Logf("pre-Run")
err = ipnserver.Run(ctx, logTriggerTestf, "dummy_logid", ipnserver.FixedEngine(eng), opts)
store := new(ipn.MemoryStore)
ln, _, err := safesocket.Listen(socketPath, 0)
if err != nil {
t.Fatal(err)
}
defer ln.Close()
err = ipnserver.Run(ctx, logTriggerTestf, ln, store, "dummy_logid", ipnserver.FixedEngine(eng), opts)
t.Logf("ipnserver.Run = %v", err)
}

Loading…
Cancel
Save