Skip to content

Commit 6e1173b

Browse files
authored
Merge pull request #229 from codesandbox/CSB-668-integrate-with-pint-execs-apis
chore: add some of the execs operations integration
2 parents bab9005 + 4d5a47c commit 6e1173b

File tree

5 files changed

+763
-15
lines changed

5 files changed

+763
-15
lines changed

src/PintClient/execs.ts

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
import { Client } from "../api-clients/pint/client";
2+
import { Emitter, EmitterSubscription } from "../utils/event";
3+
import { Disposable } from "../utils/disposable";
4+
import { parseStreamEvent } from "./utils";
5+
import {
6+
IAgentClientShells,
7+
} from "../agent-client-interface";
8+
import {
9+
createExec,
10+
ExecItem,
11+
ExecListResponse,
12+
getExec,
13+
getExecOutput,
14+
listExecs,
15+
deleteExec,
16+
updateExec,
17+
execExecStdin,
18+
streamExecsList,
19+
} from "../api-clients/pint";
20+
import {
21+
ShellSize,
22+
ShellProcessType,
23+
OpenShellDTO,
24+
CommandShellDTO,
25+
ShellId,
26+
TerminalShellDTO,
27+
ShellDTO,
28+
ShellProcessStatus,
29+
} from "../pitcher-protocol/messages/shell";
30+
31+
export class PintShellsClient implements IAgentClientShells {
32+
private openShells: Record<string, AbortController> = {};
33+
private subscribeAndEvaluateExecsUpdates(
34+
compare: (
35+
nextExec: ExecItem,
36+
prevExec: ExecItem | undefined,
37+
prevExecs: ExecItem[]
38+
) => void
39+
) {
40+
let prevExecs: ExecItem[] = [];
41+
const abortController = new AbortController();
42+
43+
streamExecsList({
44+
client: this.apiClient,
45+
signal: abortController.signal,
46+
headers: {
47+
headers: { Accept: "text/event-stream" },
48+
},
49+
}).then(async ({ stream }) => {
50+
for await (const evt of stream) {
51+
const execListResponse = parseStreamEvent<ExecListResponse>(evt);
52+
const execs = execListResponse.execs;
53+
54+
if (prevExecs && execs) {
55+
execs.forEach((exec) => {
56+
const prevExec = prevExecs?.find(
57+
(execItem) => execItem.id === exec.id
58+
);
59+
60+
compare(exec, prevExec, prevExecs);
61+
});
62+
}
63+
64+
prevExecs = execs || [];
65+
}
66+
});
67+
68+
return Disposable.create(() => {
69+
abortController.abort();
70+
});
71+
}
72+
private onShellExitedEmitter = new EmitterSubscription<{
73+
shellId: string;
74+
exitCode: number;
75+
}>((fire) =>
76+
this.subscribeAndEvaluateExecsUpdates((exec, prevExec) => {
77+
if (!prevExec) {
78+
return;
79+
}
80+
81+
if (prevExec.status === "RUNNING" && exec.status === "EXITED") {
82+
fire({
83+
shellId: exec.id,
84+
exitCode: exec.exitCode,
85+
});
86+
}
87+
})
88+
);
89+
onShellExited = this.onShellExitedEmitter.event;
90+
91+
private onShellOutEmitter = new Emitter<{
92+
shellId: ShellId;
93+
out: string;
94+
}>();
95+
onShellOut = this.onShellOutEmitter.event;
96+
private onShellTerminatedEmitter = new EmitterSubscription<{
97+
shellId: string;
98+
author: string;
99+
}>((fire) =>
100+
this.subscribeAndEvaluateExecsUpdates((exec, prevExec) => {
101+
if (!prevExec) {
102+
return;
103+
}
104+
105+
if (prevExec.status === "RUNNING" && exec.status === "STOPPED") {
106+
fire({
107+
shellId: exec.id,
108+
author: "",
109+
});
110+
}
111+
})
112+
);
113+
onShellTerminated = this.onShellTerminatedEmitter.event;
114+
constructor(private apiClient: Client, private sandboxId: string) {}
115+
private convertExecToShellDTO(exec: ExecItem) {
116+
return {
117+
isSystemShell: true,
118+
name: JSON.stringify({
119+
type: "command",
120+
command: exec.command,
121+
name: "",
122+
}),
123+
ownerUsername: "root",
124+
shellId: exec.id,
125+
shellType: "TERMINAL" as const,
126+
startCommand: exec.command,
127+
status: exec.status as ShellProcessStatus,
128+
};
129+
}
130+
async create(
131+
projectPath: string,
132+
size: ShellSize,
133+
command?: string,
134+
type?: ShellProcessType,
135+
isSystemShell?: boolean
136+
): Promise<OpenShellDTO> {
137+
// For Pint, we need to construct args from command
138+
const args = command ? command.split(' ').slice(1) : [];
139+
const baseCommand = command ? command.split(' ')[0] : 'bash';
140+
const exec = await createExec({
141+
client: this.apiClient,
142+
body: {
143+
args,
144+
command: baseCommand,
145+
interactive: type === "COMMAND" ? false : true,
146+
},
147+
});
148+
149+
if (!exec.data) {
150+
throw new Error(exec.error.message);
151+
}
152+
153+
await this.open(exec.data.id, { cols: 200, rows: 80 });
154+
155+
return {
156+
...this.convertExecToShellDTO(exec.data),
157+
buffer: [],
158+
};
159+
}
160+
async delete(shellId: ShellId): Promise<CommandShellDTO | TerminalShellDTO | null> {
161+
try {
162+
// First get the exec details before deleting it
163+
const exec = await getExec({
164+
client: this.apiClient,
165+
path: {
166+
id: shellId,
167+
},
168+
});
169+
170+
if (!exec.data) {
171+
return null; // Exec doesn't exist
172+
}
173+
174+
// Convert to shell DTO before deletion
175+
const shellDTO = this.convertExecToShellDTO(exec.data);
176+
177+
// Delete the exec
178+
const deleteResponse = await deleteExec({
179+
client: this.apiClient,
180+
path: {
181+
id: shellId,
182+
},
183+
});
184+
185+
if (deleteResponse.data) {
186+
// Clean up any open shells reference
187+
if (this.openShells[shellId]) {
188+
this.openShells[shellId].abort();
189+
delete this.openShells[shellId];
190+
}
191+
192+
return shellDTO as CommandShellDTO | TerminalShellDTO;
193+
} else {
194+
return null;
195+
}
196+
} catch (error) {
197+
return null;
198+
}
199+
}
200+
async getShells(): Promise<ShellDTO[]> {
201+
const execs = await listExecs({
202+
client: this.apiClient,
203+
});
204+
205+
return (
206+
execs.data?.execs.map((exec) => this.convertExecToShellDTO(exec)) ?? []
207+
);
208+
}
209+
async open(shellId: ShellId, size: ShellSize): Promise<OpenShellDTO> {
210+
const abortController = new AbortController();
211+
212+
this.openShells[shellId] = abortController;
213+
214+
const exec = await getExec({
215+
client: this.apiClient,
216+
path: {
217+
id: shellId,
218+
},
219+
});
220+
221+
if (!exec.data) {
222+
throw new Error(exec.error.message);
223+
}
224+
225+
const { stream } = await getExecOutput({
226+
client: this.apiClient,
227+
path: { id: shellId },
228+
query: { lastSequence: 0 },
229+
signal: abortController.signal,
230+
headers: {
231+
Accept: "text/event-stream",
232+
},
233+
});
234+
235+
const buffer: string[] = [];
236+
237+
for await (const evt of stream) {
238+
const data = parseStreamEvent<{
239+
type: "stdout" | "stderr";
240+
output: "";
241+
sequence: number;
242+
timestamp: string;
243+
}>(evt);
244+
245+
if (!buffer.length) {
246+
buffer.push(data.output);
247+
break;
248+
}
249+
}
250+
251+
return {
252+
buffer,
253+
...this.convertExecToShellDTO(exec.data),
254+
};
255+
}
256+
async rename(shellId: ShellId, name: string): Promise<null> {
257+
return null;
258+
}
259+
async restart(shellId: ShellId): Promise<null> {
260+
try {
261+
await updateExec({
262+
client: this.apiClient,
263+
path: {
264+
id: shellId,
265+
},
266+
body: {
267+
status: 'running',
268+
},
269+
});
270+
271+
return null;
272+
} catch (error) {
273+
return null;
274+
}
275+
}
276+
async send(shellId: ShellId, input: string, size: ShellSize): Promise<null> {
277+
try {
278+
await execExecStdin({
279+
client: this.apiClient,
280+
path: {
281+
id: shellId,
282+
},
283+
body: {
284+
type: 'stdin',
285+
input: input,
286+
},
287+
});
288+
289+
return null;
290+
} catch (error) {
291+
return null;
292+
}
293+
}
294+
}

src/PintClient/fs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Client, createClient, createConfig } from "../api-clients/pint/client";
1+
import { Client } from "../api-clients/pint/client";
22
import {
33
IAgentClientFS,
44
PickRawFsResult,

src/PintClient/index.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import { Port } from "../pitcher-protocol/messages/port";
2-
import { Emitter, EmitterSubscription, Event } from "../utils/event";
2+
import { Emitter, EmitterSubscription } from "../utils/event";
33
import { SandboxSession } from "../types";
44
import { Disposable } from "../utils/disposable";
55
import { Client, createClient, createConfig } from "../api-clients/pint/client";
6+
import { PintClientTasks, PintClientSetup, PintClientSystem} from "./tasks";
67
import {PintFsClient} from "./fs";
7-
import { PintClientTasks, PintClientSetup, PintClientSystem } from "./tasks";
8+
import {PintShellsClient} from "./execs";
9+
import { parseStreamEvent } from "./utils";
810
import {
911
IAgentClient,
1012
IAgentClientPorts,
@@ -14,24 +16,13 @@ import {
1416
IAgentClientSetup,
1517
IAgentClientTasks,
1618
IAgentClientSystem,
17-
PickRawFsResult,
1819
} from "../agent-client-interface";
1920
import {
2021
listPorts,
2122
PortInfo,
2223
streamPortsList,
2324
} from "../api-clients/pint";
2425

25-
function parseStreamEvent<T>(evt: unknown): T {
26-
if (typeof evt !== "string") {
27-
return evt as T;
28-
}
29-
30-
const evtWithoutDataPrefix = evt.substring(5);
31-
32-
return JSON.parse(evtWithoutDataPrefix);
33-
}
34-
3526
class PintPortsClient implements IAgentClientPorts {
3627
private onPortsUpdatedEmitter = new EmitterSubscription<Port[]>((fire) => {
3728
const abortController = new AbortController();
@@ -116,7 +107,7 @@ export class PintClient implements IAgentClient {
116107
);
117108

118109
this.ports = new PintPortsClient(apiClient, this.sandboxId);
119-
this.shells = {} as IAgentClientShells; // Not implemented for Pint
110+
this.shells = new PintShellsClient(apiClient, this.sandboxId);
120111
this.fs = new PintFsClient(apiClient);
121112
this.tasks = new PintClientTasks(apiClient);
122113
this.setup = new PintClientSetup(apiClient);

src/PintClient/utils.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
export function parseStreamEvent<T>(evt: unknown): T {
2+
if (typeof evt !== "string") {
3+
return evt as T;
4+
}
5+
6+
const evtWithoutDataPrefix = evt.substring(5);
7+
8+
return JSON.parse(evtWithoutDataPrefix);
9+
}

0 commit comments

Comments
 (0)