wgengine/magicsock: return early, outdent in derpWriteChanOfAddr

reviewable/pr209/r2
Brad Fitzpatrick 5 years ago
parent 8c4cef60f8
commit b6f77cc48d

@ -688,6 +688,8 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
if !addr.IP.Equal(derpMagicIP) { if !addr.IP.Equal(derpMagicIP) {
return nil return nil
} }
nodeID := addr.Port
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if !c.wantDerp || c.closed { if !c.wantDerp || c.closed {
@ -697,52 +699,57 @@ func (c *Conn) derpWriteChanOfAddr(addr *net.UDPAddr) chan<- derpWriteRequest {
c.logf("DERP lookup of %v with no private key; ignoring", addr) c.logf("DERP lookup of %v with no private key; ignoring", addr)
return nil return nil
} }
ad, ok := c.activeDerp[addr.Port]
if !ok {
if c.activeDerp == nil {
c.activeDerp = make(map[int]activeDerp)
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
}
derpSrv := c.derps.ServerByID(addr.Port)
if derpSrv == nil || derpSrv.HostHTTPS == "" {
return nil
}
// Note that derphttp.NewClient does not dial the server ad, ok := c.activeDerp[nodeID]
// so it is safe to do under the mu lock. if ok {
dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf) *ad.lastWrite = time.Now()
if err != nil { return ad.writeCh
c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", addr.Port, derpSrv.HostHTTPS, err) }
return nil
}
dc.NotePreferred(c.myDerp == addr.Port)
dc.DNSCache = dnscache.Get()
dc.TLSConfig = c.derpTLSConfig
ctx, cancel := context.WithCancel(c.connCtx)
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop)
ad.c = dc
ad.writeCh = ch
ad.cancel = cancel
ad.lastWrite = new(time.Time)
c.activeDerp[addr.Port] = ad
// Build a startGate for the derp reader+writer
// goroutines, so they don't start running until any
// previous generation is closed.
startGate := syncs.ClosedChan()
if prev := c.prevDerp[addr.Port]; prev != nil {
startGate = prev.DoneChan()
}
// And register a WaitGroup(Chan) for this generation.
wg := syncs.NewWaitGroupChan()
wg.Add(2)
c.prevDerp[addr.Port] = wg
go c.runDerpReader(ctx, addr, dc, wg, startGate) if c.activeDerp == nil {
go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate) c.activeDerp = make(map[int]activeDerp)
c.prevDerp = make(map[int]*syncs.WaitGroupChan)
}
derpSrv := c.derps.ServerByID(nodeID)
if derpSrv == nil || derpSrv.HostHTTPS == "" {
return nil
} }
// Note that derphttp.NewClient does not dial the server
// so it is safe to do under the mu lock.
dc, err := derphttp.NewClient(c.privateKey, "https://"+derpSrv.HostHTTPS+"/derp", c.logf)
if err != nil {
c.logf("derphttp.NewClient: port %d, host %q invalid? err: %v", nodeID, derpSrv.HostHTTPS, err)
return nil
}
dc.NotePreferred(c.myDerp == nodeID)
dc.DNSCache = dnscache.Get()
dc.TLSConfig = c.derpTLSConfig
ctx, cancel := context.WithCancel(c.connCtx)
ch := make(chan derpWriteRequest, bufferedDerpWritesBeforeDrop)
ad.c = dc
ad.writeCh = ch
ad.cancel = cancel
ad.lastWrite = new(time.Time)
c.activeDerp[nodeID] = ad
// Build a startGate for the derp reader+writer
// goroutines, so they don't start running until any
// previous generation is closed.
startGate := syncs.ClosedChan()
if prev := c.prevDerp[nodeID]; prev != nil {
startGate = prev.DoneChan()
}
// And register a WaitGroup(Chan) for this generation.
wg := syncs.NewWaitGroupChan()
wg.Add(2)
c.prevDerp[nodeID] = wg
go c.runDerpReader(ctx, addr, dc, wg, startGate)
go c.runDerpWriter(ctx, addr, dc, ch, wg, startGate)
*ad.lastWrite = time.Now() *ad.lastWrite = time.Now()
return ad.writeCh return ad.writeCh
} }

Loading…
Cancel
Save