Skip to content

Commit ba0b7ce

Browse files
committed
Add tools for Julia cluster management
1 parent f7ab97e commit ba0b7ce

File tree

5 files changed

+800
-3
lines changed

5 files changed

+800
-3
lines changed

Project.toml

-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
99
Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a"
1010
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
1111
ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042"
12-
UnPack = "3a884ed6-31ef-47d7-9d2a-63182c4928ed"
1312

1413
[compat]
1514
ClusterManagers = "0.4.6"
@@ -18,5 +17,4 @@ LinearAlgebra = "1"
1817
Parameters = "0.12, 0.13"
1918
Pkg = "1"
2019
ThreadPinning = "0.7.22"
21-
UnPack = "1"
2220
julia = "1.6"

docs/src/index.md

+61
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,64 @@
11
# ParallelProcessingTools.jl
22

33
This Julia package provides some tools to ease multithreaded and distributed programming.
4+
5+
6+
## Compute cluster management
7+
8+
ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally).
9+
10+
On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally).
11+
12+
```julia
13+
using ParallelProcessingTools, Distributed
14+
15+
@always_everywhere begin
16+
using Distributions
17+
end
18+
19+
mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
20+
#ParallelProcessingTools.worker_start_command(mode)
21+
22+
# Add some workers:
23+
addworkers(mode)
24+
25+
# List resources:
26+
ParallelProcessingTools.worker_resources()
27+
28+
# Confirm that Distributions is loaded on workers:
29+
worker = last(workers())
30+
@fetchfrom worker Normal()
31+
32+
# Add some more workers:
33+
addworkers(mode)
34+
Table(ParallelProcessingTools.worker_resources())
35+
36+
# Add even more workers:
37+
addworkers(mode)
38+
Table(ParallelProcessingTools.worker_resources())
39+
```
40+
41+
And we can do SLURM batch scripts like this (e.g. "batchtest.jl"):
42+
43+
```julia
44+
#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8
45+
#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G
46+
47+
using ParallelProcessingTools, Distributed
48+
49+
@always_everywhere begin
50+
using ParallelProcessingTools
51+
end
52+
53+
addworkers(SlurmRun())
54+
resources = ParallelProcessingTools.worker_resources()
55+
show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources())
56+
```
57+
58+
This should run with a simple
59+
60+
```shell
61+
sbatch -o out.txt batchtest.jl
62+
```
63+
64+
and "out.txt" should then contain a list of the worker resources.

src/ParallelProcessingTools.jl

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ import ClusterManagers
1414
import ThreadPinning
1515

1616
using Parameters: @with_kw
17-
using Unpack: @unpack
1817

1918
include("threadsafe.jl")
2019
include("threadlocal.jl")
2120
include("onthreads.jl")
2221
include("onprocs.jl")
2322
include("workpartition.jl")
23+
include("workers.jl")
24+
include("slurm.jl")
2425
include("deprecated.jl")
2526

2627
end # module

src/slurm.jl

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
2+
3+
"""
4+
SlurmRun(;
5+
slurm_flags::Cmd = {defaults}
6+
julia_flags::Cmd = {defaults}
7+
dir = pwd()
8+
user_start::Bool = false
9+
timeout::Real = 60
10+
)
11+
12+
Mode to add worker processes via SLURM `srun`.
13+
14+
`srun` and Julia worker `julia` command line flags are inferred from SLURM
15+
environment variables (e.g. when inside of an `salloc` or batch job), as
16+
well as `slurm_flags` and `julia_flags`.
17+
18+
Workers are started with current directory set to `dir`.
19+
20+
Example:
21+
22+
```julia
23+
mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`)
24+
addworkers(mode)
25+
```
26+
27+
If `user_start` is `true`, then the SLURM srun-command will not be run
28+
automatically, instead it will be logged via `@info` and the user is
29+
responsible for running it. This srun-command can also be retrieved via
30+
[`worker_start_command(mode)](@ref).
31+
"""
32+
@with_kw struct SlurmRun <: ElasticAddProcsMode
33+
slurm_flags::Cmd = _default_slurm_flags()
34+
julia_flags::Cmd = _default_julia_flags()
35+
dir = pwd()
36+
user_start::Bool = false
37+
timeout::Real = 60
38+
end
39+
export SlurmRun
40+
41+
42+
const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1)
43+
44+
function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticManager)
45+
slurm_flags = mode.slurm_flags
46+
julia_flags = mode.julia_flags
47+
dir = mode.dir
48+
49+
tc = _get_slurm_taskconf(slurm_flags, ENV)
50+
51+
n_workers = _slurm_nworkers(tc)
52+
mem_per_task = _slurm_mem_per_task(tc)
53+
54+
heap_size_hint_fraction = 0.5
55+
heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2)
56+
jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M`
57+
58+
jl_threads_flag = isnothing(tc.cpus_per_task) ? `` : `--threads=$(tc.cpus_per_task)`
59+
60+
additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags`
61+
jlstep = atomic_add!(_g_slurm_nextjlstep, 1)
62+
jobname = "julia-$(getpid())-$jlstep"
63+
64+
worker_cmd = elastic_localworker_startcmd(manager; julia_flags = `$julia_flags $additional_julia_flags`)
65+
66+
return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, n_workers
67+
end
68+
69+
function _slurm_nworkers(tc::NamedTuple)
70+
if !isnothing(tc.n_tasks)
71+
tc.n_tasks
72+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.ntasks_per_node)
73+
tc.n_nodes * tc.ntasks_per_node
74+
else
75+
throw(ArgumentError("Could not infer number of tasks/processes from SLURM environment and flags."))
76+
end
77+
end
78+
79+
function _slurm_mem_per_task(tc::NamedTuple)
80+
if !isnothing(tc.cpus_per_task) && !isnothing(tc.mem_per_cpu)
81+
tc.cpus_per_task * tc.mem_per_cpu
82+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.ntasks_per_node)
83+
div(tc.mem_per_node, tc.ntasks_per_node)
84+
elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.n_tasks)
85+
div(tc.n_nodes * tc.mem_per_node, tc.n_tasks)
86+
else
87+
nothing
88+
end
89+
end
90+
91+
92+
function ParallelProcessingTools.start_elastic_workers(mode::SlurmRun, manager::ClusterManagers.ElasticManager)
93+
srun_cmd, n_workers = worker_start_command(mode, manager)
94+
if mode.user_start
95+
@info "To add Julia worker processes (I'll wait for them), run: $srun_cmd"
96+
else
97+
@info "Starting SLURM job: $srun_cmd"
98+
srun_proc = open(srun_cmd)
99+
end
100+
return n_workers
101+
end
102+
103+
function worker_init_code(::SlurmRun)
104+
quote
105+
import ParallelProcessingTools
106+
ParallelProcessingTools.pinthreads_auto()
107+
end
108+
end
109+
110+
elastic_addprocs_timeout(mode::SlurmRun) = mode.timeout
111+
112+
113+
function _default_slurm_flags()
114+
# `srun` in `salloc`/`sbatch` doesn't seem to always pick up
115+
# SLURM_CPUS_PER_TASK, resulting in incorrect thread pinning. So we'll
116+
# set `--cpus-per-task` explicitly:
117+
haskey(ENV, "SLURM_CPUS_PER_TASK") ? `--cpus-per-task=$(ENV["SLURM_CPUS_PER_TASK"])` : ``
118+
end
119+
120+
121+
const _slurm_memunits = IdDict{Char,Int}('K' => 1024^1, 'M' => 1024^2, 'G' => 1024^3, 'T' => 1024^4)
122+
123+
const _slurm_memsize_regex = r"^([0-9]+)([KMGT])?$"
124+
function _slurm_parse_memoptval(memsize::AbstractString)
125+
s = strip(memsize)
126+
m = match(_slurm_memsize_regex, s)
127+
if isnothing(m)
128+
throw(ArgumentError("Invalid SLURM memory size specification \"$s\""))
129+
else
130+
value = parse(Int, m.captures[1])
131+
unitchar = only(something(m.captures[2], 'M'))
132+
unitmult = _slurm_memunits[unitchar]
133+
return value * unitmult
134+
end
135+
end
136+
_slurm_parse_memoptval(::Nothing) = nothing
137+
138+
_slurm_parse_intoptval(value::AbstractString) = parse(Int, value)
139+
_slurm_parse_intoptval(::Nothing) = nothing
140+
141+
function _slurm_parse_shortopt(opt::Char, args::Vector{String}, i::Int, default)
142+
if i <= lastindex(args)
143+
arg = args[i]
144+
if arg == "-$opt"
145+
if i < lastindex(args) && !startswith(args[i+1], "-")
146+
return args[i+1], i+2
147+
else
148+
throw(ArgumentError("Missing value for option \"-$opt\""))
149+
end
150+
elseif startswith(arg, "-$opt")
151+
if length(arg) > 2
152+
return arg[begin+2:end], i+1
153+
else
154+
throw(ArgumentError("Missing value for option \"-$opt\""))
155+
end
156+
else
157+
return default, i
158+
end
159+
else
160+
return default, i
161+
end
162+
end
163+
164+
function _slurm_parse_longopt(opt::String, args::Vector{String}, i::Int, default)
165+
if i <= lastindex(args)
166+
arg = args[i]
167+
if arg == "--$opt"
168+
if i < lastindex(args) && !startswith(args[i+1], "-")
169+
return args[i+1], i+2
170+
else
171+
throw(ArgumentError("Missing value for option \"--$opt\""))
172+
end
173+
elseif startswith(arg, "--$opt=")
174+
if length(arg) > length(opt) + 3
175+
return arg[begin+length(opt)+3:end], i+1
176+
else
177+
throw(ArgumentError("Missing value for option \"--$opt\""))
178+
end
179+
else
180+
return default, i
181+
end
182+
else
183+
return default, i
184+
end
185+
end
186+
187+
function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String})
188+
n_tasks = get(env, "SLURM_NTASKS", nothing)
189+
cpus_per_task = get(env, "SLURM_CPUS_PER_TASK", nothing)
190+
mem_per_cpu = get(env, "SLURM_MEM_PER_CPU", nothing)
191+
n_nodes = get(env, "SLURM_JOB_NUM_NODES", nothing)
192+
ntasks_per_node = get(env, "SLURM_NTASKS_PER_NODE", nothing)
193+
mem_per_node = get(env, "SLURM_MEM_PER_NODE", nothing)
194+
195+
args = slurmflags.exec
196+
i::Int = firstindex(args)
197+
while i <= lastindex(args)
198+
last_i = i
199+
n_tasks, i = _slurm_parse_shortopt('n', args, i, n_tasks)
200+
n_tasks, i = _slurm_parse_longopt("ntasks", args, i, n_tasks)
201+
cpus_per_task, i = _slurm_parse_shortopt('c', args, i, cpus_per_task)
202+
cpus_per_task, i = _slurm_parse_longopt("cpus-per-task", args, i, cpus_per_task)
203+
mem_per_cpu, i = _slurm_parse_longopt("mem-per-cpu", args, i, mem_per_cpu)
204+
n_nodes, i = _slurm_parse_shortopt('N', args, i, n_nodes)
205+
n_nodes, i = _slurm_parse_longopt("nodes", args, i, n_nodes)
206+
mem_per_node, i = _slurm_parse_longopt("mem", args, i, mem_per_node)
207+
ntasks_per_node, i = _slurm_parse_longopt("ntasks-per-node", args, i, ntasks_per_node)
208+
209+
if last_i == i
210+
i += 1
211+
end
212+
end
213+
214+
return (
215+
n_tasks = _slurm_parse_intoptval(n_tasks),
216+
cpus_per_task = _slurm_parse_intoptval(cpus_per_task),
217+
mem_per_cpu = _slurm_parse_memoptval(mem_per_cpu),
218+
n_nodes = _slurm_parse_intoptval(n_nodes),
219+
ntasks_per_node = _slurm_parse_intoptval(ntasks_per_node),
220+
mem_per_node = _slurm_parse_memoptval(mem_per_node),
221+
)
222+
end
223+
224+
225+
function _addprocs_slurm(; kwargs...)
226+
slurm_ntasks = parse(Int, ENV["SLURM_NTASKS"])
227+
slurm_ntasks > 1 || throw(ErrorException("Invalid nprocs=$slurm_ntasks inferred from SLURM environment"))
228+
_addprocs_slurm(slurm_ntasks; kwargs...)
229+
end
230+
231+
function _addprocs_slurm(
232+
nprocs::Int;
233+
job_file_loc::AbstractString = joinpath(homedir(), "slurm-julia-output"),
234+
retry_delays::AbstractVector{<:Real} = [1, 1, 2, 2, 4, 5, 5, 10, 10, 10, 10, 20, 20, 20]
235+
)
236+
try
237+
lock(_g_processops_lock)
238+
239+
@info "Adding $nprocs Julia processes via SLURM"
240+
241+
julia_project = dirname(Pkg.project().path)
242+
slurm_ntasks = nprocs
243+
slurm_nthreads = parse(Int, ENV["SLURM_CPUS_PER_TASK"])
244+
slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2
245+
slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu
246+
247+
cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays)
248+
worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60))
249+
ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout"
250+
251+
mkpath(job_file_loc)
252+
new_workers = Distributed.addprocs(
253+
cluster_manager, job_file_loc = job_file_loc,
254+
exeflags = `--project=$julia_project --threads=$slurm_nthreads --heap-size-hint=$(slurm_mem_per_task÷2)`,
255+
cpus_per_task = "$slurm_nthreads", mem_per_cpu="$(slurm_mem_per_cpu >> 30)G", # time="0:10:00",
256+
mem_bind = "local", cpu_bind="cores",
257+
)
258+
259+
@info "Configuring $nprocs new Julia worker processes"
260+
261+
_run_always_everywhere_code(new_workers)
262+
pinthreads_distributed(new_workers)
263+
264+
@info "Added $(length(new_workers)) Julia worker processes via SLURM"
265+
finally
266+
unlock(_g_processops_lock)
267+
end
268+
end

0 commit comments

Comments
 (0)