@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"testing"
"testing/synctest"
"time"
"github.com/creachadair/taskgroup"
@ -64,6 +65,55 @@ func TestBus(t *testing.T) {
}
}
func TestSubscriberFunc ( t * testing . T ) {
synctest . Test ( t , func ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
c := b . Client ( "TestClient" )
exp := expectEvents ( t , EventA { 12345 } )
eventbus . SubscribeFunc [ EventA ] ( c , func ( e EventA ) { exp . Got ( e ) } )
p := eventbus . Publish [ EventA ] ( c )
p . Publish ( EventA { 12345 } )
synctest . Wait ( )
c . Close ( )
if ! exp . Empty ( ) {
t . Errorf ( "unexpected extra events: %+v" , exp . want )
}
} )
t . Run ( "SubscriberPublishes" , func ( t * testing . T ) {
synctest . Test ( t , func ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
c := b . Client ( "TestClient" )
pa := eventbus . Publish [ EventA ] ( c )
pb := eventbus . Publish [ EventB ] ( c )
exp := expectEvents ( t , EventA { 127 } , EventB { 128 } )
eventbus . SubscribeFunc [ EventA ] ( c , func ( e EventA ) {
exp . Got ( e )
pb . Publish ( EventB { Counter : e . Counter + 1 } )
} )
eventbus . SubscribeFunc [ EventB ] ( c , func ( e EventB ) {
exp . Got ( e )
} )
pa . Publish ( EventA { 127 } )
synctest . Wait ( )
c . Close ( )
if ! exp . Empty ( ) {
t . Errorf ( "unepxected extra events: %+v" , exp . want )
}
} )
} )
}
func TestBusMultipleConsumers ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
@ -111,80 +161,149 @@ func TestBusMultipleConsumers(t *testing.T) {
}
}
func TestSpam ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
func TestClientMixedSubscribers ( t * testing . T ) {
synctest . Test ( t , func ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
c := b . Client ( "TestClient" )
var gotA EventA
s1 := eventbus . Subscribe [ EventA ] ( c )
const (
publishers = 100
eventsPerPublisher = 20
wantEvents = publishers * eventsPerPublisher
subscribers = 100
)
var g taskgroup . Group
received := make ( [ ] [ ] EventA , subscribers )
for i := range subscribers {
c := b . Client ( fmt . Sprintf ( "Subscriber%d" , i ) )
defer c . Close ( )
s := eventbus . Subscribe [ EventA ] ( c )
g . Go ( func ( ) error {
for range wantEvents {
var gotB EventB
eventbus . SubscribeFunc [ EventB ] ( c , func ( e EventB ) {
t . Logf ( "func sub received %[1]T %+[1]v" , e )
gotB = e
} )
go func ( ) {
for {
select {
case evt := <- s . Events ( ) :
received [ i ] = append ( received [ i ] , evt )
case <- s . Done ( ) :
t . Errorf ( "queue done before expected number of events received" )
return errors . New ( "queue prematurely closed" )
case <- time . After ( 5 * time . Second ) :
t . Errorf ( "timed out waiting for expected bus event after %d events" , len ( received [ i ] ) )
return errors . New ( "timeout" )
case <- s1 . Done ( ) :
return
case e := <- s1 . Events ( ) :
t . Logf ( "chan sub received %[1]T %+[1]v" , e )
gotA = e
}
}
return nil
} )
}
} ( )
p1 := eventbus . Publish [ EventA ] ( c )
p2 := eventbus . Publish [ EventB ] ( c )
go p1 . Publish ( EventA { 12345 } )
go p2 . Publish ( EventB { 67890 } )
published := make ( [ ] [ ] EventA , publishers )
for i := range publishers {
g . Run ( func ( ) {
synctest . Wait ( )
c . Close ( )
synctest . Wait ( )
if diff := cmp . Diff ( gotB , EventB { 67890 } ) ; diff != "" {
t . Errorf ( "Chan sub (-got, +want):\n%s" , diff )
}
if diff := cmp . Diff ( gotA , EventA { 12345 } ) ; diff != "" {
t . Errorf ( "Func sub (-got, +want):\n%s" , diff )
}
} )
}
func TestSpam ( t * testing . T ) {
synctest . Test ( t , func ( t * testing . T ) {
b := eventbus . New ( )
defer b . Close ( )
const (
publishers = 100
eventsPerPublisher = 20
wantEvents = publishers * eventsPerPublisher
subscribers = 100
)
var g taskgroup . Group
// A bunch of subscribers receiving on channels.
chanReceived := make ( [ ] [ ] EventA , subscribers )
for i := range subscribers {
c := b . Client ( fmt . Sprintf ( "Subscriber%d" , i ) )
defer c . Close ( )
s := eventbus . Subscribe [ EventA ] ( c )
g . Go ( func ( ) error {
for range wantEvents {
select {
case evt := <- s . Events ( ) :
chanReceived [ i ] = append ( chanReceived [ i ] , evt )
case <- s . Done ( ) :
t . Errorf ( "queue done before expected number of events received" )
return errors . New ( "queue prematurely closed" )
case <- time . After ( 5 * time . Second ) :
t . Logf ( "timed out waiting for expected bus event after %d events" , len ( chanReceived [ i ] ) )
return errors . New ( "timeout" )
}
}
return nil
} )
}
// A bunch of subscribers receiving via a func.
funcReceived := make ( [ ] [ ] EventA , subscribers )
for i := range subscribers {
c := b . Client ( fmt . Sprintf ( "SubscriberFunc%d" , i ) )
defer c . Close ( )
eventbus . SubscribeFunc ( c , func ( e EventA ) {
funcReceived [ i ] = append ( funcReceived [ i ] , e )
} )
}
published := make ( [ ] [ ] EventA , publishers )
for i := range publishers {
c := b . Client ( fmt . Sprintf ( "Publisher%d" , i ) )
p := eventbus . Publish [ EventA ] ( c )
for j := range eventsPerPublisher {
evt := EventA { i * eventsPerPublisher + j }
p . Publish ( evt )
published [ i ] = append ( published [ i ] , evt )
}
} )
}
g . Run ( func ( ) {
defer c . Close ( )
for j := range eventsPerPublisher {
evt := EventA { i * eventsPerPublisher + j }
p . Publish ( evt )
published [ i ] = append ( published [ i ] , evt )
}
} )
}
if err := g . Wait ( ) ; err != nil {
t . Fatal ( err )
}
var last [ ] EventA
for i , got := range received {
if len ( got ) != wantEvents {
// Receiving goroutine already reported an error, we just need
// to fail early within the main test goroutine.
t . FailNow ( )
if err := g . Wait ( ) ; err != nil {
t . Fatal ( err )
}
if last == nil {
continue
synctest . Wait ( )
tests := [ ] struct {
name string
recv [ ] [ ] EventA
} {
{ "Subscriber" , chanReceived } ,
{ "SubscriberFunc" , funcReceived } ,
}
if diff := cmp . Diff ( got , last ) ; diff != "" {
t . Errorf ( "Subscriber %d did not see the same events as %d (-got+want):\n%s" , i , i - 1 , diff )
for _ , tc := range tests {
for i , got := range tc . recv {
if len ( got ) != wantEvents {
t . Errorf ( "%s %d: got %d events, want %d" , tc . name , i , len ( got ) , wantEvents )
}
if i == 0 {
continue
}
if diff := cmp . Diff ( got , tc . recv [ i - 1 ] ) ; diff != "" {
t . Errorf ( "%s %d did not see the same events as %d (-got+want):\n%s" , tc . name , i , i - 1 , diff )
}
}
}
last = got
}
for i , sent := range published {
if got := len ( sent ) ; got != eventsPerPublisher {
t . Fatalf ( "Publisher %d sent %d events, want %d" , i , got , eventsPerPublisher )
for i , sent := range published {
if got := len ( sent ) ; got != eventsPerPublisher {
t . Fatalf ( "Publisher %d sent %d events, want %d" , i , got , eventsPerPublisher )
}
}
}
// TODO: check that the published sequences are proper
// subsequences of the received slices.
// TODO: check that the published sequences are proper
// subsequences of the received slices.
} )
}
func TestClient_Done ( t * testing . T ) {
@ -366,10 +485,12 @@ func expectEvents(t *testing.T, want ...any) *queueChecker {
func ( q * queueChecker ) Got ( v any ) {
q . t . Helper ( )
if q . Empty ( ) {
q . t . Fatalf ( "queue got unexpected %v" , v )
q . t . Errorf ( "queue got unexpected %v" , v )
return
}
if v != q . want [ 0 ] {
q . t . Fatalf ( "queue got %#v, want %#v" , v , q . want [ 0 ] )
q . t . Errorf ( "queue got %#v, want %#v" , v , q . want [ 0 ] )
return
}
q . want = q . want [ 1 : ]
}