|
|
|
@ -158,7 +158,7 @@ func (q *subscribeState) subscriberFor(val any) subscriber {
|
|
|
|
return q.outputs[reflect.TypeOf(val)]
|
|
|
|
return q.outputs[reflect.TypeOf(val)]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close closes the subscribeState. Implicitly closes all Subscribers
|
|
|
|
// Close closes the subscribeState. It implicitly closes all Subscribers
|
|
|
|
// linked to this state, and any pending events are discarded.
|
|
|
|
// linked to this state, and any pending events are discarded.
|
|
|
|
func (s *subscribeState) close() {
|
|
|
|
func (s *subscribeState) close() {
|
|
|
|
s.dispatcher.StopAndWait()
|
|
|
|
s.dispatcher.StopAndWait()
|
|
|
|
@ -244,6 +244,10 @@ func (s *Subscriber[T]) Done() <-chan struct{} {
|
|
|
|
// Close closes the Subscriber, indicating the caller no longer wishes
|
|
|
|
// Close closes the Subscriber, indicating the caller no longer wishes
|
|
|
|
// to receive this event type. After Close, receives on
|
|
|
|
// to receive this event type. After Close, receives on
|
|
|
|
// [Subscriber.Events] block for ever.
|
|
|
|
// [Subscriber.Events] block for ever.
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// If the Bus from which the Subscriber was created is closed,
|
|
|
|
|
|
|
|
// the Subscriber is implicitly closed and does not need to be closed
|
|
|
|
|
|
|
|
// separately.
|
|
|
|
func (s *Subscriber[T]) Close() {
|
|
|
|
func (s *Subscriber[T]) Close() {
|
|
|
|
s.stop.Stop() // unblock receivers
|
|
|
|
s.stop.Stop() // unblock receivers
|
|
|
|
s.unregister()
|
|
|
|
s.unregister()
|
|
|
|
|