diff --git a/ipn/ipnserver/server.go b/ipn/ipnserver/server.go index 6e70b9dfa..d736c4d10 100644 --- a/ipn/ipnserver/server.go +++ b/ipn/ipnserver/server.go @@ -76,16 +76,21 @@ func pump(logf logger.Logf, ctx context.Context, bs *ipn.BackendServer, s net.Co } } -func Run(rctx context.Context, logf logger.Logf, logid string, opts Options, e wgengine.Engine) error { - bo := backoff.Backoff{Name: "ipnserver"} +func Run(rctx context.Context, logf logger.Logf, logid string, opts Options, e wgengine.Engine) (err error) { + runDone := make(chan error, 1) + defer func() { runDone <- err }() listen, _, err := safesocket.Listen(opts.SocketPath, uint16(opts.Port)) if err != nil { return fmt.Errorf("safesocket.Listen: %v", err) } + // Go listeners can't take a context, close it instead. go func() { - <-rctx.Done() + select { + case <-rctx.Done(): + case <-runDone: + } listen.Close() }() logf("Listening on %v\n", listen.Addr()) @@ -130,13 +135,11 @@ func Run(rctx context.Context, logf logger.Logf, logid string, opts Options, e w }) } - var oldS net.Conn - //lint:ignore SA4006 ctx is never used, but has to be defined so - // that it can be assigned to in the following for loop. It's a - // bit of necessary code convolution to work around Go's variable - // shadowing rules. - ctx, cancel := context.WithCancel(rctx) - + var ( + oldS net.Conn + ctx context.Context + cancel context.CancelFunc + ) stopAll := func() { // Currently we only support one client connection at a time. // Theoretically we could allow multiple clients, by passing @@ -150,6 +153,8 @@ func Run(rctx context.Context, logf logger.Logf, logid string, opts Options, e w } } + bo := backoff.Backoff{Name: "ipnserver"} + for i := 1; rctx.Err() == nil; i++ { s, err = listen.Accept() if err != nil { @@ -160,10 +165,11 @@ func Run(rctx context.Context, logf logger.Logf, logid string, opts Options, e w logf("%d: Incoming control connection.\n", i) stopAll() - ctx, cancel = context.WithCancel(context.Background()) + ctx, cancel = context.WithCancel(rctx) oldS = s go func(ctx context.Context, bs *ipn.BackendServer, s net.Conn, i int) { + // TODO: move this prefixing-Logf code into a new helper in types/logger? si := fmt.Sprintf("%d: ", i) pump(func(fmt string, args ...interface{}) { logf(si+fmt, args...) diff --git a/ipn/local.go b/ipn/local.go index 8f65f2908..8b79e1759 100644 --- a/ipn/local.go +++ b/ipn/local.go @@ -71,7 +71,7 @@ func NewLocalBackend(logf logger.Logf, logid string, store StateStore, e wgengin logf("skipping portlist: %s\n", err) } - b := LocalBackend{ + b := &LocalBackend{ logf: logf, e: e, store: store, @@ -86,7 +86,7 @@ func NewLocalBackend(logf logger.Logf, logid string, store StateStore, e wgengin go b.runPoller() } - return &b, nil + return b, nil } func (b *LocalBackend) Shutdown() { diff --git a/ipn/message.go b/ipn/message.go index dee6ec6fb..bb263cf87 100644 --- a/ipn/message.go +++ b/ipn/message.go @@ -48,9 +48,9 @@ type Command struct { type BackendServer struct { logf logger.Logf - b Backend // the Backend we are serving up - sendNotifyMsg func(b []byte) // send a notification message - GotQuit bool // a Quit command was received + b Backend // the Backend we are serving up + sendNotifyMsg func(jsonMsg []byte) // send a notification message + GotQuit bool // a Quit command was received } func NewBackendServer(logf logger.Logf, b Backend, sendNotifyMsg func(b []byte)) *BackendServer { @@ -70,13 +70,14 @@ func (bs *BackendServer) send(n Notify) { bs.sendNotifyMsg(b) } -// Inform the BackendServer of an incoming message. +// GotCommandMsg parses the incoming message b as a JSON Command and +// calls GotCommand with it. func (bs *BackendServer) GotCommandMsg(b []byte) error { - cmd := Command{} - if err := json.Unmarshal(b, &cmd); err != nil { + cmd := &Command{} + if err := json.Unmarshal(b, cmd); err != nil { return err } - return bs.GotCommand(&cmd) + return bs.GotCommand(cmd) } func (bs *BackendServer) GotCommand(cmd *Command) error { @@ -130,11 +131,11 @@ func (bs *BackendServer) Reset() error { type BackendClient struct { logf logger.Logf - sendCommandMsg func(b []byte) - notify func(n Notify) + sendCommandMsg func(jsonb []byte) + notify func(Notify) } -func NewBackendClient(logf logger.Logf, sendCommandMsg func(b []byte)) *BackendClient { +func NewBackendClient(logf logger.Logf, sendCommandMsg func(jsonb []byte)) *BackendClient { return &BackendClient{ logf: logf, sendCommandMsg: sendCommandMsg, @@ -203,7 +204,8 @@ func (bc *BackendClient) FakeExpireAfter(x time.Duration) { bc.send(Command{FakeExpireAfter: &FakeExpireAfterArgs{Duration: x}}) } -const MSG_MAX = 1024 * 1024 +// MaxMessageSize is the maximum message size, in bytes. +const MaxMessageSize = 1 << 20 // TODO(apenwarr): incremental json decode? // That would let us avoid storing the whole byte array uselessly in RAM. @@ -214,7 +216,7 @@ func ReadMsg(r io.Reader) ([]byte, error) { return nil, err } n := binary.LittleEndian.Uint32(cb) - if n > 1024*1024 { + if n > MaxMessageSize { return nil, fmt.Errorf("ipn.Read: message too large: %v bytes", n) } b := make([]byte, n) @@ -229,8 +231,12 @@ func ReadMsg(r io.Reader) ([]byte, error) { // That would save RAM, at the expense of having to encode once so that // we can produce the initial byte count. func WriteMsg(w io.Writer, b []byte) error { + // TODO(bradfitz): this does two writes to w, which likely + // does two writes on the wire, two frame generations, etc. We + // should take a concrete buffered type, or use a sync.Pool to + // allocate a buf and do one write. cb := make([]byte, 4) - if len(b) > MSG_MAX { + if len(b) > MaxMessageSize { return fmt.Errorf("ipn.Write: message too large: %v bytes", len(b)) } binary.LittleEndian.PutUint32(cb, uint32(len(b)))