From 6d01d3bece279a051cb3a7091ba9a6f8354836b6 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 8 Apr 2021 14:54:25 -0700 Subject: [PATCH] ipn/ipnlocal: provide IPN bus updates as files arrive Signed-off-by: Brad Fitzpatrick --- ipn/backend.go | 11 +++++- ipn/ipnlocal/local.go | 44 +++++++++++++++++++++-- ipn/ipnlocal/peerapi.go | 77 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 120 insertions(+), 12 deletions(-) diff --git a/ipn/backend.go b/ipn/backend.go index ae5f9dc12..9e143e1c7 100644 --- a/ipn/backend.go +++ b/ipn/backend.go @@ -67,7 +67,8 @@ type Notify struct { BackendLogID *string // public logtail id used by backend PingResult *ipnstate.PingResult - FilesWaiting *empty.Message `json:",omitempty"` + FilesWaiting *empty.Message `json:",omitempty"` + IncomingFiles []PartialFile `json:",omitempty"` // LocalTCPPort, if non-nil, informs the UI frontend which // (non-zero) localhost TCP port it's listening on. @@ -78,6 +79,14 @@ type Notify struct { // type is mirrored in xcode/Shared/IPN.swift } +// PartialFile represents an in-progress file transfer. +type PartialFile struct { + Name string // e.g. "foo.jpg" + Started time.Time // time transfer started + DeclaredSize int64 // or -1 if unknown + Received int64 // bytes copied thus far +} + // StateKey is an opaque identifier for a set of LocalBackend state // (preferences, private keys, etc.). // diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index bbb92343d..a973fa68b 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -16,6 +16,7 @@ import ( "os/exec" "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" @@ -115,6 +116,7 @@ type LocalBackend struct { prevIfState *interfaces.State peerAPIServer *peerAPIServer // or nil peerAPIListeners []*peerAPIListener + incomingFiles map[*incomingFile]bool // statusLock must be held before calling statusChanged.Wait() or // statusChanged.Broadcast(). @@ -925,7 +927,6 @@ func (b *LocalBackend) readPoller() { func (b *LocalBackend) send(n ipn.Notify) { b.mu.Lock() notifyFunc := b.notify - apiSrv := b.peerAPIServer b.mu.Unlock() if notifyFunc == nil { @@ -934,10 +935,33 @@ func (b *LocalBackend) send(n ipn.Notify) { } n.Version = version.Long - if apiSrv != nil && apiSrv.hasFilesWaiting() { + notifyFunc(n) +} + +func (b *LocalBackend) sendFileNotify() { + var n ipn.Notify + + b.mu.Lock() + notifyFunc := b.notify + apiSrv := b.peerAPIServer + if notifyFunc == nil || apiSrv == nil { + b.mu.Unlock() + return + } + + if apiSrv.hasFilesWaiting() { n.FilesWaiting = &empty.Message{} } - notifyFunc(n) + for f := range b.incomingFiles { + n.IncomingFiles = append(n.IncomingFiles, f.PartialFile()) + } + b.mu.Unlock() + + sort.Slice(n.IncomingFiles, func(i, j int) bool { + return n.IncomingFiles[i].Started.Before(n.IncomingFiles[j].Started) + }) + + b.send(n) } // popBrowserAuthNow shuts down the data plane and sends an auth URL @@ -2090,6 +2114,7 @@ func (b *LocalBackend) FileTargets() ([]*FileTarget, error) { peerAPI := peerAPIBase(b.netMap, p) if peerAPI == "" { continue + } ret = append(ret, &FileTarget{ Node: p, @@ -2100,6 +2125,19 @@ func (b *LocalBackend) FileTargets() ([]*FileTarget, error) { return ret, nil } +func (b *LocalBackend) registerIncomingFile(inf *incomingFile, active bool) { + b.mu.Lock() + defer b.mu.Unlock() + if b.incomingFiles == nil { + b.incomingFiles = make(map[*incomingFile]bool) + } + if active { + b.incomingFiles[inf] = true + } else { + delete(b.incomingFiles, inf) + } +} + // peerAPIBase returns the "http://ip:port" URL base to reach peer's peerAPI. // It returns the empty string if the peer doesn't support the peerapi // or there's no matching address family based on the netmap's own addresses. diff --git a/ipn/ipnlocal/peerapi.go b/ipn/ipnlocal/peerapi.go index bfbf327a4..0d7319968 100644 --- a/ipn/ipnlocal/peerapi.go +++ b/ipn/ipnlocal/peerapi.go @@ -20,6 +20,8 @@ import ( "runtime" "strconv" "strings" + "sync" + "time" "inet.af/netaddr" "tailscale.com/ipn" @@ -338,6 +340,52 @@ This is my Tailscale device. Your device is %v. } } +type incomingFile struct { + name string // "foo.jpg" + started time.Time + size int64 // or -1 if unknown; never 0 + w io.Writer // underlying writer + ph *peerAPIHandler + + mu sync.Mutex + copied int64 + lastNotify time.Time +} + +func (f *incomingFile) Write(p []byte) (n int, err error) { + n, err = f.w.Write(p) + + b := f.ph.ps.b + var needNotify bool + defer func() { + if needNotify { + b.sendFileNotify() + } + }() + if n > 0 { + f.mu.Lock() + defer f.mu.Unlock() + f.copied += int64(n) + now := time.Now() + if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second { + f.lastNotify = now + needNotify = true + } + } + return n, err +} + +func (f *incomingFile) PartialFile() ipn.PartialFile { + f.mu.Lock() + defer f.mu.Unlock() + return ipn.PartialFile{ + Name: f.name, + Started: f.started, + DeclaredSize: f.size, + Received: f.copied, + } +} + func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) { if !h.isSelf { http.Error(w, "not owner", http.StatusForbidden) @@ -369,12 +417,25 @@ func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) { os.Remove(dstFile) } }() - n, err := io.Copy(f, r.Body) - if err != nil { - f.Close() - h.logf("put Copy error: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return + var finalSize int64 + if r.ContentLength != 0 { + inFile := &incomingFile{ + name: baseName, + started: time.Now(), + size: r.ContentLength, + w: f, + ph: h, + } + h.ps.b.registerIncomingFile(inFile, true) + defer h.ps.b.registerIncomingFile(inFile, false) + n, err := io.Copy(inFile, r.Body) + if err != nil { + f.Close() + h.logf("put Copy error: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + finalSize = n } if err := f.Close(); err != nil { h.logf("put Close error: %v", err) @@ -382,14 +443,14 @@ func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) { return } - h.logf("put of %s from %v/%v", baseName, approxSize(n), h.remoteAddr.IP, h.peerNode.ComputedName) + h.logf("put of %s from %v/%v", baseName, approxSize(finalSize), h.remoteAddr.IP, h.peerNode.ComputedName) // TODO: set modtime // TODO: some real response success = true io.WriteString(w, "{}\n") h.ps.knownEmpty.Set(false) - h.ps.b.send(ipn.Notify{}) // it will set FilesWaiting + h.ps.b.sendFileNotify() } func approxSize(n int64) string {