wgengine/magicsock: wait for previous DERP goroutines to end before new ones

Updates #109 (hopefully fixes, will wait for graphs to be happy)

Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
pull/182/head
Brad Fitzpatrick 5 years ago committed by Brad Fitzpatrick
parent eff6dcdb4e
commit 8807913be9

@ -35,6 +35,7 @@ import (
"tailscale.com/net/interfaces" "tailscale.com/net/interfaces"
"tailscale.com/netcheck" "tailscale.com/netcheck"
"tailscale.com/stun" "tailscale.com/stun"
"tailscale.com/syncs"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/key" "tailscale.com/types/key"
"tailscale.com/types/logger" "tailscale.com/types/logger"
@ -97,6 +98,7 @@ type Conn struct {
privateKey key.Private privateKey key.Private
myDerp int // nearest DERP server; 0 means none/unknown myDerp int // nearest DERP server; 0 means none/unknown
activeDerp map[int]activeDerp activeDerp map[int]activeDerp
prevDerp map[int]*syncs.WaitGroupChan
derpTLSConfig *tls.Config // normally nil; used by tests derpTLSConfig *tls.Config // normally nil; used by tests
} }
@ -686,7 +688,7 @@ func (c *Conn) sendAddr(addr *net.UDPAddr, pubKey key.Public, b []byte) error {
// dropping. // dropping.
// //
// TODO: this is currently arbitrary. Figure out something better? // TODO: this is currently arbitrary. Figure out something better?
const bufferedDerpWritesBeforeDrop = 4 const bufferedDerpWritesBeforeDrop = 32
// derpWriteChanOfAddr returns a DERP client for fake UDP addresses that // derpWriteChanOfAddr returns a DERP client for fake UDP addresses that
// represent DERP servers, creating them as necessary. For real UDP // represent DERP servers, creating them as necessary. For real UDP
@ -708,6 +710,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
if !ok { if !ok {
if c.activeDerp == nil { if c.activeDerp == nil {
c.activeDerp = make(map[int]activeDerp) c.activeDerp = make(map[int]activeDerp)
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
} }
derpSrv := c.derps.ServerByID(addr.Port) derpSrv := c.derps.ServerByID(addr.Port)
if derpSrv == nil || derpSrv.HostHTTPS == "" { if derpSrv == nil || derpSrv.HostHTTPS == "" {
@ -725,7 +728,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
dc.DNSCache = dnscache.Get() dc.DNSCache = dnscache.Get()
dc.TLSConfig = c.derpTLSConfig dc.TLSConfig = c.derpTLSConfig
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(c.connCtx)
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop) ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop)
ad.c = dc ad.c = dc
@ -734,8 +737,20 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
ad.lastWrite = new(time.Time) ad.lastWrite = new(time.Time)
c.activeDerp[addr.Port] = ad c.activeDerp[addr.Port] = ad
go c.runDerpReader(ctx, addr, dc) // Build a startGate for the derp reader+writer
go c.runDerpWriter(ctx, addr, dc, ch) // goroutines, so they don't start running until any
// previous generation is closed.
startGate := syncs.ClosedChan()
if prev := c.prevDerp[addr.Port]; prev != nil {
startGate = prev.DoneChan()
}
// And register a WaitGroup(Chan) for this generation.
wg := syncs.NewWaitGroupChan()
wg.Add(2)
c.prevDerp[addr.Port] = wg
go c.runDerpReader(ctx, addr, dc, wg, startGate)
go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate)
} }
*ad.lastWrite = time.Now() *ad.lastWrite = time.Now()
return ad.writeCh return ad.writeCh
@ -762,7 +777,16 @@ var logDerpVerbose, _ = strconv.ParseBool(os.Getenv("DEBUG_DERP_VERBOSE"))
// runDerpReader runs in a goroutine for the life of a DERP // runDerpReader runs in a goroutine for the life of a DERP
// connection, handling received packets. // connection, handling received packets.
func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client) { func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
defer wg.Decr()
defer dc.Close()
select {
case <-startGate:
case <-ctx.Done():
return
}
didCopy := make(chan struct{}, 1) didCopy := make(chan struct{}, 1)
var buf [derp.MaxPacketSize]byte var buf [derp.MaxPacketSize]byte
@ -781,8 +805,6 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc
} }
if err != nil { if err != nil {
select { select {
case <-c.donec():
return
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
@ -805,7 +827,7 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc
continue continue
} }
select { select {
case <-c.donec(): case <-ctx.Done():
return return
case c.derpRecvCh <- res: case c.derpRecvCh <- res:
<-didCopy <-didCopy
@ -822,12 +844,17 @@ type derpWriteRequest struct {
// runDerpWriter runs in a goroutine for the life of a DERP // runDerpWriter runs in a goroutine for the life of a DERP
// connection, handling received packets. // connection, handling received packets.
func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, ch <-chan derpWriteRequest) { func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) {
for { defer wg.Decr()
select { select {
case <-startGate:
case <-ctx.Done(): case <-ctx.Done():
return return
case <-c.donec(): }
for {
select {
case <-ctx.Done():
return return
case wr := <-ch: case wr := <-ch:
err := dc.Send(wr.pubKey, wr.b) err := dc.Send(wr.pubKey, wr.b)
@ -836,7 +863,7 @@ func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc
} }
select { select {
case wr.errc <- err: case wr.errc <- err:
case <-c.donec(): case <-c.donec(): // Note: not ctx.Done: sendAddr (receiver) has different lifetime than ctx.
return return
} }
} }

Loading…
Cancel
Save