From 8807913be914d039d68b764927ca876c9abe1a70 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Thu, 12 Mar 2020 11:16:54 -0700 Subject: [PATCH] wgengine/magicsock: wait for previous DERP goroutines to end before new ones Updates #109 (hopefully fixes, will wait for graphs to be happy) Signed-off-by: Brad Fitzpatrick --- wgengine/magicsock/magicsock.go | 51 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index a19c06d34..6f0ede80b 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -35,6 +35,7 @@ import ( "tailscale.com/net/interfaces" "tailscale.com/netcheck" "tailscale.com/stun" + "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/types/key" "tailscale.com/types/logger" @@ -97,6 +98,7 @@ type Conn struct { privateKey key.Private myDerp int // nearest DERP server; 0 means none/unknown activeDerp map[int]activeDerp + prevDerp map[int]*syncs.WaitGroupChan derpTLSConfig *tls.Config // normally nil; used by tests } @@ -686,7 +688,7 @@ func (c *Conn) sendAddr(addr *net.UDPAddr, pubKey key.Public, b []byte) error { // dropping. // // TODO: this is currently arbitrary. Figure out something better? -const bufferedDerpWritesBeforeDrop = 4 +const bufferedDerpWritesBeforeDrop = 32 // derpWriteChanOfAddr returns a DERP client for fake UDP addresses that // represent DERP servers, creating them as necessary. For real UDP @@ -708,6 +710,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { if !ok { if c.activeDerp == nil { c.activeDerp = make(map[int]activeDerp) + c.prevDerp = make(map[int]*syncs.WaitGroupChan) } derpSrv := c.derps.ServerByID(addr.Port) if derpSrv == nil || derpSrv.HostHTTPS == "" { @@ -725,7 +728,7 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { dc.DNSCache = dnscache.Get() dc.TLSConfig = c.derpTLSConfig - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(c.connCtx) ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop) ad.c = dc @@ -734,8 +737,20 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { ad.lastWrite = new(time.Time) c.activeDerp[addr.Port] = ad - go c.runDerpReader(ctx, addr, dc) - go c.runDerpWriter(ctx, addr, dc, ch) + // Build a startGate for the derp reader+writer + // goroutines, so they don't start running until any + // previous generation is closed. + startGate := syncs.ClosedChan() + if prev := c.prevDerp[addr.Port]; prev != nil { + startGate = prev.DoneChan() + } + // And register a WaitGroup(Chan) for this generation. + wg := syncs.NewWaitGroupChan() + wg.Add(2) + c.prevDerp[addr.Port] = wg + + go c.runDerpReader(ctx, addr, dc, wg, startGate) + go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate) } *ad.lastWrite = time.Now() return ad.writeCh @@ -762,7 +777,16 @@ var logDerpVerbose, _ = strconv.ParseBool(os.Getenv("DEBUG_DERP_VERBOSE")) // runDerpReader runs in a goroutine for the life of a DERP // connection, handling received packets. -func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client) { +func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { + defer wg.Decr() + defer dc.Close() + + select { + case <-startGate: + case <-ctx.Done(): + return + } + didCopy := make(chan struct{}, 1) var buf [derp.MaxPacketSize]byte @@ -781,8 +805,6 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc } if err != nil { select { - case <-c.donec(): - return case <-ctx.Done(): return default: @@ -805,7 +827,7 @@ func (c *Conn) runDerpReader(ctx context.Context, derpFakeAddr *net.UDPAddr, dc continue } select { - case <-c.donec(): + case <-ctx.Done(): return case c.derpRecvCh <- res: <-didCopy @@ -822,13 +844,18 @@ type derpWriteRequest struct { // runDerpWriter runs in a goroutine for the life of a DERP // connection, handling received packets. -func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, ch <-chan derpWriteRequest) { +func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc *derphttp.Client, ch <-chan derpWriteRequest, wg *syncs.WaitGroupChan, startGate <-chan struct{}) { + defer wg.Decr() + select { + case <-startGate: + case <-ctx.Done(): + return + } + for { select { case <-ctx.Done(): return - case <-c.donec(): - return case wr := <-ch: err := dc.Send(wr.pubKey, wr.b) if err != nil { @@ -836,7 +863,7 @@ func (c *Conn) runDerpWriter(ctx context.Context, derpFakeAddr *net.UDPAddr, dc } select { case wr.errc <- err: - case <-c.donec(): + case <-c.donec(): // Note: not ctx.Done: sendAddr (receiver) has different lifetime than ctx. return } }