diff --git a/feature/relayserver/relayserver.go b/feature/relayserver/relayserver.go index 24304e8ec..d77d7145a 100644 --- a/feature/relayserver/relayserver.go +++ b/feature/relayserver/relayserver.go @@ -157,9 +157,7 @@ func (e *extension) consumeEventbusTopics(port int) { select { case <-e.disconnectFromBusCh: return - case <-reqSub.Done(): - // If reqSub is done, the eventClient has been closed, which is a - // signal to return. + case <-eventClient.Done(): return case req := <-reqSub.Events(): if rs == nil { diff --git a/ipn/ipnlocal/expiry.go b/ipn/ipnlocal/expiry.go index 3d20d57b4..9427f0738 100644 --- a/ipn/ipnlocal/expiry.go +++ b/ipn/ipnlocal/expiry.go @@ -68,15 +68,13 @@ func newExpiryManager(logf logger.Logf, bus *eventbus.Bus) *expiryManager { // [eventbus.Subscriber]'s and passes them to their related handler. Events are // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the -// [controlclient.ControlTime] subscriber is closed, which is interpreted to be the -// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either -// all open or all closed). +// [eventbus.Client] is closed. func (em *expiryManager) consumeEventbusTopics() { defer close(em.subsDoneCh) for { select { - case <-em.controlTimeSub.Done(): + case <-em.eventClient.Done(): return case time := <-em.controlTimeSub.Events(): em.onControlTime(time.Value) diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 4c27bea45..5cdfaf549 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -619,18 +619,13 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo // [eventbus.Subscriber]'s and passes them to their related handler. Events are // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the -// [tailcfg.ClientVersion] subscriber is closed, which is interpreted to be the -// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either -// all open or all closed). +// [eventbus.Client] is closed. func (b *LocalBackend) consumeEventbusTopics() { defer close(b.subsDoneCh) for { select { - // TODO(cmol): Move to using b.eventClient.Done() once implemented. - // In the meantime, we rely on the subs not going away until the client is - // closed, closing all its subscribers. - case <-b.clientVersionSub.Done(): + case <-b.eventClient.Done(): return case clientVersion := <-b.clientVersionSub.Events(): b.onClientVersion(&clientVersion) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 36402122c..719cc68a4 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -640,15 +640,13 @@ func newConn(logf logger.Logf) *Conn { // [eventbus.Subscriber]'s and passes them to their related handler. Events are // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the -// [portmapper.Mapping] subscriber is closed, which is interpreted to be the -// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either -// all open or all closed). +// [eventbus.Client] is closed. func (c *Conn) consumeEventbusTopics() { defer close(c.subsDoneCh) for { select { - case <-c.pmSub.Done(): + case <-c.eventClient.Done(): return case <-c.pmSub.Events(): c.onPortMapChanged() diff --git a/wgengine/router/router_linux.go b/wgengine/router/router_linux.go index 2382e87cd..a9edd7f96 100644 --- a/wgengine/router/router_linux.go +++ b/wgengine/router/router_linux.go @@ -158,13 +158,11 @@ func newUserspaceRouterAdvanced(logf logger.Logf, tunname string, netMon *netmon // [eventbus.Subscriber]'s and passes them to their related handler. Events are // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the -// [portmapper.Mapping] subscriber is closed, which is interpreted to be the -// same as the [eventbus.Client] closing ([eventbus.Subscribers] are either -// all open or all closed). +// [eventbus.Client] is closed. func (r *linuxRouter) consumeEventbusTopics() { for { select { - case <-r.ruleDeletedSub.Done(): + case <-r.eventClient.Done(): return case rulesDeleted := <-r.ruleDeletedSub.Events(): r.onIPRuleDeleted(rulesDeleted.Table, rulesDeleted.Priority)