11# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
22
3- const _g_processops_lock = ReentrantLock ()
4-
5- const _g_always_everywhere_code = quote
6- import ParallelProcessingTools
7- end
8-
9-
10- """
11- always_everywhere(expr)
12-
13- Runs `expr` on all current Julia processes, but also all future Julia
14- processes added via [`addworkers`](@ref)).
15-
16- Similar to `Distributed.everywhere`, but also stores `expr` so that
17- `addworkers` can execute it automatically on new worker processes.
18- """
19- macro always_everywhere (expr)
20- return quote
21- try
22- lock (_g_processops_lock)
23- expr = $ (esc (Expr (:quote , expr)))
24- push! (_g_always_everywhere_code. args, expr)
25- _run_expr_on_procs (expr, Distributed. procs ())
26- finally
27- unlock (_g_processops_lock)
28- end
29- end
30- end
31- export @always_everywhere
32-
33-
34- function _run_expr_on_procs (expr, procs:: AbstractVector{<:Integer} )
35- mod_expr = Expr (:toplevel , :(task_local_storage ()[:SOURCE_PATH ] = $ (get (task_local_storage (), :SOURCE_PATH , nothing ))), expr)
36- Distributed. remotecall_eval (Main, procs, mod_expr)
37- end
38-
39- function _run_always_everywhere_code (@nospecialize (procs:: AbstractVector{<:Integer} ); pre_always:: Expr = :())
40- code = quote
41- $ pre_always
42- $ _g_always_everywhere_code
43- end
44-
45- _run_expr_on_procs (code, procs)
46- end
47-
483
494"""
505 pinthreads_auto()
@@ -144,28 +99,19 @@ Abstract supertype for worker process addition modes.
14499Subtypes must implement:
145100
146101* `ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)`
147-
148- and may want to specialize:
149-
150- * `ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)`
151102"""
152103abstract type AddProcsMode end
153104
154105
155- """
156- ParallelProcessingTools.worker_init_code(::AddProcsMode)::Expr
157-
158- Get a Julia code expression to run on new worker processes even before
159- running [`@always_everywhere`](@ref) code on them.
160- """
161- function worker_init_code end
162- worker_init_code (:: AddProcsMode ) = :()
163-
164-
165106
166107"""
167108 addworkers(mode::ParallelProcessingTools.AddProcsMode)
168109
110+ addworkers(
111+ mode::ParallelProcessingTools.AddProcsMode,
112+ pool::Union{AbstractWorkerPool,Nothing}
113+ )
114+
169115Add Julia worker processes for LEGEND data processing.
170116
171117By default ensures that all workers processes use the same Julia project
@@ -198,6 +144,10 @@ See also [`worker_resources()`](@ref).
198144function addworkers end
199145export addworkers
200146
147+ function addworkers (mode:: ParallelProcessingTools.AddProcsMode )
148+ addworkers (mode, default_flex_worker_pool ())
149+ end
150+
201151
202152"""
203153 LocalProcesses(;
@@ -212,10 +162,13 @@ end
212162export LocalProcesses
213163
214164
215- function addworkers (mode:: LocalProcesses )
165+ function addworkers (
166+ mode:: LocalProcesses ,
167+ @nospecialize (pool:: Union{AbstractWorkerPool,Nothing} )
168+ )
216169 n_workers = mode. nprocs
217170 try
218- lock (_g_processops_lock )
171+ lock (allprocs_management_lock () )
219172
220173 @info " Adding $n_workers Julia processes on current host"
221174
@@ -230,18 +183,26 @@ function addworkers(mode::LocalProcesses)
230183 exeflags = ` --project=$julia_project --threads=$worker_nthreads `
231184 )
232185
233- @info " Configuring $n_workers new Julia worker processes"
234-
235- _run_always_everywhere_code (new_workers, pre_always = worker_init_code (mode))
236- _maybe_add_workers_to_scheduler (new_workers)
237-
238- # Sanity check:
239- worker_ids = Distributed. remotecall_fetch .(Ref (Distributed. myid), Distributed. workers ())
240- @assert length (worker_ids) == Distributed. nworkers ()
186+ _init_new_workers (new_workers, pool)
241187
242188 @info " Added $(length (new_workers)) Julia worker processes on current host"
243189 finally
244- unlock (_g_processops_lock)
190+ unlock (allprocs_management_lock ())
191+ end
192+ end
193+
194+
195+ function _init_new_workers (
196+ new_workers:: AbstractVector{<:Integer} ,
197+ @nospecialize (pool:: Union{AbstractWorkerPool,Nothing} )
198+ )
199+ @info " Sending initialization code to $(length (new_workers)) new worker processes"
200+ r = ensure_procinit (new_workers)
201+ wait_for_all (values (r))
202+
203+ if ! isnothing (pool)
204+ @info " Adding $(length (new_workers)) to worker pool $(getlabel (pool)) "
205+ foreach (Base. Fix1 (push!, pool), new_workers)
245206 end
246207end
247208
@@ -268,13 +229,13 @@ end
268229
269230"""
270231 ParallelProcessingTools.default_elastic_manager()
271- ParallelProcessingTools.default_elastic_manager(manager::ClusterManagers.ElasticManager )
232+ ParallelProcessingTools.default_elastic_manager(manager::ClusterManager )
272233
273234Get or set the default elastic cluster manager.
274235"""
275236function default_elastic_manager end
276237
277- const _g_elastic_manager = Ref {Union{Nothing, ClusterManagers.ElasticManager }} (nothing )
238+ const _g_elastic_manager = Ref {Union{Nothing,ClusterManager }} (nothing )
278239
279240function default_elastic_manager ()
280241 if isnothing (_g_elastic_manager[])
@@ -283,7 +244,7 @@ function default_elastic_manager()
283244 return _g_elastic_manager[]
284245end
285246
286- function default_elastic_manager (manager:: ClusterManagers.ElasticManager )
247+ function default_elastic_manager (manager:: ClusterManager )
287248 _g_elastic_manager[] = manager
288249 return _g_elastic_manager[]
289250end
@@ -298,8 +259,8 @@ elastic cluster manager.
298259
299260Subtypes must implement:
300261
301- * `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager )`
302- * `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager )`
262+ * `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManager )`
263+ * `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManager )`
303264
304265and may want to specialize:
305266
@@ -310,7 +271,7 @@ abstract type ElasticAddProcsMode <: AddProcsMode end
310271"""
311272 ParallelProcessingTools.worker_start_command(
312273 mode::ElasticAddProcsMode,
313- manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager()
274+ manager::ClusterManager = ParallelProcessingTools.default_elastic_manager()
314275 )::Tuple{Cmd,Integer}
315276
316277Return the system command to start worker processes as well as the number of
@@ -384,9 +345,12 @@ a `Task`, `Process` or any other object that supports
384345function start_elastic_workers end
385346
386347
387- function addworkers (mode:: ElasticAddProcsMode )
348+ function addworkers (
349+ mode:: ElasticAddProcsMode ,
350+ @nospecialize (pool:: Union{AbstractWorkerPool,Nothing} )
351+ )
388352 try
389- lock (_g_processops_lock )
353+ lock (allprocs_management_lock () )
390354
391355 manager = default_elastic_manager ()
392356
@@ -442,17 +406,15 @@ function addworkers(mode::ElasticAddProcsMode)
442406 new_workers = setdiff (Distributed. workers (), old_procs)
443407 n_new = length (new_workers)
444408
445- @info " Initializing $n_new new Julia worker processes"
446- _run_always_everywhere_code (new_workers, pre_always = worker_init_code (mode))
447- _maybe_add_workers_to_scheduler (new_workers)
409+ _init_new_workers (new_workers, pool)
448410
449411 @info " Added $n_new new Julia worker processes"
450412
451413 if n_new != n_to_add
452414 throw (ErrorException (" Tried to add $n_to_add new workers, but added $n_new " ))
453415 end
454416 finally
455- unlock (_g_processops_lock )
417+ unlock (allprocs_management_lock () )
456418 end
457419end
458420
@@ -498,33 +460,6 @@ function start_elastic_workers(mode::ExternalProcesses, manager::ClusterManagers
498460end
499461
500462
501- """
502- killworkers(worker::Integer)
503- killworkers(workers::AbstractVector{<:Integer})
504-
505- Kill one or more worker processes.
506- """
507- function killworkers end
508- export killworkers
509-
510- function killworkers (workers:: Union{Integer,AbstractVector{<:Integer}} )
511- main_process = Distributed. myid ()
512- if main_process in workers
513- throw (ArgumentError (" Will not kill the main process (process $main_process )" ))
514- end
515-
516- err = try
517- Distributed. remotecall_eval (Main, workers, :(exit (1 )))
518- catch err
519- if ! (err isa Distributed. ProcessExitedException)
520- rethrow ()
521- end
522- end
523-
524- return nothing
525- end
526-
527-
528463"""
529464 always_addworkers(mode::ParallelProcessingTools.AddProcsMode, min_nworkers::Integer)
530465
0 commit comments