1
1
/*
2
- * Copyright (c) 2016, 2024 , Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2016, 2025 , Oracle and/or its affiliates. All rights reserved.
3
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
4
*
5
5
* This code is free software; you can redistribute it and/or modify it
@@ -537,12 +537,20 @@ private static final class AggregateSubscription
537
537
538
538
@ Override
539
539
public void request (long n ) {
540
- if (cancelled || publisher == null && bodies .isEmpty ()) {
541
- return ;
540
+ synchronized (this ) {
541
+ // We are finished when publisher is null and bodies
542
+ // is empty. This means that the data from the last
543
+ // publisher in the list has been consumed.
544
+ // If we are finished or cancelled, do nothing.
545
+ if (cancelled || (publisher == null && bodies .isEmpty ())) {
546
+ return ;
547
+ }
542
548
}
543
549
try {
544
550
demand .increase (n );
545
551
} catch (IllegalArgumentException x ) {
552
+ // request() should not throw - the scheduler will
553
+ // invoke onError on the subscriber.
546
554
illegalRequest = x ;
547
555
}
548
556
scheduler .runOrSchedule ();
@@ -554,46 +562,68 @@ public void cancel() {
554
562
scheduler .runOrSchedule ();
555
563
}
556
564
557
- private boolean cancelSubscription () {
558
- Flow .Subscription subscription = this .subscription ;
565
+ private boolean cancelSubscription (Flow .Subscription subscription ) {
559
566
if (subscription != null ) {
560
- this .subscription = null ;
561
- this .publisher = null ;
567
+ synchronized (this ) {
568
+ if (this .subscription == subscription ) {
569
+ this .subscription = null ;
570
+ this .publisher = null ;
571
+ }
572
+ }
562
573
subscription .cancel ();
563
574
}
575
+ // This method is called when cancel is true, so
576
+ // we should always stop the scheduler here
564
577
scheduler .stop ();
565
578
return subscription != null ;
566
579
}
567
580
568
581
public void run () {
569
582
try {
583
+ BodyPublisher publisher ;
584
+ Flow .Subscription subscription = null ;
570
585
while (error .get () == null
571
586
&& (!demand .isFulfilled ()
572
- || (publisher == null && !bodies .isEmpty ()))) {
587
+ || (this . publisher == null && !bodies .isEmpty ()))) {
573
588
boolean cancelled = this .cancelled ;
574
- BodyPublisher publisher = this .publisher ;
575
- Flow .Subscription subscription = this .subscription ;
589
+ // make sure we see a consistent state.
590
+ synchronized (this ) {
591
+ publisher = this .publisher ;
592
+ subscription = this .subscription ;
593
+ }
576
594
Throwable illegalRequest = this .illegalRequest ;
577
595
if (cancelled ) {
578
596
bodies .clear ();
579
- cancelSubscription ();
597
+ cancelSubscription (subscription );
580
598
return ;
581
599
}
582
600
if (publisher == null && !bodies .isEmpty ()) {
583
- this .publisher = publisher = bodies .poll ();
601
+ // synchronize here to avoid race condition with
602
+ // request(long) which could otherwise observe a
603
+ // null publisher and an empty bodies list when
604
+ // polling the last publisher.
605
+ synchronized (this ) {
606
+ this .publisher = publisher = bodies .poll ();
607
+ }
584
608
publisher .subscribe (this );
585
- subscription = this .subscription ;
586
609
} else if (publisher == null ) {
587
610
return ;
588
611
}
589
612
if (illegalRequest != null ) {
590
613
onError (illegalRequest );
591
614
return ;
592
615
}
593
- if (subscription == null ) return ;
594
- if (!demand .isFulfilled ()) {
595
- long n = demand .decreaseAndGet (demand .get ());
596
- demanded .increase (n );
616
+ long n = 0 ;
617
+ // synchronize to avoid race condition with
618
+ // publisherDone()
619
+ synchronized (this ) {
620
+ if ((subscription = this .subscription ) == null ) return ;
621
+ if (!demand .isFulfilled ()) {
622
+ n = demand .decreaseAndGet (demand .get ());
623
+ demanded .increase (n );
624
+ }
625
+ }
626
+ if (n > 0 && !cancelled ) {
597
627
subscription .request (n );
598
628
}
599
629
}
@@ -602,20 +632,35 @@ public void run() {
602
632
}
603
633
}
604
634
635
+ // It is important to synchronize when setting
636
+ // publisher to null to avoid race conditions
637
+ // with request(long)
638
+ private synchronized void publisherDone () {
639
+ publisher = null ;
640
+ subscription = null ;
641
+ }
642
+
605
643
606
644
@ Override
607
645
public void onSubscribe (Flow .Subscription subscription ) {
608
- this .subscription = subscription ;
646
+ // synchronize for asserting in a consistent state.
647
+ synchronized (this ) {
648
+ // we shouldn't be able to observe a null publisher
649
+ // when onSubscribe is called, unless - possibly - if
650
+ // there was some error...
651
+ assert publisher != null || error .get () != null ;
652
+ this .subscription = subscription ;
653
+ }
609
654
scheduler .runOrSchedule ();
610
655
}
611
656
612
657
@ Override
613
658
public void onNext (ByteBuffer item ) {
614
- // make sure to cancel the subscription if we receive
615
- // an item after the subscription was cancelled or
659
+ // make sure to cancel the downstream subscription if we receive
660
+ // an item after the aggregate subscription was cancelled or
616
661
// an error was reported.
617
662
if (cancelled || error .get () != null ) {
618
- cancelSubscription ();
663
+ cancelSubscription (this . subscription );
619
664
return ;
620
665
}
621
666
demanded .tryDecrement ();
@@ -625,30 +670,36 @@ public void onNext(ByteBuffer item) {
625
670
@ Override
626
671
public void onError (Throwable throwable ) {
627
672
if (error .compareAndSet (null , throwable )) {
628
- publisher = null ;
629
- subscription = null ;
673
+ publisherDone ();
630
674
subscriber .onError (throwable );
631
675
scheduler .stop ();
632
676
}
633
677
}
634
678
635
- @ Override
636
- public void onComplete () {
679
+ private synchronized boolean completeAndContinue () {
637
680
if (publisher != null && !bodies .isEmpty ()) {
638
681
while (!demanded .isFulfilled ()) {
639
682
demand .increase (demanded .decreaseAndGet (demanded .get ()));
640
683
}
641
- publisher = null ;
642
- subscription = null ;
684
+ publisherDone ();
685
+ return true ; // continue
686
+ } else {
687
+ publisherDone ();
688
+ return false ; // stop
689
+ }
690
+ }
691
+
692
+ @ Override
693
+ public void onComplete () {
694
+ if (completeAndContinue ()) {
643
695
scheduler .runOrSchedule ();
644
696
} else {
645
- publisher = null ;
646
- subscription = null ;
647
697
if (!cancelled ) {
648
698
subscriber .onComplete ();
649
699
}
650
700
scheduler .stop ();
651
701
}
652
702
}
653
703
}
704
+
654
705
}
0 commit comments