@@ -5,6 +5,7 @@ const fs = require('fs');
5
5
const { Logger } = require ( './Logger' ) ;
6
6
const { Helper } = require ( './Helper' ) ;
7
7
const Crypt = require ( './Crypt' ) ;
8
+ const Queue = require ( './Queue' ) ;
8
9
9
10
const defaultExecAssets = [
10
11
{
@@ -26,7 +27,7 @@ class Master {
26
27
transferEncryptToken = null ,
27
28
} = { } ) {
28
29
this . availableWorkers = [ ] ;
29
- this . jobs = [ ] ;
30
+ this . jobs = new Queue ( ) ;
30
31
31
32
this . event = new events . EventEmitter ( ) ;
32
33
this . log = new Logger ( { level : loglevel || process . env . LOG_LEVEL } ) ;
@@ -53,6 +54,7 @@ class Master {
53
54
// this.peer.on('left', (address)=> );
54
55
55
56
this . event . addListener ( 'init' , this . init ) ;
57
+ this . event . addListener ( 'resultsShared' , this . onResults ) ;
56
58
this . event . emit ( 'init' , transferEncryptToken ) ;
57
59
}
58
60
@@ -102,33 +104,47 @@ class Master {
102
104
} ) ;
103
105
this . peer . register ( 'requestWork' , ( pk , args , cb ) => {
104
106
switch ( true ) {
105
- case this . jobs . length > 1 : {
106
- if ( args . getBatch && this . jobs . length > args . batchSize ) {
107
- args . batchTasks = this . jobs . splice ( 0 , args . batchSize ) ; // splice mutates original array which slices it
107
+ case this . jobs . size > 1 : {
108
+ if ( args . getBatch ) {
109
+ args . batchTasks = [ ] ;
110
+ let totalJobsSend = args . batchSize ;
111
+ if ( this . jobs . size <= args . batchSize ) totalJobsSend = 1 ; // get only available
112
+ for ( let i = 0 ; i < totalJobsSend ; i += 1 ) {
113
+ const queuedJob = this . jobs . dequeue ( ) ;
114
+ if ( ! queuedJob ) break ;
115
+ args . batchTasks . push ( queuedJob . value ) ;
116
+ }
117
+ // args.batchTasks = this.jobs.splice(0, args.batchSize); // splice mutates original array which slices it
108
118
this . log . debug (
109
- `task queue reduced:${ this . jobs . length } - ${ args . batchSize } `
119
+ `task queue reduced:${ this . jobs . size } - ${ args . batchSize } `
110
120
) ;
111
121
break ;
112
122
}
113
- args . task = this . jobs . shift ( ) ;
114
- this . log . debug ( `task queue reduced:${ this . jobs . length } ` ) ;
123
+ const queuedJob = this . jobs . dequeue ( ) ;
124
+ args . task = null ;
125
+ if ( queuedJob ) {
126
+ args . task = queuedJob . value ;
127
+ this . log . debug ( `task queue reduced:${ this . jobs . size } ` ) ;
128
+ }
115
129
break ;
116
130
}
117
- case this . jobs . length === 1 : {
118
- // shift leaves array undefined on last element
119
- [ args . task ] = this . jobs ; // this case is to avoid that
120
- this . jobs = [ ] ;
121
- this . log . debug ( `task queue finished:${ this . jobs . length } ` ) ;
131
+ case this . jobs . size === 1 : {
132
+ const queuedJob = this . jobs . dequeue ( ) ;
133
+ args . task = null ;
134
+ if ( queuedJob ) {
135
+ args . task = queuedJob . value ;
136
+ this . log . debug ( `task queue reduced:${ this . jobs . size } ` ) ;
137
+ }
122
138
break ;
123
139
}
124
- case this . jobs . length === 0 : {
140
+ case this . jobs . size === 0 : {
125
141
args . task = null ;
126
- this . log . debug ( `task queue is empty:${ this . jobs . length } ` ) ;
142
+ this . log . debug ( `task queue is empty:${ this . jobs . size } ` ) ;
127
143
break ;
128
144
}
129
145
default : {
130
146
args . task = null ;
131
- this . log . warning ( `task queue is empty:${ this . jobs . length } ` ) ;
147
+ this . log . warning ( `task queue is empty:${ this . jobs . size } ` ) ;
132
148
133
149
break ;
134
150
}
@@ -137,7 +153,7 @@ class Master {
137
153
} ) ;
138
154
this . peer . register ( 'shareResults' , ( pk , args ) => {
139
155
const results = JSON . parse ( this . crypt . decrypt ( JSON . parse ( args ) ) ) ;
140
- this . onResults ( results ) ;
156
+ this . event . emit ( 'resultsShared' , results ) ;
141
157
} ) ;
142
158
this . peer . register ( 'requestExecAssets' , ( pk , args , cb ) => {
143
159
const currentHash = args ?. currentHash ; // hash of current assets array
@@ -179,14 +195,13 @@ class Master {
179
195
180
196
const encryptedPayload = this . crypt . encrypt ( payloadJson ) ;
181
197
this . log . debug ( 'pushNewJob payload: ' , payload ) ;
182
- if ( this . jobs . length > 20 ) {
183
- // resolve();
184
- setTimeout ( ( ) => {
185
- this . jobs . push ( JSON . stringify ( encryptedPayload ) ) ;
198
+ if ( this . jobs . size >= 1000 ) {
199
+ Helper . sleep ( this . jobs . size * 0.3 ) . then ( ( ) => {
200
+ this . jobs . enqueue ( JSON . stringify ( encryptedPayload ) ) ;
186
201
resolve ( ) ;
187
- } , this . jobs . length * 3 ) ;
202
+ } ) ;
188
203
} else {
189
- this . jobs . push ( JSON . stringify ( encryptedPayload ) ) ;
204
+ this . jobs . enqueue ( JSON . stringify ( encryptedPayload ) ) ;
190
205
resolve ( ) ;
191
206
}
192
207
} ) ;
0 commit comments