Skip to content

Commit 124ac26

Browse files
ivovnetroy
andauthored
feat(core): Implement task timeouts and heartbeats for runners (no-changelog) (#11690)
Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ <[email protected]>
1 parent 3f91279 commit 124ac26

18 files changed

+511
-35
lines changed

packages/@n8n/config/src/configs/runners.config.ts

+8
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,12 @@ export class TaskRunnersConfig {
5353
/** Should the output of deduplication be asserted for correctness */
5454
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
5555
assertDeduplicationOutput: boolean = false;
56+
57+
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
58+
@Env('N8N_RUNNERS_TASK_TIMEOUT')
59+
taskTimeout: number = 60;
60+
61+
/** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */
62+
@Env('N8N_RUNNERS_HEARTBEAT_INTERVAL')
63+
heartbeatInterval: number = 30;
5664
}

packages/@n8n/config/test/config.test.ts

+2
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ describe('GlobalConfig', () => {
234234
maxOldSpaceSize: '',
235235
maxConcurrency: 5,
236236
assertDeduplicationOutput: false,
237+
taskTimeout: 60,
238+
heartbeatInterval: 30,
237239
},
238240
sentry: {
239241
backendDsn: '',

packages/@n8n/task-runner/src/config/base-runner-config.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
import { Config, Env } from '@n8n/config';
1+
import { Config, Env, Nested } from '@n8n/config';
2+
3+
@Config
4+
class HealthcheckServerConfig {
5+
@Env('N8N_RUNNERS_SERVER_ENABLED')
6+
enabled: boolean = false;
7+
8+
@Env('N8N_RUNNERS_SERVER_HOST')
9+
host: string = '127.0.0.1';
10+
11+
@Env('N8N_RUNNERS_SERVER_PORT')
12+
port: number = 5680;
13+
}
214

315
@Config
416
export class BaseRunnerConfig {
@@ -13,4 +25,7 @@ export class BaseRunnerConfig {
1325

1426
@Env('N8N_RUNNERS_MAX_CONCURRENCY')
1527
maxConcurrency: number = 5;
28+
29+
@Nested
30+
healthcheckServer!: HealthcheckServerConfig;
1631
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ApplicationError } from 'n8n-workflow';
2+
import { createServer } from 'node:http';
3+
4+
export class HealthcheckServer {
5+
private server = createServer((_, res) => {
6+
res.writeHead(200);
7+
res.end('OK');
8+
});
9+
10+
async start(host: string, port: number) {
11+
return await new Promise<void>((resolve, reject) => {
12+
const portInUseErrorHandler = (error: NodeJS.ErrnoException) => {
13+
if (error.code === 'EADDRINUSE') {
14+
reject(new ApplicationError(`Port ${port} is already in use`));
15+
} else {
16+
reject(error);
17+
}
18+
};
19+
20+
this.server.on('error', portInUseErrorHandler);
21+
22+
this.server.listen(port, host, () => {
23+
this.server.removeListener('error', portInUseErrorHandler);
24+
console.log(`Healthcheck server listening on ${host}, port ${port}`);
25+
resolve();
26+
});
27+
});
28+
}
29+
30+
async stop() {
31+
return await new Promise<void>((resolve, reject) => {
32+
this.server.close((error) => {
33+
if (error) reject(error);
34+
else resolve();
35+
});
36+
});
37+
}
38+
}

packages/@n8n/task-runner/src/start.ts

+11
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ import Container from 'typedi';
33

44
import { MainConfig } from './config/main-config';
55
import type { ErrorReporter } from './error-reporter';
6+
import type { HealthcheckServer } from './healthcheck-server';
67
import { JsTaskRunner } from './js-task-runner/js-task-runner';
78

9+
let healthcheckServer: HealthcheckServer | undefined;
810
let runner: JsTaskRunner | undefined;
911
let isShuttingDown = false;
1012
let errorReporter: ErrorReporter | undefined;
@@ -22,6 +24,7 @@ function createSignalHandler(signal: string) {
2224
if (runner) {
2325
await runner.stop();
2426
runner = undefined;
27+
void healthcheckServer?.stop();
2528
}
2629

2730
if (errorReporter) {
@@ -49,6 +52,14 @@ void (async function start() {
4952

5053
runner = new JsTaskRunner(config);
5154

55+
const { enabled, host, port } = config.baseRunnerConfig.healthcheckServer;
56+
57+
if (enabled) {
58+
const { HealthcheckServer } = await import('./healthcheck-server');
59+
healthcheckServer = new HealthcheckServer();
60+
await healthcheckServer.start(host, port);
61+
}
62+
5263
process.on('SIGINT', createSignalHandler('SIGINT'));
5364
process.on('SIGTERM', createSignalHandler('SIGTERM'));
5465
})().catch((e) => {

packages/cli/src/runners/__tests__/task-broker.test.ts

+133-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
import type { TaskRunnersConfig } from '@n8n/config';
12
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
23
import { mock } from 'jest-mock-extended';
3-
import type { INodeTypeBaseDescription } from 'n8n-workflow';
4+
import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow';
5+
6+
import { Time } from '@/constants';
47

58
import { TaskRejectError } from '../errors';
9+
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
610
import { TaskBroker } from '../task-broker.service';
711
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
812

@@ -12,7 +16,7 @@ describe('TaskBroker', () => {
1216
let taskBroker: TaskBroker;
1317

1418
beforeEach(() => {
15-
taskBroker = new TaskBroker(mock());
19+
taskBroker = new TaskBroker(mock(), mock(), mock());
1620
jest.restoreAllMocks();
1721
});
1822

@@ -618,4 +622,131 @@ describe('TaskBroker', () => {
618622
});
619623
});
620624
});
625+
626+
describe('task timeouts', () => {
627+
let taskBroker: TaskBroker;
628+
let config: TaskRunnersConfig;
629+
let runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
630+
631+
beforeAll(() => {
632+
jest.useFakeTimers();
633+
config = mock<TaskRunnersConfig>({ taskTimeout: 30 });
634+
taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents);
635+
});
636+
637+
afterAll(() => {
638+
jest.useRealTimers();
639+
});
640+
641+
it('on sending task, we should set up task timeout', async () => {
642+
jest.spyOn(global, 'setTimeout');
643+
644+
const taskId = 'task1';
645+
const runnerId = 'runner1';
646+
const runner = mock<TaskRunner>({ id: runnerId });
647+
const runnerMessageCallback = jest.fn();
648+
649+
taskBroker.registerRunner(runner, runnerMessageCallback);
650+
taskBroker.setTasks({
651+
[taskId]: { id: taskId, runnerId, requesterId: 'requester1', taskType: 'test' },
652+
});
653+
654+
await taskBroker.sendTaskSettings(taskId, {});
655+
656+
expect(setTimeout).toHaveBeenCalledWith(
657+
expect.any(Function),
658+
config.taskTimeout * Time.seconds.toMilliseconds,
659+
);
660+
});
661+
662+
it('on task completion, we should clear timeout', async () => {
663+
jest.spyOn(global, 'clearTimeout');
664+
665+
const taskId = 'task1';
666+
const runnerId = 'runner1';
667+
const requesterId = 'requester1';
668+
const requesterCallback = jest.fn();
669+
670+
taskBroker.registerRequester(requesterId, requesterCallback);
671+
taskBroker.setTasks({
672+
[taskId]: {
673+
id: taskId,
674+
runnerId,
675+
requesterId,
676+
taskType: 'test',
677+
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
678+
},
679+
});
680+
681+
await taskBroker.taskDoneHandler(taskId, { result: [] });
682+
683+
expect(clearTimeout).toHaveBeenCalled();
684+
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
685+
});
686+
687+
it('on task error, we should clear timeout', async () => {
688+
jest.spyOn(global, 'clearTimeout');
689+
690+
const taskId = 'task1';
691+
const runnerId = 'runner1';
692+
const requesterId = 'requester1';
693+
const requesterCallback = jest.fn();
694+
695+
taskBroker.registerRequester(requesterId, requesterCallback);
696+
taskBroker.setTasks({
697+
[taskId]: {
698+
id: taskId,
699+
runnerId,
700+
requesterId,
701+
taskType: 'test',
702+
timeout: setTimeout(() => {}, config.taskTimeout * Time.seconds.toMilliseconds),
703+
},
704+
});
705+
706+
await taskBroker.taskErrorHandler(taskId, new Error('Test error'));
707+
708+
expect(clearTimeout).toHaveBeenCalled();
709+
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
710+
});
711+
712+
it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => {
713+
jest.spyOn(global, 'clearTimeout');
714+
715+
const taskId = 'task1';
716+
const runnerId = 'runner1';
717+
const requesterId = 'requester1';
718+
const runner = mock<TaskRunner>({ id: runnerId });
719+
const runnerCallback = jest.fn();
720+
const requesterCallback = jest.fn();
721+
722+
taskBroker.registerRunner(runner, runnerCallback);
723+
taskBroker.registerRequester(requesterId, requesterCallback);
724+
725+
taskBroker.setTasks({
726+
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
727+
});
728+
729+
await taskBroker.sendTaskSettings(taskId, {});
730+
731+
jest.runAllTimers();
732+
733+
await Promise.resolve();
734+
735+
expect(runnerLifecycleEvents.emit).toHaveBeenCalledWith('runner:timed-out-during-task');
736+
737+
await Promise.resolve();
738+
739+
expect(clearTimeout).toHaveBeenCalled();
740+
741+
expect(requesterCallback).toHaveBeenCalledWith({
742+
type: 'broker:taskerror',
743+
taskId,
744+
error: new ApplicationError(`Task execution timed out after ${config.taskTimeout} seconds`),
745+
});
746+
747+
await Promise.resolve();
748+
749+
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
750+
});
751+
});
621752
});

packages/cli/src/runners/__tests__/task-runner-process.test.ts

+25-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import type { TaskRunnerAuthService } from '@/runners/auth/task-runner-auth.serv
77
import { TaskRunnerProcess } from '@/runners/task-runner-process';
88
import { mockInstance } from '@test/mocking';
99

10+
import type { RunnerLifecycleEvents } from '../runner-lifecycle-events';
11+
1012
const spawnMock = jest.fn(() =>
1113
mock<ChildProcess>({
1214
stdout: {
@@ -25,7 +27,7 @@ describe('TaskRunnerProcess', () => {
2527
runnerConfig.enabled = true;
2628
runnerConfig.mode = 'internal_childprocess';
2729
const authService = mock<TaskRunnerAuthService>();
28-
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
30+
let taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
2931

3032
afterEach(async () => {
3133
spawnMock.mockClear();
@@ -35,15 +37,35 @@ describe('TaskRunnerProcess', () => {
3537
it('should throw if runner mode is external', () => {
3638
runnerConfig.mode = 'external';
3739

38-
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();
40+
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService, mock())).toThrow();
3941

4042
runnerConfig.mode = 'internal_childprocess';
4143
});
44+
45+
it('should register listener for `runner:failed-heartbeat-check` event', () => {
46+
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
47+
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
48+
49+
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
50+
'runner:failed-heartbeat-check',
51+
expect.any(Function),
52+
);
53+
});
54+
55+
it('should register listener for `runner:timed-out-during-task` event', () => {
56+
const runnerLifecycleEvents = mock<RunnerLifecycleEvents>();
57+
new TaskRunnerProcess(logger, runnerConfig, authService, runnerLifecycleEvents);
58+
59+
expect(runnerLifecycleEvents.on).toHaveBeenCalledWith(
60+
'runner:timed-out-during-task',
61+
expect.any(Function),
62+
);
63+
});
4264
});
4365

4466
describe('start', () => {
4567
beforeEach(() => {
46-
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService);
68+
taskRunnerProcess = new TaskRunnerProcess(logger, runnerConfig, authService, mock());
4769
});
4870

4971
test.each([
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { TaskRunnersConfig } from '@n8n/config';
2+
import { mock } from 'jest-mock-extended';
3+
4+
import { Time } from '@/constants';
5+
import { TaskRunnerWsServer } from '@/runners/runner-ws-server';
6+
7+
describe('TaskRunnerWsServer', () => {
8+
describe('heartbeat timer', () => {
9+
it('should set up heartbeat timer on server start', async () => {
10+
const setIntervalSpy = jest.spyOn(global, 'setInterval');
11+
12+
const server = new TaskRunnerWsServer(
13+
mock(),
14+
mock(),
15+
mock(),
16+
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
17+
mock(),
18+
);
19+
20+
expect(setIntervalSpy).toHaveBeenCalledWith(
21+
expect.any(Function),
22+
30 * Time.seconds.toMilliseconds,
23+
);
24+
25+
await server.shutdown();
26+
});
27+
28+
it('should clear heartbeat timer on server stop', async () => {
29+
jest.spyOn(global, 'setInterval');
30+
const clearIntervalSpy = jest.spyOn(global, 'clearInterval');
31+
32+
const server = new TaskRunnerWsServer(
33+
mock(),
34+
mock(),
35+
mock(),
36+
mock<TaskRunnersConfig>({ path: '/runners', heartbeatInterval: 30 }),
37+
mock(),
38+
);
39+
40+
await server.shutdown();
41+
42+
expect(clearIntervalSpy).toHaveBeenCalled();
43+
});
44+
});
45+
});

0 commit comments

Comments
 (0)