From 3b541c833edb7c45eea1b7b2e5b716d33baf13c3 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 15 Nov 2021 20:52:43 -0800 Subject: [PATCH] util/clientmetric, logtail: log metric changes Updates #3307 Change-Id: I1399ebd786f6ff7defe6e11c0eb651144c071574 Signed-off-by: Brad Fitzpatrick --- cmd/tailscaled/depaware.txt | 2 +- cmd/tailscaled/tailscaled.go | 3 +- logpolicy/logpolicy.go | 4 + logtail/logtail.go | 22 +++ .../tailscaled_deps_test_darwin.go | 1 + .../tailscaled_deps_test_freebsd.go | 1 + .../integration/tailscaled_deps_test_linux.go | 1 + .../tailscaled_deps_test_openbsd.go | 1 + .../tailscaled_deps_test_windows.go | 1 + util/clientmetric/clientmetric.go | 156 ++++++++++++++++-- 10 files changed, 180 insertions(+), 12 deletions(-) diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 4f5827f3b..b4dfe9608 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -175,7 +175,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/log/filelogger from tailscale.com/ipn/ipnserver tailscale.com/log/logheap from tailscale.com/control/controlclient tailscale.com/logpolicy from tailscale.com/cmd/tailscaled - tailscale.com/logtail from tailscale.com/logpolicy + tailscale.com/logtail from tailscale.com/logpolicy+ tailscale.com/logtail/backoff from tailscale.com/cmd/tailscaled+ tailscale.com/logtail/filch from tailscale.com/logpolicy 💣 tailscale.com/metrics from tailscale.com/derp diff --git a/cmd/tailscaled/tailscaled.go b/cmd/tailscaled/tailscaled.go index f12d5eb08..3fd15b91c 100644 --- a/cmd/tailscaled/tailscaled.go +++ b/cmd/tailscaled/tailscaled.go @@ -31,6 +31,7 @@ import ( "tailscale.com/ipn" "tailscale.com/ipn/ipnserver" "tailscale.com/logpolicy" + "tailscale.com/logtail" "tailscale.com/net/dns" "tailscale.com/net/netns" "tailscale.com/net/socks5/tssocks" @@ -248,7 +249,7 @@ func ipnServerOpts() (o ipnserver.Options) { func run() error { var err error - pol := logpolicy.New("tailnode.log.tailscale.io") + pol := logpolicy.New(logtail.CollectionNode) pol.SetVerbosityLevel(args.verbose) defer func() { // Finish uploading logs after closing everything else. diff --git a/logpolicy/logpolicy.go b/logpolicy/logpolicy.go index 3ec45b7df..ee3bf70dc 100644 --- a/logpolicy/logpolicy.go +++ b/logpolicy/logpolicy.go @@ -38,6 +38,7 @@ import ( "tailscale.com/paths" "tailscale.com/smallzstd" "tailscale.com/types/logger" + "tailscale.com/util/clientmetric" "tailscale.com/util/racebuild" "tailscale.com/util/winutil" "tailscale.com/version" @@ -500,6 +501,9 @@ func New(collection string) *Policy { }, HTTPC: &http.Client{Transport: newLogtailTransport(logtail.DefaultHost)}, } + if collection == logtail.CollectionNode { + c.MetricsDelta = clientmetric.EncodeLogTailMetricsDelta + } if val := getLogTarget(); val != "" { log.Println("You have enabled a non-default log target. Doing without being told to by Tailscale staff or your network administrator will make getting support difficult.") diff --git a/logtail/logtail.go b/logtail/logtail.go index 9763452bb..1506cb948 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -28,6 +28,12 @@ import ( // Config.BaseURL isn't provided. const DefaultHost = "log.tailscale.io" +const ( + // CollectionNode is the name of a logtail Config.Collection + // for tailscaled (or equivalent: IPNExtension, Android app). + CollectionNode = "tailnode.log.tailscale.io" +) + type Encoder interface { EncodeAll(src, dst []byte) []byte Close() error @@ -46,6 +52,12 @@ type Config struct { Buffer Buffer // temp storage, if nil a MemoryBuffer NewZstdEncoder func() Encoder // if set, used to compress logs for transmission + // MetricsDelta, if non-nil, is a func that returns an encoding + // delta in clientmetrics to upload alongside existing logs. + // It can return either an empty string (for nothing) or a string + // that's safe to embed in a JSON string literal without further escaping. + MetricsDelta func() string + // DrainLogs, if non-nil, disables automatic uploading of new logs, // so that logs are only uploaded when a token is sent to DrainLogs. DrainLogs <-chan struct{} @@ -84,6 +96,7 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { drainLogs: cfg.DrainLogs, timeNow: cfg.TimeNow, bo: backoff.NewBackoff("logtail", logf, 30*time.Second), + metricsDelta: cfg.MetricsDelta, shutdownStart: make(chan struct{}), shutdownDone: make(chan struct{}), @@ -119,6 +132,7 @@ type Logger struct { zstdEncoder Encoder uploadCancel func() explainedRaw bool + metricsDelta func() string // or nil shutdownStart chan struct{} // closed when shutdown begins shutdownDone chan struct{} // closed when shutdown complete @@ -426,6 +440,14 @@ func (l *Logger) encodeText(buf []byte, skipClientTime bool) []byte { b = append(b, "\"}, "...) } + if l.metricsDelta != nil { + if d := l.metricsDelta(); d != "" { + b = append(b, `"metrics": "`...) + b = append(b, d...) + b = append(b, `",`...) + } + } + b = append(b, "\"text\": \""...) for i, c := range buf { switch c { diff --git a/tstest/integration/tailscaled_deps_test_darwin.go b/tstest/integration/tailscaled_deps_test_darwin.go index 3ce3b04c2..651e68303 100644 --- a/tstest/integration/tailscaled_deps_test_darwin.go +++ b/tstest/integration/tailscaled_deps_test_darwin.go @@ -17,6 +17,7 @@ import ( _ "tailscale.com/ipn" _ "tailscale.com/ipn/ipnserver" _ "tailscale.com/logpolicy" + _ "tailscale.com/logtail" _ "tailscale.com/net/dns" _ "tailscale.com/net/interfaces" _ "tailscale.com/net/netns" diff --git a/tstest/integration/tailscaled_deps_test_freebsd.go b/tstest/integration/tailscaled_deps_test_freebsd.go index 3ce3b04c2..651e68303 100644 --- a/tstest/integration/tailscaled_deps_test_freebsd.go +++ b/tstest/integration/tailscaled_deps_test_freebsd.go @@ -17,6 +17,7 @@ import ( _ "tailscale.com/ipn" _ "tailscale.com/ipn/ipnserver" _ "tailscale.com/logpolicy" + _ "tailscale.com/logtail" _ "tailscale.com/net/dns" _ "tailscale.com/net/interfaces" _ "tailscale.com/net/netns" diff --git a/tstest/integration/tailscaled_deps_test_linux.go b/tstest/integration/tailscaled_deps_test_linux.go index 3ce3b04c2..651e68303 100644 --- a/tstest/integration/tailscaled_deps_test_linux.go +++ b/tstest/integration/tailscaled_deps_test_linux.go @@ -17,6 +17,7 @@ import ( _ "tailscale.com/ipn" _ "tailscale.com/ipn/ipnserver" _ "tailscale.com/logpolicy" + _ "tailscale.com/logtail" _ "tailscale.com/net/dns" _ "tailscale.com/net/interfaces" _ "tailscale.com/net/netns" diff --git a/tstest/integration/tailscaled_deps_test_openbsd.go b/tstest/integration/tailscaled_deps_test_openbsd.go index 3ce3b04c2..651e68303 100644 --- a/tstest/integration/tailscaled_deps_test_openbsd.go +++ b/tstest/integration/tailscaled_deps_test_openbsd.go @@ -17,6 +17,7 @@ import ( _ "tailscale.com/ipn" _ "tailscale.com/ipn/ipnserver" _ "tailscale.com/logpolicy" + _ "tailscale.com/logtail" _ "tailscale.com/net/dns" _ "tailscale.com/net/interfaces" _ "tailscale.com/net/netns" diff --git a/tstest/integration/tailscaled_deps_test_windows.go b/tstest/integration/tailscaled_deps_test_windows.go index 86624be07..1b9aed8ed 100644 --- a/tstest/integration/tailscaled_deps_test_windows.go +++ b/tstest/integration/tailscaled_deps_test_windows.go @@ -20,6 +20,7 @@ import ( _ "tailscale.com/ipn" _ "tailscale.com/ipn/ipnserver" _ "tailscale.com/logpolicy" + _ "tailscale.com/logtail" _ "tailscale.com/logtail/backoff" _ "tailscale.com/net/dns" _ "tailscale.com/net/interfaces" diff --git a/util/clientmetric/clientmetric.go b/util/clientmetric/clientmetric.go index ff5d4f9e4..69afb53c9 100644 --- a/util/clientmetric/clientmetric.go +++ b/util/clientmetric/clientmetric.go @@ -7,18 +7,25 @@ package clientmetric import ( + "bytes" + "encoding/binary" + "encoding/hex" "fmt" "io" "sort" + "strings" "sync" "sync/atomic" + "time" ) var ( - mu sync.Mutex + mu sync.Mutex // guards vars in this block metrics = map[string]*Metric{} - sortedDirty bool - sorted []*Metric + numWireID int // how many wireIDs have been allocated + lastDelta time.Time // time of last call to EncodeLogTailMetricsDelta + sortedDirty bool // whether sorted needs to be rebuilt + sorted []*Metric // by name ) // Type is a metric type: counter or gauge. @@ -35,11 +42,12 @@ const ( type Metric struct { v int64 // atomic; the metric value name string + typ Type - lastLogv int64 // v atomic, epoch seconds - lastLog int64 // atomic, epoch seconds - logSec int // log every N seconds max - typ Type + // Owned by package-level 'mu'. + wireID int // zero until named + lastNamed time.Time + lastLogVal int64 } func (m *Metric) Name() string { return m.name } @@ -97,13 +105,22 @@ func Metrics() []*Metric { // NewUnpublished initializes a new Metric without calling Publish on // it. func NewUnpublished(name string, typ Type) *Metric { + if i := strings.IndexFunc(name, isIllegalMetricRune); name == "" || i != -1 { + panic(fmt.Sprintf("illegal metric name %q (index %v)", name, i)) + } return &Metric{ - name: name, - typ: typ, - logSec: 10, + name: name, + typ: typ, } } +func isIllegalMetricRune(r rune) bool { + return !(r >= 'a' && r <= 'z' || + r >= 'A' && r <= 'Z' || + r >= '0' && r <= '9' || + r == '_') +} + // NewCounter returns a new metric that can only increment. func NewCounter(name string) *Metric { m := NewUnpublished(name, TypeCounter) @@ -133,3 +150,122 @@ func WritePrometheusExpositionFormat(w io.Writer) { fmt.Fprintf(w, "%s %v\n", m.Name(), m.Value()) } } + +const ( + // metricLogNameFrequency is how often a metric's name=>id + // mapping is redundantly put in the logs. In other words, + // this is how how far in the logs you need to fetch from a + // given point in time to recompute the metrics at that point + // in time. + metricLogNameFrequency = 4 * time.Hour + + // minMetricEncodeInterval is the minimum interval that the + // metrics will be scanned for changes before being encoded + // for logtail. + minMetricEncodeInterval = 15 * time.Second +) + +// EncodeLogTailMetricsDelta return an encoded string representing the metrics +// differences since the previous call. +// +// It implements the requirements of a logtail.Config.MetricsDelta +// func. Notably, its output is safe to embed in a JSON string literal +// without further escaping. +// +// The current encoding is: +// * name immediately following metric: +// 'N' + hex(varint(len(name))) + name +// * set value of a metric: +// 'S' + hex(varint(wireid)) + hex(varint(value)) +// * increment a metric: (decrements if negative) +// 'I' + hex(varint(wireid)) + hex(varint(value)) +func EncodeLogTailMetricsDelta() string { + mu.Lock() + defer mu.Unlock() + + now := time.Now() + if !lastDelta.IsZero() && now.Sub(lastDelta) < minMetricEncodeInterval { + return "" + } + lastDelta = now + + var enc *deltaEncBuf // lazy + for _, m := range metrics { + val := m.Value() + delta := val - m.lastLogVal + if delta == 0 { + continue + } + if enc == nil { + enc = deltaPool.Get().(*deltaEncBuf) + enc.buf.Reset() + } + m.lastLogVal = val + if m.wireID == 0 { + numWireID++ + m.wireID = numWireID + } + if m.lastNamed.IsZero() || now.Sub(m.lastNamed) > metricLogNameFrequency { + enc.writeName(m.Name()) + m.lastNamed = now + enc.writeValue(m.wireID, val) + } else { + enc.writeDelta(m.wireID, delta) + } + } + if enc == nil { + return "" + } + defer deltaPool.Put(enc) + return enc.buf.String() +} + +var deltaPool = &sync.Pool{ + New: func() interface{} { + return new(deltaEncBuf) + }, +} + +// deltaEncBuf encodes metrics per the format described +// on EncodeLogTailMetricsDelta above. +type deltaEncBuf struct { + buf bytes.Buffer + scratch [binary.MaxVarintLen64]byte +} + +// writeName writes a "name" (N) record to the buffer, which notes +// that the immediately following record's wireID has the provided +// name. +func (b *deltaEncBuf) writeName(name string) { + b.buf.WriteByte('N') + b.writeHexVarint(int64(len(name))) + b.buf.WriteString(name) +} + +// writeDelta writes a "set" (S) record to the buffer, noting that the +// metric with the given wireID now has value v. +func (b *deltaEncBuf) writeValue(wireID int, v int64) { + b.buf.WriteByte('S') + b.writeHexVarint(int64(wireID)) + b.writeHexVarint(v) +} + +// writeDelta writes an "increment" (I) delta value record to the +// buffer, noting that the metric with the given wireID now has a +// value that's v larger (or smaller if v is negative). +func (b *deltaEncBuf) writeDelta(wireID int, v int64) { + b.buf.WriteByte('I') + b.writeHexVarint(int64(wireID)) + b.writeHexVarint(v) +} + +// writeHexVarint writes v to the buffer as a hex-encoded varint. +func (b *deltaEncBuf) writeHexVarint(v int64) { + n := binary.PutVarint(b.scratch[:], v) + hexLen := n * 2 + oldLen := b.buf.Len() + b.buf.Grow(hexLen) + hexBuf := b.buf.Bytes()[oldLen : oldLen+hexLen] + hex.Encode(hexBuf, b.scratch[:n]) + b.buf.Write(hexBuf) +}