diff --git a/derp/derp_server.go b/derp/derp_server.go index 104dc11bf..3885bbb93 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -20,6 +20,7 @@ import ( "io" "io/ioutil" "log" + "math" "math/big" "math/rand" "os" @@ -27,6 +28,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "go4.org/mem" @@ -120,6 +122,7 @@ type Server struct { multiForwarderCreated expvar.Int multiForwarderDeleted expvar.Int removePktForwardOther expvar.Int + avgQueueDuration *uint64 // In milliseconds; accessed atomically mu sync.Mutex closed bool @@ -182,6 +185,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server { memSys0: ms.Sys, watchers: map[*sclient]bool{}, sentTo: map[key.Public]map[key.Public]int64{}, + avgQueueDuration: new(uint64), } s.initMetacert() s.packetsRecvDisco = s.packetsRecvByKind.Get("disco") @@ -611,8 +615,9 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } return c.sendPkt(dst, pkt{ - bs: contents, - src: srcKey, + bs: contents, + enqueuedAt: time.Now(), + src: srcKey, }) } @@ -665,8 +670,9 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { } p := pkt{ - bs: contents, - src: c.key, + bs: contents, + enqueuedAt: time.Now(), + src: c.key, } return c.sendPkt(dst, p) } @@ -696,7 +702,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { } select { - case <-dst.sendQueue: + case pkt := <-dst.sendQueue: s.packetsDropped.Add(1) s.packetsDroppedQueueHead.Add(1) if verboseDropKeys[dstKey] { @@ -705,6 +711,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error { msg := fmt.Sprintf("tail drop %s -> %s", p.src.ShortString(), dstKey.ShortString()) c.s.limitedLogf(msg) } + c.recordQueueTime(pkt.enqueuedAt) if debug { c.logf("dropping packet from client %x queue head", dstKey) } @@ -927,11 +934,13 @@ type pkt struct { // src is the who's the sender of the packet. src key.Public + // enqueuedAt is when a packet was put onto a queue before it was sent, + // and is used for reporting metrics on the duration of packets in the queue. + enqueuedAt time.Time + // bs is the data packet bytes. // The memory is owned by pkt. bs []byte - - // TODO(danderson): enqueue time, to measure queue latency? } func (c *sclient) setPreferred(v bool) { @@ -959,6 +968,25 @@ func (c *sclient) setPreferred(v bool) { } } +// expMovingAverage returns the new moving average given the previous average, +// a new value, and an alpha decay factor. +// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average +func expMovingAverage(prev, newValue, alpha float64) float64 { + return alpha*newValue + (1-alpha)*prev +} + +// recordQueueTime updates the average queue duration metric after a packet has been sent. +func (c *sclient) recordQueueTime(enqueuedAt time.Time) { + elapsed := float64(time.Since(enqueuedAt).Milliseconds()) + for { + old := atomic.LoadUint64(c.s.avgQueueDuration) + newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1) + if atomic.CompareAndSwapUint64(c.s.avgQueueDuration, old, math.Float64bits(newAvg)) { + break + } + } +} + func (c *sclient) sendLoop(ctx context.Context) error { defer func() { // If the sender shuts down unilaterally due to an error, close so @@ -1002,6 +1030,7 @@ func (c *sclient) sendLoop(ctx context.Context) error { continue case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) + c.recordQueueTime(msg.enqueuedAt) continue case <-keepAliveTick.C: werr = c.sendKeepAlive() @@ -1025,6 +1054,7 @@ func (c *sclient) sendLoop(ctx context.Context) error { continue case msg := <-c.sendQueue: werr = c.sendPacket(msg.src, msg.bs) + c.recordQueueTime(msg.enqueuedAt) case <-keepAliveTick.C: werr = c.sendKeepAlive() } @@ -1290,6 +1320,9 @@ func (s *Server) ExpVar() expvar.Var { m.Set("multiforwarder_created", &s.multiForwarderCreated) m.Set("multiforwarder_deleted", &s.multiForwarderDeleted) m.Set("packet_forwarder_delete_other_value", &s.removePktForwardOther) + m.Set("average_queue_duration_ms", expvar.Func(func() interface{} { + return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) + })) var expvarVersion expvar.String expvarVersion.Set(version.Long) m.Set("version", &expvarVersion)