-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProcessManager.js
111 lines (90 loc) · 2.29 KB
/
ProcessManager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
'use strict';
const { Worker } = require('node:worker_threads');
const Colour = require('./Colour.js');
class ProcessManager {
#workToDo = [];
#threads = [];
#config = {};
#listeners = {};
//#stateCheckTimer = null;
constructor(workToDo, config) {
console.log(workToDo);
console.log(config);
this.#workToDo = workToDo;
this.#config = config;
this.#initThreads();
}
addEventListener(event, callback){
event = event.toLowerCase();
if(!this.#listeners.hasOwnProperty(event)){
this.#listeners[event] = [callback];
} else {
this.#listeners[event].push(callback);
}
}
run(){
console.time("processing");
this.#threads.forEach(f => {
this.#assignWork(f);
});
//this.#queryState();
//clearInterval(this.#stateCheckTimer);
//this.#stateCheckTimer = setInterval(() => {this.#queryState()}, 5000);
}
//#queryState(){
// this.#threads.forEach(t => {
// t.postMessage({
// type: 'STATUS'
// });
// });
//}
#initThreads() {
const count = this.#config.count ?? 3;
for (let i = 0; i < count; i++) {
const worker = new Worker('./Processor.js', this.#config);
worker.on("error", (e) => { throw e });
worker.on("message", msg => this.#handleWorkerResponse(worker, msg));
this.#threads.push(worker);
}
}
#assignWork(thread){
if(this.#workToDo.length === 0){
console.log("No work left");
thread.postMessage({
type: 'KILL'
});
this.#threads.splice(this.#threads.indexOf(thread), 1);
if(this.#threads.length === 0){
this.#finish();
}
return;
}
const workItem = this.#workToDo.pop();
workItem.type = 'FILE';
thread.postMessage(workItem);
}
#handleWorkerResponse(thread, msg){
if(msg.type == 'DONE'){
this.#assignWork(thread);
} else if(msg.type == 'STATUS'){
if(!msg.isWorking){
Colour.writeColouredText("IDLE THREAD!", Colour.OPTIONS.FG_RED);
} else {
const duration = Math.round((performance.now() - msg.start) / 1000);
Colour.writeColouredText(`Thread working on ${msg.job} (${duration}s)`, Colour.OPTIONS.FG_GREEN);
}
} else {
console.log(msg);
}
}
#finish(){
//clearInterval(this.#stateCheckTimer);
console.timeEnd("processing");
if(this.#listeners?.hasOwnProperty("done")){
this.#listeners["done"].forEach(l => {
l();
});
}
}
}
module.exports = ProcessManager;