From c2a551469c62ec6315cc4f0091b039db37ebf2e5 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Tue, 17 Oct 2023 13:46:05 -0700 Subject: [PATCH] taildrop: implement asynchronous file deletion (#9844) File resumption requires keeping partial files around for some time, but we must still eventually delete them if never resumed. Thus, we implement asynchronous file deletion, which could spawn a background goroutine to delete the files. We also use the same mechanism for deleting files on Windows, where a file can't be deleted if there is still an open file handle. We can enqueue those with the asynchronous file deleter as well. Updates tailscale/corp#14772 Signed-off-by: Joe Tsai --- ipn/ipnlocal/local.go | 9 +- ipn/ipnlocal/peerapi_test.go | 14 +-- taildrop/delete.go | 182 +++++++++++++++++++++++++++++++++ taildrop/delete_test.go | 132 ++++++++++++++++++++++++ taildrop/resume.go | 42 +++----- taildrop/resume_test.go | 7 +- taildrop/retrieve.go | 192 ++++++++++++----------------------- taildrop/send.go | 38 ++++--- taildrop/taildrop.go | 140 ++++++++++++++++++------- taildrop/taildrop_test.go | 164 +++++------------------------- 10 files changed, 557 insertions(+), 363 deletions(-) create mode 100644 taildrop/delete.go create mode 100644 taildrop/delete_test.go diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 5f4b75364..682add38e 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -646,6 +646,9 @@ func (b *LocalBackend) Shutdown() { if b.sockstatLogger != nil { b.sockstatLogger.Shutdown() } + if b.peerAPIServer != nil { + b.peerAPIServer.taildrop.Shutdown() + } b.unregisterNetMon() b.unregisterHealthWatch() @@ -3614,14 +3617,14 @@ func (b *LocalBackend) initPeerAPIListener() { ps := &peerAPIServer{ b: b, - taildrop: &taildrop.Manager{ + taildrop: taildrop.ManagerOptions{ Logf: b.logf, - Clock: tstime.DefaultClock{b.clock}, + Clock: tstime.DefaultClock{Clock: b.clock}, Dir: fileRoot, DirectFileMode: b.directFileRoot != "", AvoidFinalRename: !b.directFileDoFinalRename, SendFileNotify: b.sendFileNotify, - }, + }.New(), } if dm, ok := b.sys.DNSManager.GetOK(); ok { ps.resolver = dm.Resolver() diff --git a/ipn/ipnlocal/peerapi_test.go b/ipn/ipnlocal/peerapi_test.go index f1f6b2265..76069eeba 100644 --- a/ipn/ipnlocal/peerapi_test.go +++ b/ipn/ipnlocal/peerapi_test.go @@ -68,7 +68,7 @@ func bodyNotContains(sub string) check { func fileHasSize(name string, size int) check { return func(t *testing.T, e *peerAPITestEnv) { - root := e.ph.ps.taildrop.Dir + root := e.ph.ps.taildrop.Dir() if root == "" { t.Errorf("no rootdir; can't check whether %q has size %v", name, size) return @@ -84,7 +84,7 @@ func fileHasSize(name string, size int) check { func fileHasContents(name string, want string) check { return func(t *testing.T, e *peerAPITestEnv) { - root := e.ph.ps.taildrop.Dir + root := e.ph.ps.taildrop.Dir() if root == "" { t.Errorf("no rootdir; can't check contents of %q", name) return @@ -540,11 +540,11 @@ func TestHandlePeerAPI(t *testing.T) { if !tt.omitRoot { rootDir = t.TempDir() if e.ph.ps.taildrop == nil { - e.ph.ps.taildrop = &taildrop.Manager{ + e.ph.ps.taildrop = taildrop.ManagerOptions{ Logf: e.logBuf.Logf, - } + Dir: rootDir, + }.New() } - e.ph.ps.taildrop.Dir = rootDir } for _, req := range tt.reqs { e.rr = httptest.NewRecorder() @@ -583,10 +583,10 @@ func TestFileDeleteRace(t *testing.T) { capFileSharing: true, clock: &tstest.Clock{}, }, - taildrop: &taildrop.Manager{ + taildrop: taildrop.ManagerOptions{ Logf: t.Logf, Dir: dir, - }, + }.New(), } ph := &peerAPIHandler{ isSelf: true, diff --git a/taildrop/delete.go b/taildrop/delete.go new file mode 100644 index 000000000..4f311d68b --- /dev/null +++ b/taildrop/delete.go @@ -0,0 +1,182 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package taildrop + +import ( + "container/list" + "context" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "tailscale.com/syncs" + "tailscale.com/tstime" + "tailscale.com/types/logger" +) + +// deleteDelay is the amount of time to wait before we delete a file. +// A shorter value ensures timely deletion of deleted and partial files, while +// a longer value provides more opportunity for partial files to be resumed. +const deleteDelay = time.Hour + +// fileDeleter manages asynchronous deletion of files after deleteDelay. +type fileDeleter struct { + logf logger.Logf + clock tstime.DefaultClock + event func(string) // called for certain events; for testing only + dir string + + mu sync.Mutex + queue list.List + byName map[string]*list.Element + + emptySignal chan struct{} // signal that the queue is empty + group syncs.WaitGroup + shutdownCtx context.Context + shutdown context.CancelFunc +} + +// deleteFile is a specific file to delete after deleteDelay. +type deleteFile struct { + name string + inserted time.Time +} + +func (d *fileDeleter) Init(logf logger.Logf, clock tstime.DefaultClock, event func(string), dir string) { + d.logf = logf + d.clock = clock + d.dir = dir + d.event = event + + // From a cold-start, load the list of partial and deleted files. + d.byName = make(map[string]*list.Element) + d.emptySignal = make(chan struct{}) + d.shutdownCtx, d.shutdown = context.WithCancel(context.Background()) + d.group.Go(func() { + d.event("start init") + defer d.event("end init") + rangeDir(dir, func(de fs.DirEntry) bool { + switch { + case d.shutdownCtx.Err() != nil: + return false // terminate early + case !de.Type().IsRegular(): + return true + case strings.Contains(de.Name(), partialSuffix): + d.Insert(de.Name()) + case strings.Contains(de.Name(), deletedSuffix): + // Best-effort immediate deletion of deleted files. + name := strings.TrimSuffix(de.Name(), deletedSuffix) + if os.Remove(filepath.Join(dir, name)) == nil { + if os.Remove(filepath.Join(dir, de.Name())) == nil { + break + } + } + // Otherwise, enqueue the file for later deletion. + d.Insert(de.Name()) + } + return true + }) + }) +} + +// Insert enqueues baseName for eventual deletion. +func (d *fileDeleter) Insert(baseName string) { + d.mu.Lock() + defer d.mu.Unlock() + if d.shutdownCtx.Err() != nil { + return + } + if _, ok := d.byName[baseName]; ok { + return // already queued for deletion + } + d.byName[baseName] = d.queue.PushBack(&deleteFile{baseName, d.clock.Now()}) + if d.queue.Len() == 1 { + d.group.Go(func() { d.waitAndDelete(deleteDelay) }) + } +} + +// waitAndDelete is an asynchronous deletion goroutine. +// At most one waitAndDelete routine is ever running at a time. +// It is not started unless there is at least one file in the queue. +func (d *fileDeleter) waitAndDelete(wait time.Duration) { + tc, ch := d.clock.NewTimer(wait) + defer tc.Stop() // cleanup the timer resource if we stop early + d.event("start waitAndDelete") + defer d.event("end waitAndDelete") + select { + case <-d.shutdownCtx.Done(): + case <-d.emptySignal: + case now := <-ch: + d.mu.Lock() + defer d.mu.Unlock() + + // Iterate over all files to delete, and delete anything old enough. + var next *list.Element + var failed []*list.Element + for elem := d.queue.Front(); elem != nil; elem = next { + next = elem.Next() + file := elem.Value.(*deleteFile) + if now.Sub(file.inserted) < deleteDelay { + break // everything after this is recently inserted + } + + // Delete the expired file. + if name, ok := strings.CutSuffix(file.name, deletedSuffix); ok { + if err := os.Remove(filepath.Join(d.dir, name)); err != nil && !os.IsNotExist(err) { + d.logf("could not delete: %v", redactError(err)) + failed = append(failed, elem) + continue + } + } + if err := os.Remove(filepath.Join(d.dir, file.name)); err != nil && !os.IsNotExist(err) { + d.logf("could not delete: %v", redactError(err)) + failed = append(failed, elem) + continue + } + d.queue.Remove(elem) + delete(d.byName, file.name) + d.event("deleted " + file.name) + } + for _, elem := range failed { + elem.Value.(*deleteFile).inserted = now // retry after deleteDelay + d.queue.MoveToBack(elem) + } + + // If there are still some files to delete, retry again later. + if d.queue.Len() > 0 { + file := d.queue.Front().Value.(*deleteFile) + retryAfter := deleteDelay - now.Sub(file.inserted) + d.group.Go(func() { d.waitAndDelete(retryAfter) }) + } + } +} + +// Remove dequeues baseName from eventual deletion. +func (d *fileDeleter) Remove(baseName string) { + d.mu.Lock() + defer d.mu.Unlock() + if elem := d.byName[baseName]; elem != nil { + d.queue.Remove(elem) + delete(d.byName, baseName) + // Signal to terminate any waitAndDelete goroutines. + if d.queue.Len() == 0 { + select { + case <-d.shutdownCtx.Done(): + case d.emptySignal <- struct{}{}: + } + } + } +} + +// Shutdown shuts down the deleter. +// It blocks until all goroutines are stopped. +func (d *fileDeleter) Shutdown() { + d.mu.Lock() // acquire lock to ensure no new goroutines start after shutdown + d.shutdown() + d.mu.Unlock() + d.group.Wait() +} diff --git a/taildrop/delete_test.go b/taildrop/delete_test.go new file mode 100644 index 000000000..0f87c46c5 --- /dev/null +++ b/taildrop/delete_test.go @@ -0,0 +1,132 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package taildrop + +import ( + "os" + "path/filepath" + "slices" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "tailscale.com/tstest" + "tailscale.com/tstime" + "tailscale.com/util/must" +) + +func TestDeleter(t *testing.T) { + dir := t.TempDir() + must.Do(touchFile(filepath.Join(dir, "foo.partial"))) + must.Do(touchFile(filepath.Join(dir, "bar.partial"))) + must.Do(touchFile(filepath.Join(dir, "fizz"))) + must.Do(touchFile(filepath.Join(dir, "fizz.deleted"))) + must.Do(touchFile(filepath.Join(dir, "buzz.deleted"))) // lacks a matching "buzz" file + + checkDirectory := func(want ...string) { + t.Helper() + var got []string + for _, de := range must.Get(os.ReadDir(dir)) { + got = append(got, de.Name()) + } + slices.Sort(got) + slices.Sort(want) + if diff := cmp.Diff(got, want); diff != "" { + t.Fatalf("directory mismatch (-got +want):\n%s", diff) + } + } + + clock := tstest.NewClock(tstest.ClockOpts{Start: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)}) + advance := func(d time.Duration) { + t.Helper() + t.Logf("advance: %v", d) + clock.Advance(d) + } + + eventsChan := make(chan string, 1000) + checkEvents := func(want ...string) { + t.Helper() + tm := time.NewTimer(10 * time.Second) + defer tm.Stop() + var got []string + for range want { + select { + case event := <-eventsChan: + t.Logf("event: %s", event) + got = append(got, event) + case <-tm.C: + t.Fatalf("timed out waiting for event: got %v, want %v", got, want) + } + } + slices.Sort(got) + slices.Sort(want) + if diff := cmp.Diff(got, want); diff != "" { + t.Fatalf("events mismatch (-got +want):\n%s", diff) + } + } + eventHook := func(event string) { eventsChan <- event } + + var fd fileDeleter + fd.Init(t.Logf, tstime.DefaultClock{Clock: clock}, eventHook, dir) + defer fd.Shutdown() + insert := func(name string) { + t.Helper() + t.Logf("insert: %v", name) + fd.Insert(name) + } + remove := func(name string) { + t.Helper() + t.Logf("remove: %v", name) + fd.Remove(name) + } + + checkEvents("start init") + checkEvents("end init", "start waitAndDelete") + checkDirectory("foo.partial", "bar.partial", "buzz.deleted") + + advance(deleteDelay / 2) + checkDirectory("foo.partial", "bar.partial", "buzz.deleted") + advance(deleteDelay / 2) + checkEvents("deleted foo.partial", "deleted bar.partial", "deleted buzz.deleted") + checkEvents("end waitAndDelete") + checkDirectory() + + must.Do(touchFile(filepath.Join(dir, "one.partial"))) + insert("one.partial") + checkEvents("start waitAndDelete") + advance(deleteDelay / 4) + must.Do(touchFile(filepath.Join(dir, "two.partial"))) + insert("two.partial") + advance(deleteDelay / 4) + must.Do(touchFile(filepath.Join(dir, "three.partial"))) + insert("three.partial") + advance(deleteDelay / 4) + must.Do(touchFile(filepath.Join(dir, "four.partial"))) + insert("four.partial") + + advance(deleteDelay / 4) + checkEvents("deleted one.partial") + checkDirectory("two.partial", "three.partial", "four.partial") + checkEvents("end waitAndDelete", "start waitAndDelete") + + advance(deleteDelay / 4) + checkEvents("deleted two.partial") + checkDirectory("three.partial", "four.partial") + checkEvents("end waitAndDelete", "start waitAndDelete") + + advance(deleteDelay / 4) + checkEvents("deleted three.partial") + checkDirectory("four.partial") + checkEvents("end waitAndDelete", "start waitAndDelete") + + advance(deleteDelay / 4) + checkEvents("deleted four.partial") + checkDirectory() + checkEvents("end waitAndDelete") + + insert("wuzz.partial") + checkEvents("start waitAndDelete") + remove("wuzz.partial") + checkEvents("end waitAndDelete") +} diff --git a/taildrop/resume.go b/taildrop/resume.go index e32e4ddb7..1388ac793 100644 --- a/taildrop/resume.go +++ b/taildrop/resume.go @@ -9,6 +9,7 @@ import ( "encoding/hex" "fmt" "io" + "io/fs" "os" "slices" "strings" @@ -72,34 +73,23 @@ func hexAppendEncode(dst, src []byte) []byte { // PartialFiles returns a list of partial files in [Handler.Dir] // that were sent (or is actively being sent) by the provided id. func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) { - if m == nil || m.Dir == "" { + if m == nil || m.opts.Dir == "" { return nil, ErrNoTaildrop } - if m.DirectFileMode && m.AvoidFinalRename { + if m.opts.DirectFileMode && m.opts.AvoidFinalRename { return nil, nil // resuming is not supported for users that peek at our file structure } - f, err := os.Open(m.Dir) - if err != nil { - return ret, err - } - defer f.Close() - suffix := id.partialSuffix() - for { - des, err := f.ReadDir(10) - if err != nil { - return ret, err - } - for _, de := range des { - if name := de.Name(); strings.HasSuffix(name, suffix) { - ret = append(ret, name) - } - } - if err == io.EOF { - return ret, nil + if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool { + if name := de.Name(); strings.HasSuffix(name, suffix) { + ret = append(ret, name) } + return true + }); err != nil { + return ret, redactError(err) } + return ret, nil } // HashPartialFile hashes the contents of a partial file sent by id, @@ -109,14 +99,14 @@ func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) { // If [FileHashes.Length] is less than length and no error occurred, // then it implies that all remaining content in the file has been hashed. func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length int64) (FileChecksums, error) { - if m == nil || m.Dir == "" { + if m == nil || m.opts.Dir == "" { return FileChecksums{}, ErrNoTaildrop } - if m.DirectFileMode && m.AvoidFinalRename { + if m.opts.DirectFileMode && m.opts.AvoidFinalRename { return FileChecksums{}, nil // resuming is not supported for users that peek at our file structure } - dstFile, err := m.joinDir(baseName) + dstFile, err := joinDir(m.opts.Dir, baseName) if err != nil { return FileChecksums{}, err } @@ -125,12 +115,12 @@ func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length i if os.IsNotExist(err) { return FileChecksums{}, nil } - return FileChecksums{}, err + return FileChecksums{}, redactError(err) } defer f.Close() if _, err := f.Seek(offset, io.SeekStart); err != nil { - return FileChecksums{}, err + return FileChecksums{}, redactError(err) } checksums := FileChecksums{ Offset: offset, @@ -145,7 +135,7 @@ func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length i for { switch n, err := io.ReadFull(r, b); { case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF: - return checksums, err + return checksums, redactError(err) case n == 0: return checksums, nil default: diff --git a/taildrop/resume_test.go b/taildrop/resume_test.go index d79fb80dd..0deaf6869 100644 --- a/taildrop/resume_test.go +++ b/taildrop/resume_test.go @@ -19,7 +19,8 @@ func TestResume(t *testing.T) { defer func() { blockSize = oldBlockSize }() blockSize = 256 - m := Manager{Logf: t.Logf, Dir: t.TempDir()} + m := ManagerOptions{Logf: t.Logf, Dir: t.TempDir()}.New() + defer m.Shutdown() rn := rand.New(rand.NewSource(0)) want := make([]byte, 12345) @@ -32,7 +33,7 @@ func TestResume(t *testing.T) { }) must.Do(err) must.Get(m.PutFile("", "foo", r, offset, -1)) - got := must.Get(os.ReadFile(must.Get(m.joinDir("foo")))) + got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo")))) if !bytes.Equal(got, want) { t.Errorf("content mismatches") } @@ -54,7 +55,7 @@ func TestResume(t *testing.T) { break } } - got := must.Get(os.ReadFile(must.Get(m.joinDir("foo")))) + got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo")))) if !bytes.Equal(got, want) { t.Errorf("content mismatches") } diff --git a/taildrop/retrieve.go b/taildrop/retrieve.go index 76dd918ed..ec40c32d7 100644 --- a/taildrop/retrieve.go +++ b/taildrop/retrieve.go @@ -12,7 +12,6 @@ import ( "path/filepath" "runtime" "sort" - "strings" "time" "tailscale.com/client/tailscale/apitype" @@ -21,163 +20,98 @@ import ( // HasFilesWaiting reports whether any files are buffered in [Handler.Dir]. // This always returns false when [Handler.DirectFileMode] is false. -func (m *Manager) HasFilesWaiting() bool { - if m == nil || m.Dir == "" || m.DirectFileMode { +func (m *Manager) HasFilesWaiting() (has bool) { + if m == nil || m.opts.Dir == "" || m.opts.DirectFileMode { return false } - if m.knownEmpty.Load() { - // Optimization: this is usually empty, so avoid opening - // the directory and checking. We can't cache the actual - // has-files-or-not values as the macOS/iOS client might - // in the future use+delete the files directly. So only - // keep this negative cache. - return false - } - f, err := os.Open(m.Dir) - if err != nil { + + // Optimization: this is usually empty, so avoid opening + // the directory and checking. We can't cache the actual + // has-files-or-not values as the macOS/iOS client might + // in the future use+delete the files directly. So only + // keep this negative cache. + totalReceived := m.totalReceived.Load() + if totalReceived == m.emptySince.Load() { return false } - defer f.Close() - for { - des, err := f.ReadDir(10) - for _, de := range des { - name := de.Name() - if strings.HasSuffix(name, partialSuffix) { - continue - } - if name, ok := strings.CutSuffix(name, deletedSuffix); ok { // for Windows + tests - // After we're done looping over files, then try - // to delete this file. Don't do it proactively, - // as the OS may return "foo.jpg.deleted" before "foo.jpg" - // and we don't want to delete the ".deleted" file before - // enumerating to the "foo.jpg" file. - defer tryDeleteAgain(filepath.Join(m.Dir, name)) - continue - } - if de.Type().IsRegular() { - _, err := os.Stat(filepath.Join(m.Dir, name+deletedSuffix)) - if os.IsNotExist(err) { - return true - } - if err == nil { - tryDeleteAgain(filepath.Join(m.Dir, name)) - continue - } - } - } - if err == io.EOF { - m.knownEmpty.Store(true) + + // Check whether there is at least one one waiting file. + err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool { + name := de.Name() + if isPartialOrDeleted(name) || !de.Type().IsRegular() { + return true } - if err != nil { - break + _, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix)) + if os.IsNotExist(err) { + has = true + return false } + return true + }) + + // If there are no more waiting files, record totalReceived as emptySince + // so that we can short-circuit the expensive directory traversal + // if no files have been received after the start of this call. + if err == nil && !has { + m.emptySince.Store(totalReceived) } - return false + return has } // WaitingFiles returns the list of files that have been sent by a // peer that are waiting in [Handler.Dir]. // This always returns nil when [Handler.DirectFileMode] is false. func (m *Manager) WaitingFiles() (ret []apitype.WaitingFile, err error) { - if m == nil || m.Dir == "" { + if m == nil || m.opts.Dir == "" { return nil, ErrNoTaildrop } - if m.DirectFileMode { + if m.opts.DirectFileMode { return nil, nil } - f, err := os.Open(m.Dir) - if err != nil { - return nil, err - } - defer f.Close() - var deleted map[string]bool // "foo.jpg" => true (if "foo.jpg.deleted" exists) - for { - des, err := f.ReadDir(10) - for _, de := range des { - name := de.Name() - if strings.HasSuffix(name, partialSuffix) { - continue - } - if name, ok := strings.CutSuffix(name, deletedSuffix); ok { // for Windows + tests - if deleted == nil { - deleted = map[string]bool{} - } - deleted[name] = true - continue - } - if de.Type().IsRegular() { - fi, err := de.Info() - if err != nil { - continue - } - ret = append(ret, apitype.WaitingFile{ - Name: filepath.Base(name), - Size: fi.Size(), - }) - } + if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool { + name := de.Name() + if isPartialOrDeleted(name) || !de.Type().IsRegular() { + return true } - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - } - if len(deleted) > 0 { - // Filter out any return values "foo.jpg" where a - // "foo.jpg.deleted" marker file exists on disk. - all := ret - ret = ret[:0] - for _, wf := range all { - if !deleted[wf.Name] { - ret = append(ret, wf) + _, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix)) + if os.IsNotExist(err) { + fi, err := de.Info() + if err != nil { + return true } + ret = append(ret, apitype.WaitingFile{ + Name: filepath.Base(name), + Size: fi.Size(), + }) } - // And do some opportunistic deleting while we're here. - // Maybe Windows is done virus scanning the file we tried - // to delete a long time ago and will let us delete it now. - for name := range deleted { - tryDeleteAgain(filepath.Join(m.Dir, name)) - } + return true + }); err != nil { + return nil, redactError(err) } sort.Slice(ret, func(i, j int) bool { return ret[i].Name < ret[j].Name }) return ret, nil } -// tryDeleteAgain tries to delete path (and path+deletedSuffix) after -// it failed earlier. This happens on Windows when various anti-virus -// tools hook into filesystem operations and have the file open still -// while we're trying to delete it. In that case we instead mark it as -// deleted (writing a "foo.jpg.deleted" marker file), but then we -// later try to clean them up. -// -// fullPath is the full path to the file without the deleted suffix. -func tryDeleteAgain(fullPath string) { - if err := os.Remove(fullPath); err == nil || os.IsNotExist(err) { - os.Remove(fullPath + deletedSuffix) - } -} - // DeleteFile deletes a file of the given baseName from [Handler.Dir]. // This method is only allowed when [Handler.DirectFileMode] is false. func (m *Manager) DeleteFile(baseName string) error { - if m == nil || m.Dir == "" { + if m == nil || m.opts.Dir == "" { return ErrNoTaildrop } - if m.DirectFileMode { + if m.opts.DirectFileMode { return errors.New("deletes not allowed in direct mode") } - path, err := m.joinDir(baseName) + path, err := joinDir(m.opts.Dir, baseName) if err != nil { return err } var bo *backoff.Backoff - logf := m.Logf - t0 := m.Clock.Now() + logf := m.opts.Logf + t0 := m.opts.Clock.Now() for { err := os.Remove(path) if err != nil && !os.IsNotExist(err) { - err = redactErr(err) + err = redactError(err) // Put a retry loop around deletes on Windows. Windows // file descriptor closes are effectively asynchronous, // as a bunch of hooks run on/after close, and we can't @@ -192,13 +126,14 @@ func (m *Manager) DeleteFile(baseName string) error { if bo == nil { bo = backoff.NewBackoff("delete-retry", logf, 1*time.Second) } - if m.Clock.Since(t0) < 5*time.Second { + if m.opts.Clock.Since(t0) < 5*time.Second { bo.BackOff(context.Background(), err) continue } if err := touchFile(path + deletedSuffix); err != nil { logf("peerapi: failed to leave deleted marker: %v", err) } + m.deleter.Insert(baseName + deletedSuffix) } logf("peerapi: failed to DeleteFile: %v", err) return err @@ -210,7 +145,7 @@ func (m *Manager) DeleteFile(baseName string) error { func touchFile(path string) error { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666) if err != nil { - return redactErr(err) + return redactError(err) } return f.Close() } @@ -218,28 +153,27 @@ func touchFile(path string) error { // OpenFile opens a file of the given baseName from [Handler.Dir]. // This method is only allowed when [Handler.DirectFileMode] is false. func (m *Manager) OpenFile(baseName string) (rc io.ReadCloser, size int64, err error) { - if m == nil || m.Dir == "" { + if m == nil || m.opts.Dir == "" { return nil, 0, ErrNoTaildrop } - if m.DirectFileMode { + if m.opts.DirectFileMode { return nil, 0, errors.New("opens not allowed in direct mode") } - path, err := m.joinDir(baseName) + path, err := joinDir(m.opts.Dir, baseName) if err != nil { return nil, 0, err } - if fi, err := os.Stat(path + deletedSuffix); err == nil && fi.Mode().IsRegular() { - tryDeleteAgain(path) - return nil, 0, &fs.PathError{Op: "open", Path: redacted, Err: fs.ErrNotExist} + if _, err := os.Stat(path + deletedSuffix); err == nil { + return nil, 0, redactError(&fs.PathError{Op: "open", Path: path, Err: fs.ErrNotExist}) } f, err := os.Open(path) if err != nil { - return nil, 0, redactErr(err) + return nil, 0, redactError(err) } fi, err := f.Stat() if err != nil { f.Close() - return nil, 0, redactErr(err) + return nil, 0, redactError(err) } return f, fi.Size(), nil } diff --git a/taildrop/send.go b/taildrop/send.go index f97e2bfbe..42c223737 100644 --- a/taildrop/send.go +++ b/taildrop/send.go @@ -8,6 +8,7 @@ import ( "errors" "io" "os" + "path/filepath" "sync" "time" @@ -72,25 +73,25 @@ func (f *incomingFile) Write(p []byte) (n int, err error) { // offset to specify where to resume receiving data at. func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, length int64) (int64, error) { switch { - case m == nil || m.Dir == "": + case m == nil || m.opts.Dir == "": return 0, ErrNoTaildrop case !envknob.CanTaildrop(): return 0, ErrNoTaildrop - case distro.Get() == distro.Unraid && !m.DirectFileMode: + case distro.Get() == distro.Unraid && !m.opts.DirectFileMode: return 0, ErrNotAccessible } - dstPath, err := m.joinDir(baseName) + dstPath, err := joinDir(m.opts.Dir, baseName) if err != nil { return 0, err } redactAndLogError := func(action string, err error) error { - err = redactErr(err) - m.Logf("put %v error: %v", action, err) + err = redactError(err) + m.opts.Logf("put %v error: %v", action, err) return err } - avoidPartialRename := m.DirectFileMode && m.AvoidFinalRename + avoidPartialRename := m.opts.DirectFileMode && m.opts.AvoidFinalRename if avoidPartialRename { // Users using AvoidFinalRename are depending on the exact filename // of the partial files. So avoid injecting the id into it. @@ -98,20 +99,16 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len } // Check whether there is an in-progress transfer for the file. - sendFileNotify := m.SendFileNotify - if sendFileNotify == nil { - sendFileNotify = func() {} // avoid nil panics below - } partialPath := dstPath + id.partialSuffix() inFileKey := incomingFileKey{id, baseName} inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile { inFile := &incomingFile{ - clock: m.Clock, - started: m.Clock.Now(), + clock: m.opts.Clock, + started: m.opts.Clock.Now(), size: length, - sendFileNotify: sendFileNotify, + sendFileNotify: m.opts.SendFileNotify, } - if m.DirectFileMode { + if m.opts.DirectFileMode { inFile.partialPath = partialPath } return inFile @@ -120,6 +117,7 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len return 0, ErrFileExists } defer m.incomingFiles.Delete(inFileKey) + m.deleter.Remove(filepath.Base(partialPath)) // avoid deleting the partial file while receiving // Create (if not already) the partial file with read-write permissions. f, err := os.OpenFile(partialPath, os.O_CREATE|os.O_RDWR, 0666) @@ -133,9 +131,7 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len os.Remove(partialPath) // best-effort return } - - // TODO: We need to delete partialPath eventually. - // However, this must be done after some period of time. + m.deleter.Insert(filepath.Base(partialPath)) // mark partial file for eventual deletion } }() inFile.w = f @@ -177,8 +173,8 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len inFile.mu.Lock() inFile.done = true inFile.mu.Unlock() - m.knownEmpty.Store(false) - sendFileNotify() + m.totalReceived.Add(1) + m.opts.SendFileNotify() return fileLength, nil } @@ -236,8 +232,8 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len if maxRetries <= 0 { return 0, errors.New("too many retries trying to rename partial file") } - m.knownEmpty.Store(false) - sendFileNotify() + m.totalReceived.Add(1) + m.opts.SendFileNotify() return fileLength, nil } diff --git a/taildrop/taildrop.go b/taildrop/taildrop.go index 5b0bda6c2..e3e1f5e78 100644 --- a/taildrop/taildrop.go +++ b/taildrop/taildrop.go @@ -12,6 +12,8 @@ package taildrop import ( "errors" "hash/adler32" + "io" + "io/fs" "os" "path" "path/filepath" @@ -30,6 +32,27 @@ import ( "tailscale.com/util/multierr" ) +var ( + ErrNoTaildrop = errors.New("Taildrop disabled; no storage directory") + ErrInvalidFileName = errors.New("invalid filename") + ErrFileExists = errors.New("file already exists") + ErrNotAccessible = errors.New("Taildrop folder not configured or accessible") +) + +const ( + // partialSuffix is the suffix appended to files while they're + // still in the process of being transferred. + partialSuffix = ".partial" + + // deletedSuffix is the suffix for a deleted marker file + // that's placed next to a file (without the suffix) that we + // tried to delete, but Windows wouldn't let us. These are + // only written on Windows (and in tests), but they're not + // permitted to be uploaded directly on any platform, like + // partial files. + deletedSuffix = ".deleted" +) + // ClientID is an opaque identifier for file resumption. // A client can only list and resume partial files for its own ID. // It must contain any filesystem specific characters (e.g., slashes). @@ -42,8 +65,8 @@ func (id ClientID) partialSuffix() string { return "." + string(id) + partialSuffix // e.g., ".n12345CNTRL.partial" } -// Manager manages the state for receiving and managing taildropped files. -type Manager struct { +// ManagerOptions are options to configure the [Manager]. +type ManagerOptions struct { Logf logger.Logf Clock tstime.DefaultClock @@ -80,39 +103,56 @@ type Manager struct { // to the function when reception completes. // It is not called if nil. SendFileNotify func() +} - knownEmpty atomic.Bool +// Manager manages the state for receiving and managing taildropped files. +type Manager struct { + opts ManagerOptions + // incomingFiles is a map of files actively being received. incomingFiles syncs.Map[incomingFileKey, *incomingFile] + // deleter managers asynchronous deletion of files. + deleter fileDeleter // renameMu is used to protect os.Rename calls so that they are atomic. renameMu sync.Mutex -} -var ( - ErrNoTaildrop = errors.New("Taildrop disabled; no storage directory") - ErrInvalidFileName = errors.New("invalid filename") - ErrFileExists = errors.New("file already exists") - ErrNotAccessible = errors.New("Taildrop folder not configured or accessible") -) + // totalReceived counts the cumulative total of received files. + totalReceived atomic.Int64 + // emptySince specifies that there were no waiting files + // since this value of totalReceived. + emptySince atomic.Int64 +} -const ( - // partialSuffix is the suffix appended to files while they're - // still in the process of being transferred. - partialSuffix = ".partial" +// New initializes a new taildrop manager. +// It may spawn asynchronous goroutines to delete files, +// so the Shutdown method must be called for resource cleanup. +func (opts ManagerOptions) New() *Manager { + if opts.Logf == nil { + opts.Logf = logger.Discard + } + if opts.SendFileNotify == nil { + opts.SendFileNotify = func() {} + } + m := &Manager{opts: opts} + m.deleter.Init(opts.Logf, opts.Clock, func(string) {}, opts.Dir) + m.emptySince.Store(-1) // invalidate this cache + return m +} - // deletedSuffix is the suffix for a deleted marker file - // that's placed next to a file (without the suffix) that we - // tried to delete, but Windows wouldn't let us. These are - // only written on Windows (and in tests), but they're not - // permitted to be uploaded directly on any platform, like - // partial files. - deletedSuffix = ".deleted" -) +// Dir returns the directory. +func (m *Manager) Dir() string { + return m.opts.Dir +} -// redacted is a fake path name we use in errors, to avoid -// accidentally logging actual filenames anywhere. -const redacted = "redacted" +// Shutdown shuts down the Manager. +// It blocks until all spawned goroutines have stopped running. +func (m *Manager) Shutdown() { + if m != nil { + m.deleter.shutdown() + m.deleter.group.Wait() + } +} func validFilenameRune(r rune) bool { switch r { @@ -131,7 +171,11 @@ func validFilenameRune(r rune) bool { return unicode.IsPrint(r) } -func (m *Manager) joinDir(baseName string) (fullPath string, err error) { +func isPartialOrDeleted(s string) bool { + return strings.HasSuffix(s, deletedSuffix) || strings.HasSuffix(s, partialSuffix) +} + +func joinDir(dir, baseName string) (fullPath string, err error) { if !utf8.ValidString(baseName) { return "", ErrInvalidFileName } @@ -145,8 +189,7 @@ func (m *Manager) joinDir(baseName string) (fullPath string, err error) { clean := path.Clean(baseName) if clean != baseName || clean == "." || clean == ".." || - strings.HasSuffix(clean, deletedSuffix) || - strings.HasSuffix(clean, partialSuffix) { + isPartialOrDeleted(clean) { return "", ErrInvalidFileName } for _, r := range baseName { @@ -157,7 +200,32 @@ func (m *Manager) joinDir(baseName string) (fullPath string, err error) { if !filepath.IsLocal(baseName) { return "", ErrInvalidFileName } - return filepath.Join(m.Dir, baseName), nil + return filepath.Join(dir, baseName), nil +} + +// rangeDir iterates over the contents of a directory, calling fn for each entry. +// It continues iterating while fn returns true. +// It reports the number of entries seen. +func rangeDir(dir string, fn func(fs.DirEntry) bool) error { + f, err := os.Open(dir) + if err != nil { + return err + } + defer f.Close() + for { + des, err := f.ReadDir(10) + for _, de := range des { + if !fn(de) { + return nil + } + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } } // IncomingFiles returns a list of active incoming files. @@ -182,16 +250,20 @@ func (m *Manager) IncomingFiles() []ipn.PartialFile { return files } -type redactedErr struct { +// redacted is a fake path name we use in errors, to avoid +// accidentally logging actual filenames anywhere. +const redacted = "redacted" + +type redactedError struct { msg string inner error } -func (re *redactedErr) Error() string { +func (re *redactedError) Error() string { return re.msg } -func (re *redactedErr) Unwrap() error { +func (re *redactedError) Unwrap() error { return re.inner } @@ -205,7 +277,7 @@ func redactString(s string) string { return string(b) } -func redactErr(root error) error { +func redactError(root error) error { // redactStrings is a list of sensitive strings that were redacted. // It is not sufficient to just snub out sensitive fields in Go errors // since some wrapper errors like fmt.Errorf pre-cache the error string, @@ -243,7 +315,7 @@ func redactErr(root error) error { for _, toRedact := range redactStrings { s = strings.ReplaceAll(s, toRedact, redactString(toRedact)) } - return &redactedErr{msg: s, inner: root} + return &redactedError{msg: s, inner: root} } var ( diff --git a/taildrop/taildrop_test.go b/taildrop/taildrop_test.go index 969ce3fe5..967fc9b04 100644 --- a/taildrop/taildrop_test.go +++ b/taildrop/taildrop_test.go @@ -4,153 +4,37 @@ package taildrop import ( - "errors" - "fmt" - "io/fs" - "os" "path/filepath" - "runtime" + "strings" "testing" ) -// Tests "foo.jpg.deleted" marks (for Windows). -func TestDeletedMarkers(t *testing.T) { +func TestJoinDir(t *testing.T) { dir := t.TempDir() - h := &Manager{Dir: dir} - - nothingWaiting := func() { - t.Helper() - h.knownEmpty.Store(false) - if h.HasFilesWaiting() { - t.Fatal("unexpected files waiting") - } - } - touch := func(base string) { - t.Helper() - if err := touchFile(filepath.Join(dir, base)); err != nil { - t.Fatal(err) - } - } - wantEmptyTempDir := func() { - t.Helper() - if fis, err := os.ReadDir(dir); err != nil { - t.Fatal(err) - } else if len(fis) > 0 && runtime.GOOS != "windows" { - for _, fi := range fis { - t.Errorf("unexpected file in tempdir: %q", fi.Name()) - } - } - } - - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - nothingWaiting() - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - wf, err := h.WaitingFiles() - if err != nil { - t.Fatal(err) - } - if len(wf) != 0 { - t.Fatalf("WaitingFiles = %d; want 0", len(wf)) - } - wantEmptyTempDir() - - touch("foo.jpg.deleted") - touch("foo.jpg") - if rc, _, err := h.OpenFile("foo.jpg"); err == nil { - rc.Close() - t.Fatal("unexpected foo.jpg open") - } - wantEmptyTempDir() - - // And verify basics still work in non-deleted cases. - touch("foo.jpg") - touch("bar.jpg.deleted") - if wf, err := h.WaitingFiles(); err != nil { - t.Error(err) - } else if len(wf) != 1 { - t.Errorf("WaitingFiles = %d; want 1", len(wf)) - } else if wf[0].Name != "foo.jpg" { - t.Errorf("unexpected waiting file %+v", wf[0]) - } - if rc, _, err := h.OpenFile("foo.jpg"); err != nil { - t.Fatal(err) - } else { - rc.Close() - } -} - -func TestRedactErr(t *testing.T) { - testCases := []struct { - name string - err func() error - want string + tests := []struct { + in string + want string // just relative to m.Dir + wantOk bool }{ - { - name: "PathError", - err: func() error { - return &os.PathError{ - Op: "open", - Path: "/tmp/sensitive.txt", - Err: fs.ErrNotExist, - } - }, - want: `open redacted.41360718: file does not exist`, - }, - { - name: "LinkError", - err: func() error { - return &os.LinkError{ - Op: "symlink", - Old: "/tmp/sensitive.txt", - New: "/tmp/othersensitive.txt", - Err: fs.ErrNotExist, - } - }, - want: `symlink redacted.41360718 redacted.6bcf093a: file does not exist`, - }, - { - name: "something else", - err: func() error { return errors.New("i am another error type") }, - want: `i am another error type`, - }, + {"", "", false}, + {"foo", "foo", true}, + {"./foo", "", false}, + {"../foo", "", false}, + {"foo/bar", "", false}, + {"😋", "😋", true}, + {"\xde\xad\xbe\xef", "", false}, + {"foo.partial", "", false}, + {"foo.deleted", "", false}, + {strings.Repeat("a", 1024), "", false}, + {"foo:bar", "", false}, } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // For debugging - var i int - for err := tc.err(); err != nil; err = errors.Unwrap(err) { - t.Logf("%d: %T @ %p", i, err, err) - i++ - } - - t.Run("Root", func(t *testing.T) { - got := redactErr(tc.err()).Error() - if got != tc.want { - t.Errorf("err = %q; want %q", got, tc.want) - } - }) - t.Run("Wrapped", func(t *testing.T) { - wrapped := fmt.Errorf("wrapped error: %w", tc.err()) - want := "wrapped error: " + tc.want - - got := redactErr(wrapped).Error() - if got != want { - t.Errorf("err = %q; want %q", got, want) - } - }) - }) + for _, tt := range tests { + got, gotErr := joinDir(dir, tt.in) + got, _ = filepath.Rel(dir, got) + gotOk := gotErr == nil + if got != tt.want || gotOk != tt.wantOk { + t.Errorf("joinDir(%q) = (%v, %v), want (%v, %v)", tt.in, got, gotOk, tt.want, tt.wantOk) + } } }