@ -138,6 +138,7 @@ type Conn struct {
// struct. Initialized once at construction, then constant.
eventBus * eventbus . Bus
eventClient * eventbus . Client
logf logger . Logf
epFunc func ( [ ] tailcfg . Endpoint )
derpActiveFunc func ( )
@ -547,6 +548,31 @@ func NewConn(opts Options) (*Conn, error) {
c . testOnlyPacketListener = opts . TestOnlyPacketListener
c . noteRecvActivity = opts . NoteRecvActivity
// If an event bus is enabled, subscribe to portmapping changes; otherwise
// use the callback mechanism of portmapper.Client.
//
// TODO(creachadair): Remove the switch once the event bus is mandatory.
onPortMapChanged := c . onPortMapChanged
if c . eventBus != nil {
c . eventClient = c . eventBus . Client ( "magicsock.Conn" )
pmSub := eventbus . Subscribe [ portmapper . Mapping ] ( c . eventClient )
go func ( ) {
defer pmSub . Close ( )
for {
select {
case <- pmSub . Events ( ) :
c . onPortMapChanged ( )
case <- pmSub . Done ( ) :
return
}
}
} ( )
// Disable the explicit callback from the portmapper, the subscriber handles it.
onPortMapChanged = nil
}
// Don't log the same log messages possibly every few seconds in our
// portmapper.
portmapperLogf := logger . WithPrefix ( c . logf , "portmapper: " )
@ -560,7 +586,7 @@ func NewConn(opts Options) (*Conn, error) {
NetMon : opts . NetMon ,
DebugKnobs : portMapOpts ,
ControlKnobs : opts . ControlKnobs ,
OnChange : c. onPortMapChanged,
OnChange : onPortMapChanged,
} )
c . portMapper . SetGatewayLookupFunc ( opts . NetMon . GatewayAndSelfIP )
c . netMon = opts . NetMon
@ -2478,6 +2504,9 @@ func (c *connBind) Close() error {
if c . closeDisco6 != nil {
c . closeDisco6 . Close ( )
}
if c . eventClient != nil {
c . eventClient . Close ( )
}
// Send an empty read result to unblock receiveDERP,
// which will then check connBind.Closed.
// connBind.Closed takes c.mu, but c.derpRecvCh is buffered.