logtail/filch: fix panic in concurrent file access (#18555)

In the event of multiple Filch intances being backed by the same file,
it is possible that concurrent rotateLocked calls occur.
One operation might clear the file,
resulting in another skipping the call to resetReadBuffer,
resulting in a later panic because the read index is invalid.
To at least avoid the panic, always call resetReadBuffer.

Note that the behavior of Filch is undefined when using the same file.
While this avoids the panic, we may still experience data corruption or less.

Fixes #18552

Signed-off-by: Joe Tsai <joetsai@digital-static.net>
pull/14575/merge
Joe Tsai 1 day ago committed by GitHub
parent 9e7f536a7c
commit 6f55309f34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -316,7 +316,7 @@ func waitIdleStderrForTest() {
// No data should be lost under this condition.
//
// - The writer exceeded a limit for f.newer.
// Data may be lost under this cxondition.
// Data may be lost under this condition.
func (f *Filch) rotateLocked() error {
f.rotateCalls.Add(1)
@ -329,7 +329,6 @@ func (f *Filch) rotateLocked() error {
rdPos := pos - int64(len(f.unreadReadBuffer())) // adjust for data already read into the read buffer
f.droppedBytes.Add(max(0, fi.Size()-rdPos))
}
f.resetReadBuffer()
// Truncate the older file and write relative to the start.
if err := f.older.Truncate(0); err != nil {
@ -339,6 +338,7 @@ func (f *Filch) rotateLocked() error {
return err
}
}
f.resetReadBuffer()
// Swap newer and older.
f.newer, f.older = f.older, f.newer

@ -381,3 +381,26 @@ func testMaxFileSize(t *testing.T, replaceStderr bool) {
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
}
}
// TestConcurrentSameFile tests that concurrent Filch operations on the same
// set of log files does not result in a panic.
// The exact behavior is undefined, but we should at least avoid a panic.
func TestConcurrentSameFile(t *testing.T) {
filePrefix := filepath.Join(t.TempDir(), "testlog")
f1 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
f2 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
var group sync.WaitGroup
for _, f := range []*Filch{f1, f2} {
group.Go(func() {
for range 1000 {
for range rand.IntN(10) {
f.Write([]byte("hello, world"))
}
for range rand.IntN(10) {
f.TryReadLine()
}
}
})
}
group.Wait()
}

Loading…
Cancel
Save