Skip to content

Commit b0deee8

Browse files
committed
Add AutoThreadPinning
1 parent 0ca27b5 commit b0deee8

6 files changed

+218
-29
lines changed

docs/src/index.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ The elastic cluster manager automatically adds new workers to an automatically c
1414
Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard `Distributed.@everywhere` macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro [`@always_everywhere`](@ref) to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a [`FlexWorkerPool`](@ref) will be updated automatically on `take!` and `onworker`. You can also use [`ensure_procinit`](@ref) to manually update all workers
1515
to all `@always_everywhere` used so far.
1616

17-
The function [`pinthreads_auto`](@ref) (used inside of `@always_everywhere`) provides a convenient way to perform some automatic thread pinning on all processes. Note that it needs to follow an [`import ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/), and that more complex use cases may require customized thread pinning for best performance.
17+
[`AutoThreadPinning`](@ref), in conjunction with the package [`ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/), provides a convenient way to perform automatic thread pinning (e.g. inside of `@always_everywhere`, to apply thead pinning to all processes). Note that `ThreadPinning.pinthreads(AutoThreadPinning())` works on a best-effort basis and that advanced applications may require customized thread pinning for best performance.
1818

1919
Some batch system configurations can result in whole Julia processes, or even a whole batch job, being terminated if a process exceeds its memory limit. In such cases, you can try to gain a softer failure mode by setting a custom (slightly smaller) memory limit using [`memory_limit!`](@ref).
2020

ext/ParallelProcessingToolsThreadPinningExt.jl

+172-12
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ import LinearAlgebra
55
import Distributed
66
import ThreadPinning
77

8+
using ThreadPinning: ispinned, getaffinity,
9+
cpuids, node, core, ncores, socket, nsockets, nnuma, numa, ishyperthread, isefficiencycore,
10+
getcpuids, pinthreads, openblas_getcpuids, openblas_pinthreads
11+
12+
using ThreadPinning.Utility: affinitymask2cpuids
13+
14+
using LinearAlgebra.BLAS: get_num_threads as blas_nthreads
15+
using LinearAlgebra.BLAS: set_num_threads as set_blas_nthreads
16+
using Random
17+
18+
819

920
# ThreadPinning.jl does not fully support all operating systems, currently:
1021
const _threadpinning_supported = try
@@ -17,27 +28,176 @@ end
1728

1829
@static if _threadpinning_supported
1930

20-
function ParallelProcessingTools._pinthreads_auto_impl(::Val{true})
21-
pid = Distributed.myid()
22-
if Distributed.myid() == 1
23-
@debug "On process $pid, leaving Julia threads unpinned"
24-
let n_juliathreads = Threads.nthreads()
25-
if n_juliathreads > 1
26-
LinearAlgebra.BLAS.set_num_threads(n_juliathreads)
27-
end
31+
32+
function _get_core_map()
33+
core_map = IdDict{Int, Int}()
34+
for core_id in 1:ncores()
35+
for cpu_id in core(core_id)
36+
core_map[cpu_id] = core_id
2837
end
38+
end
39+
return core_map
40+
end
41+
42+
43+
function _maybe_set_blas_nthreads(avail_ncores::Integer = 0)
44+
if haskey(ENV, "OPENBLAS_NUM_THREADS")
45+
@info("OPENBLAS_NUM_THREADS set, not changing number of BLAS threads.")
46+
elseif haskey(ENV, "OMP_NUM_THREADS")
47+
@info("OMP_NUM_THREADS set, not changing number of BLAS threads.")
48+
elseif avail_ncores > 0
49+
@info "Setting number of BLAS threads to $avail_ncores (number of physical CPU cores in affinity mask)"
50+
set_blas_nthreads(avail_ncores)
2951
else
30-
@debug "On process $pid, pinning threads according to affinity mask"
31-
let available_cpus = _ThreadPinning.Utility.affinitymask2cpuids(ThreadPinning.getaffinity())
32-
ThreadPinning.pinthreads(:affinitymask)
33-
LinearAlgebra.BLAS.set_num_threads(length(available_cpus))
52+
n = Threads.nthreads()
53+
if blas_nthreads() != n
54+
@info "Setting number of BLAS threads to $n (same as number of Julia threads)"
55+
set_blas_nthreads(n)
3456
end
3557
end
3658
end
3759

3860

61+
function _pin_threads_to(unsorted_sel_cpus_julia::AbstractVector{Int}, unsorted_sel_cpus_blas::AbstractVector{Int}, pin_blas::Bool)
62+
sel_cpus_julia = sort(unsorted_sel_cpus_julia)
63+
sel_cpus_blas = sort(unsorted_sel_cpus_blas)
64+
if length(sel_cpus_julia) >= Threads.nthreads()
65+
@info "Pinning Julia threads to CPU IDs $sel_cpus_julia"
66+
pinthreads(sel_cpus_julia)
67+
68+
if pin_blas
69+
if isempty(intersect(sel_cpus_julia, sel_cpus_blas))
70+
if length(sel_cpus_blas) >= blas_nthreads()
71+
@info "Pinning OpenBLAS threads to CPU IDs $sel_cpus_blas"
72+
openblas_pinthreads(sel_cpus_blas)
73+
# Partial mitigation for ThreadPinning issue #105, ensure
74+
# Julia threads are pinned correcty, at least:
75+
pinthreads(sel_cpus_julia)
76+
else
77+
@warn "Can't pin $(blas_nthreads()) BLAS threads, found only $(length(sel_cpus_blas)) suitable CPU IDs."
78+
end
79+
else
80+
@warn "Won't pin BLAS threads on same CPU IDs as the Julia threads"
81+
end
82+
end
83+
else
84+
@warn "Can't pin $(Threads.nthreads()) Julia threads, found only $(length(sel_cpus_julia)) suitable CPU IDs."
85+
end
86+
end
87+
88+
89+
function _log_thread_pinning()
90+
if ispinned()
91+
julia_cpuids = sort(getcpuids())
92+
@info "Julia threads pinned to CPU IDs $julia_cpuids"
93+
else
94+
@info "Julia threads not pinned."
95+
end
96+
97+
try
98+
blas_cpuids = sort(openblas_getcpuids())
99+
@info "OpenBLAS threads pinned to CPU IDs $blas_cpuids"
100+
catch err
101+
if err isa ErrorException
102+
if contains(err.msg, "could not load library")
103+
@warn "Could not get OpenBLAS thread pinning information"
104+
else
105+
@info "OpenBLAS threads don't seem to be pinned."
106+
end
107+
else
108+
rethrow()
109+
end
110+
end
111+
end
112+
113+
114+
function ThreadPinning.pinthreads(mode::ParallelProcessingTools.AutoThreadPinning)
115+
pin_blas = mode.blas
116+
117+
if ispinned()
118+
@info "Thread pinning already in effect, not changing it."
119+
elseif any(iszero, getaffinity())
120+
@info "Thread affinity mask available, using it."
121+
122+
core_map = _get_core_map()
123+
# Order Index that has hyperthread CPU IDs last:
124+
cpuid_order_idx = IdDict(cpu_id => order_idx for (order_idx, cpu_id) in pairs(node()))
125+
126+
avail_cpuids = sort(affinitymask2cpuids(getaffinity()), by = i -> cpuid_order_idx[i])
127+
avail_ncores = length(unique([core_map[i] for i in avail_cpuids]))
128+
129+
_maybe_set_blas_nthreads(avail_ncores)
130+
131+
sel_cpus_julia = avail_cpuids[begin:begin+Threads.nthreads()-1]
132+
sel_cpus_blas = avail_cpuids[end-blas_nthreads()+1:end]
133+
_pin_threads_to(sel_cpus_julia, sel_cpus_blas, pin_blas)
134+
elseif Threads.nthreads() < 2
135+
@info "Julia running single-threaded with no thread affinity mask, not pinning threads."
136+
elseif mode.random
137+
_maybe_set_blas_nthreads()
138+
n_julia_rest::Int = Threads.nthreads()
139+
n_blas_rest::Int = blas_nthreads()
140+
sel_cpus_julia = Int[]
141+
sel_cpus_blas = Int[]
142+
for sid in shuffle(1:nsockets())
143+
socket_cpus = socket(sid)
144+
for nid in shuffle(1:nnuma())
145+
cids = filter(!isefficiencycore, intersect(socket_cpus, numa(nid)))
146+
if !isempty(cids)
147+
mainthreads_here = filter(!ishyperthread, cids)
148+
perm = shuffle(eachindex(mainthreads_here))
149+
mainthreads_here = mainthreads_here[perm]
150+
151+
hyperthreads_here = filter(ishyperthread, cids)
152+
if axes(hyperthreads_here) == axes(mainthreads_here)
153+
hyperthreads_here = hyperthreads_here[perm]
154+
else
155+
hyperthreads_here = shuffle(hyperthreads_here)
156+
end
157+
158+
julia_threadsource = mainthreads_here
159+
blas_threadsource = !isempty(hyperthreads_here) ? hyperthreads_here : mainthreads_here
160+
if n_julia_rest > 0
161+
n_julia_here = min(n_julia_rest, length(julia_threadsource))
162+
append!(sel_cpus_julia, julia_threadsource[1:n_julia_here])
163+
n_julia_rest -= n_julia_here
164+
end
165+
if n_blas_rest > 0
166+
n_blas_here = min(n_blas_rest, length(blas_threadsource))
167+
append!(sel_cpus_blas, blas_threadsource[1:n_blas_here])
168+
n_blas_rest -= n_blas_here
169+
end
170+
!(n_julia_rest > 0) && !(n_blas_rest > 0) && break
171+
end
172+
end
173+
!(n_julia_rest > 0) && !(n_blas_rest > 0) && break
174+
end
175+
176+
_pin_threads_to(sel_cpus_julia, sel_cpus_blas, pin_blas)
177+
else
178+
_maybe_set_blas_nthreads()
179+
@info "No thread affinity set and random pinning not enabled, not pinning threads."
180+
end
181+
182+
_log_thread_pinning()
183+
184+
return nothing
185+
end
186+
187+
function ParallelProcessingTools._pinthreads_auto_impl(::Val{true})
188+
pinthreads(ParallelProcessingTools.AutoThreadPinning())
189+
end
190+
39191
ParallelProcessingTools._getcpuids_impl(::Val{true}) = ThreadPinning.getcpuids()
40192

193+
194+
else #! _threadpinning_supported
195+
196+
197+
ThreadPinning.pinthreads(ParallelProcessingTools.AutoThreadPinning) = nothing
198+
199+
41200
end # if _threadpinning_supported
201+
42202

43203
end # module ChangesOfVariablesInverseFunctionsExt

src/deprecated.jl

+9
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,12 @@ export @mp_async
4747

4848

4949
@deprecate isdefined_local(x) isassigned(x)
50+
51+
52+
@noinline function pinthreads_auto()
53+
Base.depwarn("`pinthreads_auto()` is deprecated, use `ThreadPinning.pinthreads(AutoThreadPinning())` instead.",:pinthreads_auto)
54+
_pinthreads_auto_impl(Val(true))
55+
end
56+
export pinthreads_auto
57+
58+
_pinthreads_auto_impl(::Val) = nothing

src/runworkers.jl

+33-12
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,41 @@
11
# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT).
22

33

4-
54
"""
6-
pinthreads_auto()
5+
struct AutoThreadPinning
76
8-
!!! note
9-
Only has an effect if
10-
[`ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/) is
11-
loaded, and only on operating systems supported by `ThreadPinning`.
12-
"""
13-
function pinthreads_auto end
14-
export pinthreads_auto
7+
ParallelProcessingTools default thread pinning mode.
8+
9+
Constructor:
10+
11+
```julia
12+
AutoThreadPinning(; random::Bool = false, pin_blas::Bool = false)
13+
```
14+
15+
Arguments:
1516
16-
pinthreads_auto() = _pinthreads_auto_impl(Val(true))
17-
_pinthreads_auto_impl(::Val) = nothing
17+
- `random`: Use system topology based random thread pinning if no thread
18+
affinity mask is set (e.g. via SLURM, `taskset`).
1819
20+
- `blas`: Try to pin BLAS threads. Not fully functional due to bugs in
21+
BLAS thread pinning (see ThreadPinning issue
22+
[#105](https://github.com/carstenbauer/ThreadPinning.jl/issues/105)).
23+
24+
Use with `ThreadPinning.pinthreads`:
25+
26+
```julia
27+
using ParallelProcessingTools, ThreadPinning
28+
pinthreads(AutoThreadPinning())
29+
```
30+
"""
31+
@kwdef struct AutoThreadPinning
32+
random::Bool = false
33+
blas::Bool = false
34+
end
35+
export AutoThreadPinning
1936

2037
_getcpuids() = _getcpuids_impl(Val(true))
21-
_getcpuids_impl(::Val) = nothing
38+
_getcpuids_impl(::Val) = missing
2239

2340

2441
"""
@@ -28,6 +45,10 @@ Get the distributed Julia worker process resources currently available.
2845
2946
This may take some time as some code needs to be loaded on all processes.
3047
Automatically runs `ensure_procinit()` before querying worker resources.
48+
49+
Note: CPU ID information will only be available if
50+
[`ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl) is
51+
loaded.
3152
"""
3253
function worker_resources()
3354
ensure_procinit()

test/test_deprecated.jl

+2
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,6 @@ include("testtools.jl")
5252
end
5353
end
5454
rmprocs(pids)
55+
56+
@test_deprecated pinthreads_auto() isa Nothing
5557
end

test/test_ext_threadpinning.jl

+1-4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,5 @@ using Test
66
import ThreadPinning
77

88
@testset "ext_threadpinning" begin
9-
ParallelProcessingToolsThreadPinningExt = Base.get_extension(ParallelProcessingTools, :ParallelProcessingToolsThreadPinningExt)
10-
@test ParallelProcessingToolsThreadPinningExt isa Module
11-
12-
@test pinthreads_auto() isa Nothing
9+
@test ThreadPinning.pinthreads(AutoThreadPinning()) isa Nothing
1310
end

0 commit comments

Comments
 (0)