@ -4,11 +4,8 @@
package taildrop
import (
"crypto/sha256"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
@ -73,9 +70,10 @@ func (f *incomingFile) Write(p []byte) (n int, err error) {
// specific partial file. This allows the client to determine whether to resume
// a partial file. While resuming, PutFile may be called again with a non-zero
// offset to specify where to resume receiving data at.
func ( m * manager ) PutFile ( id clientID , baseName string , r io . Reader , offset , length int64 ) ( int64 , error ) {
func ( m * manager ) PutFile ( id clientID , baseName string , r io . Reader , offset , length int64 ) ( fileLength int64 , err error ) {
switch {
case m == nil || m . opts . Dir == "" :
case m == nil || m . opts . fileOps == nil :
return 0 , ErrNoTaildrop
case ! envknob . CanTaildrop ( ) :
return 0 , ErrNoTaildrop
@ -83,47 +81,47 @@ func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, len
return 0 , ErrNotAccessible
}
//Compute dstPath & avoid mid‑ upload deletion
var dstPath string
if m . opts . Mode == PutModeDirect {
var err error
dstPath , err = joinDir ( m . opts . Dir , baseName )
if err := validateBaseName ( baseName ) ; err != nil {
return 0 , err
}
// and make sure we don't delete it while uploading:
m . deleter . Remove ( baseName )
// Create (if not already) the partial file with read-write permissions.
partialName := baseName + id . partialSuffix ( )
wc , partialPath , err := m . opts . fileOps . OpenWriter ( partialName , offset , 0 o666 )
if err != nil {
return 0 , m . redactAndLogError ( "Create" , err )
}
defer func ( ) {
wc . Close ( )
if err != nil {
return 0 , err
m . deleter . Insert ( partialName ) // mark partial file for eventual deletion
}
} else {
// In SAF mode, we simply use the baseName as the destination "path"
// (the actual directory is managed by SAF).
dstPath = baseName
}
m . deleter . Remove ( filepath . Base ( dstPath ) ) // avoid deleting the partial file while receiving
} ( )
// Check whether there is an in-progress transfer for the file.
partial FileKey := incomingFileKey { id , baseName }
inFile , loaded := m . incomingFiles . LoadOrInit ( partial FileKey, func ( ) * incomingFile {
return & incomingFile {
inFileKey := incomingFileKey { id , baseName }
inFile , loaded := m . incomingFiles . LoadOrInit ( inFileKey , func ( ) * incomingFile {
inFile := & incomingFile {
clock : m . opts . Clock ,
started : m . opts . Clock . Now ( ) ,
size : length ,
sendFileNotify : m . opts . SendFileNotify ,
}
if m . opts . DirectFileMode {
inFile . partialPath = partialPath
}
return inFile
} )
inFile . w = wc
if loaded {
return 0 , ErrFileExists
}
defer m . incomingFiles . Delete ( partialFileKey )
// Open writer & populate inFile paths
wc , partialPath , err := m . openWriterAndPaths ( id , m . opts . Mode , inFile , baseName , dstPath , offset )
if err != nil {
return 0 , m . redactAndLogError ( "Create" , err )
}
defer func ( ) {
wc . Close ( )
if err != nil {
m . deleter . Insert ( filepath . Base ( partialPath ) ) // mark partial file for eventual deletion
}
} ( )
defer m . incomingFiles . Delete ( inFileKey )
// Record that we have started to receive at least one file.
// This is used by the deleter upon a cold-start to scan the directory
@ -148,220 +146,26 @@ func (m *manager) PutFile(id clientID, baseName string, r io.Reader, offset, len
return 0 , m . redactAndLogError ( "Close" , err )
}
fileLength : = offset + copyLength
fileLength = offset + copyLength
inFile . mu . Lock ( )
inFile . done = true
inFile . mu . Unlock ( )
// Finalize rename
switch m . opts . Mode {
case PutModeDirect :
var finalDst string
finalDst , err = m . finalizeDirect ( inFile , partialPath , dstPath , fileLength )
if err != nil {
return 0 , m . redactAndLogError ( "Rename" , err )
}
inFile . finalPath = finalDst
case PutModeAndroidSAF :
if err = m . finalizeSAF ( partialPath , baseName ) ; err != nil {
return 0 , m . redactAndLogError ( "Rename" , err )
}
// 6) Finalize (rename/move) the partial into place via FileOps.Rename
finalPath , err := m . opts . fileOps . Rename ( partialPath , baseName )
if err != nil {
return 0 , m . redactAndLogError ( "Rename" , err )
}
inFile . finalPath = finalPath
m . totalReceived . Add ( 1 )
m . opts . SendFileNotify ( )
return fileLength , nil
}
// openWriterAndPaths opens the correct writer, seeks/truncates if needed,
// and sets inFile.partialPath & inFile.finalPath for later cleanup/rename.
// The caller is responsible for closing the file on completion.
func ( m * manager ) openWriterAndPaths (
id clientID ,
mode PutMode ,
inFile * incomingFile ,
baseName string ,
dstPath string ,
offset int64 ,
) ( wc io . WriteCloser , partialPath string , err error ) {
switch mode {
case PutModeDirect :
partialPath = dstPath + id . partialSuffix ( )
f , err := os . OpenFile ( partialPath , os . O_CREATE | os . O_RDWR , 0 o666 )
if err != nil {
return nil , "" , m . redactAndLogError ( "Create" , err )
}
if offset != 0 {
curr , err := f . Seek ( 0 , io . SeekEnd )
if err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , err )
}
if offset < 0 || offset > curr {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , fmt . Errorf ( "offset %d out of range" , offset ) )
}
if _ , err := f . Seek ( offset , io . SeekStart ) ; err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , err )
}
if err := f . Truncate ( offset ) ; err != nil {
f . Close ( )
return nil , "" , m . redactAndLogError ( "Truncate" , err )
}
}
inFile . w = f
wc = f
inFile . partialPath = partialPath
inFile . finalPath = dstPath
return wc , partialPath , nil
case PutModeAndroidSAF :
if m . opts . FileOps == nil {
return nil , "" , m . redactAndLogError ( "Create (SAF)" , fmt . Errorf ( "missing FileOps" ) )
}
writer , uri , err := m . opts . FileOps . OpenFileWriter ( baseName )
if err != nil {
return nil , "" , m . redactAndLogError ( "Create (SAF)" , fmt . Errorf ( "failed to open file for writing via SAF" ) )
}
if writer == nil || uri == "" {
return nil , "" , fmt . Errorf ( "invalid SAF writer or URI" )
}
// SAF mode does not support resuming, so enforce offset == 0.
if offset != 0 {
writer . Close ( )
return nil , "" , m . redactAndLogError ( "Seek" , fmt . Errorf ( "resuming is not supported in SAF mode" ) )
}
inFile . w = writer
wc = writer
partialPath = uri
inFile . partialPath = uri
inFile . finalPath = baseName
return wc , partialPath , nil
default :
return nil , "" , fmt . Errorf ( "unsupported PutMode: %v" , mode )
}
}
// finalizeDirect atomically renames or dedups the partial file, retrying
// under new names up to 10 times. It returns the final path that succeeded.
func ( m * manager ) finalizeDirect (
inFile * incomingFile ,
partialPath string ,
initialDst string ,
fileLength int64 ,
) ( string , error ) {
var (
once sync . Once
cachedSum [ sha256 . Size ] byte
cacheErr error
computeSum = func ( ) ( [ sha256 . Size ] byte , error ) {
once . Do ( func ( ) { cachedSum , cacheErr = sha256File ( partialPath ) } )
return cachedSum , cacheErr
}
)
dstPath := initialDst
const maxRetries = 10
for i := 0 ; i < maxRetries ; i ++ {
// Atomically rename the partial file as the destination file if it doesn't exist.
// Otherwise, it returns the length of the current destination file.
// The operation is atomic.
lengthOnDisk , err := func ( ) ( int64 , error ) {
m . renameMu . Lock ( )
defer m . renameMu . Unlock ( )
fi , statErr := os . Stat ( dstPath )
if os . IsNotExist ( statErr ) {
// dst missing → rename partial into place
return - 1 , os . Rename ( partialPath , dstPath )
}
if statErr != nil {
return - 1 , statErr
}
return fi . Size ( ) , nil
} ( )
if err != nil {
return "" , err
}
if lengthOnDisk < 0 {
// successfully moved
inFile . finalPath = dstPath
return dstPath , nil
}
// Avoid the final rename if a destination file has the same contents.
//
// Note: this is best effort and copying files from iOS from the Media Library
// results in processing on the iOS side which means the size and shas of the
// same file can be different.
if lengthOnDisk == fileLength {
partSum , err := computeSum ( )
if err != nil {
return "" , err
}
dstSum , err := sha256File ( dstPath )
if err != nil {
return "" , err
}
if partSum == dstSum {
// same content → drop the partial
if err := os . Remove ( partialPath ) ; err != nil {
return "" , err
}
inFile . finalPath = dstPath
return dstPath , nil
}
}
// Choose a new destination filename and try again.
dstPath = nextFilename ( dstPath )
}
return "" , fmt . Errorf ( "too many retries trying to rename a partial file %q" , initialDst )
}
// finalizeSAF retries RenamePartialFile up to 10 times, generating a new
// name on each failure until the SAF URI changes.
func ( m * manager ) finalizeSAF (
partialPath , finalName string ,
) error {
if m . opts . FileOps == nil {
return fmt . Errorf ( "missing FileOps for SAF finalize" )
}
const maxTries = 10
name := finalName
for i := 0 ; i < maxTries ; i ++ {
newURI , err := m . opts . FileOps . RenamePartialFile ( partialPath , m . opts . Dir , name )
if err != nil {
return err
}
if newURI != "" && newURI != name {
return nil
}
name = nextFilename ( name )
}
return fmt . Errorf ( "failed to finalize SAF file after %d retries" , maxTries )
}
func ( m * manager ) redactAndLogError ( stage string , err error ) error {
err = redactError ( err )
m . opts . Logf ( "put %s error: %v" , stage , err )
return err
}
func sha256File ( file string ) ( out [ sha256 . Size ] byte , err error ) {
h := sha256 . New ( )
f , err := os . Open ( file )
if err != nil {
return out , err
}
defer f . Close ( )
if _ , err := io . Copy ( h , f ) ; err != nil {
return out , err
}
return [ sha256 . Size ] byte ( h . Sum ( nil ) ) , nil
}