-
Notifications
You must be signed in to change notification settings - Fork 180
Open
Labels
t:streamPekko StreamsPekko Streams
Milestone
Description
Motivation:
With the current type Decider = Function[Throwable, Directive]
, we can not know the current element that causes the exception, I think it would be better to be type Decider = Function2[Throwable,Any, Directive]
eg, The Map
operator then will be:
override def onPush(): Unit = {
val current = grab(in)
try {
push(out, f(current))
} catch {
case NonFatal(ex) =>
decider(ex, current) match {
case Supervision.Stop => failStage(ex)
case _ => pull(in)
}
}
}
@mdedetrich @raboof @pjfanning wdyt?
We can see in reactor-core's Flux:
public final Flux<T> onErrorComplete(Predicate<? super Throwable> predicate) {
Metadata
Metadata
Assignees
Labels
t:streamPekko StreamsPekko Streams