Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
aaebc03
feat: add query.live remote function support
Rich-Harris Mar 17, 2026
af10f8b
fix: make query.live await blocks update with stream values
Rich-Harris Mar 17, 2026
9fd9c25
fix: cancel live query streams on disconnect and reload
Rich-Harris Mar 17, 2026
92afb7d
regenerate types
Rich-Harris Mar 18, 2026
29d67e9
tweak
Rich-Harris Mar 19, 2026
704eec4
remove plan doc
Rich-Harris Mar 19, 2026
c69d5ee
tidy up
Rich-Harris Mar 19, 2026
1a6ba2f
fix test
Rich-Harris Mar 19, 2026
1b8991b
simplify
Rich-Harris Mar 19, 2026
872710b
changeset
Rich-Harris Mar 19, 2026
1d0fff3
revert
Rich-Harris Mar 19, 2026
1545da9
fix types
Rich-Harris Mar 19, 2026
54ad84e
tighten up types a bit
Rich-Harris Mar 19, 2026
dd9db79
merge
Rich-Harris Mar 19, 2026
da1e41c
fix typechecking error
Rich-Harris Mar 19, 2026
f0dadad
better naming
Rich-Harris Mar 19, 2026
19f5e96
tweak docs
Rich-Harris Mar 19, 2026
84d4d78
tweak docs
Rich-Harris Mar 19, 2026
fa1fb0b
merge main
Rich-Harris Mar 19, 2026
6cc8478
get rid of second argument, we dont need it
Rich-Harris Mar 19, 2026
2fb9c67
unnecessary
Rich-Harris Mar 19, 2026
af19c4b
unnecessary
Rich-Harris Mar 19, 2026
165e514
revert
Rich-Harris Mar 19, 2026
cf5c566
revert
Rich-Harris Mar 19, 2026
31e80f2
tidy
Rich-Harris Mar 19, 2026
a06f3cf
tweak
Rich-Harris Mar 19, 2026
e271658
not sure what this is for
Rich-Harris Mar 19, 2026
7fb2d8e
tidy up
Rich-Harris Mar 20, 2026
b579163
goddammit codex was right
Rich-Harris Mar 20, 2026
c228920
don't attempt to reconnect to a finished live query
Rich-Harris Mar 20, 2026
6fe92bf
fix
Rich-Harris Mar 20, 2026
c036ff7
lint
Rich-Harris Mar 22, 2026
dd63c1c
allow server to trigger a reconnect
Rich-Harris Mar 23, 2026
c380c4b
reconnect to finished live query
Rich-Harris Mar 23, 2026
3372198
hopefully fix preview deploy
Rich-Harris Mar 23, 2026
53815f5
dedupe
Rich-Harris Mar 23, 2026
3a63d78
tweak
Rich-Harris Mar 23, 2026
ff17ca9
slightly better error handling
Rich-Harris Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wet-rings-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@sveltejs/kit': minor
---

feat: experimental `query.live` function
39 changes: 39 additions & 0 deletions documentation/docs/20-core-concepts/60-remote-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,45 @@ export const getWeather = query.batch(v.string(), async (cityIds) => {
{/if}
```

## query.live

`query.live` is for accessing real-time data from the server. It behaves similarly to `query`, but the callback — typically an async [generator function](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/function*) — returns an `AsyncIterable`:

```js
import { query } from '$app/server';

export const getTime = query.live(async function* () {
while (true) {
yield new Date();
await new Promise((f) => setTimeout(f, 1000));
}
});
```

During server-side rendering, `await getTime()` returns the first yielded value then closes the iterator. This initial value is serialized and reused during hydration.

On the client, the query stays connected while it's actively used in a component. Multiple instances share a connection. When there are no active uses left, the stream disconnects and server-side iteration is stopped.

Live queries expose a `connected` property and `reconnect()` method:

```svelte
<script>
import { getTime } from './time.remote.js';

const time = getTime();
</script>

<p>{await time}</p>
<p>connected: {time.connected}</p>
<button onclick={() => time.reconnect()}>Reconnect</button>
```

If the connection drops, `connected` becomes `false`. SvelteKit will attempt to reconnect passively, with exponential backoff, and actively if `navigator.onLine` goes from `false` to `true`.

Unlike `query`, live queries do not have a `refresh()` method, as they are self-updating.

As with `query` and `query.batch`, call `.run()` outside render when you need imperative access. For live queries, `run()` returns a `Promise<AsyncIterator<T>>`.

## form

The `form` function makes it easy to write data to the server. It takes a callback that receives `data` constructed from the submitted [`FormData`](https://developer.mozilla.org/en-US/docs/Web/API/FormData)...
Expand Down
2 changes: 1 addition & 1 deletion packages/kit/src/exports/internal/remote-functions.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/** @import { RemoteInternals } from 'types' */

/** @type {RemoteInternals['type'][]} */
const types = ['command', 'form', 'prerender', 'query', 'query_batch'];
const types = ['command', 'form', 'prerender', 'query', 'query_batch', 'query_live'];

/**
* @param {Record<string, any>} module
Expand Down
22 changes: 22 additions & 0 deletions packages/kit/src/exports/public.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,21 @@ export type RemoteQuery<T> = RemoteResource<T> & {
withOverride(update: (current: T) => T): RemoteQueryOverride;
};

export type RemoteLiveQuery<T> = RemoteResource<T> & {
/**
* Returns an async iterator with live updates.
* Unlike awaiting the resource directly, this can only be used _outside_ render
* (i.e. in load functions, event handlers and so on)
*/
run(): Promise<AsyncIterator<T>>;
/** `true` if the live stream is currently connected. */
readonly connected: boolean;
/** `true` once the live stream iterator has completed. */
readonly finished: boolean;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of confusing... Can finish go back to false if you call refresh?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We could go with completed instead?

/** Reconnects the live stream immediately. */
reconnect(): void;
};

export interface RemoteQueryOverride {
_key: string;
release(): void;
Expand All @@ -2189,4 +2204,11 @@ export type RemoteQueryFunction<Input, Output> = (
arg: undefined extends Input ? Input | void : Input
) => RemoteQuery<Output>;

/**
* The return value of a remote `query.live` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JSDoc is a little confusing. It's true that this is literally what query.live returns but it's probably more helpful to say it's just "The type of a live query function" or something similar.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex was just following the convention set by the other remote functions. Open to changing it but we should probably change them all

*/
export type RemoteLiveQueryFunction<Input, Output> = (
arg: undefined extends Input ? Input | void : Input
) => RemoteLiveQuery<Output>;

export * from './index.js';
1 change: 1 addition & 0 deletions packages/kit/src/runtime/app/server/remote/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export function command(validate_or_fn, maybe_fn) {
}

state.remote.refreshes ??= {};
state.remote.reconnects ??= new Set();

const promise = Promise.resolve(
run_remote_function(event, state, true, () => validate(arg), fn)
Expand Down
1 change: 1 addition & 0 deletions packages/kit/src/runtime/app/server/remote/form.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export function form(validate_or_fn, maybe_fn) {
}

state.remote.refreshes ??= {};
state.remote.reconnects ??= new Set();

const issue = create_issues();

Expand Down
183 changes: 181 additions & 2 deletions packages/kit/src/runtime/app/server/remote/query.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/** @import { RemoteQuery, RemoteQueryFunction } from '@sveltejs/kit' */
/** @import { RemoteInternals, MaybePromise, RequestState, RemoteQueryBatchInternals, RemoteQueryInternals } from 'types' */
/** @import { RemoteLiveQuery, RemoteLiveQueryFunction, RemoteQuery, RemoteQueryFunction } from '@sveltejs/kit' */
/** @import { RemoteInternals, MaybePromise, RequestState, RemoteQueryLiveInternals, RemoteQueryBatchInternals, RemoteQueryInternals } from 'types' */
/** @import { StandardSchemaV1 } from '@standard-schema/spec' */
import { get_request_store } from '@sveltejs/kit/internal/server';
import { create_remote_key, stringify, stringify_remote_arg } from '../../../shared.js';
Expand Down Expand Up @@ -84,6 +84,103 @@ export function query(validate_or_fn, maybe_fn) {
return wrapper;
}

/**
* Creates a live remote query. When called from the browser, the function will be invoked on the server via a streaming `fetch` call.
*
* See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
*
* @template Output
* @overload
* @param {(arg: void) => MaybePromise<Generator<Output> | AsyncIterator<Output> | AsyncIterable<Output>>} fn
* @returns {RemoteLiveQueryFunction<void, Output>}
*/
/**
* @template Input
* @template Output
* @overload
* @param {'unchecked'} validate
* @param {(arg: Input) => MaybePromise<Generator<Output> | AsyncIterator<Output> | AsyncIterable<Output>>} fn
* @returns {RemoteLiveQueryFunction<Input, Output>}
*/
/**
* @template {StandardSchemaV1} Schema
* @template Output
* @overload
* @param {Schema} schema
* @param {(arg: StandardSchemaV1.InferOutput<Schema>) => MaybePromise<Generator<Output> | AsyncIterator<Output> | AsyncIterable<Output>>} fn
* @returns {RemoteLiveQueryFunction<StandardSchemaV1.InferInput<Schema>, Output>}
*/
/**
* @template Input
* @template Output
* @param {any} validate_or_fn
* @param {(args: Input) => MaybePromise<Generator<Output> | AsyncIterator<Output> | AsyncIterable<Output>>} [maybe_fn]
* @returns {RemoteLiveQueryFunction<Input, Output>}
*/
/*@__NO_SIDE_EFFECTS__*/
function live(validate_or_fn, maybe_fn) {
/** @type {(arg: Input) => MaybePromise<Generator<Output> | AsyncIterator<Output> | AsyncIterable<Output>>} */
const fn = maybe_fn ?? validate_or_fn;

/** @type {(arg?: any) => MaybePromise<Input>} */
const validate = create_validator(validate_or_fn, maybe_fn);

/**
* @param {any} event
* @param {any} state
* @param {any} arg
*/
const run = async (event, state, arg) => {
return await run_remote_function(
event,
state,
false,
() => validate(arg),
async (input) => to_async_iterator(await fn(input), __.name)
);
};

/** @type {RemoteQueryLiveInternals} */
const __ = { type: 'query_live', id: '', name: '', run };

/** @type {RemoteLiveQueryFunction<Input, Output> & { __: RemoteQueryLiveInternals }} */
const wrapper = (arg) => {
if (prerendering) {
throw new Error(
`Cannot call query.live '${__.name}' while prerendering, as prerendered pages need static data. Use 'prerender' from $app/server instead`
);
}

const { event, state } = get_request_store();

return create_live_query_resource(
__,
arg,
state,
async () => {
const iterator = await run(event, state, arg);

try {
const { value, done } = await iterator.next();

if (done) {
throw new Error(`query.live '${__.name}' did not yield a value`);
}

return value;
} finally {
await iterator.return?.();
}
},
async () => run(event, state, arg)
);
};

Object.defineProperty(wrapper, '__', { value: __ });

return wrapper;
}

/**
* Creates a batch query function that collects multiple calls and executes them in a single request
*
Expand Down Expand Up @@ -305,8 +402,90 @@ function create_query_resource(__, arg, state, fn) {
};
}

/**
* @param {RemoteQueryLiveInternals} __
* @param {any} arg
* @param {RequestState} state
* @param {() => Promise<any>} get_first_value
* @param {() => MaybePromise<AsyncIterator<any>>} get_iterator
* @returns {RemoteLiveQuery<any>}
*/
function create_live_query_resource(__, arg, state, get_first_value, get_iterator) {
/** @type {Promise<any> | null} */
let promise = null;

const get_promise = () => {
return (promise ??= get_response(__, arg, state, get_first_value));
};

return {
/** @type {Promise<any>['catch']} */
catch(onrejected) {
return get_promise().catch(onrejected);
},
current: undefined,
error: undefined,
/** @type {Promise<any>['finally']} */
finally(onfinally) {
return get_promise().finally(onfinally);
},
finished: false,
loading: true,
ready: false,
connected: false,
reconnect() {
const reconnects = state.remote.reconnects;

if (!reconnects) {
throw new Error(
`Cannot call reconnect on query.live '${__.name}' because it is not executed in the context of a command/form remote function`
);
}

reconnects.add(create_remote_key(__.id, stringify_remote_arg(arg, state.transport)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add docs for reconnecting (...single-flight reconnects?)

},
async run() {
if (!state.is_in_universal_load) {
throw new Error(
'On the server, .run() can only be called in universal `load` functions. Anywhere else, just await the query directly'
);
}

return get_iterator();
},
/** @type {Promise<any>['then']} */
then(onfulfilled, onrejected) {
return get_promise().then(onfulfilled, onrejected);
},
get [Symbol.toStringTag]() {
return 'LiveQueryResource';
}
};
}

// Add batch as a property to the query function
Object.defineProperty(query, 'batch', { value: batch, enumerable: true });
Object.defineProperty(query, 'live', { value: live, enumerable: true });

/**
* @template T
* @param {Generator<T> | AsyncIterator<T> | AsyncIterable<T>} source
* @param {string} name
* @returns {AsyncIterator<T>}
*/
function to_async_iterator(source, name) {
const maybe = /** @type {any} */ (source);

if (maybe && typeof maybe[Symbol.asyncIterator] === 'function') {
return maybe[Symbol.asyncIterator]();
}

if (maybe && typeof maybe.next === 'function') {
return maybe;
}

throw new Error(`query.live '${name}' must return an AsyncIterator or AsyncIterable`);
}

/**
* @param {RemoteInternals} __
Expand Down
12 changes: 9 additions & 3 deletions packages/kit/src/runtime/client/client.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/** @import { RemoteQueryCacheEntry } from './remote-functions/query.svelte.js' */
/** @import { RemoteLiveQueryCacheEntry, RemoteQueryCacheEntry } from './remote-functions/query.svelte.js' */
import { BROWSER, DEV } from 'esm-env';
import * as svelte from 'svelte';
import { HttpError, Redirect, SvelteKitError } from '@sveltejs/kit/internal';
Expand Down Expand Up @@ -306,6 +306,12 @@ export let pending_invalidate;
*/
export const query_map = new Map();

/**
* @type {Map<string, RemoteLiveQueryCacheEntry<any>>}
* A map of id -> live query info with all live queries that currently exist in the app.
*/
export const live_query_map = new Map();

/**
* @param {import('./types.js').SvelteKitApp} _app
* @param {HTMLElement} _target
Expand Down Expand Up @@ -412,7 +418,7 @@ async function _invalidate(include_load_functions = true, reset_page_state = tru
// Rerun queries
if (force_invalidation) {
query_map.forEach(({ resource }) => {
void resource.refresh?.();
void resource.refresh();
});
}

Expand Down Expand Up @@ -520,7 +526,7 @@ export async function _goto(url, options, redirect_count, nav_token) {
query_map.forEach(({ resource }, key) => {
// Only refresh those that already existed on the old page
if (query_keys?.includes(key)) {
void resource.refresh?.();
void resource.refresh();
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import * as devalue from 'devalue';
import { HttpError } from '@sveltejs/kit/internal';
import { app } from '../client.js';
import { stringify_remote_arg } from '../../shared.js';
import { get_remote_request_headers, refresh_queries, release_overrides } from './shared.svelte.js';
import {
get_remote_request_headers,
refresh_queries,
reconnect_live_queries,
release_overrides
} from './shared.svelte.js';

/**
* Client-version of the `command` function from `$app/server`.
Expand Down Expand Up @@ -70,6 +75,10 @@ export function command(id) {
refresh_queries(result.refreshes, updates);
}

if (result.reconnects) {
reconnect_live_queries(result.reconnects);
}

return devalue.parse(result.result, app.decoders);
}
} finally {
Expand Down
Loading
Loading