Skip to content

Commit d6d8869

Browse files
authored
Fix cancellation order in ThrottleFirst (#7484)
* Fix cancellation order in ThrottleFirst * Bump vanniktech maven publish * Undo bump, seems to be gradle issue
1 parent 3b0d948 commit d6d8869

File tree

2 files changed

+6
-5
lines changed

2 files changed

+6
-5
lines changed

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,10 @@ public void onNext(T t) {
105105
downstream.onNext(t);
106106
BackpressureHelper.produced(this, 1);
107107
} else {
108+
upstream.cancel();
108109
done = true;
109-
cancel();
110110
downstream.onError(MissingBackpressureException.createDefault());
111+
worker.dispose();
111112
return;
112113
}
113114

@@ -122,10 +123,10 @@ public void onNext(T t) {
122123
onDropped.accept(t);
123124
} catch (Throwable ex) {
124125
Exceptions.throwIfFatal(ex);
125-
downstream.onError(ex);
126-
worker.dispose();
127126
upstream.cancel();
128127
done = true;
128+
downstream.onError(ex);
129+
worker.dispose();
129130
}
130131
}
131132
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleFirstTimed.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public final class ObservableThrottleFirstTimed<T> extends AbstractObservableWit
3232

3333
public ObservableThrottleFirstTimed(
3434
ObservableSource<T> source,
35-
long timeout,
35+
long timeout,
3636
TimeUnit unit,
3737
Scheduler scheduler,
3838
Consumer<? super T> onDropped) {
@@ -102,9 +102,9 @@ public void onNext(T t) {
102102
onDropped.accept(t);
103103
} catch (Throwable ex) {
104104
Exceptions.throwIfFatal(ex);
105+
upstream.dispose();
105106
downstream.onError(ex);
106107
worker.dispose();
107-
upstream.dispose();
108108
}
109109
}
110110
}

0 commit comments

Comments
 (0)