Skip to content

Commit 82798ea

Browse files
authored
Fix warning when sending already-available streaming body (#11665)
Fixes #11603
1 parent 9a717da commit 82798ea

File tree

3 files changed

+142
-0
lines changed

3 files changed

+142
-0
lines changed

http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.micronaut.http.client.netty;
1717

1818
import io.micronaut.core.annotation.Internal;
19+
import io.micronaut.http.body.stream.LazyUpstream;
1920
import io.micronaut.http.netty.EventLoopFlow;
2021
import io.micronaut.http.netty.body.ByteBufConsumer;
2122
import io.micronaut.http.netty.body.StreamingNettyByteBody;
@@ -61,7 +62,12 @@ void startWriting() {
6162
if (ctx == null) {
6263
throw new IllegalStateException("Not added to a channel yet");
6364
}
65+
LazyUpstream lazyUpstream = new LazyUpstream();
66+
// primary() can call other methods here immediately, so we need a replacement upstream
67+
// until it returns
68+
upstream = lazyUpstream;
6469
upstream = body.primary(this);
70+
lazyUpstream.forward(upstream);
6571
try {
6672
upstream.start();
6773
} catch (Exception e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.micronaut.http.client
2+
3+
import io.micronaut.context.ApplicationContext
4+
import io.micronaut.context.annotation.Requires
5+
import io.micronaut.http.ByteBodyHttpResponse
6+
import io.micronaut.http.HttpRequest
7+
import io.micronaut.http.annotation.Body
8+
import io.micronaut.http.annotation.Controller
9+
import io.micronaut.http.annotation.Post
10+
import io.micronaut.http.body.ByteBody
11+
import io.micronaut.http.netty.body.NettyBodyAdapter
12+
import io.micronaut.runtime.server.EmbeddedServer
13+
import io.netty.buffer.Unpooled
14+
import io.netty.channel.embedded.EmbeddedChannel
15+
import jakarta.inject.Singleton
16+
import reactor.core.publisher.Mono
17+
import spock.lang.Issue
18+
import spock.lang.Specification
19+
20+
import java.nio.charset.StandardCharsets
21+
22+
class RawHttpClientSpec extends Specification {
23+
@Issue("https://github.com/micronaut-projects/micronaut-core/issues/11603")
24+
def 'immediate stream'() {
25+
given:
26+
def ctx = ApplicationContext.run(['spec.name': 'RawHttpClientSpec'])
27+
def server = ctx.getBean(EmbeddedServer)
28+
server.start()
29+
def client = ctx.createBean(RawHttpClient, server.URI)
30+
31+
def requestBody = NettyBodyAdapter.adapt(
32+
Mono.just(Unpooled.copiedBuffer("foo", StandardCharsets.UTF_8)),
33+
new EmbeddedChannel().eventLoop())
34+
requestBody.split(ByteBody.SplitBackpressureMode.FASTEST).buffer().get().close()
35+
36+
when:
37+
def resp = Mono.from(client.exchange(
38+
HttpRequest.POST(server.URI.toString() + "/raw/echo", null),
39+
requestBody,
40+
null
41+
)).block()
42+
then:
43+
resp instanceof ByteBodyHttpResponse<?>
44+
ByteBody body = resp.byteBody()
45+
body.buffer().get().toString(StandardCharsets.UTF_8) == "foo"
46+
47+
cleanup:
48+
resp.close()
49+
server.close()
50+
ctx.close()
51+
}
52+
53+
@Singleton
54+
@Controller("/raw")
55+
@Requires(property = "spec.name", value = "RawHttpClientSpec")
56+
static class Ctrl {
57+
@Post("/echo")
58+
String echo(@Body String body) {
59+
return body
60+
}
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2017-2025 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.http.body.stream;
17+
18+
import io.micronaut.core.annotation.Internal;
19+
20+
/**
21+
* {@link io.micronaut.http.body.stream.BufferConsumer.Upstream} implementation that stores any
22+
* inputs and then {@link #forward forwards} them to another upstream later on. This helps with
23+
* reentrant calls to the subscriber during a subscribe call, when the real upstream is not yet
24+
* available.
25+
*
26+
* @since 4.8.0
27+
* @author Jonas Konrad
28+
*/
29+
@Internal
30+
public final class LazyUpstream implements BufferConsumer.Upstream {
31+
private boolean start;
32+
private boolean allowDiscard;
33+
private boolean disregardBackpressure;
34+
private long consumed;
35+
36+
@Override
37+
public void start() {
38+
this.start = true;
39+
}
40+
41+
@Override
42+
public void onBytesConsumed(long bytesConsumed) {
43+
long newConsumed = consumed + bytesConsumed;
44+
if (newConsumed < 0) {
45+
newConsumed = Long.MAX_VALUE;
46+
}
47+
consumed = newConsumed;
48+
}
49+
50+
@Override
51+
public void allowDiscard() {
52+
this.allowDiscard = true;
53+
}
54+
55+
@Override
56+
public void disregardBackpressure() {
57+
this.disregardBackpressure = true;
58+
}
59+
60+
public void forward(BufferConsumer.Upstream actual) {
61+
if (consumed != 0) {
62+
actual.onBytesConsumed(consumed);
63+
}
64+
if (start) {
65+
actual.start();
66+
}
67+
if (allowDiscard) {
68+
actual.allowDiscard();
69+
}
70+
if (disregardBackpressure) {
71+
actual.disregardBackpressure();
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)