@ -5,7 +5,7 @@ package taildrop
import (
"crypto/sha256"
" errors "
" fmt "
"io"
"os"
"path/filepath"
@ -82,126 +82,215 @@ func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, len
case distro . Get ( ) == distro . Unraid && ! m . opts . DirectFileMode :
return 0 , ErrNotAccessible
}
dstPath , err := joinDir ( m . opts . Dir , baseName )
if err != nil {
return 0 , err
}
redactAndLogError := func ( action string , err error ) error {
err = redactError ( err )
m . opts . Logf ( "put %v error: %v" , action , err )
return err
//Compute dstPath & avoid mid‑ upload deletion
var dstPath string
if m . opts . Mode == PutModeDirect {
var err error
dstPath , err = joinDir ( m . opts . Dir , baseName )
if err != nil {
return 0 , err
}
} else {
// In SAF mode, we simply use the baseName as the destination "path"
// (the actual directory is managed by SAF).
dstPath = baseName
}
m . deleter . Remove ( filepath . Base ( dstPath ) ) // avoid deleting the partial file while receiving
// Check whether there is an in-progress transfer for the file.
partialPath := dstPath + id . partialSuffix ( )
inFileKey := incomingFileKey { id , baseName }
inFile , loaded := m . incomingFiles . LoadOrInit ( inFileKey , func ( ) * incomingFile {
inFile := & incomingFile {
partialFileKey := incomingFileKey { id , baseName }
inFile , loaded := m . incomingFiles . LoadOrInit ( partialFileKey , func ( ) * incomingFile {
return & incomingFile {
clock : m . opts . Clock ,
started : m . opts . Clock . Now ( ) ,
size : length ,
sendFileNotify : m . opts . SendFileNotify ,
}
if m . opts . DirectFileMode {
inFile . partialPath = partialPath
inFile . finalPath = dstPath
}
return inFile
} )
if loaded {
return 0 , ErrFileExists
}
defer m . incomingFiles . Delete ( inFileKey )
m . deleter . Remove ( filepath . Base ( partialPath ) ) // avoid deleting the partial file while receiving
defer m . incomingFiles . Delete ( partialFileKey )
// Create (if not already) the partial file with read-write permissions.
f, err := os . OpenFile ( partialPath , os . O_CREATE | os . O_RDWR , 0666 )
// Open writer & populate inFile paths
wc, partialPath , err := m . openWriterAndPaths ( id , m . opts . Mode , inFile , baseName , dstPath , offset )
if err != nil {
return 0 , redactAndLogError ( "Create" , err )
return 0 , m . redactAndLogError ( "Create" , err )
}
defer func ( ) {
f. Close ( ) // best-effort to cleanup dangling file handles
wc. Close ( )
if err != nil {
m . deleter . Insert ( filepath . Base ( partialPath ) ) // mark partial file for eventual deletion
}
} ( )
inFile . w = f
// Record that we have started to receive at least one file.
// This is used by the deleter upon a cold-start to scan the directory
// for any files that need to be deleted.
if m. opts . State != nil {
if b , _ := m. opt s. S tate . ReadState ( ipn . TaildropReceivedKey ) ; len ( b ) == 0 {
if err := m. opt s. S tate . WriteState ( ipn . TaildropReceivedKey , [ ] byte { 1 } ) ; err != nil {
m . opts . Logf ( "WriteState error: %v" , err) // non-fatal error
if st := m. opts . State ; st != nil {
if b , _ := st. ReadState ( ipn . TaildropReceivedKey ) ; len ( b ) == 0 {
if w err := st. WriteState ( ipn . TaildropReceivedKey , [ ] byte { 1 } ) ; w err != nil {
m . opts . Logf ( "WriteState error: %v" , w err) // non-fatal error
}
}
}
// A positive offset implies that we are resuming an existing file.
// Seek to the appropriate offset and truncate the file.
if offset != 0 {
currLength , err := f . Seek ( 0 , io . SeekEnd )
if err != nil {
return 0 , redactAndLogError ( "Seek" , err )
}
if offset < 0 || offset > currLength {
return 0 , redactAndLogError ( "Seek" , err )
}
if _ , err := f . Seek ( offset , io . SeekStart ) ; err != nil {
return 0 , redactAndLogError ( "Seek" , err )
}
if err := f . Truncate ( offset ) ; err != nil {
return 0 , redactAndLogError ( "Truncate" , err )
}
}
// Copy the contents of the file.
copyLength , err := io . Copy ( inFile , r )
// Copy the contents of the file to the writer.
copyLength , err := io . Copy ( wc , r )
if err != nil {
return 0 , redactAndLogError ( "Copy" , err )
return 0 , m . redactAndLogError ( "Copy" , err )
}
if length >= 0 && copyLength != length {
return 0 , redactAndLogError ( "Copy" , errors. New ( "copied an unexpected number of bytes" ) )
return 0 , m . redactAndLogError ( "Copy" , fmt . Errorf ( "copied %d bytes; expected %d" , copyLength , length ) )
}
if err := f . Close ( ) ; err != nil {
return 0 , redactAndLogError ( "Close" , err )
if err := wc . Close ( ) ; err != nil {
return 0 , m . redactAndLogError ( "Close" , err )
}
fileLength := offset + copyLength
inFile . mu . Lock ( )
inFile . done = true
inFile . mu . Unlock ( )
// File has been successfully received, rename the partial file
// to the final destination filename. If a file of that name already exists,
// then try multiple times with variations of the filename.
computePartialSum := sync . OnceValues ( func ( ) ( [ sha256 . Size ] byte , error ) {
return sha256File ( partialPath )
} )
maxRetries := 10
for ; maxRetries > 0 ; maxRetries -- {
// Finalize rename
switch m . opts . Mode {
case PutModeDirect :
var finalDst string
finalDst , err = m . finalizeDirect ( inFile , partialPath , dstPath , fileLength )
if err != nil {
return 0 , m . redactAndLogError ( "Rename" , err )
}
inFile . finalPath = finalDst
case PutModeAndroidSAF :
if err = m . finalizeSAF ( partialPath , baseName ) ; err != nil {
return 0 , m . redactAndLogError ( "Rename" , err )
}
}
m . totalReceived . Add ( 1 )
m . opts . SendFileNotify ( )
return fileLength , nil
}
// openWriterAndPaths opens the correct writer, seeks/truncates if needed,
// and sets inFile.partialPath & inFile.finalPath for later cleanup/rename.
// The caller is responsible for closing the file on completion.
func ( m * manager ) openWriterAndPaths (
id clientID ,
mode PutMode ,
inFile * incomingFile ,
baseName string ,
dstPath string ,
offset int64 ,
) ( wc io . WriteCloser , partialPath string , err error ) {
switch mode {
case PutModeDirect :
partialPath = dstPath + id . partialSuffix ( )
f , err := os . OpenFile ( partialPath , os . O_CREATE | os . O_RDWR , 0 o666 )
if err != nil {
return nil , "" , m . redactAndLogError ( "Create" , err )
}
if offset != 0 {
curr , err := f . Seek ( 0 , io . SeekEnd )
if err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , err )
}
if offset < 0 || offset > curr {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , fmt . Errorf ( "offset %d out of range" , offset ) )
}
if _ , err := f . Seek ( offset , io . SeekStart ) ; err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , err )
}
if err := f . Truncate ( offset ) ; err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Truncate" , err )
}
}
inFile . w = f
wc = f
inFile . partialPath = partialPath
inFile . finalPath = dstPath
return wc , partialPath , nil
case PutModeAndroidSAF :
if m . opts . FileOps == nil {
return nil , "" , m . redactAndLogError ( "Create (SAF)" , fmt . Errorf ( "missing FileOps" ) )
}
writer , uri , err := m . opts . FileOps . OpenFileWriter ( baseName )
if err != nil {
return nil , "" , m . redactAndLogError ( "Create (SAF)" , fmt . Errorf ( "failed to open file for writing via SAF" ) )
}
if writer == nil || uri == "" {
return nil , "" , fmt . Errorf ( "invalid SAF writer or URI" )
}
// SAF mode does not support resuming, so enforce offset == 0.
if offset != 0 {
writer . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , fmt . Errorf ( "resuming is not supported in SAF mode" ) )
}
inFile . w = writer
wc = writer
partialPath = uri
inFile . partialPath = uri
inFile . finalPath = baseName
return wc , partialPath , nil
default :
return nil , "" , fmt . Errorf ( "unsupported PutMode: %v" , mode )
}
}
// finalizeDirect atomically renames or dedups the partial file, retrying
// under new names up to 10 times. It returns the final path that succeeded.
func ( m * manager ) finalizeDirect (
inFile * incomingFile ,
partialPath string ,
initialDst string ,
fileLength int64 ,
) ( string , error ) {
var (
once sync . Once
cachedSum [ sha256 . Size ] byte
cacheErr error
computeSum = func ( ) ( [ sha256 . Size ] byte , error ) {
once . Do ( func ( ) { cachedSum , cacheErr = sha256File ( partialPath ) } )
return cachedSum , cacheErr
}
)
dstPath := initialDst
const maxRetries = 10
for i := 0 ; i < maxRetries ; i ++ {
// Atomically rename the partial file as the destination file if it doesn't exist.
// Otherwise, it returns the length of the current destination file.
// The operation is atomic.
dstLength , err := func ( ) ( int64 , error ) {
lengthOnDisk , err := func ( ) ( int64 , error ) {
m . renameMu . Lock ( )
defer m . renameMu . Unlock ( )
switch fi , err := os . Stat ( dstPath ) ; {
case os . IsNotExist ( err ) :
fi , statErr := os . Stat ( dstPath )
if os . IsNotExist ( statErr ) {
// dst missing → rename partial into place
return - 1 , os . Rename ( partialPath , dstPath )
case err != nil :
return - 1 , err
default :
return fi . Size ( ) , nil
}
if statErr != nil {
return - 1 , statErr
}
return fi . Size ( ) , nil
} ( )
if err != nil {
return 0 , redactAndLogError ( "Rename" , err )
return " ", err
}
if dstLength < 0 {
break // we successfully renamed; so stop
if lengthOnDisk < 0 {
// successfully moved
inFile . finalPath = dstPath
return dstPath , nil
}
// Avoid the final rename if a destination file has the same contents.
@ -209,33 +298,59 @@ func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, len
// Note: this is best effort and copying files from iOS from the Media Library
// results in processing on the iOS side which means the size and shas of the
// same file can be different.
if dstLength == fileLength {
part ial Sum, err := compute Partial Sum( )
if lengthOnDisk == fileLength {
part Sum, err := compute Sum( )
if err != nil {
return 0 , redactAndLogError ( " Rename ", err )
return " ", err
}
dstSum , err := sha256File ( dstPath )
if err != nil {
return 0 , redactAndLogError ( " Rename ", err )
return " ", err
}
if dstSum == partialSum {
if partSum == dstSum {
// same content → drop the partial
if err := os . Remove ( partialPath ) ; err != nil {
return 0 , redactAndLogError ( " Remove ", err )
return " ", err
}
break // we successfully found a content match; so stop
inFile . finalPath = dstPath
return dstPath , nil
}
}
// Choose a new destination filename and try again.
dstPath = nextFilename ( dstPath )
inFile . finalPath = dstPath
}
if maxRetries <= 0 {
return 0 , errors . New ( "too many retries trying to rename partial file" )
return "" , fmt . Errorf ( "too many retries trying to rename a partial file %q" , initialDst )
}
// finalizeSAF retries RenamePartialFile up to 10 times, generating a new
// name on each failure until the SAF URI changes.
func ( m * manager ) finalizeSAF (
partialPath , finalName string ,
) error {
if m . opts . FileOps == nil {
return fmt . Errorf ( "missing FileOps for SAF finalize" )
}
m . totalReceived . Add ( 1 )
m . opts . SendFileNotify ( )
return fileLength , nil
const maxTries = 10
name := finalName
for i := 0 ; i < maxTries ; i ++ {
newURI , err := m . opts . FileOps . RenamePartialFile ( partialPath , m . opts . Dir , name )
if err != nil {
return err
}
if newURI != "" && newURI != name {
return nil
}
name = nextFilename ( name )
}
return fmt . Errorf ( "failed to finalize SAF file after %d retries" , maxTries )
}
func ( m * manager ) redactAndLogError ( stage string , err error ) error {
err = redactError ( err )
m . opts . Logf ( "put %s error: %v" , stage , err )
return err
}
func sha256File ( file string ) ( out [ sha256 . Size ] byte , err error ) {