diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 6eb566076..39a7bb2e6 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -156,7 +156,7 @@ type Conn struct { // struct. Initialized once at construction, then constant. eventBus *eventbus.Bus - eventClient *eventbus.Client + eventSubs eventbus.Monitor logf logger.Logf epFunc func([]tailcfg.Endpoint) derpActiveFunc func() @@ -176,17 +176,10 @@ type Conn struct { connCtxCancel func() // closes connCtx donec <-chan struct{} // connCtx.Done()'s to avoid context.cancelCtx.Done()'s mutex per call - // These [eventbus.Subscriber] fields are solely accessed by - // consumeEventbusTopics once initialized. - pmSub *eventbus.Subscriber[portmappertype.Mapping] - filterSub *eventbus.Subscriber[FilterUpdate] - nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate] - nodeMutsSub *eventbus.Subscriber[NodeMutationsUpdate] - syncSub *eventbus.Subscriber[syncPoint] + // A publisher for synchronization points to ensure correct ordering of + // config changes between magicsock and wireguard. syncPub *eventbus.Publisher[syncPoint] allocRelayEndpointPub *eventbus.Publisher[UDPRelayAllocReq] - allocRelayEndpointSub *eventbus.Subscriber[UDPRelayAllocResp] - subsDoneCh chan struct{} // closed when consumeEventbusTopics returns // pconn4 and pconn6 are the underlying UDP sockets used to // send/receive packets for wireguard and other magicsock @@ -643,26 +636,34 @@ func newConn(logf logger.Logf) *Conn { // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the // [eventbus.Client] is closed. -func (c *Conn) consumeEventbusTopics() { - defer close(c.subsDoneCh) - - for { - select { - case <-c.eventClient.Done(): - return - case <-c.pmSub.Events(): - c.onPortMapChanged() - case filterUpdate := <-c.filterSub.Events(): - c.onFilterUpdate(filterUpdate) - case nodeViews := <-c.nodeViewsSub.Events(): - c.onNodeViewsUpdate(nodeViews) - case nodeMuts := <-c.nodeMutsSub.Events(): - c.onNodeMutationsUpdate(nodeMuts) - case syncPoint := <-c.syncSub.Events(): - c.dlogf("magicsock: received sync point after reconfig") - syncPoint.Signal() - case allocResp := <-c.allocRelayEndpointSub.Events(): - c.onUDPRelayAllocResp(allocResp) +func (c *Conn) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { + // Subscribe calls must return before NewConn otherwise published + // events can be missed. + pmSub := eventbus.Subscribe[portmappertype.Mapping](cli) + filterSub := eventbus.Subscribe[FilterUpdate](cli) + nodeViewsSub := eventbus.Subscribe[NodeViewsUpdate](cli) + nodeMutsSub := eventbus.Subscribe[NodeMutationsUpdate](cli) + syncSub := eventbus.Subscribe[syncPoint](cli) + allocRelayEndpointSub := eventbus.Subscribe[UDPRelayAllocResp](cli) + return func(cli *eventbus.Client) { + for { + select { + case <-cli.Done(): + return + case <-pmSub.Events(): + c.onPortMapChanged() + case filterUpdate := <-filterSub.Events(): + c.onFilterUpdate(filterUpdate) + case nodeViews := <-nodeViewsSub.Events(): + c.onNodeViewsUpdate(nodeViews) + case nodeMuts := <-nodeMutsSub.Events(): + c.onNodeMutationsUpdate(nodeMuts) + case syncPoint := <-syncSub.Events(): + c.dlogf("magicsock: received sync point after reconfig") + syncPoint.Signal() + case allocResp := <-allocRelayEndpointSub.Events(): + c.onUDPRelayAllocResp(allocResp) + } } } } @@ -729,20 +730,12 @@ func NewConn(opts Options) (*Conn, error) { c.testOnlyPacketListener = opts.TestOnlyPacketListener c.noteRecvActivity = opts.NoteRecvActivity - c.eventClient = c.eventBus.Client("magicsock.Conn") - - // Subscribe calls must return before NewConn otherwise published - // events can be missed. - c.pmSub = eventbus.Subscribe[portmappertype.Mapping](c.eventClient) - c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient) - c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient) - c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient) - c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient) - c.syncPub = eventbus.Publish[syncPoint](c.eventClient) - c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](c.eventClient) - c.allocRelayEndpointSub = eventbus.Subscribe[UDPRelayAllocResp](c.eventClient) - c.subsDoneCh = make(chan struct{}) - go c.consumeEventbusTopics() + // Set up publishers and subscribers. Subscribe calls must return before + // NewConn otherwise published events can be missed. + cli := c.eventBus.Client("magicsock.Conn") + c.syncPub = eventbus.Publish[syncPoint](cli) + c.allocRelayEndpointPub = eventbus.Publish[UDPRelayAllocReq](cli) + c.eventSubs = cli.Monitor(c.consumeEventbusTopics(cli)) c.connCtx, c.connCtxCancel = context.WithCancel(context.Background()) c.donec = c.connCtx.Done() @@ -3313,14 +3306,13 @@ func (c *connBind) isClosed() bool { // // Only the first close does anything. Any later closes return nil. func (c *Conn) Close() error { - // Close the [eventbus.Client] and wait for Conn.consumeEventbusTopics to - // return. Do this before acquiring c.mu: + // Close the [eventbus.Client] and wait for c.consumeEventbusTopics to + // return before acquiring c.mu: // 1. Conn.consumeEventbusTopics event handlers also acquire c.mu, they can // deadlock with c.Close(). // 2. Conn.consumeEventbusTopics event handlers may not guard against // undesirable post/in-progress Conn.Close() behaviors. - c.eventClient.Close() - <-c.subsDoneCh + c.eventSubs.Close() c.mu.Lock() defer c.mu.Unlock() diff --git a/wgengine/userspace.go b/wgengine/userspace.go index 42c12c008..86136d977 100644 --- a/wgengine/userspace.go +++ b/wgengine/userspace.go @@ -93,10 +93,8 @@ const networkLoggerUploadTimeout = 5 * time.Second type userspaceEngine struct { // eventBus will eventually become required, but for now may be nil. // TODO(creachadair): Enforce that this is non-nil at construction. - eventBus *eventbus.Bus - eventClient *eventbus.Client - changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] - subsDoneCh chan struct{} // closed when consumeEventbusTopics returns + eventBus *eventbus.Bus + eventSubs eventbus.Monitor logf logger.Logf wgLogger *wglog.Logger // a wireguard-go logging wrapper @@ -354,11 +352,7 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) controlKnobs: conf.ControlKnobs, reconfigureVPN: conf.ReconfigureVPN, health: conf.HealthTracker, - subsDoneCh: make(chan struct{}), } - e.eventClient = e.eventBus.Client("userspaceEngine") - e.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](e.eventClient) - closePool.addFunc(e.eventClient.Close) if e.birdClient != nil { // Disable the protocol at start time. @@ -545,8 +539,8 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) } } - go e.consumeEventbusTopics() - + cli := e.eventBus.Client("userspaceEngine") + e.eventSubs = cli.Monitor(e.consumeEventbusTopics(cli)) e.logf("Engine created.") return e, nil } @@ -556,16 +550,17 @@ func NewUserspaceEngine(logf logger.Logf, conf Config) (_ Engine, reterr error) // always handled in the order they are received, i.e. the next event is not // read until the previous event's handler has returned. It returns when the // [eventbus.Client] is closed. -func (e *userspaceEngine) consumeEventbusTopics() { - defer close(e.subsDoneCh) - - for { - select { - case <-e.eventClient.Done(): - return - case changeDelta := <-e.changeDeltaSub.Events(): - tshttpproxy.InvalidateCache() - e.linkChange(&changeDelta) +func (e *userspaceEngine) consumeEventbusTopics(cli *eventbus.Client) func(*eventbus.Client) { + changeDeltaSub := eventbus.Subscribe[netmon.ChangeDelta](cli) + return func(cli *eventbus.Client) { + for { + select { + case <-cli.Done(): + return + case changeDelta := <-changeDeltaSub.Events(): + tshttpproxy.InvalidateCache() + e.linkChange(&changeDelta) + } } } } @@ -1228,9 +1223,7 @@ func (e *userspaceEngine) RequestStatus() { } func (e *userspaceEngine) Close() { - e.eventClient.Close() - <-e.subsDoneCh - + e.eventSubs.Close() e.mu.Lock() if e.closing { e.mu.Unlock()