|
|
|
@ -245,6 +245,9 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
|
|
|
|
case <-s.slow.C:
|
|
|
|
case <-s.slow.C:
|
|
|
|
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
|
|
|
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
|
|
|
s.slow.Reset(slowSubscriberTimeout)
|
|
|
|
s.slow.Reset(slowSubscriberTimeout)
|
|
|
|
|
|
|
|
all := make([]byte, 2<<20)
|
|
|
|
|
|
|
|
n := runtime.Stack(all, true)
|
|
|
|
|
|
|
|
panic("TOO SLOW: " + string(all[:n]))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -344,6 +347,9 @@ func (s *SubscriberFunc[T]) dispatch(ctx context.Context, vals *queue[DeliveredE
|
|
|
|
case <-s.slow.C:
|
|
|
|
case <-s.slow.C:
|
|
|
|
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
|
|
|
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
|
|
|
|
s.slow.Reset(slowSubscriberTimeout)
|
|
|
|
s.slow.Reset(slowSubscriberTimeout)
|
|
|
|
|
|
|
|
all := make([]byte, 2<<20)
|
|
|
|
|
|
|
|
n := runtime.Stack(all, true)
|
|
|
|
|
|
|
|
panic("TOO SLOW: " + string(all[:n]))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|