Skip to content

Commit 3852b1b

Browse files
authored
2.x: fix replay() cancel/dispose NPE (#5064)
1 parent a636b87 commit 3852b1b

File tree

6 files changed

+97
-15
lines changed

6 files changed

+97
-15
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,10 @@ public void connect(Consumer<? super Disposable> connection) {
336336
}
337337

338338
@SuppressWarnings("rawtypes")
339-
static final class ReplaySubscriber<T> implements Subscriber<T>, Disposable {
339+
static final class ReplaySubscriber<T>
340+
extends AtomicReference<Subscription>
341+
implements Subscriber<T>, Disposable {
342+
private static final long serialVersionUID = 7224554242710036740L;
340343
/** Holds notifications from upstream. */
341344
final ReplayBuffer<T> buffer;
342345
/** Indicates this Subscriber received a terminal event. */
@@ -361,8 +364,6 @@ static final class ReplaySubscriber<T> implements Subscriber<T>, Disposable {
361364
long maxChildRequested;
362365
/** Counts the outstanding upstream requests until the producer arrives. */
363366
long maxUpstreamRequested;
364-
/** The upstream producer. */
365-
volatile Subscription subscription;
366367

367368
@SuppressWarnings("unchecked")
368369
ReplaySubscriber(ReplayBuffer<T> buffer) {
@@ -386,7 +387,7 @@ public void dispose() {
386387
// current.compareAndSet(ReplaySubscriber.this, null);
387388
// we don't care if it fails because it means the current has
388389
// been replaced in the meantime
389-
subscription.cancel();
390+
SubscriptionHelper.cancel(this);
390391
}
391392

392393
/**
@@ -476,8 +477,7 @@ void remove(InnerSubscription<T> p) {
476477

477478
@Override
478479
public void onSubscribe(Subscription p) {
479-
if (SubscriptionHelper.validate(subscription, p)) {
480-
subscription = p;
480+
if (SubscriptionHelper.setOnce(this, p)) {
481481
manageRequests();
482482
for (InnerSubscription<T> rp : subscribers.get()) {
483483
buffer.replay(rp);
@@ -548,7 +548,7 @@ void manageRequests() {
548548
}
549549

550550
long ur = maxUpstreamRequested;
551-
Subscription p = subscription;
551+
Subscription p = get();
552552

553553
long diff = maxTotalRequests - ri;
554554
if (diff != 0L) {

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,10 @@ public void connect(Consumer<? super Disposable> connection) {
316316
}
317317

318318
@SuppressWarnings("rawtypes")
319-
static final class ReplayObserver<T> implements Observer<T>, Disposable {
319+
static final class ReplayObserver<T>
320+
extends AtomicReference<Disposable>
321+
implements Observer<T>, Disposable {
322+
private static final long serialVersionUID = -533785617179540163L;
320323
/** Holds notifications from upstream. */
321324
final ReplayBuffer<T> buffer;
322325
/** Indicates this Observer received a terminal event. */
@@ -335,9 +338,6 @@ static final class ReplayObserver<T> implements Observer<T>, Disposable {
335338
*/
336339
final AtomicBoolean shouldConnect;
337340

338-
/** The upstream producer. */
339-
volatile Disposable subscription;
340-
341341
ReplayObserver(ReplayBuffer<T> buffer) {
342342
this.buffer = buffer;
343343

@@ -358,7 +358,7 @@ public void dispose() {
358358
// current.compareAndSet(ReplayObserver.this, null);
359359
// we don't care if it fails because it means the current has
360360
// been replaced in the meantime
361-
subscription.dispose();
361+
DisposableHelper.dispose(this);
362362
}
363363

364364
/**
@@ -444,8 +444,7 @@ void remove(InnerDisposable<T> producer) {
444444

445445
@Override
446446
public void onSubscribe(Disposable p) {
447-
if (DisposableHelper.validate(this.subscription, p)) {
448-
subscription = p;
447+
if (DisposableHelper.setOnce(this, p)) {
449448
replay();
450449
}
451450
}

src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -873,4 +873,25 @@ public void subscribe(FlowableEmitter<Object> s) throws Exception {
873873
.test(0L)
874874
.assertFailure(MissingBackpressureException.class);
875875
}
876+
877+
@Test
878+
public void delayedUpstreamOnSubscribe() {
879+
final Subscriber<?>[] sub = { null };
880+
881+
new Flowable<Integer>() {
882+
@Override
883+
protected void subscribeActual(Subscriber<? super Integer> s) {
884+
sub[0] = s;
885+
}
886+
}
887+
.publish()
888+
.connect()
889+
.dispose();
890+
891+
BooleanSubscription bs = new BooleanSubscription();
892+
893+
sub[0].onSubscribe(bs);
894+
895+
assertTrue(bs.isCancelled());
896+
}
876897
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -1711,4 +1711,24 @@ public void testSizedTruncation() {
17111711
Assert.assertFalse(buf.hasError());
17121712
}
17131713

1714+
@Test
1715+
public void delayedUpstreamOnSubscribe() {
1716+
final Subscriber<?>[] sub = { null };
1717+
1718+
new Flowable<Integer>() {
1719+
@Override
1720+
protected void subscribeActual(Subscriber<? super Integer> s) {
1721+
sub[0] = s;
1722+
}
1723+
}
1724+
.replay()
1725+
.connect()
1726+
.dispose();
1727+
1728+
BooleanSubscription bs = new BooleanSubscription();
1729+
1730+
sub[0].onSubscribe(bs);
1731+
1732+
assertTrue(bs.isCancelled());
1733+
}
17141734
}

src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java

+21
Original file line numberDiff line numberDiff line change
@@ -699,4 +699,25 @@ public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
699699

700700
assertFalse(ps.hasObservers());
701701
}
702+
703+
@Test
704+
public void delayedUpstreamOnSubscribe() {
705+
final Observer<?>[] sub = { null };
706+
707+
new Observable<Integer>() {
708+
@Override
709+
protected void subscribeActual(Observer<? super Integer> s) {
710+
sub[0] = s;
711+
}
712+
}
713+
.publish()
714+
.connect()
715+
.dispose();
716+
717+
Disposable bs = Disposables.empty();
718+
719+
sub[0].onSubscribe(bs);
720+
721+
assertTrue(bs.isDisposed());
722+
}
702723
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.AtomicInteger;
2323

24-
import io.reactivex.annotations.NonNull;
2524
import org.junit.*;
2625
import org.mockito.InOrder;
2726

2827
import io.reactivex.*;
2928
import io.reactivex.Observable;
3029
import io.reactivex.Observer;
3130
import io.reactivex.Scheduler.Worker;
31+
import io.reactivex.annotations.NonNull;
3232
import io.reactivex.disposables.*;
3333
import io.reactivex.exceptions.TestException;
3434
import io.reactivex.functions.*;
@@ -1490,4 +1490,25 @@ public void onNext(Integer t) {
14901490

14911491
to.assertValues(1);
14921492
}
1493+
1494+
@Test
1495+
public void delayedUpstreamOnSubscribe() {
1496+
final Observer<?>[] sub = { null };
1497+
1498+
new Observable<Integer>() {
1499+
@Override
1500+
protected void subscribeActual(Observer<? super Integer> s) {
1501+
sub[0] = s;
1502+
}
1503+
}
1504+
.replay()
1505+
.connect()
1506+
.dispose();
1507+
1508+
Disposable bs = Disposables.empty();
1509+
1510+
sub[0].onSubscribe(bs);
1511+
1512+
assertTrue(bs.isDisposed());
1513+
}
14931514
}

0 commit comments

Comments
 (0)