derp: remove the mutex around and closing of send channel

Makes it less complicated.

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
reviewable/pr209/r1
Brad Fitzpatrick 5 years ago committed by Brad Fitzpatrick
parent 77921a31b1
commit c34b350efa

@ -252,15 +252,6 @@ func (s *Server) accept(nc Conn, brw *bufio.ReadWriter, remoteAddr string) error
} }
func (c *sclient) run() error { func (c *sclient) run() error {
defer func() {
// Atomically close+remove send queue, so racing writers don't
// send to closed channel.
c.mu.Lock()
close(c.sendQueue)
c.sendQueue = nil
c.mu.Unlock()
}()
go c.sender() go c.sender()
for { for {
@ -320,16 +311,6 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
return nil return nil
} }
dst.mu.RLock()
defer dst.mu.RUnlock()
if dst.sendQueue == nil {
s.packetsDropped.Add(1)
s.packetsDroppedGone.Add(1)
if debug {
c.logf("dropping packet for shutdown client %x", dstKey)
}
}
msg := sendMsg{ msg := sendMsg{
bs: contents, bs: contents,
} }
@ -340,6 +321,16 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
// the queue is full, try to drop from queue head to prioritize // the queue is full, try to drop from queue head to prioritize
// fresher packets. // fresher packets.
for attempt := 0; attempt < 3; attempt++ { for attempt := 0; attempt < 3; attempt++ {
select {
case <-dst.done:
s.packetsDropped.Add(1)
s.packetsDroppedGone.Add(1)
if debug {
c.logf("dropping packet for shutdown client %x", dstKey)
}
return nil
default:
}
select { select {
case dst.sendQueue <- msg: case dst.sendQueue <- msg:
return nil return nil
@ -480,6 +471,7 @@ type sclient struct {
logf logger.Logf logf logger.Logf
done <-chan struct{} // closed when connection closes done <-chan struct{} // closed when connection closes
remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String() remoteAddr string // usually ip:port from net.Conn.RemoteAddr().String()
sendQueue chan sendMsg // messages (packets) queued to this client; never closed
// Owned by run, not thread-safe. // Owned by run, not thread-safe.
br *bufio.Reader br *bufio.Reader
@ -488,9 +480,6 @@ type sclient struct {
// Owned by sender, not thread-safe. // Owned by sender, not thread-safe.
bw *bufio.Writer bw *bufio.Writer
mu sync.RWMutex // guards access to sendQueue for shutdown.
sendQueue chan sendMsg // messages (packets) queued to this client
} }
// sendMsg is a request to write a frame to an sclient (usually a data packet). // sendMsg is a request to write a frame to an sclient (usually a data packet).
@ -540,28 +529,19 @@ func (c *sclient) sender() {
} }
func (c *sclient) sendLoop() error { func (c *sclient) sendLoop() error {
c.mu.RLock()
queue := c.sendQueue
c.mu.RUnlock()
if queue == nil {
// Uncommon race, sclient shut down before this loop ever
// started. Nothing to do here, move along.
return nil
}
defer func() { defer func() {
// Drain the send queue to count dropped packets // Drain the send queue to count dropped packets
for { for {
_, ok := <-queue select {
if !ok { case <-c.sendQueue:
break
}
c.s.packetsDropped.Add(1) c.s.packetsDropped.Add(1)
c.s.packetsDroppedGone.Add(1) c.s.packetsDroppedGone.Add(1)
if debug { if debug {
c.logf("dropping packet for shutdown %x", c.key) c.logf("dropping packet for shutdown %x", c.key)
} }
default:
return
}
} }
}() }()
@ -578,7 +558,7 @@ func (c *sclient) sendLoop() error {
case <-c.done: case <-c.done:
return nil return nil
case msg, ok := <-queue: case msg, ok := <-c.sendQueue:
if !ok { if !ok {
return nil return nil
} }

Loading…
Cancel
Save