logtail: add instance metadata to the entry logtail

Allows instances that are running with the same machine ID (due to
cloning) to be distinguished.

Also adds sequence numbers to detect duplicates.

For tailscale/corp#5244

Signed-off-by: Mihai Parparita <mihai@tailscale.com>
pull/4709/head
Mihai Parparita 3 years ago committed by Mihai Parparita
parent acfe5bd33b
commit 3222bce02d

@ -519,6 +519,8 @@ func New(collection string) *Policy {
} }
if collection == logtail.CollectionNode { if collection == logtail.CollectionNode {
c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta
c.IncludeProcID = true
c.IncludeProcSequence = true
} }
if val := getLogTarget(); val != "" { if val := getLogTarget(); val != "" {

@ -8,6 +8,8 @@ package logtail
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -15,6 +17,7 @@ import (
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -62,6 +65,17 @@ type Config struct {
// DrainLogs, if non-nil, disables automatic uploading of new logs, // DrainLogs, if non-nil, disables automatic uploading of new logs,
// so that logs are only uploaded when a token is sent to DrainLogs. // so that logs are only uploaded when a token is sent to DrainLogs.
DrainLogs <-chan struct{} DrainLogs <-chan struct{}
// IncludeProcID, if true, results in an ephemeral process identifier being
// included in logs. The ID is random and not guaranteed to be globally
// unique, but it can be used to distinguish between different instances
// running with same PrivateID.
IncludeProcID bool
// IncludeProcSequence, if true, results in an ephemeral sequence number
// being included in the logs. The sequence number is incremented for each
// log message sent, but is not peristed across process restarts.
IncludeProcSequence bool
} }
func NewLogger(cfg Config, logf tslogger.Logf) *Logger { func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
@ -84,6 +98,17 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
} }
cfg.Buffer = NewMemoryBuffer(pendingSize) cfg.Buffer = NewMemoryBuffer(pendingSize)
} }
var procID uint32
if cfg.IncludeProcID {
keyBytes := make([]byte, 4)
rand.Read(keyBytes)
procID = binary.LittleEndian.Uint32(keyBytes)
if procID == 0 {
// 0 is the empty/off value, assign a different (non-zero) value to
// make sure we still include an ID (actual value does not matter).
procID = 7
}
}
l := &Logger{ l := &Logger{
privateID: cfg.PrivateID, privateID: cfg.PrivateID,
stderr: cfg.Stderr, stderr: cfg.Stderr,
@ -100,6 +125,9 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger {
bo: backoff.NewBackoff("logtail", logf, 30*time.Second), bo: backoff.NewBackoff("logtail", logf, 30*time.Second),
metricsDelta: cfg.MetricsDelta, metricsDelta: cfg.MetricsDelta,
procID: procID,
includeProcSequence: cfg.IncludeProcSequence,
shutdownStart: make(chan struct{}), shutdownStart: make(chan struct{}),
shutdownDone: make(chan struct{}), shutdownDone: make(chan struct{}),
} }
@ -137,6 +165,11 @@ type Logger struct {
metricsDelta func() string // or nil metricsDelta func() string // or nil
privateID PrivateID privateID PrivateID
procID uint32
includeProcSequence bool
writeLock sync.Mutex // guards increments of procSequence
procSequence uint64
shutdownStart chan struct{} // closed when shutdown begins shutdownStart chan struct{} // closed when shutdown begins
shutdownDone chan struct{} // closed when shutdown complete shutdownDone chan struct{} // closed when shutdown complete
} }
@ -253,8 +286,6 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) {
if b[0] != '{' || !json.Valid(b) { if b[0] != '{' || !json.Valid(b) {
// This is probably a log added to stderr by filch // This is probably a log added to stderr by filch
// outside of the logtail logger. Encode it. // outside of the logtail logger. Encode it.
// Do not add a client time, as it could have been
// been written a long time ago.
if !l.explainedRaw { if !l.explainedRaw {
fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n") fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n")
fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n") fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n")
@ -263,7 +294,10 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) {
l.explainedRaw = true l.explainedRaw = true
} }
fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b) fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b)
b = l.encodeText(b, true, 0) // Do not add a client time, as it could have been
// been written a long time ago. Don't include instance key or ID
// either, since this came from a different instance.
b = l.encodeText(b, true, 0, 0, 0)
} }
if entries > 0 { if entries > 0 {
@ -437,14 +471,24 @@ func (l *Logger) send(jsonBlob []byte) (int, error) {
// TODO: instead of allocating, this should probably just append // TODO: instead of allocating, this should probably just append
// directly into the output log buffer. // directly into the output log buffer.
func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte {
now := l.timeNow() now := l.timeNow()
// Factor in JSON encoding overhead to try to only do one alloc // Factor in JSON encoding overhead to try to only do one alloc
// in the make below (so appends don't resize the buffer). // in the make below (so appends don't resize the buffer).
overhead := 13 overhead := len(`{"text": ""}\n`)
includeLogtail := !skipClientTime || procID != 0 || procSequence != 0
if includeLogtail {
overhead += len(`"logtail": {},`)
}
if !skipClientTime { if !skipClientTime {
overhead += 67 overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`)
}
if procID != 0 {
overhead += len(`"proc_id": 4294967296,`)
}
if procSequence != 0 {
overhead += len(`"proc_seq": 9007199254740992,`)
} }
// TODO: do a pass over buf and count how many backslashes will be needed? // TODO: do a pass over buf and count how many backslashes will be needed?
// For now just factor in a dozen. // For now just factor in a dozen.
@ -468,10 +512,25 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte {
b := make([]byte, 0, len(buf)+overhead) b := make([]byte, 0, len(buf)+overhead)
b = append(b, '{') b = append(b, '{')
if !skipClientTime { if includeLogtail {
b = append(b, `"logtail": {"client_time": "`...) b = append(b, `"logtail": {`...)
b = now.AppendFormat(b, time.RFC3339Nano) if !skipClientTime {
b = append(b, "\"}, "...) b = append(b, `"client_time": "`...)
b = now.AppendFormat(b, time.RFC3339Nano)
b = append(b, `",`...)
}
if procID != 0 {
b = append(b, `"proc_id": `...)
b = strconv.AppendUint(b, uint64(procID), 10)
b = append(b, ',')
}
if procSequence != 0 {
b = append(b, `"proc_seq": `...)
b = strconv.AppendUint(b, procSequence, 10)
b = append(b, ',')
}
b = bytes.TrimRight(b, ",")
b = append(b, "}, "...)
} }
if l.metricsDelta != nil { if l.metricsDelta != nil {
@ -521,8 +580,11 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte {
} }
func (l *Logger) encode(buf []byte, level int) []byte { func (l *Logger) encode(buf []byte, level int) []byte {
if l.includeProcSequence {
l.procSequence++
}
if buf[0] != '{' { if buf[0] != '{' {
return l.encodeText(buf, l.skipClientTime, level) // text fast-path return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path
} }
now := l.timeNow() now := l.timeNow()
@ -544,10 +606,18 @@ func (l *Logger) encode(buf []byte, level int) []byte {
obj["error_has_logtail"] = obj["logtail"] obj["error_has_logtail"] = obj["logtail"]
obj["logtail"] = nil obj["logtail"] = nil
} }
if !l.skipClientTime { if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 {
obj["logtail"] = map[string]string{ logtail := map[string]any{}
"client_time": now.Format(time.RFC3339Nano), if !l.skipClientTime {
logtail["client_time"] = now.Format(time.RFC3339Nano)
}
if l.procID != 0 {
logtail["proc_id"] = l.procID
}
if l.procSequence != 0 {
logtail["proc_seq"] = l.procSequence
} }
obj["logtail"] = logtail
} }
if level > 0 { if level > 0 {
obj["v"] = level obj["v"] = level
@ -590,8 +660,10 @@ func (l *Logger) Write(buf []byte) (int, error) {
l.stderr.Write(withNL) l.stderr.Write(withNL)
} }
} }
l.writeLock.Lock()
b := l.encode(buf, level) b := l.encode(buf, level)
_, err := l.send(b) _, err := l.send(b)
l.writeLock.Unlock()
return len(buf), err return len(buf), err
} }

@ -216,8 +216,10 @@ var sink []byte
func TestLoggerEncodeTextAllocs(t *testing.T) { func TestLoggerEncodeTextAllocs(t *testing.T) {
lg := &Logger{timeNow: time.Now} lg := &Logger{timeNow: time.Now}
inBuf := []byte("some text to encode") inBuf := []byte("some text to encode")
procID := uint32(0x24d32ee9)
procSequence := uint64(0x12346)
err := tstest.MinAllocsPerRun(t, 1, func() { err := tstest.MinAllocsPerRun(t, 1, func() {
sink = lg.encodeText(inBuf, false, 0) sink = lg.encodeText(inBuf, false, procID, procSequence, 0)
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -333,7 +335,7 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any {
func TestEncodeTextTruncation(t *testing.T) { func TestEncodeTextTruncation(t *testing.T) {
lg := &Logger{timeNow: time.Now, lowMem: true} lg := &Logger{timeNow: time.Now, lowMem: true}
in := bytes.Repeat([]byte("a"), 300) in := bytes.Repeat([]byte("a"), 300)
b := lg.encodeText(in, true, 0) b := lg.encodeText(in, true, 0, 0, 0)
got := string(b) got := string(b)
want := `{"text": "` + strings.Repeat("a", 255) + `…+45"}` + "\n" want := `{"text": "` + strings.Repeat("a", 255) + `…+45"}` + "\n"
if got != want { if got != want {
@ -355,38 +357,40 @@ func TestEncode(t *testing.T) {
}{ }{
{ {
"normal", "normal",
`{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "text": "normal"}` + "\n", `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "text": "normal"}` + "\n",
}, },
{ {
"and a [v1] level one", "and a [v1] level one",
`{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":1,"text": "and a level one"}` + "\n", `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":1,"text": "and a level one"}` + "\n",
}, },
{ {
"[v2] some verbose two", "[v2] some verbose two",
`{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":2,"text": "some verbose two"}` + "\n", `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":2,"text": "some verbose two"}` + "\n",
}, },
{ {
"{}", "{}",
`{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", `{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n",
}, },
{ {
`{"foo":"bar"}`, `{"foo":"bar"}`,
`{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", `{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n",
}, },
{ {
"foo: [v\x00JSON]0{\"foo\":1}", "foo: [v\x00JSON]0{\"foo\":1}",
"{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"}}\n", "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1}}\n",
}, },
{ {
"foo: [v\x00JSON]2{\"foo\":1}", "foo: [v\x00JSON]2{\"foo\":1}",
"{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"},\"v\":2}\n", "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1},\"v\":2}\n",
}, },
} }
for _, tt := range tests { for _, tt := range tests {
buf := new(simpleMemBuf) buf := new(simpleMemBuf)
lg := &Logger{ lg := &Logger{
timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, timeNow: func() time.Time { return time.Unix(123, 456).UTC() },
buffer: buf, buffer: buf,
procID: 7,
procSequence: 1,
} }
io.WriteString(lg, tt.in) io.WriteString(lg, tt.in)
got := buf.buf.String() got := buf.buf.String()

Loading…
Cancel
Save