Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-streets-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/query-core': patch
---

Made context.signal consume aware with streamedQuery
43 changes: 42 additions & 1 deletion packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,12 @@ describe('streamedQuery', () => {
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
streamFn: () => createAsyncNumberGenerator(3),
streamFn: (context) => {
// just consume the signal
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
const numbers = context.signal ? 3 : 0
return createAsyncNumberGenerator(numbers)
},
refetchMode: 'append',
}),
})
Expand Down Expand Up @@ -420,6 +425,42 @@ describe('streamedQuery', () => {
})
})

test('should not abort when signal not consumed', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
streamFn: () => createAsyncNumberGenerator(3),
}),
})

const unsubscribe = observer.subscribe(vi.fn())

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'pending',
fetchStatus: 'fetching',
data: undefined,
})

await vi.advanceTimersByTimeAsync(60)

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

unsubscribe()

await vi.advanceTimersByTimeAsync(50)

expect(queryClient.getQueryState(key)).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})
})

test('should support custom reducer', async () => {
const key = queryKey()

Expand Down
25 changes: 11 additions & 14 deletions packages/query-core/src/infiniteQueryBehavior.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { addToEnd, addToStart, ensureQueryFn } from './utils'
import {
addConsumeAwareSignal,
addToEnd,
addToStart,
ensureQueryFn,
} from './utils'
import type { QueryBehavior } from './query'
import type {
InfiniteData,
Expand All @@ -23,19 +28,11 @@ export function infiniteQueryBehavior<TQueryFnData, TError, TData, TPageParam>(
const fetchFn = async () => {
let cancelled = false
const addSignalProperty = (object: unknown) => {
Object.defineProperty(object, 'signal', {
enumerable: true,
get: () => {
if (context.signal.aborted) {
cancelled = true
} else {
context.signal.addEventListener('abort', () => {
cancelled = true
})
}
return context.signal
},
})
addConsumeAwareSignal(
object,
() => context.signal,
() => (cancelled = true),
)
}

const queryFn = ensureQueryFn(context.options, context.fetchOptions)
Expand Down
30 changes: 25 additions & 5 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
import { addConsumeAwareSignal, addToEnd } from './utils'
import type {
OmitKeyof,
QueryFunction,
QueryFunctionContext,
QueryKey,
} from './types'

type BaseStreamedQueryParams<TQueryFnData, TQueryKey extends QueryKey> = {
streamFn: (
Expand Down Expand Up @@ -73,10 +78,25 @@ export function streamedQuery<

let result = initialValue

const stream = await streamFn(context)
let cancelled = false
const streamFnContext = addConsumeAwareSignal<
OmitKeyof<typeof context, 'signal'>
>(
{
client: context.client,
meta: context.meta,
queryKey: context.queryKey,
pageParam: context.pageParam,
direction: context.direction,
},
() => context.signal,
() => (cancelled = true),
)

const stream = await streamFn(streamFnContext)

for await (const chunk of stream) {
if (context.signal.aborted) {
if (cancelled) {
break
}

Expand All @@ -90,7 +110,7 @@ export function streamedQuery<
}

// finalize result: replace-refetching needs to write to the cache
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
if (isRefetch && refetchMode === 'replace' && !cancelled) {
context.client.setQueryData<TData>(context.queryKey, result)
}

Expand Down
30 changes: 30 additions & 0 deletions packages/query-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,33 @@ export function shouldThrowError<T extends (...args: Array<any>) => boolean>(

return !!throwOnError
}

export function addConsumeAwareSignal<T>(
object: T,
getSignal: () => AbortSignal,
onCancelled: VoidFunction,
): T & { signal: AbortSignal } {
let consumed = false
let signal: AbortSignal | undefined

Object.defineProperty(object, 'signal', {
enumerable: true,
get: () => {
signal ??= getSignal()
if (consumed) {
return signal
}

consumed = true
if (signal.aborted) {
onCancelled()
} else {
signal.addEventListener('abort', onCancelled, { once: true })
}

return signal
},
})

return object as T & { signal: AbortSignal }
}