-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Fix/streamed query consume aware signal #9963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix/streamed query consume aware signal #9963
Conversation
🦋 Changeset detectedLatest commit: 7adee54 The changes in this PR will be included in the next version bump. This PR includes changesets to release 19 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
WalkthroughAdds a lazily-evaluated, consume-aware AbortSignal helper and wires it into streamed and infinite query paths so cancellation callbacks run only when Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant StreamedQuery
participant SignalHelper as addConsumeAwareSignal
participant streamFn
participant Cache
Caller->>StreamedQuery: start streamed query
StreamedQuery->>SignalHelper: addConsumeAwareSignal(contextCopy, getSignal, onCancel)
SignalHelper-->>StreamedQuery: contextCopy with lazy `.signal`
StreamedQuery->>streamFn: call streamFn(streamFnContext)
alt streamFn accesses `context.signal`
streamFn->>SignalHelper: access `.signal`
SignalHelper->>SignalHelper: if aborted -> call onCancel; else subscribe to abort -> onCancel later
SignalHelper-->>StreamedQuery: onCancel -> set cancelled=true
end
StreamedQuery->>streamFn: iterate async generator until cancelled
StreamedQuery->>Cache: write final chunk if !cancelled
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
.changeset/dry-streets-exist.md(1 hunks)packages/query-core/src/__tests__/streamedQuery.test.tsx(2 hunks)packages/query-core/src/infiniteQueryBehavior.ts(2 hunks)packages/query-core/src/streamedQuery.ts(3 hunks)packages/query-core/src/utils.ts(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-11-22T09:06:05.219Z
Learnt from: sukvvon
Repo: TanStack/query PR: 9892
File: packages/solid-query-persist-client/src/__tests__/PersistQueryClientProvider.test.tsx:331-335
Timestamp: 2025-11-22T09:06:05.219Z
Learning: In TanStack/query test files, when a queryFn contains side effects (e.g., setting flags for test verification), prefer async/await syntax for clarity; when there are no side effects, prefer the .then() pattern for conciseness.
Applied to files:
packages/query-core/src/streamedQuery.tspackages/query-core/src/infiniteQueryBehavior.tspackages/query-core/src/__tests__/streamedQuery.test.tsx
📚 Learning: 2025-08-19T03:18:18.303Z
Learnt from: oscartbeaumont
Repo: TanStack/query PR: 9564
File: packages/solid-query-devtools/src/production.tsx:2-3
Timestamp: 2025-08-19T03:18:18.303Z
Learning: In the solid-query-devtools package, the codebase uses a pattern of type-only default imports combined with typeof for component type annotations (e.g., `import type SolidQueryDevtoolsComp from './devtools'` followed by `typeof SolidQueryDevtoolsComp`). This pattern is consistently used across index.tsx and production.tsx files, and the maintainers prefer consistency over changing this approach.
Applied to files:
packages/query-core/src/infiniteQueryBehavior.ts
📚 Learning: 2025-11-02T22:52:33.071Z
Learnt from: DogPawHat
Repo: TanStack/query PR: 9835
File: packages/query-core/src/__tests__/queryClient.test-d.tsx:242-256
Timestamp: 2025-11-02T22:52:33.071Z
Learning: In the TanStack Query codebase, the new `query` and `infiniteQuery` methods support the `select` option for data transformation, while the legacy `fetchQuery` and `fetchInfiniteQuery` methods do not support `select` and should reject it at the type level.
Applied to files:
packages/query-core/src/infiniteQueryBehavior.ts
🧬 Code graph analysis (3)
packages/query-core/src/streamedQuery.ts (1)
packages/query-core/src/utils.ts (1)
addConsumeAwareSignal(469-487)
packages/query-core/src/infiniteQueryBehavior.ts (1)
packages/query-core/src/utils.ts (1)
addConsumeAwareSignal(469-487)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
streamedQuery(46-107)
🔇 Additional comments (7)
.changeset/dry-streets-exist.md (1)
1-5: LGTM!The changeset correctly documents this as a patch release with a clear description of the behavioral change.
packages/query-core/src/infiniteQueryBehavior.ts (1)
30-36: LGTM!The refactor correctly delegates signal attachment to the new
addConsumeAwareSignalutility. This aligns with the consume-aware pattern and is consistent with the implementation instreamedQuery.ts.Note: This usage inherits the potential multiple-listener issue flagged in
utils.ts. If that is addressed, this code will automatically benefit.packages/query-core/src/__tests__/streamedQuery.test.tsx (2)
332-337: LGTM - Signal consumption pattern is clear.The updated test correctly consumes
context.signalto trigger the consume-aware abort behavior. The eslint-disable comment appropriately documents the intentional signal access.
428-462: Verify the timing expectation—the stream should still be fetching at 110ms.The test timing analysis reveals a potential issue:
createAsyncNumberGenerator(3)yields at 50ms, 100ms, and 150ms (with 50ms sleep between each iteration)- The observer unsubscribes at 60ms after the first yield
- When unsubscribed with no signal consumption,
retryer.cancelRetry()is called (notcancel), which only prevents retries but allows the stream to continue running in the background- At 110ms, only two items should have been yielded (at 50ms and 100ms), with the third yield pending at 150ms
- Therefore, the stream is still running and
fetchStatusshould remain'fetching', not transition to'idle'The
fetchStatus: 'idle'transition only occurs when the query function returns (after the for-await loop completes). This happens at 150ms when the stream ends, not at 110ms. The current assertion appears to expect the final state before the stream completes, which contradicts the intended behavior of streaming queries to remain in a fetching state until the stream ends.packages/query-core/src/streamedQuery.ts (3)
76-84: LGTM - Clean consume-aware signal integration.The implementation correctly:
- Creates a shallow copy of context to avoid mutating the original
- Uses
addConsumeAwareSignalto wire the cancellation flag- Passes the modified context to
streamFnThis ensures that streams continue processing when
streamFndoesn't consume the signal, addressing the PR objective.
87-88: Correct use of the localcancelledflag.Switching from
context.signal.abortedto the localcancelledflag ensures the loop only breaks when the signal has been both consumed and aborted, which is the intended behavior.
101-103: Consistent cancellation check for cache finalization.The final cache write for replace-refetch mode correctly uses the local
cancelledflag, maintaining consistency with the loop's cancellation logic.
…at only streamFn can consume it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
packages/query-core/src/utils.ts (1)
469-489: Guard against multiple listener registrations and repeatedonCancelledcalls.Right now every access to
object.signal:
- Registers a new
'abort'listener if the signal is not yet aborted.- Calls
onCancelled()again if the signal is already aborted.This can accumulate listeners and invoke
onCancelledmultiple times, which is wasteful and may be incorrect for non‑idempotent callbacks. This was already noted in a prior review; the helper still has that behavior.You can fix it by tracking consumption and subscribing only once:
export function addConsumeAwareSignal<T>( object: T, getSignal: () => AbortSignal, onCancelled: VoidFunction, ): T & { signal: AbortSignal } { - Object.defineProperty(object, 'signal', { + let consumed = false + + Object.defineProperty(object, 'signal', { enumerable: true, get: () => { const signal = getSignal() - if (signal.aborted) { - onCancelled() - } else { - signal.addEventListener('abort', () => { - onCancelled() - }) - } + if (!consumed) { + consumed = true + if (signal.aborted) { + onCancelled() + } else { + signal.addEventListener('abort', onCancelled, { once: true }) + } + } return signal }, }) return object as T & { signal: AbortSignal } }This keeps the “only when accessed” semantics while avoiding listener accumulation and ensuring
onCancelledruns at most once.
🧹 Nitpick comments (2)
packages/query-core/src/streamedQuery.ts (1)
81-96: Cancellation wiring viaaddConsumeAwareSignallooks correct; consider future‑proofingstreamFnContext.The
cancelledflag andstreamFnContextbuilt withaddConsumeAwareSignalgive you the desired “only cancel whensignalis consumed” behavior, and the!cancelledguard for replace‑refetch finalization is consistent with that model.The only maintenance concern is that
streamFnContextis a hand‑picked subset ofcontext. IfQueryFunctionContextever gains additional fields that astreamFnmight reasonably depend on, they won’t be present here and there’s no type‑level enforcement to update this list.You might consider either:
- Building
streamFnContextfrom a shallow copy, e.g.addConsumeAwareSignal({ ...context }, () => context.signal, ...), or- Defining a dedicated
StreamFnContexttype and constructing that explicitly in one place, so any future additions are harder to miss.Also applies to: 112-114
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
428-462: New test nicely captures “no abort when signal is unused”; optional extra assertion only if you want more coverage.This test cleanly encodes the desired behavior: when
streamFnnever readscontext.signal, unsubscribing doesn’t abort the underlying stream, and chunks keep flowing into the cache. The use of async/await with fake timers matches existing patterns for side‑effectful tests. Based on learnings, this is aligned with the preferred style.If you want to tighten it further, you could optionally advance timers enough for the stream to finish and assert
fetchStatus: 'idle'to prove completion as well as non‑abort, but that’s not strictly necessary for the regression you’re targeting.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/query-core/src/__tests__/streamedQuery.test.tsx(2 hunks)packages/query-core/src/infiniteQueryBehavior.ts(2 hunks)packages/query-core/src/streamedQuery.ts(3 hunks)packages/query-core/src/utils.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/query-core/src/infiniteQueryBehavior.ts
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-22T09:06:05.219Z
Learnt from: sukvvon
Repo: TanStack/query PR: 9892
File: packages/solid-query-persist-client/src/__tests__/PersistQueryClientProvider.test.tsx:331-335
Timestamp: 2025-11-22T09:06:05.219Z
Learning: In TanStack/query test files, when a queryFn contains side effects (e.g., setting flags for test verification), prefer async/await syntax for clarity; when there are no side effects, prefer the .then() pattern for conciseness.
Applied to files:
packages/query-core/src/streamedQuery.tspackages/query-core/src/__tests__/streamedQuery.test.tsx
🧬 Code graph analysis (2)
packages/query-core/src/streamedQuery.ts (2)
packages/query-core/src/utils.ts (1)
addConsumeAwareSignal(469-490)packages/query-core/src/types.ts (1)
OmitKeyof(19-29)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
streamedQuery(51-119)
🔇 Additional comments (1)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
332-337: Updated test correctly exercises signal consumption semantics.Passing
contextintostreamFnand touchingcontext.signalensures this test now truly depends on the consume‑aware signal behavior, which is what it’s validating. No changes needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
packages/query-core/src/utils.ts (1)
476-493: Consider whethersignalshould beconfigurable/ guard against overwriting
Object.definePropertydefaults toconfigurable: false. If callers might (a) already have asignalproperty, or (b) need to redefine/cleanup later, this can be a sharp edge.If that’s a realistic scenario for these augmented contexts, consider either setting
configurable: trueor asserting/guarding when'signal' in objectbefore defining.
| export function addConsumeAwareSignal<T>( | ||
| object: T, | ||
| getSignal: () => AbortSignal, | ||
| onCancelled: VoidFunction, | ||
| ): T & { signal: AbortSignal } { | ||
| let consumed = false | ||
|
|
||
| Object.defineProperty(object, 'signal', { | ||
| enumerable: true, | ||
| get: () => { | ||
| const signal = getSignal() | ||
| if (consumed) { | ||
| return signal | ||
| } | ||
|
|
||
| consumed = true | ||
| if (signal.aborted) { | ||
| onCancelled() | ||
| } else { | ||
| signal.addEventListener('abort', onCancelled, { once: true }) | ||
| } | ||
|
|
||
| return signal | ||
| }, | ||
| }) | ||
|
|
||
| return object as T & { signal: AbortSignal } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Memoize the AbortSignal so .signal is stable across accesses
Right now the getter calls getSignal() every time (Line 479), but only wires cancellation once (Line 484-489). If getSignal() can ever return a new/different signal, later consumers get an unwired signal and cancellation semantics diverge.
Suggested patch (also constrains T to a non-null object to match defineProperty expectations):
-export function addConsumeAwareSignal<T>(
- object: T,
+export function addConsumeAwareSignal<T extends object>(
+ object: T,
getSignal: () => AbortSignal,
onCancelled: VoidFunction,
): T & { signal: AbortSignal } {
let consumed = false
+ let cachedSignal: AbortSignal | undefined
Object.defineProperty(object, 'signal', {
enumerable: true,
get: () => {
- const signal = getSignal()
+ const signal = (cachedSignal ??= getSignal())
if (consumed) {
return signal
}
consumed = true
if (signal.aborted) {
onCancelled()
} else {
signal.addEventListener('abort', onCancelled, { once: true })
}
return signal
},
})
return object as T & { signal: AbortSignal }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export function addConsumeAwareSignal<T>( | |
| object: T, | |
| getSignal: () => AbortSignal, | |
| onCancelled: VoidFunction, | |
| ): T & { signal: AbortSignal } { | |
| let consumed = false | |
| Object.defineProperty(object, 'signal', { | |
| enumerable: true, | |
| get: () => { | |
| const signal = getSignal() | |
| if (consumed) { | |
| return signal | |
| } | |
| consumed = true | |
| if (signal.aborted) { | |
| onCancelled() | |
| } else { | |
| signal.addEventListener('abort', onCancelled, { once: true }) | |
| } | |
| return signal | |
| }, | |
| }) | |
| return object as T & { signal: AbortSignal } | |
| } | |
| export function addConsumeAwareSignal<T extends object>( | |
| object: T, | |
| getSignal: () => AbortSignal, | |
| onCancelled: VoidFunction, | |
| ): T & { signal: AbortSignal } { | |
| let consumed = false | |
| let cachedSignal: AbortSignal | undefined | |
| Object.defineProperty(object, 'signal', { | |
| enumerable: true, | |
| get: () => { | |
| const signal = (cachedSignal ??= getSignal()) | |
| if (consumed) { | |
| return signal | |
| } | |
| consumed = true | |
| if (signal.aborted) { | |
| onCancelled() | |
| } else { | |
| signal.addEventListener('abort', onCancelled, { once: true }) | |
| } | |
| return signal | |
| }, | |
| }) | |
| return object as T & { signal: AbortSignal } | |
| } |
🎯 Changes
Made
streamedQueryaware of consumption ofcontext.signal.If the signal is not consumed, the chunks will keep updating the data in the background.
Before, a
streamFnthat doesn't usecontext.signalwould ignore the chunks that come after the signal is aborted, even though the signal wasn't used. This causes thequeryto have partial data when the it's mounted again. And because there is cached data, thestreamFnisn't executed again, so the data may be partial forever.This PR makes it so chunks will not get ignored after the signal is aborted if the signal wasn't consumed.
addConsumeAwareSignalutil and used it ininfiniteQueryBehaviorandstreamedQuerystreamFnisn't aborted if the signal is not consumed.✅ Checklist
pnpm run test:pr.🚀 Release Impact
Summary by CodeRabbit
Bug Fixes
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.