@ -20,6 +20,7 @@ import (
"io"
"io/ioutil"
"log"
"math"
"math/big"
"math/rand"
"os"
@ -27,6 +28,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go4.org/mem"
@ -120,6 +122,7 @@ type Server struct {
multiForwarderCreated expvar . Int
multiForwarderDeleted expvar . Int
removePktForwardOther expvar . Int
avgQueueDuration * uint64 // In milliseconds; accessed atomically
mu sync . Mutex
closed bool
@ -182,6 +185,7 @@ func NewServer(privateKey key.Private, logf logger.Logf) *Server {
memSys0 : ms . Sys ,
watchers : map [ * sclient ] bool { } ,
sentTo : map [ key . Public ] map [ key . Public ] int64 { } ,
avgQueueDuration : new ( uint64 ) ,
}
s . initMetacert ( )
s . packetsRecvDisco = s . packetsRecvByKind . Get ( "disco" )
@ -612,6 +616,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return c . sendPkt ( dst , pkt {
bs : contents ,
enqueuedAt : time . Now ( ) ,
src : srcKey ,
} )
}
@ -666,6 +671,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
p := pkt {
bs : contents ,
enqueuedAt : time . Now ( ) ,
src : c . key ,
}
return c . sendPkt ( dst , p )
@ -696,7 +702,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
}
select {
case <- dst . sendQueue :
case pkt := <- dst . sendQueue :
s . packetsDropped . Add ( 1 )
s . packetsDroppedQueueHead . Add ( 1 )
if verboseDropKeys [ dstKey ] {
@ -705,6 +711,7 @@ func (c *sclient) sendPkt(dst *sclient, p pkt) error {
msg := fmt . Sprintf ( "tail drop %s -> %s" , p . src . ShortString ( ) , dstKey . ShortString ( ) )
c . s . limitedLogf ( msg )
}
c . recordQueueTime ( pkt . enqueuedAt )
if debug {
c . logf ( "dropping packet from client %x queue head" , dstKey )
}
@ -927,11 +934,13 @@ type pkt struct {
// src is the who's the sender of the packet.
src key . Public
// enqueuedAt is when a packet was put onto a queue before it was sent,
// and is used for reporting metrics on the duration of packets in the queue.
enqueuedAt time . Time
// bs is the data packet bytes.
// The memory is owned by pkt.
bs [ ] byte
// TODO(danderson): enqueue time, to measure queue latency?
}
func ( c * sclient ) setPreferred ( v bool ) {
@ -959,6 +968,25 @@ func (c *sclient) setPreferred(v bool) {
}
}
// expMovingAverage returns the new moving average given the previous average,
// a new value, and an alpha decay factor.
// https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func expMovingAverage ( prev , newValue , alpha float64 ) float64 {
return alpha * newValue + ( 1 - alpha ) * prev
}
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func ( c * sclient ) recordQueueTime ( enqueuedAt time . Time ) {
elapsed := float64 ( time . Since ( enqueuedAt ) . Milliseconds ( ) )
for {
old := atomic . LoadUint64 ( c . s . avgQueueDuration )
newAvg := expMovingAverage ( math . Float64frombits ( old ) , elapsed , 0.1 )
if atomic . CompareAndSwapUint64 ( c . s . avgQueueDuration , old , math . Float64bits ( newAvg ) ) {
break
}
}
}
func ( c * sclient ) sendLoop ( ctx context . Context ) error {
defer func ( ) {
// If the sender shuts down unilaterally due to an error, close so
@ -1002,6 +1030,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
continue
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
c . recordQueueTime ( msg . enqueuedAt )
continue
case <- keepAliveTick . C :
werr = c . sendKeepAlive ( )
@ -1025,6 +1054,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
continue
case msg := <- c . sendQueue :
werr = c . sendPacket ( msg . src , msg . bs )
c . recordQueueTime ( msg . enqueuedAt )
case <- keepAliveTick . C :
werr = c . sendKeepAlive ( )
}
@ -1290,6 +1320,9 @@ func (s *Server) ExpVar() expvar.Var {
m . Set ( "multiforwarder_created" , & s . multiForwarderCreated )
m . Set ( "multiforwarder_deleted" , & s . multiForwarderDeleted )
m . Set ( "packet_forwarder_delete_other_value" , & s . removePktForwardOther )
m . Set ( "average_queue_duration_ms" , expvar . Func ( func ( ) interface { } {
return math . Float64frombits ( atomic . LoadUint64 ( s . avgQueueDuration ) )
} ) )
var expvarVersion expvar . String
expvarVersion . Set ( version . Long )
m . Set ( "version" , & expvarVersion )