Conversation
…protocol/tri/observer/CallStreamObserver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/transport/TripleHttp2LocalFlowController.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…leConfig.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
Add two APIs, namely |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 3.3 #15967 +/- ##
============================================
- Coverage 60.70% 60.70% -0.01%
- Complexity 11708 11727 +19
============================================
Files 1946 1948 +2
Lines 88701 88732 +31
Branches 13374 13379 +5
============================================
+ Hits 53849 53863 +14
- Misses 29342 29351 +9
- Partials 5510 5518 +8
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
test on apache/dubbo-samples#1278 |
There was a problem hiding this comment.
Pull request overview
This pull request implements backpressure handling for the Triple protocol's streaming functionality, aligning Dubbo's streaming API with gRPC's flow control patterns. The changes introduce a new mechanism to trigger initial onReady() callbacks and refactor stream observer interfaces for better gRPC compatibility.
- Adds
InitOnReadyQueueCommandto trigger initialonReady()callbacks when channels are writable from creation - Refactors stream observer interfaces to align with gRPC's pattern (ClientResponseObserver, ClientCallStreamObserver, ServerCallStreamObserver)
- Changes
ClientCall.start()from returningStreamObserver<Object>tovoid, moving adapter creation to the invoker layer
Reviewed changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
InitOnReadyQueueCommand.java |
New command to trigger initial onReady callback after stream creation |
AbstractTripleClientStream.java |
Enqueues InitOnReadyQueueCommand after stream channel initialization |
TripleClientCall.java |
Changes start() to void, adds null check for stream in isReady() |
ClientCall.java |
Updates start() signature to return void |
TripleInvoker.java |
Refactors streaming call setup with beforeStart() callbacks and adapter creation |
CallStreamObserver.java |
Moves to common package, adds default implementations for optional methods |
ClientCallStreamObserver.java |
New interface for client-side flow control, mirrors gRPC API |
ServerCallStreamObserver.java |
New interface for server-side flow control, mirrors gRPC API |
ClientResponseObserver.java |
New gRPC-compatible interface with beforeStart() callback |
ClientStreamObserver.java |
Deprecated in favor of ClientCallStreamObserver |
ServerStreamObserver.java |
Deprecated in favor of ServerCallStreamObserver |
ClientCallToObserverAdapter.java |
Implements new interfaces for compatibility |
Http2ServerStreamObserver.java |
Implements ServerCallStreamObserver |
Http2ServerChannelObserver.java |
Implements ServerCallStreamObserver with documentation |
ClientTripleReactorPublisher.java |
Implements ClientResponseObserver, refactors beforeStart() |
AbstractTripleReactorPublisher.java |
Adds onReadyHandler setup in onSubscribe() |
TripleClientStreamTest.java |
Updates test expectations for new InitOnReadyQueueCommand |
BackpressureTest.java |
Updates mock to match void start() signature |
TripleInvokerTest.java |
Updates mock to use doNothing() for void start() |
| Reactive/Mutiny plugin files | Updates imports to use common.stream interfaces |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // is created before any data is sent | ||
| // is triggered by onReady, not by onStart (which requires server headers). |
There was a problem hiding this comment.
The comment on line 78 appears to be incomplete or has a grammatical error. The sentence starts with "is triggered by onReady" but doesn't have a subject. The comment should be revised to clearly state what is triggered by onReady versus onStart.
| // is created before any data is sent | |
| // is triggered by onReady, not by onStart (which requires server headers). | |
| // is created before any data is sent. | |
| // The callback is triggered by onReady, not by onStart (which requires server headers). |
| // After sendHeader and sendMessage, enqueue should have been called twice: | ||
| // once for CreateStreamQueueCommand and once for InitOnReadyQueueCommand |
There was a problem hiding this comment.
The comment states that enqueue should have been called twice for CreateStreamQueueCommand and InitOnReadyQueueCommand, but this comment appears after sendHeader and sendMessage have been called. At this point, enqueue has still only been called twice (both during stream initialization), so the comment is accurate but potentially misleading. The comment seems to suggest these calls happened after sendHeader and sendMessage, but they actually happened during stream construction.
| // Only execute the callback once (on first onReady) | ||
| Consumer<CallStreamObserver<?>> callback = onSubscribe; | ||
| if (callback != null && subscription.isReady()) { | ||
| onSubscribe = null; // Clear to prevent re-execution | ||
| callback.accept(subscription); |
There was a problem hiding this comment.
There's a potential race condition in the onReadyHandler. If the handler is invoked concurrently by multiple threads, line 82 could capture a non-null value while another thread is executing line 84 (setting onSubscribe to null). This could result in the callback being executed multiple times. Consider using a compareAndSet pattern or ensuring the handler is only invoked once at the framework level.
| // Only execute the callback once (on first onReady) | |
| Consumer<CallStreamObserver<?>> callback = onSubscribe; | |
| if (callback != null && subscription.isReady()) { | |
| onSubscribe = null; // Clear to prevent re-execution | |
| callback.accept(subscription); | |
| // Only execute the callback once (on first onReady), safely under synchronization | |
| synchronized (AbstractTripleReactorPublisher.this) { | |
| Consumer<CallStreamObserver<?>> callback = onSubscribe; | |
| if (callback != null && subscription.isReady()) { | |
| onSubscribe = null; // Clear to prevent re-execution | |
| callback.accept(subscription); | |
| } |
dubbo-common/src/main/java/org/apache/dubbo/common/stream/CallStreamObserver.java
Outdated
Show resolved
Hide resolved
RainYuY
left a comment
There was a problem hiding this comment.
LGTM, but interface default method should be fixed.
2331ffc
| * @param compression {@link Compressor} | ||
| */ | ||
| void setCompression(String compression); | ||
| default void setCompression(String compression) {} |
* support disableAutoInboundFlowControl api * support back press * fix * fix * Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix * fix * fix test * add isReady and setOnReadyHandler api * fix * fix * fix Backpressure it * fix * fix * ClientCallStreamObserver & ServerCallStreamObserver * add license * mvn spotless:apply * fix dubbo plugin * fix dubbo plugin spotless * fix * fix ut * Add gRPC-compatible APIs and fix the integration tests * fix * fix * remove default * remove default --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Rain Yu <rainyu@apache.org> Co-authored-by: Wang Chengming <wangchengming@apache.org>
What is the purpose of the change?
pre pr #15957
Checklist