feat: experimental query.live remote function#15563
feat: experimental query.live remote function#15563Rich-Harris wants to merge 38 commits intomainfrom
query.live remote function#15563Conversation
🦋 Changeset detectedLatest commit: ff17ca9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Does this replace #14292 ? |
|
Ohhhh, I know why that happens — it's because the live query contains this code... if (!user_id) {
yield null;
return;
}...which means the response is finite, which means it disconnects, which means it attempts to reconnect. So it's a bug in both the app code and the framework — app code because the returned async iterator shouldn't be finite, and framework because we allow it to be finite (and/or attempt to reconnect to the endpoint despite that, if we wanted to allow finite iterators for some reason). |
Is the websocket piece necessary for that? Or do I just need a reference to the DO class? (I don't really understand how they work and the docs are kinda dense) |
You can get a handle to the DO class on the |
|
So could the DO class implement |
Publish yes, subscribe no unless you make an internal WebSocket connection from the query function to the DO. That might work for smaller apps, but it would increase latency and without the ability to handle multiple live queries over a single WS connection it limits the amount of traffic possible (DOs don't horizontally scale without manually implementing that). |
|
Hi @Rich-Harris and @ottomated. TL;DR: This PR is about There are some more suggestions, please, direct me to good place to discuss:Let's split this a parts:
|
Do you have any thoughts on whether and how it would be possible to enable subscriptions within a live query? The API here is deliberately unopinionated, the hope being that it can be made to work with a variety of setups, but by extension it might not be able to compete with a websocket-based approach. For example the demo above uses polling, but I also got it to work with postgres One API design question I'm pondering: should yielded values be deduped? In a polling scenario it's mildy annoying to have to do this sort of thing: export const getThings = query.live(async function*() {
const signal = getRequestEvent().request.signal;
let prev;
while (!signal.aborted) {
const data = await db.getThings();
if (prev !== (prev = JSON.stringify(data)) {
yield data;
}
await sleep(1000);
}
});If deduping was automatic, it could just be this: export const getThings = query.live(async function*() {
const signal = getRequestEvent().request.signal;
while (!signal.aborted) {
yield await db.getThings();
await sleep(1000);
}
});It does mean that you can't use |
|
I think deduping should be automatic. For streams you can always include something to differentiate between chunks like a timestamp |
packages/kit/src/runtime/client/remote-functions/query.svelte.js
Outdated
Show resolved
Hide resolved
| if (done) { | ||
| throw new Error(`query.live '${__.name}' did not yield a value`); | ||
| } |
There was a problem hiding this comment.
done and value could be both initialized if the first statement is return
async function* test(){
return 42;
}
const gen = test();
console.log(await gen.next()); // {value: 42, done: true}it might be an edge case, but maybe it could happen if someone is checking a condition and returning before starting the live query?
There was a problem hiding this comment.
The correct way to write that would be this:
async function* test(){
yield 42;
return;
}I don't think it makes sense to treat return and yield interchangeably
There was a problem hiding this comment.
But is technically valid JS and return should be considered "the last yield" no?
There was a problem hiding this comment.
Not really — if you run this...
function* foo() {
yield 1;
yield 2;
yield 3;
return 4;
}
for (const value of foo()) {
console.log(value);
}...it will log 1 then 2 then 3, but not 4
| result: stringify(result, transport), | ||
| refreshes: result.issues ? undefined : await serialize_refreshes(meta.remote_refreshes) | ||
| refreshes: result.issues ? undefined : await serialize_refreshes(meta.remote_refreshes), | ||
| reconnects: serialize_reconnects() |
There was a problem hiding this comment.
Just throwing this out there (not spent any time thinking if that could be the case)...could this also have the same problem as client refreshes?
There was a problem hiding this comment.
no, it doesn't create any additional server-side work
There was a problem hiding this comment.
...shouldn't it?
Shouldn't the behavior here be the same as SSR, where we gather the first response from the iterables and return it with the command result? Otherwise you'll have the command complete and some indeterminate amount of time while the live queries reconnect and bring in their first values at different times, rather than all at once.
|
Just to give an example that would speak against dedupe by default: Memory consumption. Cloudflare workers are incredibly limited in terms of RAM (128MB) so having a mechanism that caches an entire payload by default for a long time ( export const getThings = query.live(deduped(
async function*() {
const signal = getRequestEvent().request.signal;
while (!signal.aborted) {
yield await db.getThings();
await sleep(1000);
}
}
));Additionally, this way the deduping logic can be customized (e.g. trading off CPU time vs RAM by hashing instead of string comparison of the entire payload) |
|
A Cloudflare worker only handles a single request at once, so unless a single serialized payload is large enough to burst the memory banks (in which case |
|
https://developers.cloudflare.com/workers/platform/limits/#memory
I totally agree, having a single multi-megabyte payload should not be the use case of I ran into CF memory limits more often than I would like to admit, so I just wanted to at least mention it in this thread. (I've had similar concerns with the way normal queries and refreshes attach their entire payload to the async local storage, but that's a bit too off-topic for here) |
It's a tough problem to solve. This is the closest I've got: hooks.server.ts// Provide two default transports that the user can switch between as they wish
type Input = {
// e.g. "k932er/myLiveQuery"
queryId: string;
event: RequestEvent;
transport: {
stream(queryId: string, event: RequestEvent): Response;
websocket(queryId: string, event: RequestEvent): Response;
}
};
// Default hook would just use the stream transport
export const handleLiveQuery = async ({ transport, queryId, event }: Input) => {
return transport.stream(queryId, event);
};
// Durable Object example
export const handleLiveQuery = async ({ queryId, event }: Input) => {
const upgrade = event.request.headers.get('Upgrade');
if (upgrade !== 'websocket') error(426, 'Expected websocket');
// Pass off to a Durable Object
// canonical way of passing variables like this I think
const headers = new Headers(event.request.headers);
headers.set('X-LiveQueryId', queryId);
// do extra stuff like load balancing
const socketName = queryId + '_' + Math.floor(Math.random() * 5);
const socket = event.platform.env.SOCKET.getByName(socketName);
return socket.fetch(event.request, {
headers
});
};query.remote.tsimport { query } from '$app/server';
export const myLiveQuery = query.live(async function* () {
while (true) {
yield await globalThis.subscribe();
}
});worker.ts// worker.ts
export class Socket extends DurableObject {
#subscriptions = new Set();
constructor() {
globalThis.subscribe = () => {
const { promise, resolve } = Promise.withResolvers();
this.#subscriptions.add(resolve);
return promise;
};
}
publish(payload) {
this.#subscriptions.forEach(resolve => resolve());
this.#subscriptions.clear();
}
async fetch(req: Request): Promise<Response> {
const queryId = req.headers.get('X-LiveQueryId');
const { 0: client, 1: server } = new WebSocketPair();
this.ctx.acceptWebSocket(server);
this.ctx.waitUntil(
(async () => {
for await (const payload of INTERNAL_SVELTEKIT_MAGIC_GET_QUERY_LIVE_ITERABLE[queryId]()) {
server.send(payload);
}
})()
);
return new Response(null, {
status: 101,
webSocket: client,
});
}
}Cons:
I think the current API just isn't flexible enough for this use case, unfortunately. On deduping, I think it should be opt-in and turned off by default. It's unintuitive that the async iterable you get on the client isn't the exact same as the one on the server - too magic. |
|
Note that deduping can't be delegated to a helper without that helper needing to implement its own serialization, which is straightforward in simple cases but can get more complex when you have custom types and self-references and whatnot. So it would need to at minimum take a second argument that hashes a payload (which could perhaps default to But even then, you're paying the cost of serialization twice, which seems silly. The efficient thing to do is dedupe at the point that you were going to serialize anyway. I'm not sure doing it automatically is 'too magic' — it's a very straightforward and easy-to-document behaviour that saves effort and bandwidth. If we hashed the payload automatically would that ease the concerns around memory? The (Honestly though, if a worker runs out of memory because it's trying to serve too many users, that seems like a bug in workers rather than something every user needs to tiptoe around!) |
|
Ah true... should've taken a closer look at the code. Also makes perfect sense if the sveltekit team decides to not include workarounds for all users just to alleviate the (very) strict limitations of a single platform, but the hashing does sound like an appropriate fix (that also could still be added in a later release if needed). Unrelated to the memory issue, I did agree with @ottomated on the unintuitive part as well, but on closer inspection maybe that's just due to me misinterpreting the actual usecase of For a "live" view of a single data point it makes a lot more sense to do deduping by default. |
|
I saw that WebTransport is coming in Safari 26.4 https://developer.mozilla.org/en-US/docs/Web/API/WebTransport |
|
Wonder if cap n web would be a nice transport layer for this or a query.stream could also solve the worker friction while working everywhere else https://github.com/cloudflare/capnweb |
elliott-with-the-longest-name-on-github
left a comment
There was a problem hiding this comment.
Finally got around to a full review of this.
I think the behavior of reconnecting on the server should be a little different...
Other than that, mostly a bunch of nits
| /** `true` if the live stream is currently connected. */ | ||
| readonly connected: boolean; | ||
| /** `true` once the live stream iterator has completed. */ | ||
| readonly finished: boolean; |
There was a problem hiding this comment.
This is kind of confusing... Can finish go back to false if you call refresh?
| ) => RemoteQuery<Output>; | ||
|
|
||
| /** | ||
| * The return value of a remote `query.live` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation. |
There was a problem hiding this comment.
This JSDoc is a little confusing. It's true that this is literally what query.live returns but it's probably more helpful to say it's just "The type of a live query function" or something similar.
| ); | ||
| } | ||
|
|
||
| reconnects.add(create_remote_key(__.id, stringify_remote_arg(arg, state.transport))); |
There was a problem hiding this comment.
We should probably add docs for reconnecting (...single-flight reconnects?)
| void invalidateAll(); | ||
| } | ||
|
|
||
| if (form_result.reconnects) { |
There was a problem hiding this comment.
Should this be tied into invalidateAll like form_result.refreshes is? Should invalidateAll reconnect all live queries? Should calling reconnect in a form handler cause us to not invalidateAll automatically?
| if (DEV) { | ||
| for (const [key, entry] of live_query_map) { | ||
| if (key === id || key.startsWith(id + '/')) { | ||
| void entry.resource.reconnect(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
How should live queries work with single-flight mutations? If we want to be able to do updates(live_query) to reconnect all instances of that query, this is going to need the two-tiered caching I added in the requested PR. This logic will also have to change. We'd also need to add some additional handling to support requested(live_query, ...)
| * @implements {Promise<T>} | ||
| */ | ||
| export class LiveQuery { | ||
| _key; |
There was a problem hiding this comment.
We removed _key in favor of a symbol property in my requested PR; might as well use the same thing here?
There was a problem hiding this comment.
Especially if we're going to integrate this with requested (I think we should)
| * @template T | ||
| * @implements {Promise<T>} | ||
| */ | ||
| export class LiveQuery { |
There was a problem hiding this comment.
I ran a couple of different AI code reviews on this PR -- the most useful feedback was probably this:
- I re-checked the output and continued the review; there is still one actionable issue and no additional high-confidence bugs.
- Medium severity bug: query.live can’t recover for await consumers after an initial connection failure.
- In packages/kit/src/runtime/client/remote-functions/query.svelte.js:601, LiveQuery creates a single first-value promise once.
- In packages/kit/src/runtime/client/remote-functions/query.svelte.js:701, that promise is rejected if the first stream attempt fails before readiness.
- In packages/kit/src/runtime/client/remote-functions/query.svelte.js:566, then keeps chaining from that same rejected promise, so {await live} remains permanently failed even if reconnect later succeeds and current updates.
| cached.count += 1; | ||
|
|
||
| return cached; | ||
| return /** @type {RemoteQueryCacheEntry<T>} */ (cached); |
There was a problem hiding this comment.
Was this actually necessary? Why?
| withOverride(fn) { | ||
| const entry = this.#get_or_create_cache_entry(); | ||
| const override = entry.resource.withOverride(fn); | ||
| const override = /** @type {Query<T>} */ (entry.resource).withOverride(fn); |
There was a problem hiding this comment.
?
| result: stringify(result, transport), | ||
| refreshes: result.issues ? undefined : await serialize_refreshes(meta.remote_refreshes) | ||
| refreshes: result.issues ? undefined : await serialize_refreshes(meta.remote_refreshes), | ||
| reconnects: serialize_reconnects() |
There was a problem hiding this comment.
...shouldn't it?
Shouldn't the behavior here be the same as SSR, where we gather the first response from the iterables and return it with the command result? Otherwise you'll have the command complete and some indeterminate amount of time while the live queries reconnect and bring in their first values at different times, rather than all at once.
This implements a new
query.liveremote function that provides a live view of some real-time data. The semantics are similar toqueryandquery.batchyou can{await myLiveQuery(123)}in your component (or in a$derived) and it will update automatically.query) or a function that returns data for a specific input (likequery.batch), the callback toquery.liveshould return anAsyncIterator. Typically, it will be implemented as an async generator function.connectedproperty which turnsfalseif the connection drops while the query is actively used. It will proactively attempt to reconnect with exponential backoff with jitter, and also ifnavigator.onLinegoes fromfalsetotrue, or you can force a reconnection attempt with the.reconnect()methodquerycan (since feat:hydratableand a more consistent remote functions model #15533) be accessed in e.g. an event handler with themyQuery(123).run()method, which returns aPromise, you can.run()a live query to get the rawAsyncIteratorThe live query callback receives acorrection: there's no need for this, we can just do{ signal }as well as the (validated) argumentgetRequestEvent().request.signalSo the most basic example might look like this:
Because it is stateless, it is well suited to e.g. serverless environments — if you hit the duration limit, then
now().connectedwill becomefalsefor a moment, then the client will automatically reconnect. Under the hood, it is implemented as a normalResponsewithchunkedtransfer encoding (not anEventSource, since that provides less control over reconnection).In cases where the callback needs to do some setup and disposal work, you could either use this pattern...
...or, if you're using a sufficiently modern server runtime,
using:Another useful construct is
yield*. My expectation is that by leaning on a language primitive, rather than using e.g. a callback-based API, we will both encourage and benefit from the ecosystem standardising on async iterators and disposables.Possible follow-ups:
http(which includes most people's local dev setups) you can have a very small number (I think it's 6?) which you can very quickly exhaust. It might make sense to use websockets for transport in local dev, rather than HTTP (unfortunately websockets aren't a great default for prod, because not all environments support them)Please don't delete this checklist! Before submitting the PR, please make sure you do the following:
Tests
pnpm testand lint the project withpnpm lintandpnpm checkChangesets
pnpm changesetand following the prompts. Changesets that add features should beminorand those that fix bugs should bepatch. Please prefix changeset messages withfeat:,fix:, orchore:.Edits