mirror of https://github.com/tailscale/tailscale/
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
384 lines
11 KiB
Go
384 lines
11 KiB
Go
// Copyright (c) Tailscale Inc & AUTHORS
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package filch
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand/v2"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
jsonv2 "github.com/go-json-experiment/json"
|
|
"tailscale.com/tstest"
|
|
"tailscale.com/util/must"
|
|
)
|
|
|
|
func init() { alwaysStatForTests = true }
|
|
|
|
type filchTest struct {
|
|
*Filch
|
|
|
|
filePrefix string
|
|
}
|
|
|
|
func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest {
|
|
t.Helper()
|
|
if filePrefix == "" {
|
|
filePrefix = filepath.Join(t.TempDir(), "testlog")
|
|
}
|
|
f, err := New(filePrefix, opts)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := f.Close(); err != nil {
|
|
t.Errorf("Close error: %v", err)
|
|
}
|
|
})
|
|
return &filchTest{Filch: f, filePrefix: filePrefix}
|
|
}
|
|
|
|
func (f *filchTest) read(t *testing.T, want []byte) {
|
|
t.Helper()
|
|
if got, err := f.TryReadLine(); err != nil {
|
|
t.Fatalf("TryReadLine error: %v", err)
|
|
} else if string(got) != string(want) {
|
|
t.Errorf("TryReadLine = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestNew(t *testing.T) {
|
|
const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n"
|
|
const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n"
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
checkLinesAndCleanup := func() {
|
|
t.Helper()
|
|
defer os.Remove(filepath.Join(filePrefix + ".log1.txt"))
|
|
defer os.Remove(filepath.Join(filePrefix + ".log2.txt"))
|
|
f := newForTest(t, filePrefix, Options{})
|
|
var got []byte
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if b == nil {
|
|
break
|
|
}
|
|
got = append(got, b...)
|
|
}
|
|
if string(got) != want1+want2 {
|
|
t.Errorf("got %q\nwant %q", got, want1+want2)
|
|
}
|
|
}
|
|
now := time.Now()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
checkLinesAndCleanup()
|
|
}
|
|
|
|
func setupStderr(t *testing.T) {
|
|
t.Helper()
|
|
pipeR, pipeW, err := os.Pipe()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { pipeR.Close() })
|
|
t.Cleanup(func() {
|
|
switch b, err := io.ReadAll(pipeR); {
|
|
case err != nil:
|
|
t.Fatalf("ReadAll error: %v", err)
|
|
case len(b) > 0:
|
|
t.Errorf("unexpected write to fake stderr: %s", b)
|
|
}
|
|
})
|
|
t.Cleanup(func() { pipeW.Close() })
|
|
tstest.Replace(t, &stderrFD, int(pipeW.Fd()))
|
|
tstest.Replace(t, &os.Stderr, pipeW)
|
|
}
|
|
|
|
func TestConcurrentWriteAndRead(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
setupStderr(t)
|
|
}
|
|
|
|
const numWriters = 10
|
|
const linesPerWriter = 1000
|
|
opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Concurrently write many lines.
|
|
var draining sync.RWMutex
|
|
var group sync.WaitGroup
|
|
defer group.Wait()
|
|
data := bytes.Repeat([]byte("X"), 1000)
|
|
var runningWriters atomic.Int64
|
|
for i := range numWriters {
|
|
runningWriters.Add(+1)
|
|
group.Go(func() {
|
|
defer runningWriters.Add(-1)
|
|
var b []byte
|
|
for j := range linesPerWriter {
|
|
b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))])
|
|
draining.RLock()
|
|
if i%2 == 0 && opts.ReplaceStderr {
|
|
stderrWriteForTest(b)
|
|
} else {
|
|
must.Get(f.Write(b))
|
|
}
|
|
draining.RUnlock()
|
|
runtime.Gosched()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Verify that we can read back the lines in an ordered manner.
|
|
var lines int
|
|
var entry struct{ Index, Count int }
|
|
state := make(map[int]int)
|
|
checkLine := func() (ok bool) {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
return false
|
|
}
|
|
entry.Index, entry.Count = 0, 0
|
|
if err := jsonv2.Unmarshal(b, &entry); err != nil {
|
|
t.Fatalf("json.Unmarshal error: %v", err)
|
|
}
|
|
if wantCount := state[entry.Index] + 1; entry.Count != wantCount {
|
|
t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount)
|
|
}
|
|
state[entry.Index] = entry.Count
|
|
lines++
|
|
return true
|
|
}
|
|
for lines < numWriters*linesPerWriter {
|
|
writersDone := runningWriters.Load() == 0
|
|
for range rand.IntN(100) {
|
|
runtime.Gosched() // bias towards more writer operations
|
|
}
|
|
|
|
if rand.IntN(100) == 0 {
|
|
// Asynchronous read of a single line.
|
|
if !checkLine() && writersDone {
|
|
t.Fatal("failed to read all lines after all writers done")
|
|
}
|
|
} else {
|
|
// Synchronous reading of all lines.
|
|
draining.Lock()
|
|
for checkLine() {
|
|
}
|
|
draining.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test that the
|
|
func TestBufferCapacity(t *testing.T) {
|
|
f := newForTest(t, "", Options{})
|
|
b := bytes.Repeat([]byte("X"), 1000)
|
|
for range 1000 {
|
|
must.Get(f.Write(b[:rand.IntN(len(b))]))
|
|
}
|
|
for must.Get(f.TryReadLine()) != nil {
|
|
}
|
|
if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) {
|
|
t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b))
|
|
}
|
|
|
|
must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1)))
|
|
must.Get(f.TryReadLine())
|
|
wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf)
|
|
|
|
// Force another rotation. Buffers should not be GC'd yet.
|
|
must.Get(f.TryReadLine())
|
|
if cap(f.wrBuf) != wrCap {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap)
|
|
}
|
|
if cap(f.rdBuf) != rdCap {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap)
|
|
}
|
|
|
|
// Force many rotations. Buffers should be GC'd.
|
|
for range 64 {
|
|
t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf))
|
|
must.Get(f.TryReadLine())
|
|
}
|
|
if cap(f.wrBuf) != 0 {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0)
|
|
}
|
|
if cap(f.rdBuf) != 0 {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0)
|
|
}
|
|
}
|
|
|
|
func TestMaxLineSize(t *testing.T) {
|
|
const maxLineSize = 1000
|
|
f := newForTest(t, "", Options{MaxLineSize: maxLineSize})
|
|
|
|
// Test writing.
|
|
b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n")
|
|
must.Get(f.Write(b0))
|
|
b1 := []byte(strings.Repeat("X", maxLineSize))
|
|
if _, err := f.Write(b1); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
b2 := bytes.Repeat(b0, 2)
|
|
must.Get(f.Write(b2))
|
|
if f.storedBytesForTest() != int64(len(b0)+len(b2)) {
|
|
t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2)))
|
|
}
|
|
|
|
// Test reading.
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, nil) // should trigger rotate
|
|
if f.storedBytesForTest() != 0 {
|
|
t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest())
|
|
}
|
|
|
|
// Test writing
|
|
must.Get(f.Write([]byte("hello")))
|
|
must.Get(f.Write(b0))
|
|
must.Get(f.Write([]byte("goodbye")))
|
|
|
|
// Test reading.
|
|
f.Close()
|
|
f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10})
|
|
f.read(t, []byte("hello\n"))
|
|
if _, err := f.TryReadLine(); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
f.read(t, []byte("goodbye\n"))
|
|
|
|
// Check that the read buffer does not need to be as long
|
|
// as the overly long line to skip over it.
|
|
if cap(f.rdBuf) >= maxLineSize/2 {
|
|
t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2)
|
|
}
|
|
}
|
|
|
|
func TestMaxFileSize(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) })
|
|
}
|
|
t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) })
|
|
}
|
|
|
|
func testMaxFileSize(t *testing.T, replaceStderr bool) {
|
|
if replaceStderr {
|
|
setupStderr(t)
|
|
}
|
|
|
|
opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Write lots of data.
|
|
const calls = 1000
|
|
var group sync.WaitGroup
|
|
var filchedBytes, writeBytes int64
|
|
group.Go(func() {
|
|
if !opts.ReplaceStderr {
|
|
return
|
|
}
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1)
|
|
filchedBytes += int64(stderrWriteForTest(b))
|
|
}
|
|
})
|
|
group.Go(func() {
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1)
|
|
writeBytes += int64(must.Get(f.Write(b)))
|
|
}
|
|
})
|
|
group.Wait()
|
|
f.statAndUpdateBytes()
|
|
droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value()
|
|
|
|
switch {
|
|
case f.writeCalls.Value() != calls:
|
|
t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls)
|
|
case f.readCalls.Value() != 0:
|
|
t.Errorf("readCalls = %v, want 0", f.readCalls.Value())
|
|
case f.rotateCalls.Value() == 0:
|
|
t.Errorf("rotateCalls = 0, want >0")
|
|
case f.callErrors.Value() != 0:
|
|
t.Errorf("callErrors = %v, want 0", f.callErrors.Value())
|
|
case f.writeBytes.Value() != writeBytes+filchedBytes:
|
|
t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes)
|
|
case f.readBytes.Value() != 0:
|
|
t.Errorf("readBytes = %v, want 0", f.readBytes.Value())
|
|
case f.filchedBytes.Value() != filchedBytes:
|
|
t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes)
|
|
case f.droppedBytes.Value() != droppedBytes:
|
|
t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes)
|
|
case f.droppedBytes.Value() == 0:
|
|
t.Errorf("droppedBytes = 0, want >0")
|
|
case f.storedBytes.Value() != f.storedBytesForTest():
|
|
t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest())
|
|
case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr:
|
|
// If ReplaceStderr, it is impossible for MaxFileSize to be
|
|
// strictly adhered to since asynchronous os.Stderr.Write calls
|
|
// do not trigger any checks to enforce maximum file size.
|
|
t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize)
|
|
}
|
|
|
|
// Read back the data and verify that the entries are in order.
|
|
var readBytes, lastFilchIndex, lastWriteIndex int64
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
break
|
|
}
|
|
var entry struct{ FilchIndex, WriteIndex int64 }
|
|
must.Do(json.Unmarshal(b, &entry))
|
|
if entry.FilchIndex == 0 && entry.WriteIndex == 0 {
|
|
t.Errorf("both indexes are zero")
|
|
}
|
|
if entry.FilchIndex > 0 {
|
|
if entry.FilchIndex <= lastFilchIndex {
|
|
t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex)
|
|
}
|
|
lastFilchIndex = entry.FilchIndex
|
|
}
|
|
if entry.WriteIndex > 0 {
|
|
if entry.WriteIndex <= lastWriteIndex {
|
|
t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex)
|
|
}
|
|
lastWriteIndex = entry.WriteIndex
|
|
}
|
|
readBytes += int64(len(b))
|
|
}
|
|
|
|
if f.readBytes.Value() != readBytes {
|
|
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
|
|
}
|
|
}
|