Skip to content

Commit ea38666

Browse files
authored
feat: support events and std streams from an abort handler
1 parent 6bf949e commit ea38666

File tree

3 files changed

+78
-11
lines changed

3 files changed

+78
-11
lines changed

src/WorkerHandler.js

+17-8
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,11 @@ function objectToError (obj) {
220220

221221
function handleEmittedStdPayload(handler, payload) {
222222
// TODO: refactor if parallel task execution gets added
223-
if (Object.keys(handler.processing).length !== 1) {
224-
return;
225-
}
226-
var task = Object.values(handler.processing)[0]
227-
if (task.options && typeof task.options.on === 'function') {
228-
task.options.on(payload);
229-
}
223+
Object.values(handler.processing)
224+
.forEach(task => task?.options?.on(payload));
225+
226+
Object.values(handler.tracking)
227+
.forEach(task => task?.options?.on(payload));
230228
}
231229

232230
/**
@@ -299,6 +297,16 @@ function WorkerHandler(script, _options) {
299297
task.resolver.resolve(response.result);
300298
}
301299
}
300+
} else {
301+
// if the task is not the current, it might be tracked for cleanup
302+
var task = me.tracking[id];
303+
if (task !== undefined) {
304+
if (response.isEvent) {
305+
if (task.options && typeof task.options.on === 'function') {
306+
task.options.on(response.payload);
307+
}
308+
}
309+
}
302310
}
303311

304312
if (response.method === CLEANUP_METHOD_ID) {
@@ -422,7 +430,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
422430
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
423431
me.tracking[id] = {
424432
id,
425-
resolver: Promise.defer()
433+
resolver: Promise.defer(),
434+
options: options,
426435
};
427436

428437
// remove this task from the queue. It is already rejected (hence this

test/Pool.test.js

+35-1
Original file line numberDiff line numberDiff line change
@@ -1356,7 +1356,7 @@ describe('Pool', function () {
13561356
maxWorkers: 1,
13571357
onCreateWorker: () => {
13581358
workerCount += 1;
1359-
}
1359+
},
13601360
});
13611361

13621362
let task = pool.exec('asyncTimeout', [], {});
@@ -1561,6 +1561,40 @@ describe('Pool', function () {
15611561
});
15621562
});
15631563
});
1564+
1565+
it('should trigger event stdout in abort handler', function (done) {
1566+
var pool = createPool(__dirname + '/workers/cleanup-abort.js', {
1567+
maxWorkers: 1,
1568+
workerType: 'process',
1569+
emitStdStreams: true,
1570+
workerTerminateTimeout: 1000,
1571+
});
1572+
1573+
pool.exec('stdoutStreamOnAbort', [], {
1574+
on: function (payload) {
1575+
assert.strictEqual(payload.stdout.trim(), "Hello, world!");
1576+
pool.terminate();
1577+
done();
1578+
}
1579+
}).timeout(50);
1580+
});
1581+
1582+
it('should trigger event in abort handler', function (done) {
1583+
var pool = createPool(__dirname + '/workers/cleanup-abort.js', {
1584+
maxWorkers: 1,
1585+
workerType: 'process',
1586+
emitStdStreams: true,
1587+
workerTerminateTimeout: 1000,
1588+
});
1589+
1590+
pool.exec('eventEmitOnAbort', [], {
1591+
on: function (payload) {
1592+
assert.strictEqual(payload.status, 'cleanup_success');
1593+
pool.terminate();
1594+
done();
1595+
}
1596+
}).timeout(50);
1597+
});
15641598
});
15651599

15661600
describe('validate', () => {

test/workers/cleanup-abort.js

+26-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ function asyncTimeout() {
55
return new Promise(function (resolve) {
66
let timeout = setTimeout(() => {
77
resolve();
8-
}, 5000);
9-
8+
}, 5000);
109
me.worker.addAbortListener(async function () {
1110
clearTimeout(timeout);
1211
resolve();
@@ -34,11 +33,36 @@ function asyncAbortHandlerNeverResolves() {
3433
});
3534
}
3635

36+
function stdoutStreamOnAbort() {
37+
var me = this;
38+
return new Promise(function (resolve) {
39+
me.worker.addAbortListener(async function () {
40+
console.log("Hello, world!");
41+
resolve();
42+
});
43+
});
44+
}
45+
46+
function eventEmitOnAbort() {
47+
var me = this;
48+
return new Promise(function (resolve) {
49+
me.worker.addAbortListener(async function () {
50+
workerpool.workerEmit({
51+
status: 'cleanup_success',
52+
});
53+
resolve();
54+
});
55+
});
56+
}
57+
58+
3759
// create a worker and register public functions
3860
workerpool.worker(
3961
{
4062
asyncTimeout: asyncTimeout,
4163
asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves,
64+
stdoutStreamOnAbort: stdoutStreamOnAbort,
65+
eventEmitOnAbort: eventEmitOnAbort,
4266
},
4367
{
4468
abortListenerTimeout: 1000

0 commit comments

Comments
 (0)