{net/connstats,wgengine/magicsock}: fix packet counting in connstats

connstats currently increments the packet counter whenever it is called
to store a length of data, however when udp batch sending was introduced
we pass the length for a series of packages, and it is only incremented
ones, making it count wrongly if we are on a platform supporting udp
batches.

Updates tailscale/corp#22075

Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
pull/13815/head
Kristoffer Dalby 1 month ago committed by Kristoffer Dalby
parent 40c991f6b8
commit e0d711c478

@ -131,23 +131,23 @@ func (s *Statistics) updateVirtual(b []byte, receive bool) {
s.virtual[conn] = cnts s.virtual[conn] = cnts
} }
// UpdateTxPhysical updates the counters for a transmitted wireguard packet // UpdateTxPhysical updates the counters for zero or more transmitted wireguard packets.
// The src is always a Tailscale IP address, representing some remote peer. // The src is always a Tailscale IP address, representing some remote peer.
// The dst is a remote IP address and port that corresponds // The dst is a remote IP address and port that corresponds
// with some physical peer backing the Tailscale IP address. // with some physical peer backing the Tailscale IP address.
func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, n int) { func (s *Statistics) UpdateTxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
s.updatePhysical(src, dst, n, false) s.updatePhysical(src, dst, packets, bytes, false)
} }
// UpdateRxPhysical updates the counters for a received wireguard packet. // UpdateRxPhysical updates the counters for zero or more received wireguard packets.
// The src is always a Tailscale IP address, representing some remote peer. // The src is always a Tailscale IP address, representing some remote peer.
// The dst is a remote IP address and port that corresponds // The dst is a remote IP address and port that corresponds
// with some physical peer backing the Tailscale IP address. // with some physical peer backing the Tailscale IP address.
func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, n int) { func (s *Statistics) UpdateRxPhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int) {
s.updatePhysical(src, dst, n, true) s.updatePhysical(src, dst, packets, bytes, true)
} }
func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, receive bool) { func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, packets, bytes int, receive bool) {
conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst} conn := netlogtype.Connection{Src: netip.AddrPortFrom(src, 0), Dst: dst}
s.mu.Lock() s.mu.Lock()
@ -157,11 +157,11 @@ func (s *Statistics) updatePhysical(src netip.Addr, dst netip.AddrPort, n int, r
return return
} }
if receive { if receive {
cnts.RxPackets++ cnts.RxPackets += uint64(packets)
cnts.RxBytes += uint64(n) cnts.RxBytes += uint64(bytes)
} else { } else {
cnts.TxPackets++ cnts.TxPackets += uint64(packets)
cnts.TxBytes += uint64(n) cnts.TxBytes += uint64(bytes)
} }
s.physical[conn] = cnts s.physical[conn] = cnts
} }

@ -730,7 +730,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep *en
ep.noteRecvActivity(ipp, mono.Now()) ep.noteRecvActivity(ipp, mono.Now())
if stats := c.stats.Load(); stats != nil { if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, dm.n) stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, dm.n)
} }
return n, ep return n, ep
} }

@ -976,7 +976,7 @@ func (de *endpoint) send(buffs [][]byte) error {
// TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends. // TODO(raggi): needs updating for accuracy, as in error conditions we may have partial sends.
if stats := de.c.stats.Load(); err == nil && stats != nil { if stats := de.c.stats.Load(); err == nil && stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, udpAddr, txBytes) stats.UpdateTxPhysical(de.nodeAddr, udpAddr, len(buffs), txBytes)
} }
} }
if derpAddr.IsValid() { if derpAddr.IsValid() {
@ -991,7 +991,7 @@ func (de *endpoint) send(buffs [][]byte) error {
} }
if stats := de.c.stats.Load(); stats != nil { if stats := de.c.stats.Load(); stats != nil {
stats.UpdateTxPhysical(de.nodeAddr, derpAddr, txBytes) stats.UpdateTxPhysical(de.nodeAddr, derpAddr, 1, txBytes)
} }
if allOk { if allOk {
return nil return nil

@ -1523,7 +1523,7 @@ func (c *Conn) receiveIP(b []byte, ipp netip.AddrPort, cache *ippEndpointCache)
ep.lastRecvUDPAny.StoreAtomic(now) ep.lastRecvUDPAny.StoreAtomic(now)
ep.noteRecvActivity(ipp, now) ep.noteRecvActivity(ipp, now)
if stats := c.stats.Load(); stats != nil { if stats := c.stats.Load(); stats != nil {
stats.UpdateRxPhysical(ep.nodeAddr, ipp, len(b)) stats.UpdateRxPhysical(ep.nodeAddr, ipp, 1, len(b))
} }
return ep, true return ep, true
} }

Loading…
Cancel
Save