derp: redo, simplify how mesh update writes are queued/written

I couldn't convince myself the old way was safe and couldn't lose
writes.

And it seemed too complicated.

Updates tailscale/corp#21104

Change-Id: I17ba7c7d6fd83458a311ac671146a1f6a458a5c1
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/12598/head
Brad Fitzpatrick 3 months ago committed by Brad Fitzpatrick
parent ded7734c36
commit d91e5c25ce

@ -141,6 +141,8 @@ type Server struct {
removePktForwardOther expvar.Int removePktForwardOther expvar.Int
avgQueueDuration *uint64 // In milliseconds; accessed atomically avgQueueDuration *uint64 // In milliseconds; accessed atomically
tcpRtt metrics.LabelMap // histogram tcpRtt metrics.LabelMap // histogram
meshUpdateBatchSize *metrics.Histogram
meshUpdateLoopCount *metrics.Histogram
// verifyClientsLocalTailscaled only accepts client connections to the DERP // verifyClientsLocalTailscaled only accepts client connections to the DERP
// server if the clientKey is a known peer in the network, as specified by a // server if the clientKey is a known peer in the network, as specified by a
@ -323,6 +325,8 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64), avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"}, tcpRtt: metrics.LabelMap{Label: "le"},
meshUpdateBatchSize: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000}),
meshUpdateLoopCount: metrics.NewHistogram([]float64{0, 1, 2, 5, 10, 20, 50, 100}),
keyOfAddr: map[netip.AddrPort]key.NodePublic{}, keyOfAddr: map[netip.AddrPort]key.NodePublic{},
clock: tstime.StdClock{}, clock: tstime.StdClock{},
} }
@ -758,7 +762,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
} }
if c.canMesh { if c.canMesh {
c.meshUpdate = make(chan struct{}) c.meshUpdate = make(chan struct{}, 1) // must be buffered; >1 is fine but wasteful
} }
if clientInfo != nil { if clientInfo != nil {
c.info = *clientInfo c.info = *clientInfo
@ -1143,13 +1147,18 @@ func (c *sclient) requestPeerGoneWrite(peer key.NodePublic, reason PeerGoneReaso
} }
} }
// requestMeshUpdate notes that a c's peerStateChange has been appended to and
// should now be written.
//
// It does not block. If a meshUpdate is already pending for this client, it
// does nothing.
func (c *sclient) requestMeshUpdate() { func (c *sclient) requestMeshUpdate() {
if !c.canMesh { if !c.canMesh {
panic("unexpected requestMeshUpdate") panic("unexpected requestMeshUpdate")
} }
select { select {
case c.meshUpdate <- struct{}{}: case c.meshUpdate <- struct{}{}:
case <-c.done: default:
} }
} }
@ -1671,62 +1680,47 @@ func (c *sclient) sendPeerPresent(peer key.NodePublic, ipPort netip.AddrPort, fl
return err return err
} }
// sendMeshUpdates drains as many mesh peerStateChange entries as // sendMeshUpdates drains all mesh peerStateChange entries into the write buffer
// possible into the write buffer WITHOUT flushing or otherwise // without flushing.
// blocking (as it holds c.s.mu while working). If it can't drain them
// all, it schedules itself to be called again in the future.
func (c *sclient) sendMeshUpdates() error { func (c *sclient) sendMeshUpdates() error {
c.s.mu.Lock() var lastBatch []peerConnState // memory to best effort reuse
defer c.s.mu.Unlock()
// allow all happened-before mesh update request goroutines to complete, if // takeAll returns c.peerStateChange and empties it.
// we don't finish the task we'll queue another below. takeAll := func() []peerConnState {
drainUpdates: c.s.mu.Lock()
for { defer c.s.mu.Unlock()
select { if len(c.peerStateChange) == 0 {
case <-c.meshUpdate: return nil
default:
break drainUpdates
}
}
writes := 0
for _, pcs := range c.peerStateChange {
avail := c.bw.Available()
var err error
if pcs.present {
if avail <= frameHeaderLen+peerPresentFrameLen {
break
}
err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags)
} else {
if avail <= frameHeaderLen+peerGoneFrameLen {
break
}
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
} }
if err != nil { batch := c.peerStateChange
// Shouldn't happen, though, as we're writing if cap(lastBatch) > 16 {
// into available buffer space, not the lastBatch = nil
// network.
return err
} }
writes++ c.peerStateChange = lastBatch[:0]
return batch
} }
remain := copy(c.peerStateChange, c.peerStateChange[writes:]) for loops := 0; ; loops++ {
c.peerStateChange = c.peerStateChange[:remain] batch := takeAll()
if len(batch) == 0 {
c.s.meshUpdateLoopCount.Observe(float64(loops))
return nil
}
c.s.meshUpdateBatchSize.Observe(float64(len(batch)))
// Did we manage to write them all into the bufio buffer without flushing? for _, pcs := range batch {
if len(c.peerStateChange) == 0 { var err error
if cap(c.peerStateChange) > 16 { if pcs.present {
c.peerStateChange = nil err = c.sendPeerPresent(pcs.peer, pcs.ipPort, pcs.flags)
} else {
err = c.sendPeerGone(pcs.peer, PeerGoneReasonDisconnected)
}
if err != nil {
return err
}
} }
} else { lastBatch = batch
// Didn't finish in the buffer space provided; schedule a future run.
go c.requestMeshUpdate()
} }
return nil
} }
// sendPacket writes contents to the client in a RecvPacket frame. If // sendPacket writes contents to the client in a RecvPacket frame. If
@ -1955,6 +1949,8 @@ func (s *Server) ExpVar() expvar.Var {
return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration)) return math.Float64frombits(atomic.LoadUint64(s.avgQueueDuration))
})) }))
m.Set("counter_tcp_rtt", &s.tcpRtt) m.Set("counter_tcp_rtt", &s.tcpRtt)
m.Set("counter_mesh_update_batch_size", s.meshUpdateBatchSize)
m.Set("counter_mesh_update_loop_count", s.meshUpdateLoopCount)
var expvarVersion expvar.String var expvarVersion expvar.String
expvarVersion.Set(version.Long()) expvarVersion.Set(version.Long())
m.Set("version", &expvarVersion) m.Set("version", &expvarVersion)

Loading…
Cancel
Save