KTOR-9483 Curl: Implement response body backpressure#5585
Conversation
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (8)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (6)
📝 WalkthroughWalkthroughAdds a new ByteChannel.hasFreeSpace API, refactors cURL response body streaming to use writeBuffer/flushWriteBuffer with coroutine pause/resume and normalized cancellation, updates tests for slow consumers and platform exclusions, and bumps a libcurl version comment. ChangesBackpressure Handling and Cancellation in cURL Response Streaming
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| } | ||
|
|
||
| @Test | ||
| fun testDownloadWithSlowConsumer() = clientTests(timeout = 10.seconds) { |
There was a problem hiding this comment.
This test doesn't check if back pressure is properly implemented, but at least checks that client doesn't fail in slow consumer scenario.
| } | ||
| } | ||
|
|
||
| return WRITEFUNC_PAUSE |
There was a problem hiding this comment.
It turned out that libcurl replays last chunk for which we returned WRITEFUNC_PAUSE, so we should return WRITEFUNC_PAUSE before reading data, not after it.
| private var flushBufferSize = 0 | ||
|
|
||
| @InternalAPI | ||
| public val flushNeeded: Boolean |
There was a problem hiding this comment.
This is the main subject to discuss here. Is it okay to add this field? Are there any other ways to check that flush is needed, I'm not aware of?
Another possible name is hasFreeSpace (in line with awaitFreeSpace).
There was a problem hiding this comment.
I think flushNeeded is a little confusing and exposes some implementation details. I'd go with hasFreeSpace or isFull.
b77a093 to
dd6b99e
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: dd6b99e8d2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| paused = false | ||
| onUnpause() |
There was a problem hiding this comment.
Skip unpausing after response body has been closed
pauseUntilFreeSpaceAvailable() always calls onUnpause() in finally, even when the response is being closed/cancelled. In that path, close() can run curl_easy_cleanup for the same easy handle in CurlMultiApiHandler.cleanupEasyHandle, but the queued unpause is still processed later in perform() via curl_easy_pause(handle, CURLPAUSE_CONT). This creates a use-after-cleanup risk for slow-consumer or cancelled downloads where the waiter coroutine is cancelled during shutdown, and can crash or corrupt transfers intermittently.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlHttpResponseBody.kt (1)
72-76: 💤 Low value
close()withnullcause creates a redundantCancellationExceptionwrapper.When
cause == null:null as? CancellationExceptionyieldsnull, thenCancellationException(null)creates a wrapper with a null message. This is functionally equivalent to callingcancel()with no arguments, but adds an unnecessary allocation.✏️ Proposed simplification
- cancel(cause as? CancellationException ?: CancellationException(cause)) + cancel(cause?.let { it as? CancellationException ?: CancellationException("${it.message}", it) })Or even simpler — let Kotlin's default null handle the no-cause case:
override fun close(cause: Throwable?) { if (bodyChannel.isClosedForWrite) return bodyChannel.close(cause) - cancel(cause as? CancellationException ?: CancellationException(cause)) + val cancellationCause = cause as? CancellationException + ?: cause?.let { CancellationException(it.message, it) } + cancel(cancellationCause) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlHttpResponseBody.kt` around lines 72 - 76, The close(cause: Throwable?) implementation creates an unnecessary CancellationException wrapper when cause is null; change the cancel call to pass the original cause directly (i.e., call cancel(cause as? CancellationException ?: cause)) or simply cancel(cause) so that a null cause is preserved and no redundant CancellationException object is allocated; update the method using the existing symbols close, bodyChannel, and cancel to pass the original cause without wrapping.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/DownloadTest.kt`:
- Around line 30-31: The test function name testDownloadWithSlowConsumer should
follow the project's convention of using backtick-quoted descriptive strings for
test names. Rename the function by replacing the camelCase function name with a
backtick-quoted descriptive string that clearly describes what the test does,
such as converting testDownloadWithSlowConsumer to use backticks with a more
natural language description of the test behavior.
---
Nitpick comments:
In
`@ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlHttpResponseBody.kt`:
- Around line 72-76: The close(cause: Throwable?) implementation creates an
unnecessary CancellationException wrapper when cause is null; change the cancel
call to pass the original cause directly (i.e., call cancel(cause as?
CancellationException ?: cause)) or simply cancel(cause) so that a null cause is
preserved and no redundant CancellationException object is allocated; update the
method using the existing symbols close, bodyChannel, and cancel to pass the
original cause without wrapping.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 96f0890f-7573-41dd-b98c-9e005a25ab76
📒 Files selected for processing (8)
ktor-client/ktor-client-curl/desktop/interop/libcurl.defktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.ktktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlHttpResponseBody.ktktor-client/ktor-client-tests/common/test/io/ktor/client/tests/DownloadTest.ktktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ExceptionsTest.ktktor-io/api/ktor-io.apiktor-io/api/ktor-io.klib.apiktor-io/common/src/io/ktor/utils/io/ByteChannel.kt
dd6b99e to
d590118
Compare
Replace runBlocking with non-blocking writes and implement proper backpressure: when the channel's flush buffer reaches CHANNEL_MAX_SIZE (1 MB), return WRITEFUNC_PAUSE to pause the specific easy handle. Once the consumer drains enough data, resume via curl_easy_pause(CURLPAUSE_CONT), reusing the unpause infrastructure already present for request bodies. Add a slow-consumer download test to verify large responses complete correctly. Also fixes KTOR-9527
d590118 to
65b0479
Compare
Subsystem
ktor-client-curl
Motivation
KTOR-9483 Curl: backpressure implementation is never used
KTOR-9527 Curl: Freeze when receiving large responses
Solution
Replace
runBlockingwith non-blocking writes viawriteBuffer/flushWriteBuffer. When the channel's flush buffer reachesCHANNEL_MAX_SIZE, returnWRITEFUNC_PAUSEto pause the specific easy handle. A background coroutine then callsflush(), suspending until the consumer drains the buffer, after whichcurl_easy_pause(CURLPAUSE_CONT)resumes the transfer.Also adds
ByteChannel.flushNeeded(@InternalAPI) and a slow-consumer download test.