From 1613073fdb27b40eecace5e767f4c80cd094da04 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 14:48:51 +0200 Subject: [PATCH 1/9] Require Julia v1.6, update CI --- .github/workflows/CompatHelper.yml | 43 +++++++++++++---- .github/workflows/TagBot.yml | 5 ++ .github/workflows/ci.yml | 75 ++++++++++++++---------------- Project.toml | 9 +--- README.md | 1 + docs/Project.toml | 3 +- docs/make.jl | 12 ++++- docs/src/api.md | 14 +++--- test/Project.toml | 8 ++++ test/REQUIRE | 2 - test/runtests.jl | 6 ++- test/test_aqua.jl | 16 +++++++ test/test_docs.jl | 13 ++++++ 13 files changed, 137 insertions(+), 70 deletions(-) create mode 100644 test/Project.toml delete mode 100644 test/REQUIRE create mode 100644 test/test_aqua.jl create mode 100644 test/test_docs.jl diff --git a/.github/workflows/CompatHelper.yml b/.github/workflows/CompatHelper.yml index 1f8a579..8ad0284 100644 --- a/.github/workflows/CompatHelper.yml +++ b/.github/workflows/CompatHelper.yml @@ -1,20 +1,45 @@ name: CompatHelper - on: schedule: - - cron: '00 00 * * *' - push: - branches: - - actions/trigger/CompatHelper + - cron: 0 0 * * * workflow_dispatch: +permissions: + contents: write + pull-requests: write jobs: CompatHelper: runs-on: ubuntu-latest steps: - - name: Pkg.add("CompatHelper") - run: julia -e 'using Pkg; Pkg.add("CompatHelper")' - - name: CompatHelper.main() + - name: Check if Julia is already available in the PATH + id: julia_in_path + run: which julia + continue-on-error: true + - name: Install Julia, but only if it is not already available in the PATH + uses: julia-actions/setup-julia@v2 + with: + version: '1' + arch: ${{ runner.arch }} + if: steps.julia_in_path.outcome != 'success' + - name: "Add the General registry via Git" + run: | + import Pkg + ENV["JULIA_PKG_SERVER"] = "" + Pkg.Registry.add("General") + shell: julia --color=yes {0} + - name: "Install CompatHelper" + run: | + import Pkg + name = "CompatHelper" + uuid = "aa819f21-2bde-4658-8897-bab36330d9b7" + version = "3" + Pkg.add(; name, uuid, version) + shell: julia --color=yes {0} + - name: "Run CompatHelper" + run: | + import CompatHelper + CompatHelper.main() + shell: julia --color=yes {0} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} COMPATHELPER_PRIV: ${{ secrets.DOCUMENTER_KEY }} - run: julia -e 'using CompatHelper; CompatHelper.main()' + # COMPATHELPER_PRIV: ${{ secrets.COMPATHELPER_PRIV }} diff --git a/.github/workflows/TagBot.yml b/.github/workflows/TagBot.yml index f49313b..f389611 100644 --- a/.github/workflows/TagBot.yml +++ b/.github/workflows/TagBot.yml @@ -4,6 +4,11 @@ on: types: - created workflow_dispatch: + inputs: + lookback: + default: 3 +permissions: + contents: write jobs: TagBot: if: github.event_name == 'workflow_dispatch' || github.actor == 'JuliaTagBot' diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 07f5768..e71c285 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,7 @@ name: CI +env: + JULIA_NUM_THREADS: 2 on: push: branches: @@ -10,6 +12,12 @@ on: pull_request: release: +concurrency: + # Skip intermediate builds: always. + # Cancel intermediate builds: only if it is a pull request build. + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }} + jobs: test: name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} @@ -19,68 +27,53 @@ jobs: fail-fast: false matrix: version: - - '1.0' + - '1.6' - '1' - 'nightly' os: - ubuntu-latest - - macOS-latest - - windows-latest arch: - x64 - - x86 - exclude: - # 32-bit Julia binaries are not available on macOS - - os: macOS-latest - arch: x86 - - os: windows-latest - arch: x86 - - version: nightly + include: + - version: 1 + os: ubuntu-latest arch: x86 + - version: 1 + os: macOS-latest + arch: x64 + - version: 1 + os: windows-latest + arch: x64 steps: - - uses: actions/checkout@v2 - - uses: julia-actions/setup-julia@latest + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v2 with: version: ${{ matrix.version }} arch: ${{ matrix.arch }} - - name: Cache artifacts - uses: actions/cache@v2 - env: - cache-name: cache-artifacts + - uses: julia-actions/cache@v1 + - uses: julia-actions/julia-buildpkg@v1 + - uses: julia-actions/julia-runtest@v1 with: - path: ~/.julia/artifacts - key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} - restore-keys: | - ${{ runner.os }}-test-${{ env.cache-name }}- - ${{ runner.os }}-test- - ${{ runner.os }}- - - uses: julia-actions/julia-buildpkg@latest - - uses: julia-actions/julia-runtest@latest + coverage: ${{ matrix.version == '1' && matrix.os == 'ubuntu-latest' && matrix.arch == 'x64' }} - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v1 + if: matrix.version == '1' && matrix.os == 'ubuntu-latest' && matrix.arch == 'x64' + - uses: codecov/codecov-action@v4 + if: matrix.version == '1' && matrix.os == 'ubuntu-latest' && matrix.arch == 'x64' with: + fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} file: lcov.info docs: name: Documentation runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: julia-actions/setup-julia@latest + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v2 with: version: '1' - - name: Cache artifacts - uses: actions/cache@v2 - env: - cache-name: cache-artifacts - with: - path: ~/.julia/artifacts - key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/docs/Project.toml') }} - restore-keys: | - ${{ runner.os }}-test-${{ env.cache-name }}- - ${{ runner.os }}-test- - ${{ runner.os }}- - - uses: julia-actions/julia-buildpkg@latest - - uses: julia-actions/julia-docdeploy@latest + - uses: julia-actions/cache@v1 + - uses: julia-actions/julia-buildpkg@v1 + - uses: julia-actions/julia-docdeploy@v1 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed due to https://github.com/JuliaDocs/Documenter.jl/issues/1177 diff --git a/Project.toml b/Project.toml index eb410d8..a9cb1fd 100644 --- a/Project.toml +++ b/Project.toml @@ -6,10 +6,5 @@ version = "0.4.3" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" [compat] -julia = "1" - -[extras] -Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" - -[targets] -test = ["Test"] +Distributed = "1" +julia = "1.6" diff --git a/README.md b/README.md index 49380a5..e658b7d 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ [![License](http://img.shields.io/badge/license-MIT-brightgreen.svg?style=flat)](LICENSE.md) [![Build Status](https://github.com/oschulz/ParallelProcessingTools.jl/workflows/CI/badge.svg?branch=main)](https://github.com/oschulz/ParallelProcessingTools.jl/actions?query=workflow%3ACI) [![Codecov](https://codecov.io/gh/oschulz/ParallelProcessingTools.jl/branch/main/graph/badge.svg)](https://codecov.io/gh/oschulz/ParallelProcessingTools.jl) +[![Aqua QA](https://raw.githubusercontent.com/JuliaTesting/Aqua.jl/master/badge.svg)](https://github.com/JuliaTesting/Aqua.jl) This Julia package provides some tools to ease multithreaded and distributed programming, especially for more complex use cases and when using multiple processes with multiple threads on each process. It also provides functions and macros designed to ease the transition to the new multi-threading model introduced in Julia v1.3. diff --git a/docs/Project.toml b/docs/Project.toml index ba02242..935ec3b 100644 --- a/docs/Project.toml +++ b/docs/Project.toml @@ -1,6 +1,7 @@ [deps] +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a" [compat] -Documenter = "~0.26" +Documenter = "1" diff --git a/docs/make.jl b/docs/make.jl index 2cca2df..78497aa 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -7,6 +7,14 @@ using Documenter using ParallelProcessingTools +# Doctest setup +DocMeta.setdocmeta!( + ParallelProcessingTools, + :DocTestSetup, + :(using ParallelProcessingTools); + recursive=true, +) + makedocs( sitename = "ParallelProcessingTools", modules = [ParallelProcessingTools], @@ -20,8 +28,8 @@ makedocs( "LICENSE" => "LICENSE.md", ], doctest = ("fixdoctests" in ARGS) ? :fix : true, - linkcheck = ("linkcheck" in ARGS), - strict = !("nonstrict" in ARGS), + linkcheck = !("nonstrict" in ARGS), + warnonly = ("nonstrict" in ARGS), ) deploydocs( diff --git a/docs/src/api.md b/docs/src/api.md index 677c1f2..f8aa26d 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -1,15 +1,15 @@ # API -```@meta -DocTestSetup = quote - using ParallelProcessingTools -end +## Modules + +```@index +Order = [:module] ``` -## Types +## Types and constants ```@index -Order = [:type] +Order = [:type, :constant] ``` ## Functions and macros @@ -22,5 +22,5 @@ Order = [:macro, :function] ```@autodocs Modules = [ParallelProcessingTools] -Order = [:type, :macro, :function] +Order = [:module, :type, :constant, :macro, :function] ``` diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 0000000..74d8dd4 --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,8 @@ +[deps] +Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[compat] +Documenter = "1" diff --git a/test/REQUIRE b/test/REQUIRE deleted file mode 100644 index 2fc270f..0000000 --- a/test/REQUIRE +++ /dev/null @@ -1,2 +0,0 @@ -DistributedArrays -StatsBase diff --git a/test/runtests.jl b/test/runtests.jl index 4fe3298..949449c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,12 +1,16 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). import Test + Test.@testset "Package ParallelProcessingTools" begin @info "Testing with $(Base.Threads.nthreads()) Julia threads." + include("test_aqua.jl") include("test_threadsafe.jl") include("test_threadlocal.jl") include("test_workpartition.jl") include("test_onthreads.jl") include("test_onprocs.jl") -end + include("test_docs.jl") +end # testset + diff --git a/test/test_aqua.jl b/test/test_aqua.jl new file mode 100644 index 0000000..9a4e2b1 --- /dev/null +++ b/test/test_aqua.jl @@ -0,0 +1,16 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +import Test +import Aqua +import ParallelProcessingTools + +Test.@testset "Package ambiguities" begin + Test.@test isempty(Test.detect_ambiguities(ParallelProcessingTools)) +end # testset + +Test.@testset "Aqua tests" begin + Aqua.test_all( + ParallelProcessingTools, + ambiguities = true + ) +end # testset diff --git a/test/test_docs.jl b/test/test_docs.jl new file mode 100644 index 0000000..ff79581 --- /dev/null +++ b/test/test_docs.jl @@ -0,0 +1,13 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools +import Documenter + +Documenter.DocMeta.setdocmeta!( + ParallelProcessingTools, + :DocTestSetup, + :(using ParallelProcessingTools); + recursive=true, +) +Documenter.doctest(ParallelProcessingTools) From cc050a008bf44ac395463aa6c1420057f8cdb470 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 15:33:09 +0200 Subject: [PATCH 2/9] Clear docs main page --- docs/src/index.md | 160 +--------------------------------------------- 1 file changed, 1 insertion(+), 159 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index c17b1af..62656cb 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -1,161 +1,3 @@ # ParallelProcessingTools.jl -This Julia package provides some tools to ease multithreaded and distributed programming, especially for more complex use cases and when using multiple processes with multiple threads on each process. - -This package follows the SPMD (Single Program Multiple Data) paradigm (like, e.g MPI, Cuda, OpenCL and -`DistributedArrays.SPMD`): Run the same code on every execution unit (process or thread) and make the code responsible for figuring out which part of the data it should process. This differs from the approach of `Base.Threads.@threads` and `Distributed.@distributed`. SPMD is more appropriate for complex cases that the latter do not handle well (e.g. because some initial setup is required on each execution unit and/or iteration scheme over the data is more complex, control over SIMD processing is required, etc.). - -This package also implements thread-local variables and tooling to handle non-thread-safe code. - -In addition, `ParallelProcessingTools` provides functions and macros designed to ease the transition to the new multi-threading model introduced in Julia v1.3. - -Note: Some features may not work on Windows, currently. - - -## Work partitions - -`workpart` partitions an `AbstractArray` across a a specified set of workers (i.e. processes or threads). E.g. - -```julia -A = rand(100) -workpart(A, 4:7, 5) == view(A, 26:50) -``` - -returns a views into the array that worker `5` out of a set or workers `4:7` will be responsible for. The intended usage is - -```julia -using Distributed -@everywhere using Base.Threads, ParallelProcessingTools -@everywhere data = rand(1000) -@everywhere procsel = workers() -@onprocs procsel begin - sub_A = workpart(data, procsel, myid()) - threadsel = allthreads() - @onthreads threadsel begin - # ... some initialization, create local buffers, etc. - idxs = workpart(eachindex(sub_A), threadsel, threadid()) - for i in idxs - # ... A[i] ... - end - end -end -``` - -see below for a full example. - -If `data` is a `DistributedArrays.DArray`, then `DistributedArrays.localpart(data)` should be used instead of `workpart(data, workers(), myid())`. - - -## Thread-safety - -Use `@critical` to mark non thread-safe code, e.g. for logging. For example - -```julia -@onthreads allthreads() begin - @critical @info Base.Threads.threadid() -end -``` - -would crash Julia without `@critical` because `@info` is not thread-safe. - -Note: This doesn't always work for multithreaded code on other processes yet. - - -## Thread-local variables - -Thread-local variable can be created and initialized via - -```julia -tl = ThreadLocal(0.0) -``` - -The API is the similar to `Ref`: `tl[]` gets the value of `tl` for the current thread, `tl[] = 4.2` sets the value for the current thread. `getallvalues(tl)` returns the values for all threads as a vector, and can only be called from single-threaded code. - - -## Multithreaded code execution - -The macro `@onthreads threadsel expr` will run the code in `expr` on the threads in `threadsel` (typically a range of thread IDs). For convenience, the package exports `allthreads() = 1:nthreads()`. Here's a simple example on how to use thread-local variables and `@onthreads` to sum up numbers in parallel: - -```julia -tlsum = ThreadLocal(0.0) -data = rand(100) -@onthreads allthreads() begin - tlsum[] = sum(workpart(data, allthreads(), Base.Threads.threadid())) -end -sum(getallvalues(tlsum)) ≈ sum(data) -``` - -`@onthreads` forwards exceptions thrown by the code in `expr` to the caller (in contrast to, `Base.Threads.@threads`, that will currently print an exception but not forward it, so when using `@threads` program execution simply continues after a failure in multithreaded code). - -Note: Julia can currently run only one function on multiple threads at the same time (this restriction is likely to disappear in the the future). So even if `threadsel` does not include all threads, the rest of the threads will be idle but blocked and cannot be used to run other code in parallel. However, the ability to run on a subset of the available threads is still useful to measure the scaling behavior of multithreaded code (without restarting Julia with a different value for `$JULIA_NUM_THREADS`). - - - -## Multiprocess code execution - -The macro `@onprocs procsel expr` will run the code in `expr` on the processes in `procsel` (typically an -array of process IDs). `@onprocs` returns a vector with the result of `expr` on each process and -will wait until all the results are available (but may of course be wrapped in `@async`). A -simple example to get the process ID on each worker process: - -```julia -using Distributed -addprocs(2) -workers() == @onprocs workers() myid() -``` - -Note: If the data can be expressed in terms of a `DistributedArrays.DArray`, it may be more appropriate and convenient to use the multiprocess execution tooling available in the package `DistributedArrays` (possibly combined with `ParallelProcessingTools.@onthreads`). - - -### Example use case: - -As a simple real-world use case, let's histogram distributed data on multiple processes and threads: - -Set up a cluster of multithreaded workers and load the required packages: - -```julia -using Distributed, ParallelProcessingTools -addprocs(2) -@everywhere using ParallelProcessingTools, Base.Threads, - DistributedArrays, Statistics, StatsBase -``` - -Create some distributed data and check how the data is distributed: - -```julia -data = drandn(10^8) -procsel = procs(data) -@onprocs procsel size(localpart(data)) -``` - -Check the number of threads on each worker holding a part of the data: - -```julia -@onprocs procsel nthreads() -``` - -Create histograms in parallel on all threads of all workers and merge: - -```julia -proc_hists = @onprocs procsel begin - local_data = localpart(data) - tl_hist = ThreadLocal(Histogram((-6:0.1:6,), :left)) - @onthreads allthreads() begin - data_for_this_thread = workpart(local_data, allthreads(), threadid()) - append!(tl_hist[], data_for_this_thread) - end - merged_hist = merge(getallvalues(tl_hist)...) -end -final_hist = merge(proc_hists...) -``` - -Check result: - -``` -sum(final_hist.weights) ≈ length(data) - -using Plots -plot(final_hist) -``` - -Note: This example is meant to show how to combine the features of this package. The multi-process part of this particular use case can be written in a simpler way using functionality from `DistributedArrays`. +This Julia package provides some tools to ease multithreaded and distributed programming. From b119f34d13e5880bf538dcce372979761b0d1d01 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 15:33:37 +0200 Subject: [PATCH 3/9] Deprecate macros mp_async and mt_async --- src/ParallelProcessingTools.jl | 1 + src/deprecated.jl | 49 ++++++++++++++++++++++++++++++++ src/onprocs.jl | 25 ----------------- src/onthreads.jl | 41 ++------------------------- src/threadlocal.jl | 3 -- test/runtests.jl | 2 +- test/test_deprecated.jl | 51 ++++++++++++++++++++++++++++++++++ test/test_onprocs.jl | 14 ---------- test/test_onthreads.jl | 13 --------- 9 files changed, 104 insertions(+), 95 deletions(-) create mode 100644 src/deprecated.jl create mode 100644 test/test_deprecated.jl diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 6c3f3ba..506640f 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -12,5 +12,6 @@ include("threadlocal.jl") include("onthreads.jl") include("onprocs.jl") include("workpartition.jl") +include("deprecated.jl") end # module diff --git a/src/deprecated.jl b/src/deprecated.jl new file mode 100644 index 0000000..0cef2ea --- /dev/null +++ b/src/deprecated.jl @@ -0,0 +1,49 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + + +macro onallthreads(expr) + quote + Base.depwarn("`@onallthreads expr` is deprecated, use `@onthreads allthreads() expr` instead.", nothing) + @onthreads allthreads() $(esc(expr)) + end +end +export @onallthreads + + +macro mt_async(expr) + # Code taken from Base.@async and Base.Threads.@spawn: + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + Base.depwarn("`@mt_async expr` is deprecated, use `Base.Threads.@spawn expr` instead.", nothing) + local task = Task($thunk) + @static if VERSION >= v"1.3.0-alpha.0" + task.sticky = false + end + if $(Expr(:isdefined, var)) + push!($var, task) + end + schedule(task) + task + end +end +export @mt_async + + +macro mp_async(expr) + # Code taken from Distributed.@spawn: + thunk = esc(:(()->($expr))) + var = esc(Base.sync_varname) + quote + Base.depwarn("`@mp_async expr` is deprecated, use `Distributed.@spawn expr` instead.", nothing) + local ref = Distributed.spawn_somewhere($thunk) + if $(Expr(:isdefined, var)) + push!($var, ref) + end + ref + end +end +export @mp_async + + +@deprecate isdefined_local(x) isassigned(x) diff --git a/src/onprocs.jl b/src/onprocs.jl index da70cd1..9933a4f 100644 --- a/src/onprocs.jl +++ b/src/onprocs.jl @@ -31,28 +31,3 @@ macro onprocs(procsel, expr) end end export @onprocs - - -""" - @mp_async expr - -Run `expr` asynchronously on a worker process. - -Compatible with `@sync`. - -Equivalent to `Distributed.@spawn expr` on Julia <= v1.2, equivalent to -`Distributed.@spawn :any expr` on Julia >= v1.3. -""" -macro mp_async(expr) - # Code taken from Distributed.@spawn: - thunk = esc(:(()->($expr))) - var = esc(Base.sync_varname) - quote - local ref = Distributed.spawn_somewhere($thunk) - if $(Expr(:isdefined, var)) - push!($var, ref) - end - ref - end -end -export @mp_async diff --git a/src/onthreads.jl b/src/onthreads.jl index b97ca04..9961689 100644 --- a/src/onthreads.jl +++ b/src/onthreads.jl @@ -178,15 +178,6 @@ end export @onthreads -macro onallthreads(expr) - quote - Base.depwarn("`@onallthreads expr` is deprecated, use `@onthreads allthreads() expr` instead.", nothing) - $(_thread_exec_func(:(ParallelProcessingTools.allthreads()), expr)) - end -end -export @onallthreads - - function ThreadLocal{T}(f::Base.Callable) where {T} result = ThreadLocal{T}(undef) result.value @@ -195,34 +186,6 @@ function ThreadLocal{T}(f::Base.Callable) where {T} end -""" - @mt_async expr - -Spawn a Julia task running `expr` asynchronously. - -Compatible with `@sync`. Uses a multi-threaded task scheduler if available (on -Julia >= v1.3). - -Equivalent to `Base.@async` on Julia <= v1.2, equivalent to -`Base.Threads.@spawn` on Julia >= v1.3. -""" -macro mt_async(expr) - # Code taken from Base.@async and Base.Threads.@spawn: - thunk = esc(:(()->($expr))) - var = esc(Base.sync_varname) - quote - local task = Task($thunk) - @static if VERSION >= v"1.3.0-alpha.0" - task.sticky = false - end - if $(Expr(:isdefined, var)) - push!($var, task) - end - schedule(task) - task - end -end -export @mt_async """ @@ -257,14 +220,14 @@ macro mt_out_of_order(ex) trg = exprs[i].args[1] val = exprs[i].args[2] if val isa Expr - exprs[i] = :(push!($tasks, @mt_async($(esc(val))))) + exprs[i] = :(push!($tasks, Base.Threads.@spawn($(esc(val))))) push!(handle_results, :($(esc(trg)) = fetch(popfirst!($tasks)))) else exprs[i] = esc(exprs[i]) end elseif exprs[i] isa Expr ftvar = gensym() - exprs[i] = :(push!($tasks, @mt_async($(esc(exprs[i]))))) + exprs[i] = :(push!($tasks, Base.Threads.@spawn($(esc(exprs[i]))))) push!(handle_results, :(wait(popfirst!($tasks)))) else exprs[i] = esc(exprs[i]) diff --git a/src/threadlocal.jl b/src/threadlocal.jl index 288a638..95593f5 100644 --- a/src/threadlocal.jl +++ b/src/threadlocal.jl @@ -21,9 +21,6 @@ abstract type AbstractThreadLocal{T} end export AbstractThreadLocal -@deprecate isdefined_local(x) isassigned(x) - - """ getlocalvalue(x::Any) = x getlocalvalue(x::ThreadLocal) = x[] diff --git a/test/runtests.jl b/test/runtests.jl index 949449c..11cf428 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,6 +11,6 @@ Test.@testset "Package ParallelProcessingTools" begin include("test_workpartition.jl") include("test_onthreads.jl") include("test_onprocs.jl") + include("test_deprecated.jl") include("test_docs.jl") end # testset - diff --git a/test/test_deprecated.jl b/test/test_deprecated.jl new file mode 100644 index 0000000..0b91283 --- /dev/null +++ b/test/test_deprecated.jl @@ -0,0 +1,51 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using Distributed + + +@testset "deprecated" begin + function do_work(n) + if n < 0 + throw(ArgumentError("n must be >= 0")) + end + s::Float64 = 0 + for i in 1:n + if n % 1000 == 0 + yield() + end + s += log(abs(asin(sin(Complex(log(i), log(i))))) + 1) + end + s + end + + @testset "macro mt_async" begin + @test_deprecated begin + n = 128 + A = zeros(n) + @sync for i in eachindex(A) + @mt_async begin + do_work(10^3) + A[i] = log(i) + end + end + A == log.(1:n) + end + end + + @testset "macro mp_async" begin + @test_deprecated begin + n = 128 + A = Vector{Future}(undef, n) + @sync for i in 1:n + A[i] = @mp_async begin + @assert myid() != 1 + log(i) + end + end + fetch.(A) == log.(1:n) + end + end +end diff --git a/test/test_onprocs.jl b/test/test_onprocs.jl index ef637e7..fd36b06 100644 --- a/test/test_onprocs.jl +++ b/test/test_onprocs.jl @@ -30,20 +30,6 @@ using Distributed end) == ref_result end - @testset "macro mp_async" begin - @test begin - n = 128 - A = Vector{Future}(undef, n) - @sync for i in 1:n - A[i] = @mp_async begin - @assert myid() != 1 - log(i) - end - end - fetch.(A) == log.(1:n) - end - end - @testset "Examples" begin @test begin workers() == (@onprocs workers() myid()) diff --git a/test/test_onthreads.jl b/test/test_onthreads.jl index 0a186c0..b195f27 100644 --- a/test/test_onthreads.jl +++ b/test/test_onthreads.jl @@ -35,19 +35,6 @@ using Base.Threads end) == 1:nthreads() end - @testset "macro mt_async" begin - @test begin - n = 128 - A = zeros(n) - @sync for i in eachindex(A) - @mt_async begin - do_work(10^3) - A[i] = log(i) - end - end - A == log.(1:n) - end - end @testset "macro mt_out_of_order" begin @test begin From af4b813872fd8ced0e4bd699311a53d5c417e757 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 15:49:31 +0200 Subject: [PATCH 4/9] Remove code specific to older Julia versions --- src/onthreads.jl | 89 ++--------------------------------------- src/threadlocal.jl | 10 +---- src/threadsafe.jl | 34 +--------------- test/test_threadsafe.jl | 8 ---- 4 files changed, 6 insertions(+), 135 deletions(-) diff --git a/src/onthreads.jl b/src/onthreads.jl index 9961689..83ec6c5 100644 --- a/src/onthreads.jl +++ b/src/onthreads.jl @@ -7,9 +7,6 @@ function _current_thread_selected(threadsel::Union{Integer,AbstractVector{<:Inte end -@static if VERSION >= v"1.3.0-alpha.0" - - # From Julia PR 32477: function _run_on(t::Task, tid) @assert !istaskstarted(t) @@ -52,82 +49,12 @@ function _thread_exec_func(threadsel, expr) end -else #VERSION < v"1.3.0-alpha.0" - - -const _thread_local_error_err = ThreadLocal{Any}(undef) -const _thread_local_error_set = ThreadLocal{Bool}(undef) - - -_clear_thread_local_errors() = fill!(getallvalues(_thread_local_error_set), false) - -function _check_thread_local_errors() - i = something(findfirst(isequal(true), getallvalues(_thread_local_error_set)), 0) - (i > 0) && throw(getallvalues(_thread_local_error_err)[i]) - nothing -end - -function _set_thread_local_error(err) - _thread_local_error_err[] = err - _thread_local_error_set[] = true -end - -function _check_threadsel(threadsel::Union{Integer,AbstractVector{<:Integer}}) - if !checkindex(Bool, allthreads(), threadsel) - throw(ArgumentError("Thread selection not within available threads")) - end - threadsel -end - - -function _run_on_threads(f) - try - @assert(!Base.Threads.in_threaded_loop[], "Can't nest threaded execution") - _clear_thread_local_errors() - Base.Threads.in_threaded_loop[] = true - ccall(:jl_threading_run, Ref{Cvoid}, (Any,), f) - finally - Base.Threads.in_threaded_loop[] = false - _check_thread_local_errors() - end -end - - -function _thread_exec_func(threadsel, expr) - quote - local thread_body_wrapper_fun - let threadsel_eval = $(esc(threadsel)) - function thread_body_wrapper_fun() - try - if Base.Threads.threadid() in threadsel_eval - $(esc(expr)) - end - catch err - _set_thread_local_error(err) - rethrow() - end - end - if _current_thread_selected(threadsel_eval) - thread_body_wrapper_fun() - else - _run_on_threads(thread_body_wrapper_fun) - end - nothing - end - end -end - - -end # Julia version-dependent code - - - """ allthreads() -Convencience function, returns `1:Base.Threads.nthreads()`. +Convencience function, returns an equivalent of `1:Base.Threads.nthreads()`. """ -allthreads() = 1:nthreads() +allthreads() = Base.OneTo(Base.Threads.nthreads()) export allthreads @@ -140,14 +67,6 @@ Execute code in `expr` in parallel on the threads in `threadsel`. If `threadsel == Base.Threads.threadid()`, `expr` is run on the current tread with only minimal overhead. -Note: Currently, multiple `@onthreads` sections will not run in parallel -to each other, even if they use disjunct sets of threads, due to limitations -of the Julia multithreading implementation. This restriction is likely to -disappear in future Julia versions. - -In contrast to `Base.Threads.@threads`, `@onthreads` does forward -exceptions to the caller. - Example 1: ```juliaexpr @@ -191,8 +110,8 @@ end """ @mt_out_of_order begin expr... end -Runs all top-level expressions in `begin expr... end` on parallel tasks. -On Julia >= v1.3, the tasks will run multi-threaded. +Runs all top-level expressions in `begin expr... end` on parallel +multi-threaded tasks. Example: diff --git a/src/threadlocal.jl b/src/threadlocal.jl index 95593f5..a6252a3 100644 --- a/src/threadlocal.jl +++ b/src/threadlocal.jl @@ -124,13 +124,5 @@ Base.get!(x::ThreadLocal, default) = get!(() -> default, x) Base.isassigned(x::ThreadLocal) = isassigned(x.value, threadid()) function getallvalues(x::ThreadLocal) - @static if VERSION >= v"1.3.0-alpha.0" - x.value - else - if !Base.Threads.in_threaded_loop[] - x.value - else - throw(InvalidStateException("Can not access thread local values across threads in multi-threaded code sections")) - end - end + x.value end diff --git a/src/threadsafe.jl b/src/threadsafe.jl index 5d3f826..9e66308 100644 --- a/src/threadsafe.jl +++ b/src/threadsafe.jl @@ -6,39 +6,7 @@ abstract type ThreadSafe{T} end export ThreadSafeReentrantLock - -@static if VERSION >= v"1.2-DEV.28" - const ThreadSafeReentrantLock = ReentrantLock -else - struct ThreadSafeReentrantLock - thread_lock::RecursiveSpinLock - task_lock::ReentrantLock - - ThreadSafeReentrantLock() = new(RecursiveSpinLock(), ReentrantLock()) - end - - function Base.lock(l::ThreadSafeReentrantLock) - # @debug "LOCKING $l" - lock(l.thread_lock) - try - lock(l.task_lock) - catch err - unlock(l.thread_lock) - rethrow() - end - end - - - function Base.unlock(l::ThreadSafeReentrantLock) - # @debug "UNLOCKING $l" - try - unlock(l.task_lock) - finally - unlock(l.thread_lock) - end - end -end - +const ThreadSafeReentrantLock = ReentrantLock export LockableValue diff --git a/test/test_threadsafe.jl b/test/test_threadsafe.jl index e955860..66eeef3 100644 --- a/test/test_threadsafe.jl +++ b/test/test_threadsafe.jl @@ -9,14 +9,6 @@ using Base.Threads @testset "ThreadSafeReentrantLock" begin tsReLock = @inferred ThreadSafeReentrantLock() - @static if VERSION < v"1.2-DEV.28" - lock(tsReLock) - @test islocked(tsReLock.thread_lock) - @test islocked(tsReLock.task_lock) - unlock(tsReLock) - @test !islocked(tsReLock.thread_lock) - @test !islocked(tsReLock.task_lock) - end end @testset "LockableValue" begin From 7e99081924ca8cc52c8c471f6c34f8281435dc59 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 15:53:08 +0200 Subject: [PATCH 5/9] Require ClusterManagers and ThreadPinning --- Project.toml | 4 ++++ src/ParallelProcessingTools.jl | 3 +++ 2 files changed, 7 insertions(+) diff --git a/Project.toml b/Project.toml index a9cb1fd..855d2f8 100644 --- a/Project.toml +++ b/Project.toml @@ -3,8 +3,12 @@ uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199" version = "0.4.3" [deps] +ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" [compat] +ClusterManagers = "0.4.6" Distributed = "1" +ThreadPinning = "0.7.22" julia = "1.6" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 506640f..cf1b14a 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -7,6 +7,9 @@ module ParallelProcessingTools using Base.Threads using Distributed +import ClusterManagers +import ThreadPinning + include("threadsafe.jl") include("threadlocal.jl") include("onthreads.jl") From 6eb6cb0d69a768e245d15496be71951ce9c64427 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 22:26:52 +0200 Subject: [PATCH 6/9] Require LinearAlgebra and Pkg --- Project.toml | 4 ++++ src/ParallelProcessingTools.jl | 3 +++ 2 files changed, 7 insertions(+) diff --git a/Project.toml b/Project.toml index 855d2f8..bbaa471 100644 --- a/Project.toml +++ b/Project.toml @@ -5,10 +5,14 @@ version = "0.4.3" [deps] ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" [compat] ClusterManagers = "0.4.6" Distributed = "1" +LinearAlgebra = "1" +Pkg = "1" ThreadPinning = "0.7.22" julia = "1.6" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index cf1b14a..749ba93 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -7,6 +7,9 @@ module ParallelProcessingTools using Base.Threads using Distributed +import LinearAlgebra +import Pkg + import ClusterManagers import ThreadPinning From 02f345e81528d8575d1d7cb8e91cb163e6ecf641 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 22:42:25 +0200 Subject: [PATCH 7/9] Requires Parameters and UnPack --- Project.toml | 4 ++++ src/ParallelProcessingTools.jl | 3 +++ 2 files changed, 7 insertions(+) diff --git a/Project.toml b/Project.toml index bbaa471..3a7e6d8 100644 --- a/Project.toml +++ b/Project.toml @@ -6,13 +6,17 @@ version = "0.4.3" ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" +UnPack = "3a884ed6-31ef-47d7-9d2a-63182c4928ed" [compat] ClusterManagers = "0.4.6" Distributed = "1" LinearAlgebra = "1" +Parameters = "0.12, 0.13" Pkg = "1" ThreadPinning = "0.7.22" +UnPack = "1" julia = "1.6" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 749ba93..7dafa9d 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -13,6 +13,9 @@ import Pkg import ClusterManagers import ThreadPinning +using Parameters: @with_kw +using Unpack: @unpack + include("threadsafe.jl") include("threadlocal.jl") include("onthreads.jl") From b650c0cd7a659b386810f7e8426b4ee0471f75eb Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Mon, 22 Apr 2024 15:54:19 +0200 Subject: [PATCH 8/9] Add tools for Julia cluster management --- Project.toml | 2 - docs/src/index.md | 61 +++++ src/ParallelProcessingTools.jl | 3 +- src/slurm.jl | 268 +++++++++++++++++++ src/workers.jl | 467 +++++++++++++++++++++++++++++++++ 5 files changed, 798 insertions(+), 3 deletions(-) create mode 100644 src/slurm.jl create mode 100644 src/workers.jl diff --git a/Project.toml b/Project.toml index 3a7e6d8..e5e5749 100644 --- a/Project.toml +++ b/Project.toml @@ -9,7 +9,6 @@ LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" -UnPack = "3a884ed6-31ef-47d7-9d2a-63182c4928ed" [compat] ClusterManagers = "0.4.6" @@ -18,5 +17,4 @@ LinearAlgebra = "1" Parameters = "0.12, 0.13" Pkg = "1" ThreadPinning = "0.7.22" -UnPack = "1" julia = "1.6" diff --git a/docs/src/index.md b/docs/src/index.md index 62656cb..326765d 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -1,3 +1,64 @@ # ParallelProcessingTools.jl This Julia package provides some tools to ease multithreaded and distributed programming. + + +## Compute cluster management + +ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally). + +On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally). + +```julia +using ParallelProcessingTools, Distributed + +@always_everywhere begin + using Distributions +end + +mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +#ParallelProcessingTools.worker_start_command(mode) + +# Add some workers: +addworkers(mode) + +# List resources: +ParallelProcessingTools.worker_resources() + +# Confirm that Distributions is loaded on workers: +worker = last(workers()) +@fetchfrom worker Normal() + +# Add some more workers: +addworkers(mode) +Table(ParallelProcessingTools.worker_resources()) + +# Add even more workers: +addworkers(mode) +Table(ParallelProcessingTools.worker_resources()) +``` + +And we can do SLURM batch scripts like this (e.g. "batchtest.jl"): + +```julia +#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8 +#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G + +using ParallelProcessingTools, Distributed + +@always_everywhere begin + using ParallelProcessingTools +end + +addworkers(SlurmRun()) +resources = ParallelProcessingTools.worker_resources() +show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources()) +``` + +This should run with a simple + +```shell +sbatch -o out.txt batchtest.jl +``` + +and "out.txt" should then contain a list of the worker resources. diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 7dafa9d..16ce833 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -14,13 +14,14 @@ import ClusterManagers import ThreadPinning using Parameters: @with_kw -using Unpack: @unpack include("threadsafe.jl") include("threadlocal.jl") include("onthreads.jl") include("onprocs.jl") include("workpartition.jl") +include("workers.jl") +include("slurm.jl") include("deprecated.jl") end # module diff --git a/src/slurm.jl b/src/slurm.jl new file mode 100644 index 0000000..adb4a17 --- /dev/null +++ b/src/slurm.jl @@ -0,0 +1,268 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + SlurmRun(; + slurm_flags::Cmd = {defaults} + julia_flags::Cmd = {defaults} + dir = pwd() + user_start::Bool = false + timeout::Real = 60 + ) + +Mode to add worker processes via SLURM `srun`. + +`srun` and Julia worker `julia` command line flags are inferred from SLURM +environment variables (e.g. when inside of an `salloc` or batch job), as +well as `slurm_flags` and `julia_flags`. + +Workers are started with current directory set to `dir`. + +Example: + +```julia +mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +addworkers(mode) +``` + +If `user_start` is `true`, then the SLURM srun-command will not be run +automatically, instead it will be logged via `@info` and the user is +responsible for running it. This srun-command can also be retrieved via +[`worker_start_command(mode)`](@ref). +""" +@with_kw struct SlurmRun <: ElasticAddProcsMode + slurm_flags::Cmd = _default_slurm_flags() + julia_flags::Cmd = _default_julia_flags() + dir = pwd() + user_start::Bool = false + timeout::Real = 60 +end +export SlurmRun + + +const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1) + +function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticManager) + slurm_flags = mode.slurm_flags + julia_flags = mode.julia_flags + dir = mode.dir + + tc = _get_slurm_taskconf(slurm_flags, ENV) + + n_workers = _slurm_nworkers(tc) + mem_per_task = _slurm_mem_per_task(tc) + + heap_size_hint_fraction = 0.5 + heap_size_hint_in_MB = isnothing(mem_per_task) ? nothing : ceil(Int, mem_per_task * heap_size_hint_fraction / 1024^2) + jl_heap_size_hint_flag = isnothing(heap_size_hint_in_MB) ? `` : `--heap-size-hint=$(heap_size_hint_in_MB)M` + + jl_threads_flag = isnothing(tc.cpus_per_task) ? `` : `--threads=$(tc.cpus_per_task)` + + additional_julia_flags = `$jl_threads_flag $jl_heap_size_hint_flag $julia_flags` + jlstep = atomic_add!(_g_slurm_nextjlstep, 1) + jobname = "julia-$(getpid())-$jlstep" + + worker_cmd = elastic_localworker_startcmd(manager; julia_flags = `$julia_flags $additional_julia_flags`) + + return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, n_workers +end + +function _slurm_nworkers(tc::NamedTuple) + if !isnothing(tc.n_tasks) + tc.n_tasks + elseif !isnothing(tc.n_nodes) && !isnothing(tc.ntasks_per_node) + tc.n_nodes * tc.ntasks_per_node + else + throw(ArgumentError("Could not infer number of tasks/processes from SLURM environment and flags.")) + end +end + +function _slurm_mem_per_task(tc::NamedTuple) + if !isnothing(tc.cpus_per_task) && !isnothing(tc.mem_per_cpu) + tc.cpus_per_task * tc.mem_per_cpu + elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.ntasks_per_node) + div(tc.mem_per_node, tc.ntasks_per_node) + elseif !isnothing(tc.n_nodes) && !isnothing(tc.mem_per_node) && !isnothing(tc.n_tasks) + div(tc.n_nodes * tc.mem_per_node, tc.n_tasks) + else + nothing + end +end + + +function ParallelProcessingTools.start_elastic_workers(mode::SlurmRun, manager::ClusterManagers.ElasticManager) + srun_cmd, n_workers = worker_start_command(mode, manager) + if mode.user_start + @info "To add Julia worker processes (I'll wait for them), run: $srun_cmd" + else + @info "Starting SLURM job: $srun_cmd" + srun_proc = open(srun_cmd) + end + return n_workers +end + +function worker_init_code(::SlurmRun) + quote + import ParallelProcessingTools + ParallelProcessingTools.pinthreads_auto() + end +end + +elastic_addprocs_timeout(mode::SlurmRun) = mode.timeout + + +function _default_slurm_flags() + # `srun` in `salloc`/`sbatch` doesn't seem to always pick up + # SLURM_CPUS_PER_TASK, resulting in incorrect thread pinning. So we'll + # set `--cpus-per-task` explicitly: + haskey(ENV, "SLURM_CPUS_PER_TASK") ? `--cpus-per-task=$(ENV["SLURM_CPUS_PER_TASK"])` : `` +end + + +const _slurm_memunits = IdDict{Char,Int}('K' => 1024^1, 'M' => 1024^2, 'G' => 1024^3, 'T' => 1024^4) + +const _slurm_memsize_regex = r"^([0-9]+)([KMGT])?$" +function _slurm_parse_memoptval(memsize::AbstractString) + s = strip(memsize) + m = match(_slurm_memsize_regex, s) + if isnothing(m) + throw(ArgumentError("Invalid SLURM memory size specification \"$s\"")) + else + value = parse(Int, m.captures[1]) + unitchar = only(something(m.captures[2], 'M')) + unitmult = _slurm_memunits[unitchar] + return value * unitmult + end +end +_slurm_parse_memoptval(::Nothing) = nothing + +_slurm_parse_intoptval(value::AbstractString) = parse(Int, value) +_slurm_parse_intoptval(::Nothing) = nothing + +function _slurm_parse_shortopt(opt::Char, args::Vector{String}, i::Int, default) + if i <= lastindex(args) + arg = args[i] + if arg == "-$opt" + if i < lastindex(args) && !startswith(args[i+1], "-") + return args[i+1], i+2 + else + throw(ArgumentError("Missing value for option \"-$opt\"")) + end + elseif startswith(arg, "-$opt") + if length(arg) > 2 + return arg[begin+2:end], i+1 + else + throw(ArgumentError("Missing value for option \"-$opt\"")) + end + else + return default, i + end + else + return default, i + end +end + +function _slurm_parse_longopt(opt::String, args::Vector{String}, i::Int, default) + if i <= lastindex(args) + arg = args[i] + if arg == "--$opt" + if i < lastindex(args) && !startswith(args[i+1], "-") + return args[i+1], i+2 + else + throw(ArgumentError("Missing value for option \"--$opt\"")) + end + elseif startswith(arg, "--$opt=") + if length(arg) > length(opt) + 3 + return arg[begin+length(opt)+3:end], i+1 + else + throw(ArgumentError("Missing value for option \"--$opt\"")) + end + else + return default, i + end + else + return default, i + end +end + +function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String}) + n_tasks = get(env, "SLURM_NTASKS", nothing) + cpus_per_task = get(env, "SLURM_CPUS_PER_TASK", nothing) + mem_per_cpu = get(env, "SLURM_MEM_PER_CPU", nothing) + n_nodes = get(env, "SLURM_JOB_NUM_NODES", nothing) + ntasks_per_node = get(env, "SLURM_NTASKS_PER_NODE", nothing) + mem_per_node = get(env, "SLURM_MEM_PER_NODE", nothing) + + args = slurmflags.exec + i::Int = firstindex(args) + while i <= lastindex(args) + last_i = i + n_tasks, i = _slurm_parse_shortopt('n', args, i, n_tasks) + n_tasks, i = _slurm_parse_longopt("ntasks", args, i, n_tasks) + cpus_per_task, i = _slurm_parse_shortopt('c', args, i, cpus_per_task) + cpus_per_task, i = _slurm_parse_longopt("cpus-per-task", args, i, cpus_per_task) + mem_per_cpu, i = _slurm_parse_longopt("mem-per-cpu", args, i, mem_per_cpu) + n_nodes, i = _slurm_parse_shortopt('N', args, i, n_nodes) + n_nodes, i = _slurm_parse_longopt("nodes", args, i, n_nodes) + mem_per_node, i = _slurm_parse_longopt("mem", args, i, mem_per_node) + ntasks_per_node, i = _slurm_parse_longopt("ntasks-per-node", args, i, ntasks_per_node) + + if last_i == i + i += 1 + end + end + + return ( + n_tasks = _slurm_parse_intoptval(n_tasks), + cpus_per_task = _slurm_parse_intoptval(cpus_per_task), + mem_per_cpu = _slurm_parse_memoptval(mem_per_cpu), + n_nodes = _slurm_parse_intoptval(n_nodes), + ntasks_per_node = _slurm_parse_intoptval(ntasks_per_node), + mem_per_node = _slurm_parse_memoptval(mem_per_node), + ) +end + + +function _addprocs_slurm(; kwargs...) + slurm_ntasks = parse(Int, ENV["SLURM_NTASKS"]) + slurm_ntasks > 1 || throw(ErrorException("Invalid nprocs=$slurm_ntasks inferred from SLURM environment")) + _addprocs_slurm(slurm_ntasks; kwargs...) +end + +function _addprocs_slurm( + nprocs::Int; + job_file_loc::AbstractString = joinpath(homedir(), "slurm-julia-output"), + retry_delays::AbstractVector{<:Real} = [1, 1, 2, 2, 4, 5, 5, 10, 10, 10, 10, 20, 20, 20] +) + try + lock(_g_processops_lock) + + @info "Adding $nprocs Julia processes via SLURM" + + julia_project = dirname(Pkg.project().path) + slurm_ntasks = nprocs + slurm_nthreads = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) + slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2 + slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu + + cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays) + worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60)) + ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout" + + mkpath(job_file_loc) + new_workers = Distributed.addprocs( + cluster_manager, job_file_loc = job_file_loc, + exeflags = `--project=$julia_project --threads=$slurm_nthreads --heap-size-hint=$(slurm_mem_per_task÷2)`, + cpus_per_task = "$slurm_nthreads", mem_per_cpu="$(slurm_mem_per_cpu >> 30)G", # time="0:10:00", + mem_bind = "local", cpu_bind="cores", + ) + + @info "Configuring $nprocs new Julia worker processes" + + _run_always_everywhere_code(new_workers) + pinthreads_distributed(new_workers) + + @info "Added $(length(new_workers)) Julia worker processes via SLURM" + finally + unlock(_g_processops_lock) + end +end diff --git a/src/workers.jl b/src/workers.jl new file mode 100644 index 0000000..b036675 --- /dev/null +++ b/src/workers.jl @@ -0,0 +1,467 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +const _g_processops_lock = ReentrantLock() + +const _g_always_everywhere_code = quote + import ParallelProcessingTools +end + + +""" + always_everywhere(expr) + +Runs `expr` on all current Julia processes, but also all future Julia +processes added via [`addworkers`](@ref)). + +Similar to `Distributed.everywhere`, but also stores `expr` so that +`addworkers` can execute it automatically on new worker processes. +""" +macro always_everywhere(expr) + return quote + try + lock(_g_processops_lock) + expr = $(esc(Expr(:quote, expr))) + push!(_g_always_everywhere_code.args, expr) + _run_expr_on_procs(expr, Distributed.procs()) + finally + unlock(_g_processops_lock) + end + end +end +export @always_everywhere + + +function _run_expr_on_procs(expr, procs::AbstractVector{<:Integer}) + mod_expr = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), expr) + Distributed.remotecall_eval(Main, procs, mod_expr) +end + +function _run_always_everywhere_code(@nospecialize(procs::AbstractVector{<:Integer}); pre_always::Expr = :()) + code = quote + $pre_always + $_g_always_everywhere_code + end + + _run_expr_on_procs(code, procs) +end + + +""" + pinthreads_auto() + +Use default thread-pinning strategy for the current Julia process. +""" +function pinthreads_auto() + if Distributed.myid() == 1 + let n_juliathreads = nthreads() + if n_juliathreads > 1 + LinearAlgebra.BLAS.set_num_threads(n_juliathreads) + end + end + else + let available_cpus = ThreadPinning.affinitymask2cpuids(ThreadPinning.get_affinity_mask()) + ThreadPinning.pinthreads(:affinitymask) + LinearAlgebra.BLAS.set_num_threads(length(available_cpus)) + end + end +end +export pinthreads_auto + + +""" + ParallelProcessingTools.pinthreads_distributed(procs::AbstractVector{<:Integer} = Distrib) + +Use default thread-pinning strategy on all Julia processes processes `procs`. +""" +function pinthreads_distributed(@nospecialize(procs::AbstractVector{<:Integer})) + if 1 in procs + pinthreads_auto() + end + + workerprocs = filter(!isequal(1), procs) + if !isempty(workerprocs) + Distributed.remotecall_eval(Main, workerprocs, + quote + import ParallelProcessingTools + ParallelProcessingTools.pinthreads_auto() + end + ) + end +end + + +""" + ParallelProcessingTools.shutdown_workers_atexit() + +Ensure worker processes are shut down when Julia exits. +""" +function shutdown_workers_atexit() + atexit(() -> Distributed.rmprocs(filter!(!isequal(1), Distributed.workers()), waitfor = 1)) +end + + +""" + worker_resources + +Get the distributed Julia process resources currently available. +""" +function worker_resources() + resources_ft = Distributed.remotecall.(ParallelProcessingTools._current_process_resources, Distributed.workers()) + resources = fetch.(resources_ft) + sorted_resources = sort(resources, by = x -> x.workerid) + sorted_resources +end +export worker_resources + +function _current_process_resources() + return ( + workerid = Distributed.myid(), + hostname = Base.gethostname(), + nthreads = nthreads(), + blas_nthreads = LinearAlgebra.BLAS.get_num_threads(), + cpuids = ThreadPinning.getcpuids() + ) +end + + +""" + abstract type ParallelProcessingTools.AddProcsMode + +Abstract supertype for worker process addition modes. + +Subtypes must implement: + +* `ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)` + +and may want to specialize: + +* `ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)` +""" +abstract type AddProcsMode end + + +""" + ParallelProcessingTools.worker_init_code(::AddProcsMode)::Expr + +Get a Julia code expression to run on new worker processes even before +running [`@always_everywhere`](@ref) code on them. +""" +function worker_init_code end +worker_init_code(::AddProcsMode) = :() + + + +""" + addworkers(mode::ParallelProcessingTools.AddProcsMode) + +Add Julia worker processes for LEGEND data processing. + +By default ensures that all workers processes use the same Julia project +environment as the current process (requires that file systems paths are +consistenst across compute hosts). + +Use [`@always_everywhere`](@ref) to run initialization code on all current +processes and all future processes added via `addworkers`: + +```julia +using Distributed, ParallelProcessingTools + +@always_everywhere begin + using SomePackage + import SomeOtherPackage + + get_global_value() = 42 +end + +# ... some code ... + +addworkers(LocalProcesses(nprocs = 4)) + +# `get_global_value` is available even though workers were added later: +remotecall_fetch(get_global_value, last(workers())) +``` + +See also [`worker_resources()`](@ref). +""" +function addworkers end +export addworkers + + +""" + LocalProcesses(; + nprocs::Integer = 1 + ) + +Mode to add `nprocs` worker processes on the current host. +""" +@with_kw struct LocalProcesses <: AddProcsMode + nprocs::Int +end +export LocalProcesses + + +function addworkers(mode::LocalProcesses) + n_workers = mode.nprocs + try + lock(_g_processops_lock) + + @info "Adding $n_workers Julia processes on current host" + + # Maybe wait for shared/distributed file system to get in sync? + # sleep(5) + + julia_project = dirname(Pkg.project().path) + worker_nthreads = nthreads() + + new_workers = Distributed.addprocs( + n_workers, + exeflags = `--project=$julia_project --threads=$worker_nthreads` + ) + + @info "Configuring $n_workers new Julia worker processes" + + _run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode)) + + # Sanity check: + worker_ids = Distributed.remotecall_fetch.(Ref(Distributed.myid), Distributed.workers()) + @assert length(worker_ids) == Distributed.nworkers() + + @info "Added $(length(new_workers)) Julia worker processes on current host" + finally + unlock(_g_processops_lock) + end +end + + +#= +# ToDo: Add SSHWorkers or similar: + +@with_kw struct SSHWorkers <: AddProcsMode + hosts::Vector{Any} + ssd_flags::Cmd = _default_slurm_flags() + julia_flags::Cmd = _default_julia_flags() + dir = ... + env = ... + tunnel::Bool = false + multiplex::Bool = false + shell::Symbol = :posix + max_parallel::Int = 10 + enable_threaded_blas::Bool = true + topology::Symbol = :all_to_all + lazy_connections::Bool = true +end +=# + + +""" + ParallelProcessingTools.default_elastic_manager() + ParallelProcessingTools.default_elastic_manager(manager::ClusterManagers.ElasticManager) + +Get or set the default elastic cluster manager. +""" +function default_elastic_manager end + +const _g_elastic_manager = Ref{Union{Nothing, ClusterManagers.ElasticManager}}(nothing) + +function default_elastic_manager() + if isnothing(_g_elastic_manager[]) + _g_elastic_manager[] = ClusterManagers.ElasticManager(addr=:auto, port=0, topology=:master_worker) + end + return _g_elastic_manager[] +end + +function default_elastic_manager(manager::ClusterManagers.ElasticManager) + _g_elastic_manager[] = manager + return _g_elastic_manager[] +end + + + +""" + abstract type ParallelProcessingTools.ElasticAddProcsMode <: ParallelProcessingTools.AddProcsMode + +Abstract supertype for worker process addition modes that use the +elastic cluster manager. + +Subtypes must implement: + +* `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)` +* `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)` + +and may want to specialize: + +* `ParallelProcessingTools.elastic_addprocs_timeout(mode::SomeElasticAddProcsMode)` +""" +abstract type ElasticAddProcsMode <: AddProcsMode end + +""" + ParallelProcessingTools.worker_start_command( + mode::ElasticAddProcsMode, + manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager() + )::Tuple{Cmd,Integer} + +Return the system command to start worker processes as well as the number of +workers to start. +""" +function worker_start_command end +worker_start_command(mode::ElasticAddProcsMode) = worker_start_command(mode, default_elastic_manager()) + + +function _elastic_worker_startjl(manager::ClusterManagers.ElasticManager) + cookie = Distributed.cluster_cookie() + socket_name = manager.sockname + address = string(socket_name[1]) + port = convert(Int, socket_name[2]) + """import ClusterManagers; ClusterManagers.elastic_worker("$cookie", "$address", $port)""" +end + +const _default_addprocs_params = Distributed.default_addprocs_params() + +_default_julia_cmd() = `$(_default_addprocs_params[:exename]) $(_default_addprocs_params[:exeflags])` +_default_julia_flags() = `` +_default_julia_project() = Pkg.project().path + + +""" + ParallelProcessingTools.elastic_localworker_startcmd( + manager::Distributed.ClusterManager; + julia_cmd::Cmd = _default_julia_cmd(), + julia_flags::Cmd = _default_julia_flags(), + julia_project::AbstractString = _default_julia_project() + )::Cmd + +Return the system command required to start a Julia worker process, that will +connect to `manager`, on the current host. +""" +function elastic_localworker_startcmd( + manager::Distributed.ClusterManager; + julia_cmd::Cmd = _default_julia_cmd(), + julia_flags::Cmd = _default_julia_flags(), + julia_project::AbstractString = _default_julia_project() +) + julia_code = _elastic_worker_startjl(manager) + + `$julia_cmd --project=$julia_project $julia_flags -e $julia_code` +end + + + +""" + ParallelProcessingTools.elastic_addprocs_timeout(mode::ElasticAddProcsMode) + +Get the timeout in seconds for waiting for worker processes to connect. +""" +function elastic_addprocs_timeout end + +elastic_addprocs_timeout(mode::ElasticAddProcsMode) = 60 + + +""" + ParallelProcessingTools.start_elastic_workers(mode::ElasticAddProcsMode, manager::ClusterManagers.ElasticManager)::Int + +Spawn worker processes as specified by `mode` and return the number of +expected additional workers. +""" +function start_elastic_workers end + + +function addworkers(mode::ElasticAddProcsMode) + try + lock(_g_processops_lock) + + manager = default_elastic_manager() + + old_procs = Distributed.procs() + n_previous = length(old_procs) + n_to_add = start_elastic_workers(mode, manager) + + @info "Waiting for $n_to_add workers to connect..." + + sleep(1) + + # ToDo: Add timeout and either prevent workers from connecting after + # or somehow make sure that init and @always everywhere code is still + # run on them before user code is executed on them. + + timeout = elastic_addprocs_timeout(mode) + + t_start = time() + t_waited = zero(t_start) + n_added_last = 0 + while true + t_waited = time() - t_start + if t_waited > timeout + @error "Timeout after waiting for workers to connect for $t_waited seconds" + break + end + n_added = Distributed.nprocs() - n_previous + if n_added > n_added_last + @info "$n_added of $n_to_add additional workers have connected" + end + if n_added == n_to_add + break + elseif n_added > n_to_add + @warn "More workers connected than expected: $n_added > $n_to_add" + break + end + + n_added_last = n_added + sleep(1) + end + + new_workers = setdiff(Distributed.workers(), old_procs) + n_new = length(new_workers) + + @info "Initializing $n_new new Julia worker processes" + _run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode)) + + @info "Added $n_new new Julia worker processes" + + if n_new != n_to_add + throw(ErrorException("Tried to add $n_to_add new workers, but added $n_new")) + end + finally + unlock(_g_processops_lock) + end +end + + +""" + ParallelProcessingTools.ExternalProcesses(; + nprocs::Integer = ... + ) + +Add worker processes by starting them externally. + +Will log (via `@info`) a worker start command and then wait for the workers to +connect. The user is responsible for starting the specified number of workers +externally using that start command. + +Example: + +```julia +mode = ExternalProcesses(nprocs = 4) +addworkers(mode) +``` + +The user now has to start 4 Julia worker processes externally using the logged +start command. This start command can also be retrieved via +[`worker_start_command(mode)`](@ref). +""" +@with_kw struct ExternalProcesses <: ElasticAddProcsMode + nprocs::Int = 1 +end +export ExternalProcesses + + +function worker_start_command(mode::ExternalProcesses, manager::ClusterManagers.ElasticManager) + worker_nthreads = nthreads() + julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads` + elastic_localworker_startcmd(manager, julia_flags = julia_flags), mode.nprocs +end + +function start_elastic_workers(mode::ExternalProcesses, manager::ClusterManagers.ElasticManager) + start_cmd, n_workers = worker_start_command(mode, manager) + @info "To add Julia worker processes, run ($n_workers times in parallel, I'll wait for them): $start_cmd" + return n_workers +end From 213b0cfc5a22c5a56cc9b14a7c454691ee53cb07 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Tue, 23 Apr 2024 19:59:33 +0200 Subject: [PATCH 9/9] Rename workers.jl to addworkers.jl --- src/ParallelProcessingTools.jl | 2 +- src/{workers.jl => addworkers.jl} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/{workers.jl => addworkers.jl} (100%) diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 16ce833..9face71 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -20,7 +20,7 @@ include("threadlocal.jl") include("onthreads.jl") include("onprocs.jl") include("workpartition.jl") -include("workers.jl") +include("addworkers.jl") include("slurm.jl") include("deprecated.jl") diff --git a/src/workers.jl b/src/addworkers.jl similarity index 100% rename from src/workers.jl rename to src/addworkers.jl