From b6f77cc48d624ad0e0dc6fd385627321773bc637 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 22 Mar 2020 14:08:59 -0700 Subject: [PATCH] wgengine/magicsock: return early, outdent in derpWriteChanOfAddr --- wgengine/magicsock/magicsock.go | 93 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 77b9b8cfa..7536d7946 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -688,6 +688,8 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { if !addr.IP.Equal(derpMagicIP) { return nil } + nodeID := addr.Port + c.mu.Lock() defer c.mu.Unlock() if !c.wantDerp || c.closed { @@ -697,52 +699,57 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest { c.logf("DERP lookup of %v with no private key; ignoring", addr) return nil } - ad, ok := c.activeDerp[addr.Port] - if !ok { - if c.activeDerp == nil { - c.activeDerp = make(map[int]activeDerp) - c.prevDerp = make(map[int]*syncs.WaitGroupChan) - } - derpSrv := c.derps.ServerByID(addr.Port) - if derpSrv == nil || derpSrv.HostHTTPS == "" { - return nil - } - // Note that derphttp.NewClient does not dial the server - // so it is safe to do under the mu lock. - dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf) - if err != nil { - c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", addr.Port, derpSrv.HostHTTPS, err) - return nil - } - dc.NotePreferred(c.myDerp == addr.Port) - dc.DNSCache = dnscache.Get() - dc.TLSConfig = c.derpTLSConfig - - ctx, cancel := context.WithCancel(c.connCtx) - ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop) - - ad.c = dc - ad.writeCh = ch - ad.cancel = cancel - ad.lastWrite = new(time.Time) - c.activeDerp[addr.Port] = ad - - // Build a startGate for the derp reader+writer - // goroutines, so they don't start running until any - // previous generation is closed. - startGate := syncs.ClosedChan() - if prev := c.prevDerp[addr.Port]; prev != nil { - startGate = prev.DoneChan() - } - // And register a WaitGroup(Chan) for this generation. - wg := syncs.NewWaitGroupChan() - wg.Add(2) - c.prevDerp[addr.Port] = wg + ad, ok := c.activeDerp[nodeID] + if ok { + *ad.lastWrite = time.Now() + return ad.writeCh + } - go c.runDerpReader(ctx, addr, dc, wg, startGate) - go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate) + if c.activeDerp == nil { + c.activeDerp = make(map[int]activeDerp) + c.prevDerp = make(map[int]*syncs.WaitGroupChan) + } + derpSrv := c.derps.ServerByID(nodeID) + if derpSrv == nil || derpSrv.HostHTTPS == "" { + return nil } + + // Note that derphttp.NewClient does not dial the server + // so it is safe to do under the mu lock. + dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf) + if err != nil { + c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err) + return nil + } + dc.NotePreferred(c.myDerp == nodeID) + dc.DNSCache = dnscache.Get() + dc.TLSConfig = c.derpTLSConfig + + ctx, cancel := context.WithCancel(c.connCtx) + ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop) + + ad.c = dc + ad.writeCh = ch + ad.cancel = cancel + ad.lastWrite = new(time.Time) + c.activeDerp[nodeID] = ad + + // Build a startGate for the derp reader+writer + // goroutines, so they don't start running until any + // previous generation is closed. + startGate := syncs.ClosedChan() + if prev := c.prevDerp[nodeID]; prev != nil { + startGate = prev.DoneChan() + } + // And register a WaitGroup(Chan) for this generation. + wg := syncs.NewWaitGroupChan() + wg.Add(2) + c.prevDerp[nodeID] = wg + + go c.runDerpReader(ctx, addr, dc, wg, startGate) + go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate) + *ad.lastWrite = time.Now() return ad.writeCh }