derp: use tstime (#8634)

Updates #8587

Signed-off-by: Claire Wang <claire@tailscale.com>
pull/8740/head
Claire Wang 1 year ago committed by GitHub
parent 2315bf246a
commit 90a7d3066c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -115,6 +115,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa
tailscale.com/tailcfg from tailscale.com/client/tailscale+
tailscale.com/tka from tailscale.com/client/tailscale+
W tailscale.com/tsconst from tailscale.com/net/interfaces
tailscale.com/tstime from tailscale.com/derp+
💣 tailscale.com/tstime/mono from tailscale.com/tstime/rate
tailscale.com/tstime/rate from tailscale.com/wgengine/filter+
tailscale.com/tsweb from tailscale.com/cmd/derper

@ -108,6 +108,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/tailcfg from tailscale.com/cmd/tailscale/cli+
tailscale.com/tka from tailscale.com/client/tailscale+
W tailscale.com/tsconst from tailscale.com/net/interfaces
tailscale.com/tstime from tailscale.com/derp+
💣 tailscale.com/tstime/mono from tailscale.com/tstime/rate
tailscale.com/tstime/rate from tailscale.com/wgengine/filter+
tailscale.com/types/dnstype from tailscale.com/tailcfg

@ -17,6 +17,7 @@ import (
"go4.org/mem"
"golang.org/x/time/rate"
"tailscale.com/syncs"
"tailscale.com/tstime"
"tailscale.com/types/key"
"tailscale.com/types/logger"
)
@ -40,6 +41,8 @@ type Client struct {
// Owned by Recv:
peeked int // bytes to discard on next Recv
readErr syncs.AtomicValue[error] // sticky (set by Recv)
clock tstime.Clock
}
// ClientOpt is an option passed to NewClient.
@ -103,6 +106,7 @@ func newClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf
meshKey: opt.MeshKey,
canAckPings: opt.CanAckPings,
isProber: opt.IsProber,
clock: tstime.StdClock{},
}
if opt.ServerPub.IsZero() {
if err := c.recvServerKey(); err != nil {
@ -214,7 +218,7 @@ func (c *Client) send(dstKey key.NodePublic, pkt []byte) (ret error) {
defer c.wmu.Unlock()
if c.rate != nil {
pktLen := frameHeaderLen + key.NodePublicRawLen + len(pkt)
if !c.rate.AllowN(time.Now(), pktLen) {
if !c.rate.AllowN(c.clock.Now(), pktLen) {
return nil // drop
}
}
@ -244,7 +248,7 @@ func (c *Client) ForwardPacket(srcKey, dstKey key.NodePublic, pkt []byte) (err e
c.wmu.Lock()
defer c.wmu.Unlock()
timer := time.AfterFunc(5*time.Second, c.writeTimeoutFired)
timer := c.clock.AfterFunc(5*time.Second, c.writeTimeoutFired)
defer timer.Stop()
if err := writeFrameHeader(c.bw, frameForwardPacket, uint32(keyLen*2+len(pkt))); err != nil {
@ -457,7 +461,6 @@ func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err erro
c.readErr.Store(err)
}
}()
for {
c.nc.SetReadDeadline(time.Now().Add(timeout))

@ -39,6 +39,7 @@ import (
"tailscale.com/envknob"
"tailscale.com/metrics"
"tailscale.com/syncs"
"tailscale.com/tstime"
"tailscale.com/tstime/rate"
"tailscale.com/types/key"
"tailscale.com/types/logger"
@ -164,6 +165,8 @@ type Server struct {
// maps from netip.AddrPort to a client's public key
keyOfAddr map[netip.AddrPort]key.NodePublic
clock tstime.Clock
}
// clientSet represents 1 or more *sclients.
@ -318,6 +321,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{},
}
s.initMetacert()
s.packetsRecvDisco = s.packetsRecvByKind.Get("disco")
@ -467,8 +471,8 @@ func (s *Server) initMetacert() {
CommonName: fmt.Sprintf("derpkey%s", s.publicKey.UntypedHexString()),
},
// Windows requires NotAfter and NotBefore set:
NotAfter: time.Now().Add(30 * 24 * time.Hour),
NotBefore: time.Now().Add(-30 * 24 * time.Hour),
NotAfter: s.clock.Now().Add(30 * 24 * time.Hour),
NotBefore: s.clock.Now().Add(-30 * 24 * time.Hour),
// Per https://github.com/golang/go/issues/51759#issuecomment-1071147836,
// macOS requires BasicConstraints when subject == issuer:
BasicConstraintsValid: true,
@ -697,7 +701,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
done: ctx.Done(),
remoteAddr: remoteAddr,
remoteIPPort: remoteIPPort,
connectedAt: time.Now(),
connectedAt: s.clock.Now(),
sendQueue: make(chan pkt, perClientSendQueueDepth),
discoSendQueue: make(chan pkt, perClientSendQueueDepth),
sendPongCh: make(chan [8]byte, 1),
@ -927,7 +931,7 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
return c.sendPkt(dst, pkt{
bs: contents,
enqueuedAt: time.Now(),
enqueuedAt: c.s.clock.Now(),
src: srcKey,
})
}
@ -994,7 +998,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
p := pkt{
bs: contents,
enqueuedAt: time.Now(),
enqueuedAt: c.s.clock.Now(),
src: c.key,
}
return c.sendPkt(dst, p)
@ -1387,7 +1391,7 @@ func (c *sclient) setPreferred(v bool) {
// graphs, so not important to miss a move. But it shouldn't:
// the netcheck/re-STUNs in magicsock only happen about every
// 30 seconds.
if time.Since(c.connectedAt) > 5*time.Second {
if c.s.clock.Since(c.connectedAt) > 5*time.Second {
homeMove.Add(1)
}
}
@ -1401,7 +1405,7 @@ func expMovingAverage(prev, newValue, alpha float64) float64 {
// recordQueueTime updates the average queue duration metric after a packet has been sent.
func (c *sclient) recordQueueTime(enqueuedAt time.Time) {
elapsed := float64(time.Since(enqueuedAt).Milliseconds())
elapsed := float64(c.s.clock.Since(enqueuedAt).Milliseconds())
for {
old := atomic.LoadUint64(c.s.avgQueueDuration)
newAvg := expMovingAverage(math.Float64frombits(old), elapsed, 0.1)
@ -1431,7 +1435,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
}()
jitter := time.Duration(rand.Intn(5000)) * time.Millisecond
keepAliveTick := time.NewTicker(keepAlive + jitter)
keepAliveTick, keepAliveTickChannel := c.s.clock.NewTicker(keepAlive + jitter)
defer keepAliveTick.Stop()
var werr error // last write error
@ -1461,7 +1465,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)
continue
case <-keepAliveTick.C:
case <-keepAliveTickChannel:
werr = c.sendKeepAlive()
continue
default:
@ -1490,7 +1494,7 @@ func (c *sclient) sendLoop(ctx context.Context) error {
case msg := <-c.sendPongCh:
werr = c.sendPong(msg)
continue
case <-keepAliveTick.C:
case <-keepAliveTickChannel:
werr = c.sendKeepAlive()
}
}

@ -27,13 +27,13 @@ func (c *sclient) statsLoop(ctx context.Context) error {
const statsInterval = 10 * time.Second
ticker := time.NewTicker(statsInterval)
ticker, tickerChannel := c.s.clock.NewTicker(statsInterval)
defer ticker.Stop()
statsLoop:
for {
select {
case <-ticker.C:
case <-tickerChannel:
rtt, err := tcpinfo.RTT(conn)
if err != nil {
continue statsLoop

@ -27,6 +27,7 @@ import (
"golang.org/x/time/rate"
"tailscale.com/disco"
"tailscale.com/net/memnet"
"tailscale.com/tstest"
"tailscale.com/types/key"
"tailscale.com/types/logger"
)
@ -993,6 +994,7 @@ func TestClientRecv(t *testing.T) {
nc: dummyNetConn{},
br: bufio.NewReader(bytes.NewReader(tt.input)),
logf: t.Logf,
clock: &tstest.Clock{},
}
got, err := c.Recv()
if err != nil {
@ -1436,6 +1438,7 @@ func TestClientSendRateLimiting(t *testing.T) {
cw := new(countWriter)
c := &Client{
bw: bufio.NewWriter(cw),
clock: &tstest.Clock{},
}
c.setSendRateLimiter(ServerInfoMessage{})

@ -38,6 +38,7 @@ import (
"tailscale.com/net/tshttpproxy"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/util/cmpx"
@ -83,6 +84,7 @@ type Client struct {
serverPubKey key.NodePublic
tlsState *tls.ConnectionState
pingOut map[derp.PingMessage]chan<- bool // chan to send to on pong
clock tstime.Clock
}
func (c *Client) String() string {
@ -101,6 +103,7 @@ func NewRegionClient(privateKey key.NodePrivate, logf logger.Logf, netMon *netmo
getRegion: getRegion,
ctx: ctx,
cancelCtx: cancel,
clock: tstime.StdClock{},
}
return c
}
@ -108,7 +111,7 @@ func NewRegionClient(privateKey key.NodePrivate, logf logger.Logf, netMon *netmo
// NewNetcheckClient returns a Client that's only able to have its DialRegionTLS method called.
// It's used by the netcheck package.
func NewNetcheckClient(logf logger.Logf) *Client {
return &Client{logf: logf}
return &Client{logf: logf, clock: tstime.StdClock{}}
}
// NewClient returns a new DERP-over-HTTP client. It connects lazily.
@ -129,6 +132,7 @@ func NewClient(privateKey key.NodePrivate, serverURL string, logf logger.Logf) (
url: u,
ctx: ctx,
cancelCtx: cancel,
clock: tstime.StdClock{},
}
return c, nil
}
@ -644,14 +648,14 @@ func (c *Client) dialNode(ctx context.Context, n *tailcfg.DERPNode) (net.Conn, e
nwait++
go func() {
if proto == "tcp4" && c.preferIPv6() {
t := time.NewTimer(200 * time.Millisecond)
t, tChannel := c.clock.NewTimer(200 * time.Millisecond)
select {
case <-ctx.Done():
// Either user canceled original context,
// it timed out, or the v6 dial succeeded.
t.Stop()
return
case <-t.C:
case <-tChannel:
// Start v4 dial
}
}

@ -51,7 +51,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
present = map[key.NodePublic]bool{}
}
lastConnGen := 0
lastStatus := time.Now()
lastStatus := c.clock.Now()
logConnectedLocked := func() {
if loggedConnected {
return
@ -61,7 +61,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
}
const logConnectedDelay = 200 * time.Millisecond
timer := time.AfterFunc(2*time.Second, func() {
timer := c.clock.AfterFunc(2*time.Second, func() {
mu.Lock()
defer mu.Unlock()
logConnectedLocked()
@ -91,11 +91,11 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
}
sleep := func(d time.Duration) {
t := time.NewTimer(d)
t, tChannel := c.clock.NewTimer(d)
select {
case <-ctx.Done():
t.Stop()
case <-t.C:
case <-tChannel:
}
}
@ -142,7 +142,7 @@ func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key
default:
continue
}
if now := time.Now(); now.Sub(lastStatus) > statusInterval {
if now := c.clock.Now(); now.Sub(lastStatus) > statusInterval {
lastStatus = now
infoLogf("%d peers", len(present))
}

Loading…
Cancel
Save