ipn/ipnlocal: provide IPN bus updates as files arrive

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/1673/head
Brad Fitzpatrick 3 years ago committed by Brad Fitzpatrick
parent 2f398106e2
commit 6d01d3bece

@ -67,7 +67,8 @@ type Notify struct {
BackendLogID *string // public logtail id used by backend BackendLogID *string // public logtail id used by backend
PingResult *ipnstate.PingResult PingResult *ipnstate.PingResult
FilesWaiting *empty.Message `json:",omitempty"` FilesWaiting *empty.Message `json:",omitempty"`
IncomingFiles []PartialFile `json:",omitempty"`
// LocalTCPPort, if non-nil, informs the UI frontend which // LocalTCPPort, if non-nil, informs the UI frontend which
// (non-zero) localhost TCP port it's listening on. // (non-zero) localhost TCP port it's listening on.
@ -78,6 +79,14 @@ type Notify struct {
// type is mirrored in xcode/Shared/IPN.swift // type is mirrored in xcode/Shared/IPN.swift
} }
// PartialFile represents an in-progress file transfer.
type PartialFile struct {
Name string // e.g. "foo.jpg"
Started time.Time // time transfer started
DeclaredSize int64 // or -1 if unknown
Received int64 // bytes copied thus far
}
// StateKey is an opaque identifier for a set of LocalBackend state // StateKey is an opaque identifier for a set of LocalBackend state
// (preferences, private keys, etc.). // (preferences, private keys, etc.).
// //

@ -16,6 +16,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -115,6 +116,7 @@ type LocalBackend struct {
prevIfState *interfaces.State prevIfState *interfaces.State
peerAPIServer *peerAPIServer // or nil peerAPIServer *peerAPIServer // or nil
peerAPIListeners []*peerAPIListener peerAPIListeners []*peerAPIListener
incomingFiles map[*incomingFile]bool
// statusLock must be held before calling statusChanged.Wait() or // statusLock must be held before calling statusChanged.Wait() or
// statusChanged.Broadcast(). // statusChanged.Broadcast().
@ -925,7 +927,6 @@ func (b *LocalBackend) readPoller() {
func (b *LocalBackend) send(n ipn.Notify) { func (b *LocalBackend) send(n ipn.Notify) {
b.mu.Lock() b.mu.Lock()
notifyFunc := b.notify notifyFunc := b.notify
apiSrv := b.peerAPIServer
b.mu.Unlock() b.mu.Unlock()
if notifyFunc == nil { if notifyFunc == nil {
@ -934,10 +935,33 @@ func (b *LocalBackend) send(n ipn.Notify) {
} }
n.Version = version.Long n.Version = version.Long
if apiSrv != nil && apiSrv.hasFilesWaiting() { notifyFunc(n)
}
func (b *LocalBackend) sendFileNotify() {
var n ipn.Notify
b.mu.Lock()
notifyFunc := b.notify
apiSrv := b.peerAPIServer
if notifyFunc == nil || apiSrv == nil {
b.mu.Unlock()
return
}
if apiSrv.hasFilesWaiting() {
n.FilesWaiting = &empty.Message{} n.FilesWaiting = &empty.Message{}
} }
notifyFunc(n) for f := range b.incomingFiles {
n.IncomingFiles = append(n.IncomingFiles, f.PartialFile())
}
b.mu.Unlock()
sort.Slice(n.IncomingFiles, func(i, j int) bool {
return n.IncomingFiles[i].Started.Before(n.IncomingFiles[j].Started)
})
b.send(n)
} }
// popBrowserAuthNow shuts down the data plane and sends an auth URL // popBrowserAuthNow shuts down the data plane and sends an auth URL
@ -2090,6 +2114,7 @@ func (b *LocalBackend) FileTargets() ([]*FileTarget, error) {
peerAPI := peerAPIBase(b.netMap, p) peerAPI := peerAPIBase(b.netMap, p)
if peerAPI == "" { if peerAPI == "" {
continue continue
} }
ret = append(ret, &FileTarget{ ret = append(ret, &FileTarget{
Node: p, Node: p,
@ -2100,6 +2125,19 @@ func (b *LocalBackend) FileTargets() ([]*FileTarget, error) {
return ret, nil return ret, nil
} }
func (b *LocalBackend) registerIncomingFile(inf *incomingFile, active bool) {
b.mu.Lock()
defer b.mu.Unlock()
if b.incomingFiles == nil {
b.incomingFiles = make(map[*incomingFile]bool)
}
if active {
b.incomingFiles[inf] = true
} else {
delete(b.incomingFiles, inf)
}
}
// peerAPIBase returns the "http://ip:port" URL base to reach peer's peerAPI. // peerAPIBase returns the "http://ip:port" URL base to reach peer's peerAPI.
// It returns the empty string if the peer doesn't support the peerapi // It returns the empty string if the peer doesn't support the peerapi
// or there's no matching address family based on the netmap's own addresses. // or there's no matching address family based on the netmap's own addresses.

@ -20,6 +20,8 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"sync"
"time"
"inet.af/netaddr" "inet.af/netaddr"
"tailscale.com/ipn" "tailscale.com/ipn"
@ -338,6 +340,52 @@ This is my Tailscale device. Your device is %v.
} }
} }
type incomingFile struct {
name string // "foo.jpg"
started time.Time
size int64 // or -1 if unknown; never 0
w io.Writer // underlying writer
ph *peerAPIHandler
mu sync.Mutex
copied int64
lastNotify time.Time
}
func (f *incomingFile) Write(p []byte) (n int, err error) {
n, err = f.w.Write(p)
b := f.ph.ps.b
var needNotify bool
defer func() {
if needNotify {
b.sendFileNotify()
}
}()
if n > 0 {
f.mu.Lock()
defer f.mu.Unlock()
f.copied += int64(n)
now := time.Now()
if f.lastNotify.IsZero() || now.Sub(f.lastNotify) > time.Second {
f.lastNotify = now
needNotify = true
}
}
return n, err
}
func (f *incomingFile) PartialFile() ipn.PartialFile {
f.mu.Lock()
defer f.mu.Unlock()
return ipn.PartialFile{
Name: f.name,
Started: f.started,
DeclaredSize: f.size,
Received: f.copied,
}
}
func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) { func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) {
if !h.isSelf { if !h.isSelf {
http.Error(w, "not owner", http.StatusForbidden) http.Error(w, "not owner", http.StatusForbidden)
@ -369,12 +417,25 @@ func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) {
os.Remove(dstFile) os.Remove(dstFile)
} }
}() }()
n, err := io.Copy(f, r.Body) var finalSize int64
if err != nil { if r.ContentLength != 0 {
f.Close() inFile := &incomingFile{
h.logf("put Copy error: %v", err) name: baseName,
http.Error(w, err.Error(), http.StatusInternalServerError) started: time.Now(),
return size: r.ContentLength,
w: f,
ph: h,
}
h.ps.b.registerIncomingFile(inFile, true)
defer h.ps.b.registerIncomingFile(inFile, false)
n, err := io.Copy(inFile, r.Body)
if err != nil {
f.Close()
h.logf("put Copy error: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
finalSize = n
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
h.logf("put Close error: %v", err) h.logf("put Close error: %v", err)
@ -382,14 +443,14 @@ func (h *peerAPIHandler) put(w http.ResponseWriter, r *http.Request) {
return return
} }
h.logf("put of %s from %v/%v", baseName, approxSize(n), h.remoteAddr.IP, h.peerNode.ComputedName) h.logf("put of %s from %v/%v", baseName, approxSize(finalSize), h.remoteAddr.IP, h.peerNode.ComputedName)
// TODO: set modtime // TODO: set modtime
// TODO: some real response // TODO: some real response
success = true success = true
io.WriteString(w, "{}\n") io.WriteString(w, "{}\n")
h.ps.knownEmpty.Set(false) h.ps.knownEmpty.Set(false)
h.ps.b.send(ipn.Notify{}) // it will set FilesWaiting h.ps.b.sendFileNotify()
} }
func approxSize(n int64) string { func approxSize(n int64) string {

Loading…
Cancel
Save