1
+ // MPI import
2
+ var cluster = require ( 'cluster' ) ;
3
+ var data = require ( './data.js' ) ;
4
+ const perf = require ( 'execution-time' ) ( ) ;
5
+
6
+ const nodes = [ ] ;
7
+ const nodesFriends = [ ] ;
8
+ const edges = [ ] ;
9
+ const power = [ ] ;
10
+ var allData = [ ] ;
11
+
12
+ // Master worker
13
+ if ( cluster . isMaster ) {
14
+ var numWorkers = require ( 'os' ) . cpus ( ) . length ;
15
+
16
+ console . log ( 'Master cluster setting up ' + numWorkers + ' workers...' ) ;
17
+
18
+ for ( var i = 0 ; i < numWorkers ; i ++ ) {
19
+ cluster . fork ( ) ;
20
+ }
21
+ // after forking the workers the master will process the data
22
+ console . log ( "Master started data processing of " , data . length , " edges... @" , new Date ( ) . toTimeString ( ) ) ;
23
+
24
+ perf . start ( ) ;
25
+
26
+ processData ( ) ;
27
+ // split nodes among the available workers
28
+ var start = 0 ;
29
+ var chunk = parseInt ( nodes . length / numWorkers ) ;
30
+ var first = true ;
31
+ var startIndex = 0 ;
32
+ var iterations = 0 ;
33
+
34
+ var chunkS = parseInt ( data . length / numWorkers ) ;
35
+ if ( nodesFriends [ 3 ] && nodesFriends [ 3 ] . length > 0 ) {
36
+ cluster . on ( 'online' , function ( worker ) {
37
+ if ( first == true ) {
38
+ worker . send ( {
39
+ start : start ,
40
+ chunk : chunk ,
41
+ nodes : nodesFriends ,
42
+ all : nodes ,
43
+ power : power ,
44
+ flag : 'first'
45
+ } ) ;
46
+
47
+ // increase start for next worker
48
+ start += chunk ;
49
+
50
+ worker . on ( 'message' , function ( message ) {
51
+ if ( message ) {
52
+ message . map ( a => {
53
+ allData [ a . node ] = a ;
54
+ } )
55
+ }
56
+ } ) ;
57
+
58
+ } else {
59
+
60
+ if ( iterations == numWorkers ) {
61
+ worker . send ( {
62
+ start : startIndex ,
63
+ chunk : chunkS ,
64
+ allData : allData ,
65
+ flag : "second" ,
66
+ end : true ,
67
+ } ) ;
68
+ }
69
+ else {
70
+ worker . send ( {
71
+ start : startIndex ,
72
+ chunk : chunkS ,
73
+ allData : allData ,
74
+ flag : "second" ,
75
+ end : false ,
76
+ } ) ;
77
+ }
78
+ startIndex += chunkS ;
79
+ iterations ++ ;
80
+ worker . on ( 'message' , function ( message ) {
81
+ if ( message . flag == "second" ) {
82
+ const newData = message . data ;
83
+ for ( var i = 0 ; i < newData . length ; i ++ ) {
84
+
85
+ if ( newData [ i ] && allData [ i ] && newData [ i ] . node == allData [ i ] . node ) {
86
+ if ( newData [ i ] . list . length > allData [ i ] . list . length ) {
87
+ allData [ i ] . list = newData [ i ] . list ;
88
+ }
89
+ }
90
+ }
91
+ }
92
+ } ) ;
93
+
94
+ }
95
+ } ) ;
96
+
97
+ }
98
+
99
+
100
+ var dead = 0 ;
101
+ cluster . on ( 'exit' , function ( worker , code , signal ) {
102
+
103
+ // console.log('Starting a new worker');
104
+ // cluster.fork();
105
+ dead ++ ;
106
+ // if all processes are now dead
107
+
108
+ if ( dead == numWorkers ) {
109
+ first = false ;
110
+ // fork a new set of processors
111
+ for ( var i = 0 ; i < numWorkers ; i ++ ) {
112
+ cluster . fork ( ) ;
113
+ }
114
+
115
+
116
+ }
117
+ // if all workers are dead for the second time
118
+ if ( dead == numWorkers * 2 ) {
119
+ allData . sort ( compare ) ;
120
+ for ( var i = 0 ; i < 4 ; i ++ ) {
121
+ console . log ( "The " , ( i + 1 ) , "th most influential node is " , allData [ i ] . node ) ;
122
+ }
123
+ const results = perf . stop ( ) ;
124
+
125
+ console . log ( "Execution time " , results . time , " ms" ) ;
126
+ }
127
+ } ) ;
128
+ } else {
129
+ process . on ( 'message' , function ( message ) {
130
+ if ( message . flag == "first" ) {
131
+ const friends = message . nodes ;
132
+ if ( friends [ 3 ] . length > 0 ) {
133
+ const power = message . power ;
134
+ const start = message . start ;
135
+ const chunk = message . chunk ;
136
+ const all = message . all ;
137
+ var end ;
138
+ if ( ( start + chunk > all . length ) || all . length - chunk < start ) {
139
+ end = all . length ;
140
+ } else {
141
+ end = start + chunk ;
142
+ }
143
+
144
+ const myNodes = [ ] ;
145
+ for ( var i = start ; i <= end ; i ++ ) {
146
+ if ( all [ i ] != null ) {
147
+ myNodes . push ( all [ i ] ) ;
148
+
149
+ }
150
+ }
151
+ const myData = [ ] ;
152
+
153
+ myNodes . map ( node => {
154
+ if ( node != null ) {
155
+ const l = getInfluenceSet ( node , friends , power ) ;
156
+ myData . push ( {
157
+ node : node ,
158
+ list : l
159
+ } ) ;
160
+ }
161
+ } ) ;
162
+ process . send ( myData ) ;
163
+
164
+ }
165
+ process . exit ( 0 ) ;
166
+ }
167
+
168
+ } ) ;
169
+ // second batch of work
170
+
171
+ process . on ( 'message' , function ( message ) {
172
+ if ( message . flag == "second" ) {
173
+ const start = message . start ;
174
+ const chunk = message . chunk ;
175
+ var end ;
176
+ if ( message . end == true ) {
177
+ end = data . length ;
178
+ }
179
+ else {
180
+ end = start + chunk ;
181
+ }
182
+ const allData = message . allData ;
183
+ const myPart = data . slice ( start , end ) ;
184
+ myPart . map ( set => {
185
+ set . node . map ( node => {
186
+ allData . map ( sets => {
187
+ if ( sets != null && sets . node && node != null ) {
188
+ if ( sets . node == node ) {
189
+ sets . list . map ( a => {
190
+ if ( a != node ) {
191
+ allData . map ( n => {
192
+ if ( n && n . node == a ) {
193
+ n . list . map ( l => {
194
+ if ( includes ( sets . list , l ) == false ) {
195
+ sets . list . push ( l ) ;
196
+ }
197
+ } )
198
+ }
199
+ } )
200
+ }
201
+ } )
202
+ }
203
+ }
204
+ } )
205
+
206
+ } )
207
+ } )
208
+ process . send ( {
209
+ data : allData ,
210
+ flag : 'second'
211
+ } ) ;
212
+ process . exit ( 0 ) ;
213
+ }
214
+ } )
215
+
216
+ }
217
+
218
+ function getFriends ( node ) {
219
+ const list = [ ] ;
220
+ data . map ( set => {
221
+ if ( set . node ) {
222
+ if ( includes ( set . node , node ) ) {
223
+ set . node . map ( a => {
224
+ if ( a != node && ! includes ( list , a ) ) {
225
+ list . push ( a )
226
+ }
227
+ } )
228
+ }
229
+ }
230
+ } ) ;
231
+
232
+ return list ;
233
+ }
234
+
235
+ function processData ( ) {
236
+
237
+ data . map ( set => {
238
+
239
+ set . node . map ( node => {
240
+ edges . push ( set . node ) ;
241
+ if ( nodes ) {
242
+ if ( ! includes ( nodes , node ) ) {
243
+ // add to list
244
+ nodes [ node ] = node ;
245
+ // set friends
246
+ nodesFriends [ node ] = getFriends ( node ) ;
247
+ power [ node ] = {
248
+ value : 0 ,
249
+ total : 0
250
+ } ;
251
+ }
252
+ }
253
+ } )
254
+
255
+ } ) ;
256
+
257
+ setInfluence ( ) ;
258
+ }
259
+
260
+ function setInfluence ( ) {
261
+ data . map ( a => {
262
+ a . node . map ( node => {
263
+ power [ node ] . value += a . power ;
264
+ power [ node ] . total += 1 ;
265
+ } )
266
+ } ) ;
267
+ }
268
+
269
+ function getInfluenceSet ( node , friends , powerSet ) {
270
+ const list = [ ] ;
271
+ if ( friends [ node ] ) {
272
+ friends [ node ] . map ( friend => {
273
+ data . map ( ( set ) => {
274
+ if ( set . node && includes ( set . node , node ) && set . node && includes ( set . node , friend ) ) {
275
+ if ( powerSet [ node ] ) {
276
+ if ( set . power <= ( powerSet [ node ] . value / powerSet [ node ] . total ) ) {
277
+ list . push ( friend ) ;
278
+ }
279
+ }
280
+ }
281
+ } )
282
+
283
+ } )
284
+ }
285
+ return list ;
286
+
287
+ }
288
+
289
+ function compare ( a , b ) {
290
+ if ( a . list . length > b . list . length ) {
291
+ return - 1 ;
292
+ }
293
+ return 1 ;
294
+ }
295
+ function includes ( arr , variable ) {
296
+ for ( var i = 0 ; i < arr . length ; i ++ ) {
297
+ if ( arr [ i ] == variable ) {
298
+ return true ;
299
+ }
300
+ }
301
+ return false ;
302
+ }
0 commit comments