Skip to content

Commit b720517

Browse files
committed
8348108: Race condition in AggregatePublisher.AggregateSubscription
Reviewed-by: jpai
1 parent 17a408c commit b720517

File tree

2 files changed

+112
-44
lines changed

2 files changed

+112
-44
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java

+80-29
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -537,12 +537,20 @@ private static final class AggregateSubscription
537537

538538
@Override
539539
public void request(long n) {
540-
if (cancelled || publisher == null && bodies.isEmpty()) {
541-
return;
540+
synchronized (this) {
541+
// We are finished when publisher is null and bodies
542+
// is empty. This means that the data from the last
543+
// publisher in the list has been consumed.
544+
// If we are finished or cancelled, do nothing.
545+
if (cancelled || (publisher == null && bodies.isEmpty())) {
546+
return;
547+
}
542548
}
543549
try {
544550
demand.increase(n);
545551
} catch (IllegalArgumentException x) {
552+
// request() should not throw - the scheduler will
553+
// invoke onError on the subscriber.
546554
illegalRequest = x;
547555
}
548556
scheduler.runOrSchedule();
@@ -554,46 +562,68 @@ public void cancel() {
554562
scheduler.runOrSchedule();
555563
}
556564

557-
private boolean cancelSubscription() {
558-
Flow.Subscription subscription = this.subscription;
565+
private boolean cancelSubscription(Flow.Subscription subscription) {
559566
if (subscription != null) {
560-
this.subscription = null;
561-
this.publisher = null;
567+
synchronized (this) {
568+
if (this.subscription == subscription) {
569+
this.subscription = null;
570+
this.publisher = null;
571+
}
572+
}
562573
subscription.cancel();
563574
}
575+
// This method is called when cancel is true, so
576+
// we should always stop the scheduler here
564577
scheduler.stop();
565578
return subscription != null;
566579
}
567580

568581
public void run() {
569582
try {
583+
BodyPublisher publisher;
584+
Flow.Subscription subscription = null;
570585
while (error.get() == null
571586
&& (!demand.isFulfilled()
572-
|| (publisher == null && !bodies.isEmpty()))) {
587+
|| (this.publisher == null && !bodies.isEmpty()))) {
573588
boolean cancelled = this.cancelled;
574-
BodyPublisher publisher = this.publisher;
575-
Flow.Subscription subscription = this.subscription;
589+
// make sure we see a consistent state.
590+
synchronized (this) {
591+
publisher = this.publisher;
592+
subscription = this.subscription;
593+
}
576594
Throwable illegalRequest = this.illegalRequest;
577595
if (cancelled) {
578596
bodies.clear();
579-
cancelSubscription();
597+
cancelSubscription(subscription);
580598
return;
581599
}
582600
if (publisher == null && !bodies.isEmpty()) {
583-
this.publisher = publisher = bodies.poll();
601+
// synchronize here to avoid race condition with
602+
// request(long) which could otherwise observe a
603+
// null publisher and an empty bodies list when
604+
// polling the last publisher.
605+
synchronized (this) {
606+
this.publisher = publisher = bodies.poll();
607+
}
584608
publisher.subscribe(this);
585-
subscription = this.subscription;
586609
} else if (publisher == null) {
587610
return;
588611
}
589612
if (illegalRequest != null) {
590613
onError(illegalRequest);
591614
return;
592615
}
593-
if (subscription == null) return;
594-
if (!demand.isFulfilled()) {
595-
long n = demand.decreaseAndGet(demand.get());
596-
demanded.increase(n);
616+
long n = 0;
617+
// synchronize to avoid race condition with
618+
// publisherDone()
619+
synchronized (this) {
620+
if ((subscription = this.subscription) == null) return;
621+
if (!demand.isFulfilled()) {
622+
n = demand.decreaseAndGet(demand.get());
623+
demanded.increase(n);
624+
}
625+
}
626+
if (n > 0 && !cancelled) {
597627
subscription.request(n);
598628
}
599629
}
@@ -602,20 +632,35 @@ public void run() {
602632
}
603633
}
604634

635+
// It is important to synchronize when setting
636+
// publisher to null to avoid race conditions
637+
// with request(long)
638+
private synchronized void publisherDone() {
639+
publisher = null;
640+
subscription = null;
641+
}
642+
605643

606644
@Override
607645
public void onSubscribe(Flow.Subscription subscription) {
608-
this.subscription = subscription;
646+
// synchronize for asserting in a consistent state.
647+
synchronized (this) {
648+
// we shouldn't be able to observe a null publisher
649+
// when onSubscribe is called, unless - possibly - if
650+
// there was some error...
651+
assert publisher != null || error.get() != null;
652+
this.subscription = subscription;
653+
}
609654
scheduler.runOrSchedule();
610655
}
611656

612657
@Override
613658
public void onNext(ByteBuffer item) {
614-
// make sure to cancel the subscription if we receive
615-
// an item after the subscription was cancelled or
659+
// make sure to cancel the downstream subscription if we receive
660+
// an item after the aggregate subscription was cancelled or
616661
// an error was reported.
617662
if (cancelled || error.get() != null) {
618-
cancelSubscription();
663+
cancelSubscription(this.subscription);
619664
return;
620665
}
621666
demanded.tryDecrement();
@@ -625,30 +670,36 @@ public void onNext(ByteBuffer item) {
625670
@Override
626671
public void onError(Throwable throwable) {
627672
if (error.compareAndSet(null, throwable)) {
628-
publisher = null;
629-
subscription = null;
673+
publisherDone();
630674
subscriber.onError(throwable);
631675
scheduler.stop();
632676
}
633677
}
634678

635-
@Override
636-
public void onComplete() {
679+
private synchronized boolean completeAndContinue() {
637680
if (publisher != null && !bodies.isEmpty()) {
638681
while (!demanded.isFulfilled()) {
639682
demand.increase(demanded.decreaseAndGet(demanded.get()));
640683
}
641-
publisher = null;
642-
subscription = null;
684+
publisherDone();
685+
return true; // continue
686+
} else {
687+
publisherDone();
688+
return false; // stop
689+
}
690+
}
691+
692+
@Override
693+
public void onComplete() {
694+
if (completeAndContinue()) {
643695
scheduler.runOrSchedule();
644696
} else {
645-
publisher = null;
646-
subscription = null;
647697
if (!cancelled) {
648698
subscriber.onComplete();
649699
}
650700
scheduler.stop();
651701
}
652702
}
653703
}
704+
654705
}

test/jdk/java/net/httpclient/AggregateRequestBodyTest.java

+32-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020, 2023, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2020, 2025, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -33,8 +33,6 @@
3333
* @summary Tests HttpRequest.BodyPublishers::concat
3434
*/
3535

36-
import java.net.InetAddress;
37-
import java.net.InetSocketAddress;
3836
import java.net.URI;
3937
import java.net.http.HttpClient;
4038
import java.net.http.HttpRequest;
@@ -57,6 +55,7 @@
5755
import java.util.concurrent.Flow;
5856
import java.util.concurrent.Flow.Subscriber;
5957
import java.util.concurrent.Flow.Subscription;
58+
import java.util.concurrent.Semaphore;
6059
import java.util.concurrent.TimeUnit;
6160
import java.util.concurrent.TimeoutException;
6261
import java.util.concurrent.atomic.AtomicLong;
@@ -67,12 +66,8 @@
6766
import java.util.stream.LongStream;
6867
import java.util.stream.Stream;
6968
import jdk.httpclient.test.lib.common.HttpServerAdapters;
70-
import jdk.httpclient.test.lib.http2.Http2TestServer;
7169
import javax.net.ssl.SSLContext;
7270

73-
import com.sun.net.httpserver.HttpServer;
74-
import com.sun.net.httpserver.HttpsConfigurator;
75-
import com.sun.net.httpserver.HttpsServer;
7671
import jdk.test.lib.net.SimpleSSLContext;
7772
import org.testng.Assert;
7873
import org.testng.ITestContext;
@@ -419,9 +414,11 @@ public void cancel() {
419414
}
420415

421416
static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
422-
CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
423-
ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
424-
CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
417+
final CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
418+
final ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
419+
final CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
420+
421+
final Semaphore semaphore = new Semaphore(0);
425422

426423
@Override
427424
public void onSubscribe(Subscription subscription) {
@@ -431,6 +428,11 @@ public void onSubscribe(Subscription subscription) {
431428
@Override
432429
public void onNext(ByteBuffer item) {
433430
items.addLast(item);
431+
int available = semaphore.availablePermits();
432+
if (available > Integer.MAX_VALUE - 8) {
433+
onError(new IllegalStateException("too many buffers in queue: " + available));
434+
}
435+
semaphore.release();
434436
}
435437

436438
@Override
@@ -443,6 +445,18 @@ public void onComplete() {
443445
resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));
444446
}
445447

448+
public ByteBuffer take() {
449+
// it is not guaranteed that the buffer will be added to
450+
// the queue in the same thread that calls request(1).
451+
try {
452+
semaphore.acquire();
453+
} catch (InterruptedException x) {
454+
Thread.currentThread().interrupt();
455+
throw new CompletionException(x);
456+
}
457+
return items.pop();
458+
}
459+
446460
CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }
447461
}
448462

@@ -628,8 +642,9 @@ public void testPositiveRequests() {
628642
publisher.subscribe(requestSubscriber1);
629643
Subscription subscription1 = requestSubscriber1.subscriptionCF.join();
630644
subscription1.request(16);
631-
assertTrue(requestSubscriber1.resultCF().isDone());
645+
// onNext() may not be called in the same thread than request()
632646
List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();
647+
assertTrue(requestSubscriber1.resultCF().isDone());
633648
String result1 = stringFromBytes(list1.stream());
634649
assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
635650
System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));
@@ -646,8 +661,8 @@ public void testPositiveRequests() {
646661
subscription2.request(4);
647662
assertFalse(requestSubscriber2.resultCF().isDone());
648663
subscription2.request(1);
649-
assertTrue(requestSubscriber2.resultCF().isDone());
650664
List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();
665+
assertTrue(requestSubscriber2.resultCF().isDone());
651666
String result2 = stringFromBytes(list2.stream());
652667
assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
653668
System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));
@@ -689,7 +704,7 @@ public void testCancel() {
689704
// receive half the data
690705
for (int i = 0; i < n; i++) {
691706
subscription.request(1);
692-
ByteBuffer buffer = subscriber.items.pop();
707+
ByteBuffer buffer = subscriber.take();
693708
}
694709

695710
// cancel subscription
@@ -789,7 +804,8 @@ public void testCancelSubscription() {
789804
@Test(dataProvider = "variants")
790805
public void test(String uri, boolean sameClient) throws Exception {
791806
checkSkip();
792-
System.out.println("Request to " + uri);
807+
System.out.printf("Request to %s (sameClient: %s)%n", uri, sameClient);
808+
System.err.printf("Request to %s (sameClient: %s)%n", uri, sameClient);
793809

794810
HttpClient client = newHttpClient(sameClient);
795811

@@ -802,7 +818,8 @@ public void test(String uri, boolean sameClient) throws Exception {
802818
.POST(publisher)
803819
.build();
804820
for (int i = 0; i < ITERATION_COUNT; i++) {
805-
System.out.println("Iteration: " + i);
821+
System.out.println(uri + ": Iteration: " + i);
822+
System.err.println(uri + ": Iteration: " + i);
806823
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
807824
int expectedResponse = RESPONSE_CODE;
808825
if (response.statusCode() != expectedResponse)

0 commit comments

Comments
 (0)