@ -123,9 +123,7 @@ type Auto struct {
observerQueue execqueue . ExecQueue
shutdownFn func ( ) // to be called prior to shutdown or nil
eventClient * eventbus . Client
healthChangeSub * eventbus . Subscriber [ health . Change ]
subsDoneCh chan struct { } // close-only channel when eventClient has closed
eventSubs eventbus . Monitor
mu sync . Mutex // mutex guards the following fields
@ -195,11 +193,11 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
updateDone : make ( chan struct { } ) ,
observer : opts . Observer ,
shutdownFn : opts . Shutdown ,
subsDoneCh : make ( chan struct { } ) ,
}
c . eventClient = opts . Bus . Client ( "controlClient.Auto" )
c . healthChangeSub = eventbus . Subscribe [ health . Change ] ( c . eventClient )
// Set up eventbus client and subscriber
ec := opts . Bus . Client ( "controlClient.Auto" )
c . eventSubs = ec . Monitor ( c . consumeEventbusTopics ( ec ) )
c . authCtx , c . authCancel = context . WithCancel ( context . Background ( ) )
c . authCtx = sockstats . WithSockStats ( c . authCtx , sockstats . LabelControlClientAuto , opts . Logf )
@ -207,7 +205,6 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
c . mapCtx , c . mapCancel = context . WithCancel ( context . Background ( ) )
c . mapCtx = sockstats . WithSockStats ( c . mapCtx , sockstats . LabelControlClientAuto , opts . Logf )
go c . consumeEventbusTopics ( )
return c , nil
}
@ -216,16 +213,17 @@ func NewNoStart(opts Options) (_ *Auto, err error) {
// always handled in the order they are received, i.e. the next event is not
// read until the previous event's handler has returned. It returns when the
// [eventbus.Client] is closed.
func ( c * Auto ) consumeEventbusTopics ( ) {
defer close ( c . subsDoneCh )
for {
select {
case <- c . eventClient . Done ( ) :
return
case change := <- c . healthChangeSub . Events ( ) :
if change . WarnableChanged {
c . direct . ReportWarnableChange ( change . Warnable , change . UnhealthyState )
func ( c * Auto ) consumeEventbusTopics ( ec * eventbus . Client ) func ( * eventbus . Client ) {
healthChangeSub := eventbus . Subscribe [ health . Change ] ( ec )
return func ( cli * eventbus . Client ) {
for {
select {
case <- cli . Done ( ) :
return
case change := <- healthChangeSub . Events ( ) :
if change . WarnableChanged {
c . direct . ReportWarnableChange ( change . Warnable , change . UnhealthyState )
}
}
}
}
@ -784,8 +782,7 @@ func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
}
func ( c * Auto ) Shutdown ( ) {
c . eventClient . Close ( )
<- c . subsDoneCh
c . eventSubs . Close ( )
c . mu . Lock ( )
if c . closed {