From 3222bce02d02251b1e6b62c7df411d94b0baf39e Mon Sep 17 00:00:00 2001 From: Mihai Parparita Date: Tue, 17 May 2022 22:28:57 -0700 Subject: [PATCH] logtail: add instance metadata to the entry logtail Allows instances that are running with the same machine ID (due to cloning) to be distinguished. Also adds sequence numbers to detect duplicates. For tailscale/corp#5244 Signed-off-by: Mihai Parparita --- logpolicy/logpolicy.go | 2 + logtail/logtail.go | 100 ++++++++++++++++++++++++++++++++++------ logtail/logtail_test.go | 26 ++++++----- 3 files changed, 103 insertions(+), 25 deletions(-) diff --git a/logpolicy/logpolicy.go b/logpolicy/logpolicy.go index 696be98f2..c0a29a396 100644 --- a/logpolicy/logpolicy.go +++ b/logpolicy/logpolicy.go @@ -519,6 +519,8 @@ func New(collection string) *Policy { } if collection == logtail.CollectionNode { c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta + c.IncludeProcID = true + c.IncludeProcSequence = true } if val := getLogTarget(); val != "" { diff --git a/logtail/logtail.go b/logtail/logtail.go index 9e7a26805..19dd6145a 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -8,6 +8,8 @@ package logtail import ( "bytes" "context" + "crypto/rand" + "encoding/binary" "encoding/json" "fmt" "io" @@ -15,6 +17,7 @@ import ( "net/http" "os" "strconv" + "sync" "sync/atomic" "time" @@ -62,6 +65,17 @@ type Config struct { // DrainLogs, if non-nil, disables automatic uploading of new logs, // so that logs are only uploaded when a token is sent to DrainLogs. DrainLogs <-chan struct{} + + // IncludeProcID, if true, results in an ephemeral process identifier being + // included in logs. The ID is random and not guaranteed to be globally + // unique, but it can be used to distinguish between different instances + // running with same PrivateID. + IncludeProcID bool + + // IncludeProcSequence, if true, results in an ephemeral sequence number + // being included in the logs. The sequence number is incremented for each + // log message sent, but is not peristed across process restarts. + IncludeProcSequence bool } func NewLogger(cfg Config, logf tslogger.Logf) *Logger { @@ -84,6 +98,17 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { } cfg.Buffer = NewMemoryBuffer(pendingSize) } + var procID uint32 + if cfg.IncludeProcID { + keyBytes := make([]byte, 4) + rand.Read(keyBytes) + procID = binary.LittleEndian.Uint32(keyBytes) + if procID == 0 { + // 0 is the empty/off value, assign a different (non-zero) value to + // make sure we still include an ID (actual value does not matter). + procID = 7 + } + } l := &Logger{ privateID: cfg.PrivateID, stderr: cfg.Stderr, @@ -100,6 +125,9 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { bo: backoff.NewBackoff("logtail", logf, 30*time.Second), metricsDelta: cfg.MetricsDelta, + procID: procID, + includeProcSequence: cfg.IncludeProcSequence, + shutdownStart: make(chan struct{}), shutdownDone: make(chan struct{}), } @@ -137,6 +165,11 @@ type Logger struct { metricsDelta func() string // or nil privateID PrivateID + procID uint32 + includeProcSequence bool + writeLock sync.Mutex // guards increments of procSequence + procSequence uint64 + shutdownStart chan struct{} // closed when shutdown begins shutdownDone chan struct{} // closed when shutdown complete } @@ -253,8 +286,6 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { if b[0] != '{' || !json.Valid(b) { // This is probably a log added to stderr by filch // outside of the logtail logger. Encode it. - // Do not add a client time, as it could have been - // been written a long time ago. if !l.explainedRaw { fmt.Fprintf(l.stderr, "RAW-STDERR: ***\n") fmt.Fprintf(l.stderr, "RAW-STDERR: *** Lines prefixed with RAW-STDERR below bypassed logtail and probably come from a previous run of the program\n") @@ -263,7 +294,10 @@ func (l *Logger) drainPending(scratch []byte) (res []byte) { l.explainedRaw = true } fmt.Fprintf(l.stderr, "RAW-STDERR: %s", b) - b = l.encodeText(b, true, 0) + // Do not add a client time, as it could have been + // been written a long time ago. Don't include instance key or ID + // either, since this came from a different instance. + b = l.encodeText(b, true, 0, 0, 0) } if entries > 0 { @@ -437,14 +471,24 @@ func (l *Logger) send(jsonBlob []byte) (int, error) { // TODO: instead of allocating, this should probably just append // directly into the output log buffer. -func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { +func (l *Logger) encodeText(buf []byte, skipClientTime bool, procID uint32, procSequence uint64, level int) []byte { now := l.timeNow() // Factor in JSON encoding overhead to try to only do one alloc // in the make below (so appends don't resize the buffer). - overhead := 13 + overhead := len(`{"text": ""}\n`) + includeLogtail := !skipClientTime || procID != 0 || procSequence != 0 + if includeLogtail { + overhead += len(`"logtail": {},`) + } if !skipClientTime { - overhead += 67 + overhead += len(`"client_time": "2006-01-02T15:04:05.999999999Z07:00",`) + } + if procID != 0 { + overhead += len(`"proc_id": 4294967296,`) + } + if procSequence != 0 { + overhead += len(`"proc_seq": 9007199254740992,`) } // TODO: do a pass over buf and count how many backslashes will be needed? // For now just factor in a dozen. @@ -468,10 +512,25 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { b := make([]byte, 0, len(buf)+overhead) b = append(b, '{') - if !skipClientTime { - b = append(b, `"logtail": {"client_time": "`...) - b = now.AppendFormat(b, time.RFC3339Nano) - b = append(b, "\"}, "...) + if includeLogtail { + b = append(b, `"logtail": {`...) + if !skipClientTime { + b = append(b, `"client_time": "`...) + b = now.AppendFormat(b, time.RFC3339Nano) + b = append(b, `",`...) + } + if procID != 0 { + b = append(b, `"proc_id": `...) + b = strconv.AppendUint(b, uint64(procID), 10) + b = append(b, ',') + } + if procSequence != 0 { + b = append(b, `"proc_seq": `...) + b = strconv.AppendUint(b, procSequence, 10) + b = append(b, ',') + } + b = bytes.TrimRight(b, ",") + b = append(b, "}, "...) } if l.metricsDelta != nil { @@ -521,8 +580,11 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool, level int) []byte { } func (l *Logger) encode(buf []byte, level int) []byte { + if l.includeProcSequence { + l.procSequence++ + } if buf[0] != '{' { - return l.encodeText(buf, l.skipClientTime, level) // text fast-path + return l.encodeText(buf, l.skipClientTime, l.procID, l.procSequence, level) // text fast-path } now := l.timeNow() @@ -544,10 +606,18 @@ func (l *Logger) encode(buf []byte, level int) []byte { obj["error_has_logtail"] = obj["logtail"] obj["logtail"] = nil } - if !l.skipClientTime { - obj["logtail"] = map[string]string{ - "client_time": now.Format(time.RFC3339Nano), + if !l.skipClientTime || l.procID != 0 || l.procSequence != 0 { + logtail := map[string]any{} + if !l.skipClientTime { + logtail["client_time"] = now.Format(time.RFC3339Nano) + } + if l.procID != 0 { + logtail["proc_id"] = l.procID + } + if l.procSequence != 0 { + logtail["proc_seq"] = l.procSequence } + obj["logtail"] = logtail } if level > 0 { obj["v"] = level @@ -590,8 +660,10 @@ func (l *Logger) Write(buf []byte) (int, error) { l.stderr.Write(withNL) } } + l.writeLock.Lock() b := l.encode(buf, level) _, err := l.send(b) + l.writeLock.Unlock() return len(buf), err } diff --git a/logtail/logtail_test.go b/logtail/logtail_test.go index f5be19eae..7cd306496 100644 --- a/logtail/logtail_test.go +++ b/logtail/logtail_test.go @@ -216,8 +216,10 @@ var sink []byte func TestLoggerEncodeTextAllocs(t *testing.T) { lg := &Logger{timeNow: time.Now} inBuf := []byte("some text to encode") + procID := uint32(0x24d32ee9) + procSequence := uint64(0x12346) err := tstest.MinAllocsPerRun(t, 1, func() { - sink = lg.encodeText(inBuf, false, 0) + sink = lg.encodeText(inBuf, false, procID, procSequence, 0) }) if err != nil { t.Fatal(err) @@ -333,7 +335,7 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any { func TestEncodeTextTruncation(t *testing.T) { lg := &Logger{timeNow: time.Now, lowMem: true} in := bytes.Repeat([]byte("a"), 300) - b := lg.encodeText(in, true, 0) + b := lg.encodeText(in, true, 0, 0, 0) got := string(b) want := `{"text": "` + strings.Repeat("a", 255) + `…+45"}` + "\n" if got != want { @@ -355,38 +357,40 @@ func TestEncode(t *testing.T) { }{ { "normal", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "text": "normal"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "text": "normal"}` + "\n", }, { "and a [v1] level one", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":1,"text": "and a level one"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":1,"text": "and a level one"}` + "\n", }, { "[v2] some verbose two", - `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z"}, "v":2,"text": "some verbose two"}` + "\n", + `{"logtail": {"client_time": "1970-01-01T00:02:03.000000456Z","proc_id": 7,"proc_seq": 1}, "v":2,"text": "some verbose two"}` + "\n", }, { "{}", - `{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", + `{"logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n", }, { `{"foo":"bar"}`, - `{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z"}}` + "\n", + `{"foo":"bar","logtail":{"client_time":"1970-01-01T00:02:03.000000456Z","proc_id":7,"proc_seq":1}}` + "\n", }, { "foo: [v\x00JSON]0{\"foo\":1}", - "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"}}\n", + "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1}}\n", }, { "foo: [v\x00JSON]2{\"foo\":1}", - "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\"},\"v\":2}\n", + "{\"foo\":1,\"logtail\":{\"client_time\":\"1970-01-01T00:02:03.000000456Z\",\"proc_id\":7,\"proc_seq\":1},\"v\":2}\n", }, } for _, tt := range tests { buf := new(simpleMemBuf) lg := &Logger{ - timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, - buffer: buf, + timeNow: func() time.Time { return time.Unix(123, 456).UTC() }, + buffer: buf, + procID: 7, + procSequence: 1, } io.WriteString(lg, tt.in) got := buf.buf.String()