Skip to content

Commit d26824f

Browse files
committed
Solve issue with function invocation.
1 parent f4faf86 commit d26824f

File tree

4 files changed

+82
-43
lines changed

4 files changed

+82
-43
lines changed

src/controller/call.ts

+20-34
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { NextFunction, Request, Response } from 'express';
22
import { Applications } from '../app';
3+
34
import AppError from '../utils/appError';
4-
import { WorkerMessageType, WorkerMessageUnknown } from '../worker/protocol';
5+
import { invokeQueue } from '../utils/invoke';
6+
import { WorkerMessageType } from '../worker/protocol';
57

68
export const callFunction = (
79
req: Request,
@@ -36,37 +38,21 @@ export const callFunction = (
3638
);
3739
}
3840

39-
new Promise((resolve, reject) => {
40-
application.proc?.send({
41-
type: WorkerMessageType.Invoke,
42-
data: {
43-
name: func,
44-
args
45-
}
46-
});
47-
48-
application.proc?.on('message', (message: WorkerMessageUnknown) => {
49-
if (message.type === WorkerMessageType.InvokeResult) {
50-
resolve(JSON.stringify(message.data));
51-
}
52-
});
53-
54-
application.proc?.on('exit', code => {
55-
// The application may have been ended unexpectedly,
56-
// probably segmentation fault (exit code 139 in Linux)
57-
reject(
58-
JSON.stringify({
59-
error: `Deployment '${suffix}' process exited with code: ${
60-
code || 'unknown'
61-
}`
62-
})
63-
);
64-
});
65-
})
66-
.then(data => {
67-
res.send(data);
68-
})
69-
.catch(error => {
70-
res.status(500).send(error);
71-
});
41+
// Enqueue the call with a specific id, in order to be able to resolve the
42+
// promise later on when the message is received in the process message handler
43+
application.proc?.send({
44+
type: WorkerMessageType.Invoke,
45+
data: {
46+
id: invokeQueue.push({
47+
resolve: (data: string) => {
48+
res.send(data);
49+
},
50+
reject: (error: string) => {
51+
res.status(500).send(error);
52+
}
53+
}),
54+
name: func,
55+
args
56+
}
57+
});
7258
};

src/utils/deploy.ts

+33-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { spawn } from 'child_process';
33
import path from 'path';
44
import { Applications, Resource } from '../app';
55
import { WorkerMessageType, WorkerMessageUnknown } from '../worker/protocol';
6+
import { invokeQueue } from './invoke';
67
import { logProcessOutput } from './logger';
78

89
export const deployProcess = async (resource: Resource): Promise<void> => {
@@ -36,14 +37,33 @@ export const deployProcess = async (resource: Resource): Promise<void> => {
3637
});
3738

3839
proc.on('message', (payload: WorkerMessageUnknown) => {
39-
// Get the deploy data and store the process and app into our tables
40-
if (payload.type === WorkerMessageType.MetaData) {
41-
const application = Applications[resource.id];
42-
const deployment = payload.data as Deployment;
43-
44-
application.proc = proc;
45-
application.deployment = deployment;
46-
deployResolve();
40+
switch (payload.type) {
41+
case WorkerMessageType.MetaData: {
42+
// Get the deploy data and store the process and app into our tables
43+
const application = Applications[resource.id];
44+
const deployment = payload.data as Deployment;
45+
46+
application.proc = proc;
47+
application.deployment = deployment;
48+
deployResolve();
49+
break;
50+
}
51+
52+
case WorkerMessageType.InvokeResult: {
53+
const invokeResult = payload.data as {
54+
id: string;
55+
result: unknown;
56+
};
57+
58+
// Get the invocation id in order to retrieve the callbacks
59+
// for resolving the call, this deletes the invocation object
60+
const invoke = invokeQueue.get(invokeResult.id);
61+
invoke.resolve(JSON.stringify(invokeResult.result));
62+
break;
63+
}
64+
65+
default:
66+
break;
4767
}
4868
});
4969

@@ -57,6 +77,11 @@ export const deployProcess = async (resource: Resource): Promise<void> => {
5777
}`
5878
)
5979
);
80+
81+
// TODO: How to implement the exit properly? We cannot reject easily
82+
// the promise from the call if the process exits during the call.
83+
// Also if exits during the call it will try to call deployReject
84+
// which is completely out of scope and the promise was fullfilled already
6085
});
6186

6287
return promise;

src/utils/invoke.ts

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import crypto from 'crypto';
2+
3+
interface Invocation {
4+
resolve: (value: string) => void;
5+
reject: (reason: string) => void;
6+
}
7+
8+
class InvokeQueue {
9+
private queue: Record<string, Invocation> = {};
10+
11+
public push(invoke: Invocation): string {
12+
const id = crypto.randomBytes(16).toString('hex');
13+
this.queue[id] = invoke;
14+
return id;
15+
}
16+
17+
public get(id: string): Invocation {
18+
const invoke = this.queue[id];
19+
delete this.queue[id];
20+
return invoke;
21+
}
22+
}
23+
24+
export const invokeQueue = new InvokeQueue();

src/worker/index.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,18 @@ process.on('message', (payload: WorkerMessageUnknown) => {
106106
case WorkerMessageType.Invoke: {
107107
const fn = (
108108
payload as WorkerMessage<{
109+
id: string;
109110
name: string;
110111
args: unknown[];
111112
}>
112113
).data;
113114
if (process.send) {
114115
process.send({
115116
type: WorkerMessageType.InvokeResult,
116-
data: functions[fn.name](...fn.args)
117+
data: {
118+
id: fn.id,
119+
result: functions[fn.name](...fn.args)
120+
}
117121
});
118122
}
119123
break;

0 commit comments

Comments
 (0)