Skip to content

Commit 939b5ce

Browse files
hqzxzwbzhuwenbo
and
zhuwenbo
authored
Align hasCustomOnError behavior of CallbackCompletableObserver with LambdaObserver, ConsumerSingleObserver and so on (#7326)
Co-authored-by: zhuwenbo <[email protected]>
1 parent f5ff589 commit 939b5ce

File tree

3 files changed

+5
-18
lines changed

3 files changed

+5
-18
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -2996,11 +2996,7 @@ public final Disposable subscribe(
29962996
@NonNull
29972997
@SchedulerSupport(SchedulerSupport.NONE)
29982998
public final Disposable subscribe(@NonNull Action onComplete) {
2999-
Objects.requireNonNull(onComplete, "onComplete is null");
3000-
3001-
CallbackCompletableObserver observer = new CallbackCompletableObserver(onComplete);
3002-
subscribe(observer);
3003-
return observer;
2999+
return subscribe(onComplete, Functions.ON_ERROR_MISSING);
30043000
}
30053001

30063002
/**

src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,24 @@
2020
import io.reactivex.rxjava3.exceptions.*;
2121
import io.reactivex.rxjava3.functions.*;
2222
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
23+
import io.reactivex.rxjava3.internal.functions.Functions;
2324
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
2425
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2526

2627
public final class CallbackCompletableObserver
2728
extends AtomicReference<Disposable>
28-
implements CompletableObserver, Disposable, Consumer<Throwable>, LambdaConsumerIntrospection {
29+
implements CompletableObserver, Disposable, LambdaConsumerIntrospection {
2930

3031
private static final long serialVersionUID = -4361286194466301354L;
3132

3233
final Consumer<? super Throwable> onError;
3334
final Action onComplete;
3435

35-
public CallbackCompletableObserver(Action onComplete) {
36-
this.onError = this;
37-
this.onComplete = onComplete;
38-
}
39-
4036
public CallbackCompletableObserver(Consumer<? super Throwable> onError, Action onComplete) {
4137
this.onError = onError;
4238
this.onComplete = onComplete;
4339
}
4440

45-
@Override
46-
public void accept(Throwable e) {
47-
RxJavaPlugins.onError(new OnErrorNotImplementedException(e));
48-
}
49-
5041
@Override
5142
public void onComplete() {
5243
try {
@@ -86,6 +77,6 @@ public boolean isDisposed() {
8677

8778
@Override
8879
public boolean hasCustomOnError() {
89-
return onError != this;
80+
return onError != Functions.ON_ERROR_MISSING;
9081
}
9182
}

src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public final class CallbackCompletableObserverTest extends RxJavaTest {
2424

2525
@Test
2626
public void emptyActionShouldReportNoCustomOnError() {
27-
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);
27+
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
2828

2929
assertFalse(o.hasCustomOnError());
3030
}

0 commit comments

Comments
 (0)