|
| 1 | +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). |
| 2 | + |
| 3 | +""" |
| 4 | + SlurmRun(; |
| 5 | + slurm_flags::Cmd = {defaults} |
| 6 | + julia_flags::Cmd = {defaults} |
| 7 | + dir = pwd() |
| 8 | + user_start::Bool = false |
| 9 | + timeout::Real = 60 |
| 10 | + ) |
| 11 | +
|
| 12 | +Mode to add worker processes via SLURM `srun`. |
| 13 | +
|
| 14 | +`srun` and Julia worker `julia` command line flags are inferred from SLURM |
| 15 | +environment variables (e.g. when inside of an `salloc` or batch job), as |
| 16 | +well as `slurm_flags` and `julia_flags`. |
| 17 | +
|
| 18 | +Workers are started with current directory set to `dir`. |
| 19 | +
|
| 20 | +Example: |
| 21 | +
|
| 22 | +```julia |
| 23 | +mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) |
| 24 | +addworkers(mode) |
| 25 | +``` |
| 26 | +
|
| 27 | +If `user_start` is `true`, then the SLURM srun-command will not be run |
| 28 | +automatically, instead it will be logged via `@info` and the user is |
| 29 | +responsible for running it. This srun-command can also be retrieved via |
| 30 | +[`worker_start_command(mode)](@ref). |
| 31 | +""" |
| 32 | +@with_kw struct SlurmRun <: ElasticAddProcsMode |
| 33 | + slurm_flags::Cmd = _default_slurm_flags() |
| 34 | + julia_flags::Cmd = _default_julia_flags() |
| 35 | + dir = pwd() |
| 36 | + user_start::Bool = false |
| 37 | + timeout::Real = 60 |
| 38 | +end |
| 39 | +export SlurmRun |
| 40 | + |
| 41 | + |
| 42 | +const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1) |
| 43 | + |
| 44 | +function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticManager) |
| 45 | + slurm_flags = mode.slurm_flags |
| 46 | + julia_flags = mode.julia_flags |
| 47 | + dir = mode.dir |
| 48 | + |
| 49 | + tc = _get_slurm_taskconf(slurm_flags, ENV) |
| 50 | + |
| 51 | + n_workers = _slurm_nworkers(tc) |
| 52 | + mem_per_task = _slurm_mem_per_task(tc) |
| 53 | + |
| 54 | + heap_size_hint_fraction = 0.5 |
| 55 | + heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2) |
| 56 | + jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M` |
| 57 | + |
| 58 | + jl_threads_flag = isnothing(tc.cpus_per_task) ? `` : `--threads=$(tc.cpus_per_task)` |
| 59 | + |
| 60 | + additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags` |
| 61 | + jlstep = atomic_add!(_g_slurm_nextjlstep, 1) |
| 62 | + jobname = "julia-$(getpid())-$jlstep" |
| 63 | + |
| 64 | + worker_cmd = elastic_localworker_startcmd(manager; julia_flags = `$julia_flags $additional_julia_flags`) |
| 65 | + |
| 66 | + return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, n_workers |
| 67 | +end |
| 68 | + |
| 69 | +function _slurm_nworkers(tc::NamedTuple) |
| 70 | + if !isnothing(tc.n_tasks) |
| 71 | + tc.n_tasks |
| 72 | + elseif !isnothing(tc.n_nodes) && !isnothing(tc.ntasks_per_node) |
| 73 | + tc.n_nodes * tc.ntasks_per_node |
| 74 | + else |
| 75 | + throw(ArgumentError("Could not infer number of tasks/processes from SLURM environment and flags.")) |
| 76 | + end |
| 77 | +end |
| 78 | + |
| 79 | +function _slurm_mem_per_task(tc::NamedTuple) |
| 80 | + if !isnothing(tc.cpus_per_task) && !isnothing(tc.mem_per_cpu) |
| 81 | + tc.cpus_per_task * tc.mem_per_cpu |
| 82 | + elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.ntasks_per_node) |
| 83 | + div(tc.mem_per_node, tc.ntasks_per_node) |
| 84 | + elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.n_tasks) |
| 85 | + div(tc.n_nodes * tc.mem_per_node, tc.n_tasks) |
| 86 | + else |
| 87 | + nothing |
| 88 | + end |
| 89 | +end |
| 90 | + |
| 91 | + |
| 92 | +function ParallelProcessingTools.start_elastic_workers(mode::SlurmRun, manager::ClusterManagers.ElasticManager) |
| 93 | + srun_cmd, n_workers = worker_start_command(mode, manager) |
| 94 | + if mode.user_start |
| 95 | + @info "To add Julia worker processes (I'll wait for them), run: $srun_cmd" |
| 96 | + else |
| 97 | + @info "Starting SLURM job: $srun_cmd" |
| 98 | + srun_proc = open(srun_cmd) |
| 99 | + end |
| 100 | + return n_workers |
| 101 | +end |
| 102 | + |
| 103 | +function worker_init_code(::SlurmRun) |
| 104 | + quote |
| 105 | + import ParallelProcessingTools |
| 106 | + ParallelProcessingTools.pinthreads_auto() |
| 107 | + end |
| 108 | +end |
| 109 | + |
| 110 | +elastic_addprocs_timeout(mode::SlurmRun) = mode.timeout |
| 111 | + |
| 112 | + |
| 113 | +function _default_slurm_flags() |
| 114 | + # `srun` in `salloc`/`sbatch` doesn't seem to always pick up |
| 115 | + # SLURM_CPUS_PER_TASK, resulting in incorrect thread pinning. So we'll |
| 116 | + # set `--cpus-per-task` explicitly: |
| 117 | + haskey(ENV, "SLURM_CPUS_PER_TASK") ? `--cpus-per-task=$(ENV["SLURM_CPUS_PER_TASK"])` : `` |
| 118 | +end |
| 119 | + |
| 120 | + |
| 121 | +const _slurm_memunits = IdDict{Char,Int}('K' => 1024^1, 'M' => 1024^2, 'G' => 1024^3, 'T' => 1024^4) |
| 122 | + |
| 123 | +const _slurm_memsize_regex = r"^([0-9]+)([KMGT])?$" |
| 124 | +function _slurm_parse_memoptval(memsize::AbstractString) |
| 125 | + s = strip(memsize) |
| 126 | + m = match(_slurm_memsize_regex, s) |
| 127 | + if isnothing(m) |
| 128 | + throw(ArgumentError("Invalid SLURM memory size specification \"$s\"")) |
| 129 | + else |
| 130 | + value = parse(Int, m.captures[1]) |
| 131 | + unitchar = only(something(m.captures[2], 'M')) |
| 132 | + unitmult = _slurm_memunits[unitchar] |
| 133 | + return value * unitmult |
| 134 | + end |
| 135 | +end |
| 136 | +_slurm_parse_memoptval(::Nothing) = nothing |
| 137 | + |
| 138 | +_slurm_parse_intoptval(value::AbstractString) = parse(Int, value) |
| 139 | +_slurm_parse_intoptval(::Nothing) = nothing |
| 140 | + |
| 141 | +function _slurm_parse_shortopt(opt::Char, args::Vector{String}, i::Int, default) |
| 142 | + if i <= lastindex(args) |
| 143 | + arg = args[i] |
| 144 | + if arg == "-$opt" |
| 145 | + if i < lastindex(args) && !startswith(args[i+1], "-") |
| 146 | + return args[i+1], i+2 |
| 147 | + else |
| 148 | + throw(ArgumentError("Missing value for option \"-$opt\"")) |
| 149 | + end |
| 150 | + elseif startswith(arg, "-$opt") |
| 151 | + if length(arg) > 2 |
| 152 | + return arg[begin+2:end], i+1 |
| 153 | + else |
| 154 | + throw(ArgumentError("Missing value for option \"-$opt\"")) |
| 155 | + end |
| 156 | + else |
| 157 | + return default, i |
| 158 | + end |
| 159 | + else |
| 160 | + return default, i |
| 161 | + end |
| 162 | +end |
| 163 | + |
| 164 | +function _slurm_parse_longopt(opt::String, args::Vector{String}, i::Int, default) |
| 165 | + if i <= lastindex(args) |
| 166 | + arg = args[i] |
| 167 | + if arg == "--$opt" |
| 168 | + if i < lastindex(args) && !startswith(args[i+1], "-") |
| 169 | + return args[i+1], i+2 |
| 170 | + else |
| 171 | + throw(ArgumentError("Missing value for option \"--$opt\"")) |
| 172 | + end |
| 173 | + elseif startswith(arg, "--$opt=") |
| 174 | + if length(arg) > length(opt) + 3 |
| 175 | + return arg[begin+length(opt)+3:end], i+1 |
| 176 | + else |
| 177 | + throw(ArgumentError("Missing value for option \"--$opt\"")) |
| 178 | + end |
| 179 | + else |
| 180 | + return default, i |
| 181 | + end |
| 182 | + else |
| 183 | + return default, i |
| 184 | + end |
| 185 | +end |
| 186 | + |
| 187 | +function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String}) |
| 188 | + n_tasks = get(env, "SLURM_NTASKS", nothing) |
| 189 | + cpus_per_task = get(env, "SLURM_CPUS_PER_TASK", nothing) |
| 190 | + mem_per_cpu = get(env, "SLURM_MEM_PER_CPU", nothing) |
| 191 | + n_nodes = get(env, "SLURM_JOB_NUM_NODES", nothing) |
| 192 | + ntasks_per_node = get(env, "SLURM_NTASKS_PER_NODE", nothing) |
| 193 | + mem_per_node = get(env, "SLURM_MEM_PER_NODE", nothing) |
| 194 | + |
| 195 | + args = slurmflags.exec |
| 196 | + i::Int = firstindex(args) |
| 197 | + while i <= lastindex(args) |
| 198 | + last_i = i |
| 199 | + n_tasks, i = _slurm_parse_shortopt('n', args, i, n_tasks) |
| 200 | + n_tasks, i = _slurm_parse_longopt("ntasks", args, i, n_tasks) |
| 201 | + cpus_per_task, i = _slurm_parse_shortopt('c', args, i, cpus_per_task) |
| 202 | + cpus_per_task, i = _slurm_parse_longopt("cpus-per-task", args, i, cpus_per_task) |
| 203 | + mem_per_cpu, i = _slurm_parse_longopt("mem-per-cpu", args, i, mem_per_cpu) |
| 204 | + n_nodes, i = _slurm_parse_shortopt('N', args, i, n_nodes) |
| 205 | + n_nodes, i = _slurm_parse_longopt("nodes", args, i, n_nodes) |
| 206 | + mem_per_node, i = _slurm_parse_longopt("mem", args, i, mem_per_node) |
| 207 | + ntasks_per_node, i = _slurm_parse_longopt("ntasks-per-node", args, i, ntasks_per_node) |
| 208 | + |
| 209 | + if last_i == i |
| 210 | + i += 1 |
| 211 | + end |
| 212 | + end |
| 213 | + |
| 214 | + return ( |
| 215 | + n_tasks = _slurm_parse_intoptval(n_tasks), |
| 216 | + cpus_per_task = _slurm_parse_intoptval(cpus_per_task), |
| 217 | + mem_per_cpu = _slurm_parse_memoptval(mem_per_cpu), |
| 218 | + n_nodes = _slurm_parse_intoptval(n_nodes), |
| 219 | + ntasks_per_node = _slurm_parse_intoptval(ntasks_per_node), |
| 220 | + mem_per_node = _slurm_parse_memoptval(mem_per_node), |
| 221 | + ) |
| 222 | +end |
| 223 | + |
| 224 | + |
| 225 | +function _addprocs_slurm(; kwargs...) |
| 226 | + slurm_ntasks = parse(Int, ENV["SLURM_NTASKS"]) |
| 227 | + slurm_ntasks > 1 || throw(ErrorException("Invalid nprocs=$slurm_ntasks inferred from SLURM environment")) |
| 228 | + _addprocs_slurm(slurm_ntasks; kwargs...) |
| 229 | +end |
| 230 | + |
| 231 | +function _addprocs_slurm( |
| 232 | + nprocs::Int; |
| 233 | + job_file_loc::AbstractString = joinpath(homedir(), "slurm-julia-output"), |
| 234 | + retry_delays::AbstractVector{<:Real} = [1, 1, 2, 2, 4, 5, 5, 10, 10, 10, 10, 20, 20, 20] |
| 235 | +) |
| 236 | + try |
| 237 | + lock(_g_processops_lock) |
| 238 | + |
| 239 | + @info "Adding $nprocs Julia processes via SLURM" |
| 240 | + |
| 241 | + julia_project = dirname(Pkg.project().path) |
| 242 | + slurm_ntasks = nprocs |
| 243 | + slurm_nthreads = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) |
| 244 | + slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2 |
| 245 | + slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu |
| 246 | + |
| 247 | + cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays) |
| 248 | + worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60)) |
| 249 | + ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout" |
| 250 | + |
| 251 | + mkpath(job_file_loc) |
| 252 | + new_workers = Distributed.addprocs( |
| 253 | + cluster_manager, job_file_loc = job_file_loc, |
| 254 | + exeflags = `--project=$julia_project --threads=$slurm_nthreads --heap-size-hint=$(slurm_mem_per_task÷2)`, |
| 255 | + cpus_per_task = "$slurm_nthreads", mem_per_cpu="$(slurm_mem_per_cpu >> 30)G", # time="0:10:00", |
| 256 | + mem_bind = "local", cpu_bind="cores", |
| 257 | + ) |
| 258 | + |
| 259 | + @info "Configuring $nprocs new Julia worker processes" |
| 260 | + |
| 261 | + _run_always_everywhere_code(new_workers) |
| 262 | + pinthreads_distributed(new_workers) |
| 263 | + |
| 264 | + @info "Added $(length(new_workers)) Julia worker processes via SLURM" |
| 265 | + finally |
| 266 | + unlock(_g_processops_lock) |
| 267 | + end |
| 268 | +end |
0 commit comments