Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.

Commit b0d246b

Browse files
committed
feat(core): actor sleeping
1 parent fa0d38d commit b0d246b

File tree

11 files changed

+364
-52
lines changed

11 files changed

+364
-52
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { actor } from "@rivetkit/core";
2+
3+
// Short timeout actor
4+
export const shortTimeoutActor = actor({
5+
onAuth: () => {},
6+
state: { value: 0 },
7+
options: {
8+
actionTimeout: 50, // 50ms timeout
9+
},
10+
actions: {
11+
quickAction: async (c) => {
12+
return "quick response";
13+
},
14+
slowAction: async (c) => {
15+
// This action should timeout
16+
await new Promise((resolve) => setTimeout(resolve, 100));
17+
return "slow response";
18+
},
19+
},
20+
});
21+
22+
// Long timeout actor
23+
export const longTimeoutActor = actor({
24+
onAuth: () => {},
25+
state: { value: 0 },
26+
options: {
27+
actionTimeout: 200, // 200ms timeout
28+
},
29+
actions: {
30+
delayedAction: async (c) => {
31+
// This action should complete within timeout
32+
await new Promise((resolve) => setTimeout(resolve, 100));
33+
return "delayed response";
34+
},
35+
},
36+
});
37+
38+
// Default timeout actor
39+
export const defaultTimeoutActor = actor({
40+
onAuth: () => {},
41+
state: { value: 0 },
42+
actions: {
43+
normalAction: async (c) => {
44+
await new Promise((resolve) => setTimeout(resolve, 50));
45+
return "normal response";
46+
},
47+
},
48+
});
49+
50+
// Sync actor (timeout shouldn't apply)
51+
export const syncTimeoutActor = actor({
52+
onAuth: () => {},
53+
state: { value: 0 },
54+
options: {
55+
actionTimeout: 50, // 50ms timeout
56+
},
57+
actions: {
58+
syncAction: (c) => {
59+
return "sync response";
60+
},
61+
},
62+
});
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { actor, CONNECTION_DRIVER_WEBSOCKET } from "@rivetkit/core";
2+
3+
export const connLivenessActor = actor({
4+
onAuth: () => {},
5+
state: {
6+
counter: 0,
7+
acceptingConnections: true,
8+
},
9+
options: {
10+
connectionLivenessInterval: 5_000,
11+
connectionLivenessTimeout: 2_500,
12+
},
13+
onConnect: (c, conn) => {
14+
if (!c.state.acceptingConnections) {
15+
conn.disconnect();
16+
throw new Error("Actor is not accepting connections");
17+
}
18+
},
19+
actions: {
20+
getWsConnectionsLiveness: (c) => {
21+
return Array.from(c.conns.values())
22+
.filter((conn) => conn.driver === CONNECTION_DRIVER_WEBSOCKET)
23+
.map((conn) => ({
24+
id: conn.id,
25+
status: conn.status,
26+
lastSeen: conn.lastSeen,
27+
}));
28+
},
29+
getConnectionId: (c) => {
30+
return c.conn.id;
31+
},
32+
kill: (c, connId: string) => {
33+
c.state.acceptingConnections = false;
34+
// Disconnect the connection with the given ID
35+
// This simulates a network failure or a manual disconnection
36+
// The connection will be cleaned up by the actor manager after the timeout
37+
const conn = c.conns.get(connId);
38+
if (conn) {
39+
conn.disconnect();
40+
}
41+
},
42+
getCounter: (c) => {
43+
return c.state.counter;
44+
},
45+
increment: (c, amount: number) => {
46+
c.state.counter += amount;
47+
return c.state.counter;
48+
},
49+
},
50+
});
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { actor, UserError } from "@rivetkit/core";
2+
3+
export const errorHandlingActor = actor({
4+
onAuth: () => {},
5+
state: {
6+
errorLog: [] as string[],
7+
},
8+
actions: {
9+
// Action that throws a UserError with just a message
10+
throwSimpleError: () => {
11+
throw new UserError("Simple error message");
12+
},
13+
14+
// Action that throws a UserError with code and metadata
15+
throwDetailedError: () => {
16+
throw new UserError("Detailed error message", {
17+
code: "detailed_error",
18+
metadata: {
19+
reason: "test",
20+
timestamp: Date.now(),
21+
},
22+
});
23+
},
24+
25+
// Action that throws an internal error
26+
throwInternalError: () => {
27+
throw new Error("This is an internal error");
28+
},
29+
30+
// Action that returns successfully
31+
successfulAction: () => {
32+
return "success";
33+
},
34+
35+
// Action that times out (simulated with a long delay)
36+
timeoutAction: async (c) => {
37+
// This action should time out if the timeout is configured
38+
return new Promise((resolve) => {
39+
setTimeout(() => {
40+
resolve("This should not be reached if timeout works");
41+
}, 10000); // 10 seconds
42+
});
43+
},
44+
45+
// Action with configurable delay to test timeout edge cases
46+
delayedAction: async (c, delayMs: number) => {
47+
return new Promise((resolve) => {
48+
setTimeout(() => {
49+
resolve(`Completed after ${delayMs}ms`);
50+
}, delayMs);
51+
});
52+
},
53+
54+
// Log an error for inspection
55+
logError: (c, error: string) => {
56+
c.state.errorLog.push(error);
57+
return c.state.errorLog;
58+
},
59+
60+
// Get the error log
61+
getErrorLog: (c) => {
62+
return c.state.errorLog;
63+
},
64+
65+
// Clear the error log
66+
clearErrorLog: (c) => {
67+
c.state.errorLog = [];
68+
return true;
69+
},
70+
},
71+
options: {
72+
// Set a short timeout for this actor's actions
73+
actionTimeout: 500, // 500ms timeout for actions
74+
},
75+
});
76+
77+
// Actor with custom timeout
78+
export const customTimeoutActor = actor({
79+
state: {},
80+
actions: {
81+
quickAction: async () => {
82+
await new Promise((resolve) => setTimeout(resolve, 50));
83+
return "Quick action completed";
84+
},
85+
slowAction: async () => {
86+
await new Promise((resolve) => setTimeout(resolve, 300));
87+
return "Slow action completed";
88+
},
89+
},
90+
options: {
91+
actionTimeout: 200, // 200ms timeout
92+
},
93+
});

packages/core/src/actor/config.ts

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,28 +60,15 @@ export const ActorConfigSchema = z
6060
createVars: z.function().optional(),
6161
options: z
6262
.object({
63-
lifecycle: z
64-
.object({
65-
createVarsTimeout: z.number().positive().default(5000),
66-
createConnStateTimeout: z.number().positive().default(5000),
67-
onConnectTimeout: z.number().positive().default(5000),
68-
connectionLivenessTimeout: z.number().positive().default(2500),
69-
connectionLivenessInterval: z.number().positive().default(5000),
70-
})
71-
.strict()
72-
.default({}),
73-
state: z
74-
.object({
75-
saveInterval: z.number().positive().default(10_000),
76-
})
77-
.strict()
78-
.default({}),
79-
action: z
80-
.object({
81-
timeout: z.number().positive().default(60_000),
82-
})
83-
.strict()
84-
.default({}),
63+
createVarsTimeout: z.number().positive().default(5000),
64+
createConnStateTimeout: z.number().positive().default(5000),
65+
onConnectTimeout: z.number().positive().default(5000),
66+
stateSaveInterval: z.number().positive().default(10_000),
67+
actionTimeout: z.number().positive().default(60_000),
68+
connectionLivenessTimeout: z.number().positive().default(2500),
69+
connectionLivenessInterval: z.number().positive().default(5000),
70+
noSleep: z.boolean().default(false),
71+
sleepTimeout: z.number().positive().default(30_000),
8572
})
8673
.strict()
8774
.default({}),

packages/core/src/actor/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ export class Conn<S, CP, CS, V, I, AD, DB extends AnyDatabaseProvider> {
209209
* @internal
210210
*/
211211
[CONNECTION_CHECK_LIVENESS_SYMBOL]() {
212-
const readyState = this.#driver.getConnectionReadyState?.(
212+
const readyState = this.#driver.getConnectionReadyState(
213213
this.#actor,
214214
this,
215215
);

packages/core/src/actor/driver.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ export interface ActorDriver {
3939
*/
4040
getDatabase(actorId: string): Promise<unknown | undefined>;
4141

42+
sleep?(actorId: string): void;
43+
4244
shutdown?(immediate: boolean): Promise<void>;
4345
}
4446

@@ -72,7 +74,7 @@ export interface ConnDriver<ConnDriverState = unknown> {
7274
* Returns the ready state of the connection.
7375
* This is used to determine if the connection is ready to send messages, or if the connection is stale.
7476
*/
75-
getConnectionReadyState?(
77+
getConnectionReadyState(
7678
actor: AnyActorInstance,
7779
conn: AnyConn,
7880
): ConnectionReadyState | undefined;

packages/core/src/actor/generic-conn-driver.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ export interface GenericSseDriverState {
160160
encoding: Encoding;
161161
}
162162

163-
export function createGenericSseDriver(globalState: GenericConnGlobalState) {
163+
export function createGenericSseDriver(
164+
globalState: GenericConnGlobalState,
165+
): ConnDriver<GenericSseDriverState> {
164166
return {
165167
sendMessage: (
166168
_actor: AnyActorInstance,
@@ -219,8 +221,12 @@ export function createGenericSseDriver(globalState: GenericConnGlobalState) {
219221
// MARK: HTTP
220222
export type GenericHttpDriverState = Record<never, never>;
221223

222-
export function createGenericHttpDriver() {
224+
export function createGenericHttpDriver(): ConnDriver<GenericHttpDriverState> {
223225
return {
226+
getConnectionReadyState(_actor, conn) {
227+
// TODO: This might not be the correct logic
228+
return ConnectionReadyState.OPEN;
229+
},
224230
disconnect: async () => {
225231
// Noop
226232
},

0 commit comments

Comments
 (0)