225
225
function cleanup (ctx)
226
226
end
227
227
228
- function _init_proc (uid)
229
- lock (ACTIVE_TASKS_LOCK) do
230
- @assert ! haskey (ACTIVE_TASKS, uid)
231
- end
232
- end
228
+ const WORKER_MONITOR_LOCK = Threads. ReentrantLock ()
229
+ const WORKER_MONITOR_TASKS = Dict {Int,Task} ()
230
+ const WORKER_MONITOR_CHANS = Dict {Int,Dict{UInt64,RemoteChannel}} ()
233
231
function init_proc (state, p)
234
232
# Initialize pressure and capacity
235
233
proc = OSProc (p. pid)
@@ -251,7 +249,31 @@ function init_proc(state, p)
251
249
state. worker_capacity[p. pid] = cap
252
250
end
253
251
end
254
- # TODO : remotecall_fetch(_init_proc, p.pid, state.uid)
252
+ lock (WORKER_MONITOR_LOCK) do
253
+ wid = p. pid
254
+ if ! haskey (WORKER_MONITOR_TASKS, wid)
255
+ t = @async begin
256
+ try
257
+ # Wait until this connection is terminated
258
+ remotecall_fetch (sleep, wid, typemax (UInt64))
259
+ catch err
260
+ if err isa ProcessExitedException
261
+ lock (WORKER_MONITOR_LOCK) do
262
+ d = WORKER_MONITOR_CHANS[wid]
263
+ for uid in keys (d)
264
+ put! (d[uid], (wid, OSProc (wid), nothing , (ProcessExitedException (wid), nothing )))
265
+ end
266
+ empty! (d)
267
+ delete! (WORKER_MONITOR_CHANS, wid)
268
+ end
269
+ end
270
+ end
271
+ end
272
+ WORKER_MONITOR_TASKS[wid] = t
273
+ WORKER_MONITOR_CHANS[wid] = Dict {UInt64,RemoteChannel} ()
274
+ end
275
+ WORKER_MONITOR_CHANS[wid][state. uid] = state. chan
276
+ end
255
277
256
278
# Setup worker-to-scheduler channels
257
279
inp_chan = RemoteChannel (p. pid)
@@ -261,10 +283,16 @@ function init_proc(state, p)
261
283
end
262
284
end
263
285
function _cleanup_proc (uid)
264
- empty! (CHUNK_CACHE)
286
+ empty! (CHUNK_CACHE) # FIXME : Should be keyed on uid!
265
287
end
266
288
function cleanup_proc (state, p)
267
- remote_do (_cleanup_proc, p. pid, state. uid)
289
+ lock (WORKER_MONITOR_LOCK) do
290
+ wid = p. pid
291
+ if haskey (WORKER_MONITOR_CHANS, wid)
292
+ delete! (WORKER_MONITOR_CHANS[wid], state. uid)
293
+ remote_do (_cleanup_proc, wid, state. uid)
294
+ end
295
+ end
268
296
end
269
297
270
298
" Process-local count of actively-executing Dagger tasks per processor type."
@@ -367,7 +395,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
367
395
remove_dead_proc! (ctx, state, gproc)
368
396
369
397
lock (state. lock) do
370
- handle_fault (ctx, state, state . thunk_dict[thunk_id], gproc)
398
+ handle_fault (ctx, state, gproc)
371
399
end
372
400
continue
373
401
else
@@ -403,6 +431,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions())
403
431
404
432
safepoint (state)
405
433
end
434
+ state. halt[] = true
406
435
@sync for p in procs_to_use (ctx)
407
436
@async cleanup_proc (state, p)
408
437
end
0 commit comments