@ -7,18 +7,25 @@
package clientmetric
package clientmetric
import (
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"fmt"
"io"
"io"
"sort"
"sort"
"strings"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
"time"
)
)
var (
var (
mu sync . Mutex
mu sync . Mutex // guards vars in this block
metrics = map [ string ] * Metric { }
metrics = map [ string ] * Metric { }
sortedDirty bool
numWireID int // how many wireIDs have been allocated
sorted [ ] * Metric
lastDelta time . Time // time of last call to EncodeLogTailMetricsDelta
sortedDirty bool // whether sorted needs to be rebuilt
sorted [ ] * Metric // by name
)
)
// Type is a metric type: counter or gauge.
// Type is a metric type: counter or gauge.
@ -35,11 +42,12 @@ const (
type Metric struct {
type Metric struct {
v int64 // atomic; the metric value
v int64 // atomic; the metric value
name string
name string
typ Type
lastLogv int64 // v atomic, epoch seconds
// Owned by package-level 'mu'.
lastLog int64 // atomic, epoch seconds
wireID int // zero until named
l ogSec int // log every N seconds max
l astNamed time . Time
typ Type
lastLogVal int64
}
}
func ( m * Metric ) Name ( ) string { return m . name }
func ( m * Metric ) Name ( ) string { return m . name }
@ -97,13 +105,22 @@ func Metrics() []*Metric {
// NewUnpublished initializes a new Metric without calling Publish on
// NewUnpublished initializes a new Metric without calling Publish on
// it.
// it.
func NewUnpublished ( name string , typ Type ) * Metric {
func NewUnpublished ( name string , typ Type ) * Metric {
if i := strings . IndexFunc ( name , isIllegalMetricRune ) ; name == "" || i != - 1 {
panic ( fmt . Sprintf ( "illegal metric name %q (index %v)" , name , i ) )
}
return & Metric {
return & Metric {
name : name ,
name : name ,
typ : typ ,
typ : typ ,
logSec : 10 ,
}
}
}
}
func isIllegalMetricRune ( r rune ) bool {
return ! ( r >= 'a' && r <= 'z' ||
r >= 'A' && r <= 'Z' ||
r >= '0' && r <= '9' ||
r == '_' )
}
// NewCounter returns a new metric that can only increment.
// NewCounter returns a new metric that can only increment.
func NewCounter ( name string ) * Metric {
func NewCounter ( name string ) * Metric {
m := NewUnpublished ( name , TypeCounter )
m := NewUnpublished ( name , TypeCounter )
@ -133,3 +150,122 @@ func WritePrometheusExpositionFormat(w io.Writer) {
fmt . Fprintf ( w , "%s %v\n" , m . Name ( ) , m . Value ( ) )
fmt . Fprintf ( w , "%s %v\n" , m . Name ( ) , m . Value ( ) )
}
}
}
}
const (
// metricLogNameFrequency is how often a metric's name=>id
// mapping is redundantly put in the logs. In other words,
// this is how how far in the logs you need to fetch from a
// given point in time to recompute the metrics at that point
// in time.
metricLogNameFrequency = 4 * time . Hour
// minMetricEncodeInterval is the minimum interval that the
// metrics will be scanned for changes before being encoded
// for logtail.
minMetricEncodeInterval = 15 * time . Second
)
// EncodeLogTailMetricsDelta return an encoded string representing the metrics
// differences since the previous call.
//
// It implements the requirements of a logtail.Config.MetricsDelta
// func. Notably, its output is safe to embed in a JSON string literal
// without further escaping.
//
// The current encoding is:
// * name immediately following metric:
// 'N' + hex(varint(len(name))) + name
// * set value of a metric:
// 'S' + hex(varint(wireid)) + hex(varint(value))
// * increment a metric: (decrements if negative)
// 'I' + hex(varint(wireid)) + hex(varint(value))
func EncodeLogTailMetricsDelta ( ) string {
mu . Lock ( )
defer mu . Unlock ( )
now := time . Now ( )
if ! lastDelta . IsZero ( ) && now . Sub ( lastDelta ) < minMetricEncodeInterval {
return ""
}
lastDelta = now
var enc * deltaEncBuf // lazy
for _ , m := range metrics {
val := m . Value ( )
delta := val - m . lastLogVal
if delta == 0 {
continue
}
if enc == nil {
enc = deltaPool . Get ( ) . ( * deltaEncBuf )
enc . buf . Reset ( )
}
m . lastLogVal = val
if m . wireID == 0 {
numWireID ++
m . wireID = numWireID
}
if m . lastNamed . IsZero ( ) || now . Sub ( m . lastNamed ) > metricLogNameFrequency {
enc . writeName ( m . Name ( ) )
m . lastNamed = now
enc . writeValue ( m . wireID , val )
} else {
enc . writeDelta ( m . wireID , delta )
}
}
if enc == nil {
return ""
}
defer deltaPool . Put ( enc )
return enc . buf . String ( )
}
var deltaPool = & sync . Pool {
New : func ( ) interface { } {
return new ( deltaEncBuf )
} ,
}
// deltaEncBuf encodes metrics per the format described
// on EncodeLogTailMetricsDelta above.
type deltaEncBuf struct {
buf bytes . Buffer
scratch [ binary . MaxVarintLen64 ] byte
}
// writeName writes a "name" (N) record to the buffer, which notes
// that the immediately following record's wireID has the provided
// name.
func ( b * deltaEncBuf ) writeName ( name string ) {
b . buf . WriteByte ( 'N' )
b . writeHexVarint ( int64 ( len ( name ) ) )
b . buf . WriteString ( name )
}
// writeDelta writes a "set" (S) record to the buffer, noting that the
// metric with the given wireID now has value v.
func ( b * deltaEncBuf ) writeValue ( wireID int , v int64 ) {
b . buf . WriteByte ( 'S' )
b . writeHexVarint ( int64 ( wireID ) )
b . writeHexVarint ( v )
}
// writeDelta writes an "increment" (I) delta value record to the
// buffer, noting that the metric with the given wireID now has a
// value that's v larger (or smaller if v is negative).
func ( b * deltaEncBuf ) writeDelta ( wireID int , v int64 ) {
b . buf . WriteByte ( 'I' )
b . writeHexVarint ( int64 ( wireID ) )
b . writeHexVarint ( v )
}
// writeHexVarint writes v to the buffer as a hex-encoded varint.
func ( b * deltaEncBuf ) writeHexVarint ( v int64 ) {
n := binary . PutVarint ( b . scratch [ : ] , v )
hexLen := n * 2
oldLen := b . buf . Len ( )
b . buf . Grow ( hexLen )
hexBuf := b . buf . Bytes ( ) [ oldLen : oldLen + hexLen ]
hex . Encode ( hexBuf , b . scratch [ : n ] )
b . buf . Write ( hexBuf )
}