Skip to content

Commit fb8e062

Browse files
committed
Remove worker scheduler, add new procinit and FlexWorkerPool
1 parent 315c084 commit fb8e062

12 files changed

+1187
-487
lines changed

src/ParallelProcessingTools.jl

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ include("threadlocal.jl")
2929
include("onthreads.jl")
3030
include("onprocs.jl")
3131
include("workpartition.jl")
32+
include("procinit.jl")
33+
include("workerpool.jl")
3234
include("onworkers.jl")
3335
include("addworkers.jl")
3436
include("slurm.jl")

src/addworkers.jl

+44-109
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,5 @@
11
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
22

3-
const _g_processops_lock = ReentrantLock()
4-
5-
const _g_always_everywhere_code = quote
6-
import ParallelProcessingTools
7-
end
8-
9-
10-
"""
11-
always_everywhere(expr)
12-
13-
Runs `expr` on all current Julia processes, but also all future Julia
14-
processes added via [`addworkers`](@ref)).
15-
16-
Similar to `Distributed.everywhere`, but also stores `expr` so that
17-
`addworkers` can execute it automatically on new worker processes.
18-
"""
19-
macro always_everywhere(expr)
20-
return quote
21-
try
22-
lock(_g_processops_lock)
23-
expr = $(esc(Expr(:quote, expr)))
24-
push!(_g_always_everywhere_code.args, expr)
25-
_run_expr_on_procs(expr, Distributed.procs())
26-
finally
27-
unlock(_g_processops_lock)
28-
end
29-
end
30-
end
31-
export @always_everywhere
32-
33-
34-
function _run_expr_on_procs(expr, procs::AbstractVector{<:Integer})
35-
mod_expr = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), expr)
36-
Distributed.remotecall_eval(Main, procs, mod_expr)
37-
end
38-
39-
function _run_always_everywhere_code(@nospecialize(procs::AbstractVector{<:Integer}); pre_always::Expr = :())
40-
code = quote
41-
$pre_always
42-
$_g_always_everywhere_code
43-
end
44-
45-
_run_expr_on_procs(code, procs)
46-
end
47-
483

494
"""
505
pinthreads_auto()
@@ -144,28 +99,19 @@ Abstract supertype for worker process addition modes.
14499
Subtypes must implement:
145100
146101
* `ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)`
147-
148-
and may want to specialize:
149-
150-
* `ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)`
151102
"""
152103
abstract type AddProcsMode end
153104

154105

155-
"""
156-
ParallelProcessingTools.worker_init_code(::AddProcsMode)::Expr
157-
158-
Get a Julia code expression to run on new worker processes even before
159-
running [`@always_everywhere`](@ref) code on them.
160-
"""
161-
function worker_init_code end
162-
worker_init_code(::AddProcsMode) = :()
163-
164-
165106

166107
"""
167108
addworkers(mode::ParallelProcessingTools.AddProcsMode)
168109
110+
addworkers(
111+
mode::ParallelProcessingTools.AddProcsMode,
112+
pool::Union{AbstractWorkerPool,Nothing}
113+
)
114+
169115
Add Julia worker processes for LEGEND data processing.
170116
171117
By default ensures that all workers processes use the same Julia project
@@ -198,6 +144,10 @@ See also [`worker_resources()`](@ref).
198144
function addworkers end
199145
export addworkers
200146

147+
function addworkers(mode::ParallelProcessingTools.AddProcsMode)
148+
addworkers(mode, default_flex_worker_pool())
149+
end
150+
201151

202152
"""
203153
LocalProcesses(;
@@ -212,10 +162,13 @@ end
212162
export LocalProcesses
213163

214164

215-
function addworkers(mode::LocalProcesses)
165+
function addworkers(
166+
mode::LocalProcesses,
167+
@nospecialize(pool::Union{AbstractWorkerPool,Nothing})
168+
)
216169
n_workers = mode.nprocs
217170
try
218-
lock(_g_processops_lock)
171+
lock(allprocs_management_lock())
219172

220173
@info "Adding $n_workers Julia processes on current host"
221174

@@ -230,18 +183,26 @@ function addworkers(mode::LocalProcesses)
230183
exeflags = `--project=$julia_project --threads=$worker_nthreads`
231184
)
232185

233-
@info "Configuring $n_workers new Julia worker processes"
234-
235-
_run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode))
236-
_maybe_add_workers_to_scheduler(new_workers)
237-
238-
# Sanity check:
239-
worker_ids = Distributed.remotecall_fetch.(Ref(Distributed.myid), Distributed.workers())
240-
@assert length(worker_ids) == Distributed.nworkers()
186+
_init_new_workers(new_workers, pool)
241187

242188
@info "Added $(length(new_workers)) Julia worker processes on current host"
243189
finally
244-
unlock(_g_processops_lock)
190+
unlock(allprocs_management_lock())
191+
end
192+
end
193+
194+
195+
function _init_new_workers(
196+
new_workers::AbstractVector{<:Integer},
197+
@nospecialize(pool::Union{AbstractWorkerPool,Nothing})
198+
)
199+
@info "Sending initialization code to $(length(new_workers)) new worker processes"
200+
r = ensure_procinit(new_workers)
201+
wait_for_all(values(r))
202+
203+
if !isnothing(pool)
204+
@info "Adding $(length(new_workers)) to worker pool $(getlabel(pool))"
205+
foreach(Base.Fix1(push!, pool), new_workers)
245206
end
246207
end
247208

@@ -268,13 +229,13 @@ end
268229

269230
"""
270231
ParallelProcessingTools.default_elastic_manager()
271-
ParallelProcessingTools.default_elastic_manager(manager::ClusterManagers.ElasticManager)
232+
ParallelProcessingTools.default_elastic_manager(manager::ClusterManager)
272233
273234
Get or set the default elastic cluster manager.
274235
"""
275236
function default_elastic_manager end
276237

277-
const _g_elastic_manager = Ref{Union{Nothing, ClusterManagers.ElasticManager}}(nothing)
238+
const _g_elastic_manager = Ref{Union{Nothing,ClusterManager}}(nothing)
278239

279240
function default_elastic_manager()
280241
if isnothing(_g_elastic_manager[])
@@ -283,7 +244,7 @@ function default_elastic_manager()
283244
return _g_elastic_manager[]
284245
end
285246

286-
function default_elastic_manager(manager::ClusterManagers.ElasticManager)
247+
function default_elastic_manager(manager::ClusterManager)
287248
_g_elastic_manager[] = manager
288249
return _g_elastic_manager[]
289250
end
@@ -298,8 +259,8 @@ elastic cluster manager.
298259
299260
Subtypes must implement:
300261
301-
* `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)`
302-
* `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)`
262+
* `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManager)`
263+
* `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManager)`
303264
304265
and may want to specialize:
305266
@@ -310,7 +271,7 @@ abstract type ElasticAddProcsMode <: AddProcsMode end
310271
"""
311272
ParallelProcessingTools.worker_start_command(
312273
mode::ElasticAddProcsMode,
313-
manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager()
274+
manager::ClusterManager = ParallelProcessingTools.default_elastic_manager()
314275
)::Tuple{Cmd,Integer}
315276
316277
Return the system command to start worker processes as well as the number of
@@ -384,9 +345,12 @@ a `Task`, `Process` or any other object that supports
384345
function start_elastic_workers end
385346

386347

387-
function addworkers(mode::ElasticAddProcsMode)
348+
function addworkers(
349+
mode::ElasticAddProcsMode,
350+
@nospecialize(pool::Union{AbstractWorkerPool,Nothing})
351+
)
388352
try
389-
lock(_g_processops_lock)
353+
lock(allprocs_management_lock())
390354

391355
manager = default_elastic_manager()
392356

@@ -442,17 +406,15 @@ function addworkers(mode::ElasticAddProcsMode)
442406
new_workers = setdiff(Distributed.workers(), old_procs)
443407
n_new = length(new_workers)
444408

445-
@info "Initializing $n_new new Julia worker processes"
446-
_run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode))
447-
_maybe_add_workers_to_scheduler(new_workers)
409+
_init_new_workers(new_workers, pool)
448410

449411
@info "Added $n_new new Julia worker processes"
450412

451413
if n_new != n_to_add
452414
throw(ErrorException("Tried to add $n_to_add new workers, but added $n_new"))
453415
end
454416
finally
455-
unlock(_g_processops_lock)
417+
unlock(allprocs_management_lock())
456418
end
457419
end
458420

@@ -498,33 +460,6 @@ function start_elastic_workers(mode::ExternalProcesses, manager::ClusterManagers
498460
end
499461

500462

501-
"""
502-
killworkers(worker::Integer)
503-
killworkers(workers::AbstractVector{<:Integer})
504-
505-
Kill one or more worker processes.
506-
"""
507-
function killworkers end
508-
export killworkers
509-
510-
function killworkers(workers::Union{Integer,AbstractVector{<:Integer}})
511-
main_process = Distributed.myid()
512-
if main_process in workers
513-
throw(ArgumentError("Will not kill the main process (process $main_process)"))
514-
end
515-
516-
err = try
517-
Distributed.remotecall_eval(Main, workers, :(exit(1)))
518-
catch err
519-
if !(err isa Distributed.ProcessExitedException)
520-
rethrow()
521-
end
522-
end
523-
524-
return nothing
525-
end
526-
527-
528463
"""
529464
always_addworkers(mode::ParallelProcessingTools.AddProcsMode, min_nworkers::Integer)
530465

0 commit comments

Comments
 (0)