wgengine/magicsock: use mono.Time

magicsock makes multiple calls to Now per packet.
Move to mono.Now. Changing some of the calls to
use package mono has a cascading effect,
causing non-per-packet call sites to also switch.

Signed-off-by: Josh Bleecher Snyder <josh@tailscale.com>
josh/debug-flake
Josh Bleecher Snyder 3 years ago committed by Josh Bleecher Snyder
parent c2202cc27c
commit 8a3d52e882

@ -23,6 +23,7 @@ import (
"tailscale.com/ipn"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/interfaces"
"tailscale.com/tstime/mono"
"tailscale.com/util/dnsname"
)
@ -193,7 +194,7 @@ func runStatus(ctx context.Context, args []string) error {
//
// TODO: have the server report this bool instead.
func peerActive(ps *ipnstate.PeerStatus) bool {
return !ps.LastWrite.IsZero() && time.Since(ps.LastWrite) < 2*time.Minute
return !ps.LastWrite.IsZero() && mono.Since(ps.LastWrite) < 2*time.Minute
}
func dnsOrQuoteHostname(st *ipnstate.Status, ps *ipnstate.PeerStatus) string {

@ -49,6 +49,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep
tailscale.com/syncs from tailscale.com/net/interfaces+
tailscale.com/tailcfg from tailscale.com/cmd/tailscale/cli+
W tailscale.com/tsconst from tailscale.com/net/interfaces
💣 tailscale.com/tstime/mono from tailscale.com/cmd/tailscale/cli+
tailscale.com/types/empty from tailscale.com/ipn
tailscale.com/types/ipproto from tailscale.com/net/flowtrack+
tailscale.com/types/key from tailscale.com/derp+

@ -130,7 +130,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
tailscale.com/tailcfg from tailscale.com/control/controlclient+
W tailscale.com/tsconst from tailscale.com/net/interfaces
tailscale.com/tstime from tailscale.com/wgengine/magicsock
💣 tailscale.com/tstime/mono from tailscale.com/net/tstun
💣 tailscale.com/tstime/mono from tailscale.com/net/tstun+
tailscale.com/types/empty from tailscale.com/control/controlclient+
tailscale.com/types/flagtype from tailscale.com/cmd/tailscaled
tailscale.com/types/ipproto from tailscale.com/net/flowtrack+

@ -20,6 +20,7 @@ import (
"inet.af/netaddr"
"tailscale.com/tailcfg"
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/util/dnsname"
)
@ -90,7 +91,7 @@ type PeerStatus struct {
RxBytes int64
TxBytes int64
Created time.Time // time registered with tailcontrol
LastWrite time.Time // time last packet sent
LastWrite mono.Time // time last packet sent
LastSeen time.Time // last seen to tailcontrol
LastHandshake time.Time // with local wireguard
KeepAlive bool
@ -320,7 +321,7 @@ table tbody tr:nth-child(even) td { background-color: #f5f5f5; }
f("<tr><th>Peer</th><th>OS</th><th>Node</th><th>Owner</th><th>Rx</th><th>Tx</th><th>Activity</th><th>Connection</th></tr>\n")
f("</thead>\n<tbody>\n")
now := time.Now()
now := mono.Now()
var peers []*PeerStatus
for _, peer := range st.Peers() {
@ -378,7 +379,7 @@ table tbody tr:nth-child(even) td { background-color: #f5f5f5; }
f("<td>")
// TODO: let server report this active bool instead
active := !ps.LastWrite.IsZero() && time.Since(ps.LastWrite) < 2*time.Minute
active := !ps.LastWrite.IsZero() && mono.Since(ps.LastWrite) < 2*time.Minute
if active {
if ps.Relay != "" && ps.CurAddr == "" {
f("relay <b>%s</b>", html.EscapeString(ps.Relay))

@ -23,6 +23,7 @@ import (
"golang.zx2c4.com/wireguard/tai64n"
"inet.af/netaddr"
"tailscale.com/ipn/ipnstate"
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/wgkey"
@ -348,12 +349,12 @@ type addrSet struct {
ipPorts []netaddr.IPPort
// clock, if non-nil, is used in tests instead of time.Now.
clock func() time.Time
clock func() mono.Time
Logf logger.Logf // must not be nil
mu sync.Mutex // guards following fields
lastSend time.Time
lastSend mono.Time
// roamAddr is non-nil if/when we receive a correctly signed
// WireGuard packet from an unexpected address. If so, we
@ -369,10 +370,10 @@ type addrSet struct {
curAddr int
// stopSpray is the time after which we stop spraying packets.
stopSpray time.Time
stopSpray mono.Time
// lastSpray is the last time we sprayed a packet.
lastSpray time.Time
lastSpray mono.Time
// loggedLogPriMask is a bit field of that tracks whether
// we've already logged about receiving a packet from a low
@ -395,11 +396,11 @@ func (as *addrSet) derpID() int {
return 0
}
func (as *addrSet) timeNow() time.Time {
func (as *addrSet) timeNow() mono.Time {
if as.clock != nil {
return as.clock()
}
return time.Now()
return mono.Now()
}
var noAddr, _ = netaddr.FromStdAddr(net.ParseIP("127.127.127.127"), 127, "")

@ -47,6 +47,7 @@ import (
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
@ -814,20 +815,20 @@ func (c *Conn) SetNetInfoCallback(fn func(*tailcfg.NetInfo)) {
}
}
// LastRecvActivityOfDisco returns the time we last got traffic from
// LastRecvActivityOfDisco describes the time we last got traffic from
// this endpoint (updated every ~10 seconds).
func (c *Conn) LastRecvActivityOfDisco(dk tailcfg.DiscoKey) time.Time {
func (c *Conn) LastRecvActivityOfDisco(dk tailcfg.DiscoKey) string {
c.mu.Lock()
defer c.mu.Unlock()
de, ok := c.endpointOfDisco[dk]
if !ok {
return time.Time{}
return "never"
}
unix := atomic.LoadInt64(&de.lastRecvUnixAtomic)
if unix == 0 {
return time.Time{}
saw := de.lastRecv.LoadAtomic()
if saw == 0 {
return "never"
}
return time.Unix(unix, 0)
return mono.Since(saw).Round(time.Second).String()
}
// Ping handles a "tailscale ping" CLI query.
@ -3126,7 +3127,7 @@ func ippDebugString(ua netaddr.IPPort) string {
// advertise a DiscoKey and participate in active discovery.
type discoEndpoint struct {
// atomically accessed; declared first for alignment reasons
lastRecvUnixAtomic int64
lastRecv mono.Time
numStopAndResetAtomic int64
// These fields are initialized once and never modified.
@ -3145,13 +3146,13 @@ type discoEndpoint struct {
mu sync.Mutex // Lock ordering: Conn.mu, then discoEndpoint.mu
heartBeatTimer *time.Timer // nil when idle
lastSend time.Time // last time there was outgoing packets sent to this peer (from wireguard-go)
lastFullPing time.Time // last time we pinged all endpoints
lastSend mono.Time // last time there was outgoing packets sent to this peer (from wireguard-go)
lastFullPing mono.Time // last time we pinged all endpoints
derpAddr netaddr.IPPort // fallback/bootstrap path, if non-zero (non-zero for well-behaved clients)
bestAddr addrLatency // best non-DERP path; zero if none
bestAddrAt time.Time // time best address re-confirmed
trustBestAddrUntil time.Time // time when bestAddr expires
bestAddrAt mono.Time // time best address re-confirmed
trustBestAddrUntil mono.Time // time when bestAddr expires
sentPing map[stun.TxID]sentPing
endpointState map[netaddr.IPPort]*endpointState
isCallMeMaybeEP map[netaddr.IPPort]bool
@ -3218,7 +3219,7 @@ type endpointState struct {
// all fields guarded by discoEndpoint.mu
// lastPing is the last (outgoing) ping time.
lastPing time.Time
lastPing mono.Time
// lastGotPing, if non-zero, means that this was an endpoint
// that we learned about at runtime (from an incoming ping)
@ -3266,14 +3267,14 @@ const pongHistoryCount = 64
type pongReply struct {
latency time.Duration
pongAt time.Time // when we received the pong
pongAt mono.Time // when we received the pong
from netaddr.IPPort // the pong's src (usually same as endpoint map key)
pongSrc netaddr.IPPort // what they reported they heard
}
type sentPing struct {
to netaddr.IPPort
at time.Time
at mono.Time
timer *time.Timer // timeout timer
purpose discoPingPurpose
}
@ -3293,10 +3294,10 @@ func (de *discoEndpoint) initFakeUDPAddr() {
// endpoint and reports whether it's been at least 10 seconds since the last
// receive activity (including having never received from this peer before).
func (de *discoEndpoint) isFirstRecvActivityInAwhile() bool {
now := time.Now().Unix()
old := atomic.LoadInt64(&de.lastRecvUnixAtomic)
if old <= now-10 {
atomic.StoreInt64(&de.lastRecvUnixAtomic, now)
now := mono.Now()
elapsed := now.Sub(de.lastRecv.LoadAtomic())
if elapsed > 10*time.Second {
de.lastRecv.StoreAtomic(now)
return true
}
return false
@ -3321,7 +3322,7 @@ func (de *discoEndpoint) DstToBytes() []byte { return packIPPort(de.fakeWGAddr)
// addr may be non-zero.
//
// de.mu must be held.
func (de *discoEndpoint) addrForSendLocked(now time.Time) (udpAddr, derpAddr netaddr.IPPort) {
func (de *discoEndpoint) addrForSendLocked(now mono.Time) (udpAddr, derpAddr netaddr.IPPort) {
udpAddr = de.bestAddr.IPPort
if udpAddr.IsZero() || now.After(de.trustBestAddrUntil) {
// We had a bestAddr but it expired so send both to it
@ -3344,13 +3345,13 @@ func (de *discoEndpoint) heartbeat() {
return
}
if time.Since(de.lastSend) > sessionActiveTimeout {
if mono.Since(de.lastSend) > sessionActiveTimeout {
// Session's idle. Stop heartbeating.
de.c.logf("[v1] magicsock: disco: ending heartbeats for idle session to %v (%v)", de.publicKey.ShortString(), de.discoShort)
return
}
now := time.Now()
now := mono.Now()
udpAddr, _ := de.addrForSendLocked(now)
if !udpAddr.IsZero() {
// We have a preferred path. Ping that every 2 seconds.
@ -3368,7 +3369,7 @@ func (de *discoEndpoint) heartbeat() {
// a better path.
//
// de.mu must be held.
func (de *discoEndpoint) wantFullPingLocked(now time.Time) bool {
func (de *discoEndpoint) wantFullPingLocked(now mono.Time) bool {
if de.bestAddr.IsZero() || de.lastFullPing.IsZero() {
return true
}
@ -3385,7 +3386,7 @@ func (de *discoEndpoint) wantFullPingLocked(now time.Time) bool {
}
func (de *discoEndpoint) noteActiveLocked() {
de.lastSend = time.Now()
de.lastSend = mono.Now()
if de.heartBeatTimer == nil {
de.heartBeatTimer = time.AfterFunc(heartbeatInterval, de.heartbeat)
}
@ -3399,7 +3400,7 @@ func (de *discoEndpoint) cliPing(res *ipnstate.PingResult, cb func(*ipnstate.Pin
de.pendingCLIPings = append(de.pendingCLIPings, pendingCLIPing{res, cb})
now := time.Now()
now := mono.Now()
udpAddr, derpAddr := de.addrForSendLocked(now)
if !derpAddr.IsZero() {
de.startPingLocked(derpAddr, now, pingCLI)
@ -3419,7 +3420,7 @@ func (de *discoEndpoint) cliPing(res *ipnstate.PingResult, cb func(*ipnstate.Pin
}
func (de *discoEndpoint) send(b []byte) error {
now := time.Now()
now := mono.Now()
de.mu.Lock()
udpAddr, derpAddr := de.addrForSendLocked(now)
@ -3452,7 +3453,7 @@ func (de *discoEndpoint) pingTimeout(txid stun.TxID) {
if !ok {
return
}
if debugDisco || de.bestAddr.IsZero() || time.Now().After(de.trustBestAddrUntil) {
if debugDisco || de.bestAddr.IsZero() || mono.Now().After(de.trustBestAddrUntil) {
de.c.logf("[v1] magicsock: disco: timeout waiting for pong %x from %v (%v, %v)", txid[:6], sp.to, de.publicKey.ShortString(), de.discoShort)
}
de.removeSentPingLocked(txid, sp)
@ -3504,7 +3505,7 @@ const (
pingCLI
)
func (de *discoEndpoint) startPingLocked(ep netaddr.IPPort, now time.Time, purpose discoPingPurpose) {
func (de *discoEndpoint) startPingLocked(ep netaddr.IPPort, now mono.Time, purpose discoPingPurpose) {
if purpose != pingCLI {
st, ok := de.endpointState[ep]
if !ok {
@ -3530,7 +3531,7 @@ func (de *discoEndpoint) startPingLocked(ep netaddr.IPPort, now time.Time, purpo
go de.sendDiscoPing(ep, txid, logLevel)
}
func (de *discoEndpoint) sendPingsLocked(now time.Time, sendCallMeMaybe bool) {
func (de *discoEndpoint) sendPingsLocked(now mono.Time, sendCallMeMaybe bool) {
de.lastFullPing = now
var sentAny bool
for ep, st := range de.endpointState {
@ -3652,7 +3653,7 @@ func (de *discoEndpoint) noteConnectivityChange() {
de.mu.Lock()
defer de.mu.Unlock()
de.trustBestAddrUntil = time.Time{}
de.trustBestAddrUntil = 0
}
// handlePongConnLocked handles a Pong message (a reply to an earlier ping).
@ -3670,7 +3671,7 @@ func (de *discoEndpoint) handlePongConnLocked(m *disco.Pong, src netaddr.IPPort)
}
de.removeSentPingLocked(m.TxID, sp)
now := time.Now()
now := mono.Now()
latency := now.Sub(sp.at)
if !isDerp {
@ -3822,9 +3823,9 @@ func (de *discoEndpoint) handleCallMeMaybe(m *disco.CallMeMaybe) {
// Zero out all the lastPing times to force sendPingsLocked to send new ones,
// even if it's been less than 5 seconds ago.
for _, st := range de.endpointState {
st.lastPing = time.Time{}
st.lastPing = 0
}
de.sendPingsLocked(time.Now(), false)
de.sendPingsLocked(mono.Now(), false)
}
func (de *discoEndpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
@ -3837,7 +3838,7 @@ func (de *discoEndpoint) populatePeerStatus(ps *ipnstate.PeerStatus) {
ps.LastWrite = de.lastSend
now := time.Now()
now := mono.Now()
if udpAddr, derpAddr := de.addrForSendLocked(now); !udpAddr.IsZero() && derpAddr.IsZero() {
ps.CurAddr = udpAddr.String()
}
@ -3855,13 +3856,13 @@ func (de *discoEndpoint) stopAndReset() {
// Zero these fields so if the user re-starts the network, the discovery
// state isn't a mix of before & after two sessions.
de.lastSend = time.Time{}
de.lastFullPing = time.Time{}
de.lastSend = 0
de.lastFullPing = 0
de.bestAddr = addrLatency{}
de.bestAddrAt = time.Time{}
de.trustBestAddrUntil = time.Time{}
de.bestAddrAt = 0
de.trustBestAddrUntil = 0
for _, es := range de.endpointState {
es.lastPing = time.Time{}
es.lastPing = 0
}
for txid, sp := range de.sentPing {

@ -38,6 +38,7 @@ import (
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstest/natlab"
"tailscale.com/tstime/mono"
"tailscale.com/types/key"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
@ -1146,13 +1147,13 @@ func TestAddrSet(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
faket := time.Unix(0, 0)
faket := mono.Time(0)
var logBuf bytes.Buffer
tt.as.Logf = func(format string, args ...interface{}) {
fmt.Fprintf(&logBuf, format, args...)
t.Logf(format, args...)
}
tt.as.clock = func() time.Time { return faket }
tt.as.clock = func() mono.Time { return faket }
for i, st := range tt.steps {
faket = faket.Add(st.advance)
@ -1229,8 +1230,8 @@ func TestDiscoStringLogRace(t *testing.T) {
func Test32bitAlignment(t *testing.T) {
var de discoEndpoint
if off := unsafe.Offsetof(de.lastRecvUnixAtomic); off%8 != 0 {
t.Fatalf("discoEndpoint.lastRecvUnixAtomic is not 8-byte aligned")
if off := unsafe.Offsetof(de.lastRecv); off%8 != 0 {
t.Fatalf("discoEndpoint.lastRecv is not 8-byte aligned")
}
if !de.isFirstRecvActivityInAwhile() { // verify this doesn't panic on 32-bit

@ -231,7 +231,7 @@ func (e *userspaceEngine) onOpenTimeout(flow flowtrack.Tuple) {
e.logf("open-conn-track: timeout opening %v to node %v; online=%v, lastRecv=%v",
flow, n.Key.ShortString(),
online,
durFmt(e.magicConn.LastRecvActivityOfDisco(n.DiscoKey)))
e.magicConn.LastRecvActivityOfDisco(n.DiscoKey))
}
func durFmt(t time.Time) string {

Loading…
Cancel
Save