|
|
|
@ -22,12 +22,28 @@ import (
|
|
|
|
|
var (
|
|
|
|
|
mu sync.Mutex // guards vars in this block
|
|
|
|
|
metrics = map[string]*Metric{}
|
|
|
|
|
numWireID int // how many wireIDs have been allocated
|
|
|
|
|
lastDelta time.Time // time of last call to EncodeLogTailMetricsDelta
|
|
|
|
|
sortedDirty bool // whether sorted needs to be rebuilt
|
|
|
|
|
sorted []*Metric // by name
|
|
|
|
|
numWireID int // how many wireIDs have been allocated
|
|
|
|
|
lastDelta time.Time // time of last call to EncodeLogTailMetricsDelta
|
|
|
|
|
sortedDirty bool // whether sorted needs to be rebuilt
|
|
|
|
|
sorted []*Metric // by name
|
|
|
|
|
lastLogVal []scanEntry // by Metric.regIdx
|
|
|
|
|
unsorted []*Metric // by Metric.regIdx
|
|
|
|
|
|
|
|
|
|
// valFreeList is a set of free contiguous int64s whose
|
|
|
|
|
// element addresses get assigned to Metric.v.
|
|
|
|
|
// Any memory address in len(valFreeList) is free for use.
|
|
|
|
|
// They're contiguous to reduce cache churn during diff scans.
|
|
|
|
|
// When out of length, a new backing array is made.
|
|
|
|
|
valFreeList []int64
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// scanEntry contains the minimal data needed for quickly scanning
|
|
|
|
|
// memory for changed values. It's small to reduce memory pressure.
|
|
|
|
|
type scanEntry struct {
|
|
|
|
|
v *int64 // Metric.v
|
|
|
|
|
lastLogged int64 // last logged value
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Type is a metric type: counter or gauge.
|
|
|
|
|
type Type uint8
|
|
|
|
|
|
|
|
|
@ -40,32 +56,40 @@ const (
|
|
|
|
|
//
|
|
|
|
|
// It's safe for concurrent use.
|
|
|
|
|
type Metric struct {
|
|
|
|
|
v int64 // atomic; the metric value
|
|
|
|
|
name string
|
|
|
|
|
typ Type
|
|
|
|
|
|
|
|
|
|
// Owned by package-level 'mu'.
|
|
|
|
|
wireID int // zero until named
|
|
|
|
|
lastNamed time.Time
|
|
|
|
|
lastLogVal int64
|
|
|
|
|
v *int64 // atomic; the metric value
|
|
|
|
|
regIdx int // index into lastLogVal and unsorted
|
|
|
|
|
name string
|
|
|
|
|
typ Type
|
|
|
|
|
|
|
|
|
|
// The following fields are owned by the package-level 'mu':
|
|
|
|
|
|
|
|
|
|
// wireID is the lazily-allocated "wire ID". Until a metric is encoded
|
|
|
|
|
// in the logs (by EncodeLogTailMetricsDelta), it has no wireID. This
|
|
|
|
|
// ensures that unused metrics don't waste valuable low numbers, which
|
|
|
|
|
// encode with varints with fewer bytes.
|
|
|
|
|
wireID int
|
|
|
|
|
|
|
|
|
|
// lastNamed is the last time the name of this metric was
|
|
|
|
|
// written on the wire.
|
|
|
|
|
lastNamed time.Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *Metric) Name() string { return m.name }
|
|
|
|
|
func (m *Metric) Value() int64 { return atomic.LoadInt64(&m.v) }
|
|
|
|
|
func (m *Metric) Value() int64 { return atomic.LoadInt64(m.v) }
|
|
|
|
|
func (m *Metric) Type() Type { return m.typ }
|
|
|
|
|
|
|
|
|
|
// Add increments m's value by n.
|
|
|
|
|
//
|
|
|
|
|
// If m is of type counter, n should not be negative.
|
|
|
|
|
func (m *Metric) Add(n int64) {
|
|
|
|
|
atomic.AddInt64(&m.v, n)
|
|
|
|
|
atomic.AddInt64(m.v, n)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Set sets m's value to v.
|
|
|
|
|
//
|
|
|
|
|
// If m is of type counter, Set should not be used.
|
|
|
|
|
func (m *Metric) Set(v int64) {
|
|
|
|
|
atomic.StoreInt64(&m.v, v)
|
|
|
|
|
atomic.StoreInt64(m.v, v)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Publish registers a metric in the global map.
|
|
|
|
@ -81,6 +105,16 @@ func (m *Metric) Publish() {
|
|
|
|
|
}
|
|
|
|
|
metrics[m.name] = m
|
|
|
|
|
sortedDirty = true
|
|
|
|
|
|
|
|
|
|
if len(valFreeList) == 0 {
|
|
|
|
|
valFreeList = make([]int64, 256)
|
|
|
|
|
}
|
|
|
|
|
m.v = &valFreeList[0]
|
|
|
|
|
valFreeList = valFreeList[1:]
|
|
|
|
|
|
|
|
|
|
m.regIdx = len(unsorted)
|
|
|
|
|
unsorted = append(unsorted, m)
|
|
|
|
|
lastLogVal = append(lastLogVal, scanEntry{v: m.v})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Metrics returns the sorted list of metrics.
|
|
|
|
@ -190,17 +224,18 @@ func EncodeLogTailMetricsDelta() string {
|
|
|
|
|
lastDelta = now
|
|
|
|
|
|
|
|
|
|
var enc *deltaEncBuf // lazy
|
|
|
|
|
for _, m := range metrics {
|
|
|
|
|
val := m.Value()
|
|
|
|
|
delta := val - m.lastLogVal
|
|
|
|
|
for i, ent := range lastLogVal {
|
|
|
|
|
val := atomic.LoadInt64(ent.v)
|
|
|
|
|
delta := val - ent.lastLogged
|
|
|
|
|
if delta == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
lastLogVal[i].lastLogged = val
|
|
|
|
|
m := unsorted[i]
|
|
|
|
|
if enc == nil {
|
|
|
|
|
enc = deltaPool.Get().(*deltaEncBuf)
|
|
|
|
|
enc.buf.Reset()
|
|
|
|
|
}
|
|
|
|
|
m.lastLogVal = val
|
|
|
|
|
if m.wireID == 0 {
|
|
|
|
|
numWireID++
|
|
|
|
|
m.wireID = numWireID
|
|
|
|
|