diff --git a/Project.toml b/Project.toml index b3d998c..85e67f4 100644 --- a/Project.toml +++ b/Project.toml @@ -3,20 +3,30 @@ uuid = "8e8a01fc-6193-5ca1-a2f1-20776dae4199" version = "0.4.3" [deps] +ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197" ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" +Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" +[weakdeps] +ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" + +[extensions] +ParallelProcessingToolsThreadPinningExt = "ThreadPinning" + [compat] +ArgCheck = "1, 2" ClusterManagers = "0.4.6" Distributed = "1" LinearAlgebra = "1" Logging = "1" Parameters = "0.12, 0.13" Pkg = "1" +Sockets = "1" ThreadPinning = "0.7.22" julia = "1.6" diff --git a/Test/Project.toml b/Test/Project.toml new file mode 100644 index 0000000..a7159b9 --- /dev/null +++ b/Test/Project.toml @@ -0,0 +1,2 @@ +[deps] +Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" diff --git a/docs/src/index.md b/docs/src/index.md index 326765d..e19b37c 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -3,56 +3,112 @@ This Julia package provides some tools to ease multithreaded and distributed programming. -## Compute cluster management +## Distributed computing -ParallelProcessingTools helps spin-up Julia compute clusters. It currently has support for clusters on localhost and on SLURM (uses `ClusterManagers.ElasticManager` internally). +Julia provides native support for distributed computing on multiple Julia processes that run in parallel on the same or on different machines. ParallelProcessingTools add some machinery to make some aspects of this even easier. -On SLURM, `addworkers` will automatically try to perform a sensible thread-pinning (using the [ThreadPinning](https://github.com/carstenbauer/ThreadPinning.jl) package internally). +An internal elastic cluster manager ([`ppt_cluster_manager`](@ref), a modified version of `ParallelProcessingTools.ElasticManager`), started on demand, allows for starting ([`runworkers`](@ref)) an stopping ([`stopworkers`](@ref)) worker processes in a dynamic fashion. The worker processes can also be started outside of the Julia session ([`worker_start_command`](@ref) and [`write_worker_start_script`](@ref)), this can be useful to add worker to a running Julia session via manually controlled batch jobs, for example. Workers can be started locally ([`OnLocalhost`](@ref)) or via SLURM ([`SlurmRun`](@ref)). Other methods to start workers (e.g. via SSH) may be added in the future (contributions are very welcome). + +The elastic cluster manager automatically adds new workers to an automatically created dynamic worker pool ([`ppt_worker_pool`](@ref)) of type [`FlexWorkerPool`](@ref) that optionally supports oversubscription. Users can `take!` workers from the pool and `put!` them back, or use [`onworker`](@ref) to send work to workers in the pool without exceeding their maximum occupancy. + +Since workers can appear and disappear dynamically, initializing them (loading packages, etc.) via the standard `Distributed.@everywhere` macro is problematic, as workers added afterwards won't be initialized. Parallel processing tools provides the macro [`@always_everywhere`](@ref) to run code globally on all current processes, but also store the code so it can be run again on future new worker processes. Workers that are part of a [`FlexWorkerPool`](@ref) will be updated automatically on `take!` and `onworker`. You can also use [`ensure_procinit`](@ref) to manually update all workers +to all `@always_everywhere` used so far. + +The function [`pinthreads_auto`](@ref) (used inside of `@always_everywhere`) provides a convenient way to perform some automatic thread pinning on all processes. Note that it needs to follow an [`import ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/), and that more complex use cased may require customized thread pinning for best performance. + +For example: ```julia +ENV["JULIA_DEBUG"] = "ParallelProcessingTools" +ENV["JULIA_WORKER_TIMEOUT"] = "120" + using ParallelProcessingTools, Distributed @always_everywhere begin - using Distributions + using ParallelProcessingTools + using Statistics + + import ThreadPinning + pinthreads_auto() end -mode = ParallelProcessingTools.SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) -#ParallelProcessingTools.worker_start_command(mode) +runmode = OnLocalhost(n = 4) +# runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) + +display(worker_start_command(runmode)) -# Add some workers: -addworkers(mode) +# Add some workers and initialize with all `@always_everywhere` code: +old_nprocs = nprocs() +_, n = runworkers(runmode) +@wait_while nprocs() < old_nprocs + n +ensure_procinit() -# List resources: -ParallelProcessingTools.worker_resources() +# Show worker resources: +pool = ppt_worker_pool() +display(pool) +display(worker_resources()) -# Confirm that Distributions is loaded on workers: +# Confirm that Distributions is loaded on a worker: worker = last(workers()) -@fetchfrom worker Normal() +@fetchfrom worker mean(rand(100)) -# Add some more workers: -addworkers(mode) -Table(ParallelProcessingTools.worker_resources()) +# Some more init code +@always_everywhere begin + X = rand(100) +end -# Add even more workers: -addworkers(mode) -Table(ParallelProcessingTools.worker_resources()) +# Add some more workers, we won't run `ensure_procinit()` manually this time: +old_nprocs = nprocs() +_, n = runworkers(runmode) +@wait_while nprocs() < old_nprocs + n + +# Worker hasn't run @always_everywhere code yet, so it doesn't have `mean`: +worker = last(workers()) +display(@return_exceptions @userfriendly_exceptions begin + @fetchfrom worker mean(X) +end) + +# Using `take!` on a `FlexWorkerPool` automatically runs init code as necessary: +pid = take!(pool) +try + remotecall_fetch(() -> mean(X), pid) +finally + put!(pool, pid) +end + +# `onworker` (using the default `FlexWorkerPool` here) does the same: +onworker(mean, X) + +# If we don't need workers processes for a while, let's stop them: +stopworkers() ``` -And we can do SLURM batch scripts like this (e.g. "batchtest.jl"): +We can also use SLURM batch scripts, like this (e.g. "batchtest.jl"): ```julia -#!/usr/bin/env -S julia --project=@SOME_JULIA_ENVIRONMENT --threads=8 -#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G +#!/usr/bin/env julia +#SBATCH --ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G --time=00:15:00 + +using Pkg; pkg"activate @SOME_JULIA_ENVIRONMENT" + +ENV["JULIA_DEBUG"] = "ParallelProcessingTools" +ENV["JULIA_WORKER_TIMEOUT"] = "120" using ParallelProcessingTools, Distributed @always_everywhere begin using ParallelProcessingTools + import ThreadPinning + pinthreads_auto() end -addworkers(SlurmRun()) -resources = ParallelProcessingTools.worker_resources() -show(stdout, MIME"text/plain"(), ParallelProcessingTools.worker_resources()) +_, n = runworkers(SlurmRun(slurm_flags = `--cpu-bind=cores --mem-bind=local`)) +@wait_while maxtime=240 nprocs() < n + 1 + +resources = worker_resources() +display(resources) + +stopworkers() ``` This should run with a simple @@ -61,4 +117,40 @@ This should run with a simple sbatch -o out.txt batchtest.jl ``` -and "out.txt" should then contain a list of the worker resources. +and "out.txt" should then contain debugging output and a list of the worker +resources. + + +## Multithreading + +To test multithreading performance and help debug and optimize multithreaded +code, ParallelProcessingTools provides the utility macros [`@onthreads`](@ref) +to run code explicitly on the selected Julia threads (all threads can be +listed using [`allthreads`](@ref)). + +You can use the macro [`@critical`](@ref) to prevent code that may suffer from race conditions in parallel to other code fenced by `@critical`. + +The macro [`@mt_out_of_order`](@ref) is useful to run different code on in parallel on Julia threads. + + +# Waiting and sleeping + +In a parallel computing scenario, on threads, distributed processes or both, or when dealing with I/O operations, code often needs to wait. In addition a timeout mechanism is often necessary. Julia's standard `wait` function can only waits a single object without a timeout. (`waitany`, requires Julia >= v1.12, can be used to wait for multiple tasks). + +ParallelProcessingTools provides a very flexible macro [`@wait_while`](@ref) to wait for custom conditions with an optional timeout, as well as the functions [`wait_for_all`](@ref) and [`wait_for_any`](@ref) that can wait for different kinds of objects, also with an optional timeout. + +The functions [`sleep_ns`](@ref) and [`idle_sleep`](@ref) can be used to implement custom scenarios that require precise sleeping for both very short and long intervals. + + +# Exception handling + +Exceptions throws during remote code execution can be complex, nested and sometimes hard to understand. You can use the functions [`inner_exception`](@ref), [`onlyfirst_exception`](@ref) and [`original_exception`](@ref) to get to the underlying reason of a failure more easily. The macro [`@userfriendly_exceptions`](@ref) automatizes this to some extent for a given piece of code. + +To get an exception "in hand" for further analysis, you can use the macro [`@return_exceptions`](@ref) to make (possibly failing) code return the exceptions instead of throwing it. + + +# File I/O + +File handling can become more challenging when working in a parallel and possibly distributed fashion. Code or whole workers can crash, resulting in corrupt files, or workers may become disconnected, but still write files and clash with restarted code (resulting in race conditions and may also result in corrupt files). + +ParallelProcessingTools provides the functions [`create_files`](@ref), [`read_files`](@ref) and [`modify_files`](@ref) to implement atomic file operations, on a best-effort basis (depending on the operating system and underlying file systems). diff --git a/ext/ParallelProcessingToolsThreadPinningExt.jl b/ext/ParallelProcessingToolsThreadPinningExt.jl new file mode 100644 index 0000000..2a920e0 --- /dev/null +++ b/ext/ParallelProcessingToolsThreadPinningExt.jl @@ -0,0 +1,38 @@ +module ParallelProcessingToolsThreadPinningExt + +import ParallelProcessingTools +import LinearAlgebra +import Distributed +import ThreadPinning + +# ThreadPinning.jl does not support all operating systems, currently: +const _threadpinning_supported = isdefined(ThreadPinning, :affinitymask2cpuids) + +@static if _threadpinning_supported + + +function ParallelProcessingTools._pinthreads_auto_impl(::Val{true}) + pid = Distributed.myid() + if Distributed.myid() == 1 + @debug "On process $pid, leaving Julia threads unpinned" + let n_juliathreads = Threads.nthreads() + if n_juliathreads > 1 + LinearAlgebra.BLAS.set_num_threads(n_juliathreads) + end + end + else + @debug "On process $pid, pinning threads according to affinity mask" + let available_cpus = ThreadPinning.affinitymask2cpuids(ThreadPinning.get_affinity_mask()) + ThreadPinning.pinthreads(:affinitymask) + LinearAlgebra.BLAS.set_num_threads(length(available_cpus)) + end + end +end + + +ParallelProcessingTools._getcpuids_impl(::Val{true}) = ThreadPinning.getcpuids() + + +end # if _threadpinning_supported + +end # module ChangesOfVariablesInverseFunctionsExt diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 1d502d0..0a63862 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -9,23 +9,41 @@ using Distributed import LinearAlgebra import Pkg +import Sockets import ClusterManagers -import ThreadPinning +using Base: Process using Logging: @logmsg, LogLevel, Info, Debug +using ArgCheck: @argcheck using Parameters: @with_kw +# # ToDo: Remove CustomClusterManagers once changes to ElasticManager have +# # have been upstreamed. +#using CustomClusterManagers: ElasticManager +include("custom_cluster_managers.jl") +using .CustomClusterManagers: ElasticManager + +include("display.jl") +include("waiting.jl") include("exceptions.jl") +include("states.jl") include("fileio.jl") include("threadsafe.jl") include("threadlocal.jl") include("onthreads.jl") include("onprocs.jl") include("workpartition.jl") -include("addworkers.jl") +include("procinit.jl") +include("workerpool.jl") +include("onworkers.jl") +include("runworkers.jl") include("slurm.jl") include("deprecated.jl") +@static if !isdefined(Base, :get_extension) + include("../ext/ParallelProcessingToolsThreadPinningExt.jl") +end + end # module diff --git a/src/addworkers.jl b/src/addworkers.jl deleted file mode 100644 index b036675..0000000 --- a/src/addworkers.jl +++ /dev/null @@ -1,467 +0,0 @@ -# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). - -const _g_processops_lock = ReentrantLock() - -const _g_always_everywhere_code = quote - import ParallelProcessingTools -end - - -""" - always_everywhere(expr) - -Runs `expr` on all current Julia processes, but also all future Julia -processes added via [`addworkers`](@ref)). - -Similar to `Distributed.everywhere`, but also stores `expr` so that -`addworkers` can execute it automatically on new worker processes. -""" -macro always_everywhere(expr) - return quote - try - lock(_g_processops_lock) - expr = $(esc(Expr(:quote, expr))) - push!(_g_always_everywhere_code.args, expr) - _run_expr_on_procs(expr, Distributed.procs()) - finally - unlock(_g_processops_lock) - end - end -end -export @always_everywhere - - -function _run_expr_on_procs(expr, procs::AbstractVector{<:Integer}) - mod_expr = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), expr) - Distributed.remotecall_eval(Main, procs, mod_expr) -end - -function _run_always_everywhere_code(@nospecialize(procs::AbstractVector{<:Integer}); pre_always::Expr = :()) - code = quote - $pre_always - $_g_always_everywhere_code - end - - _run_expr_on_procs(code, procs) -end - - -""" - pinthreads_auto() - -Use default thread-pinning strategy for the current Julia process. -""" -function pinthreads_auto() - if Distributed.myid() == 1 - let n_juliathreads = nthreads() - if n_juliathreads > 1 - LinearAlgebra.BLAS.set_num_threads(n_juliathreads) - end - end - else - let available_cpus = ThreadPinning.affinitymask2cpuids(ThreadPinning.get_affinity_mask()) - ThreadPinning.pinthreads(:affinitymask) - LinearAlgebra.BLAS.set_num_threads(length(available_cpus)) - end - end -end -export pinthreads_auto - - -""" - ParallelProcessingTools.pinthreads_distributed(procs::AbstractVector{<:Integer} = Distrib) - -Use default thread-pinning strategy on all Julia processes processes `procs`. -""" -function pinthreads_distributed(@nospecialize(procs::AbstractVector{<:Integer})) - if 1 in procs - pinthreads_auto() - end - - workerprocs = filter(!isequal(1), procs) - if !isempty(workerprocs) - Distributed.remotecall_eval(Main, workerprocs, - quote - import ParallelProcessingTools - ParallelProcessingTools.pinthreads_auto() - end - ) - end -end - - -""" - ParallelProcessingTools.shutdown_workers_atexit() - -Ensure worker processes are shut down when Julia exits. -""" -function shutdown_workers_atexit() - atexit(() -> Distributed.rmprocs(filter!(!isequal(1), Distributed.workers()), waitfor = 1)) -end - - -""" - worker_resources - -Get the distributed Julia process resources currently available. -""" -function worker_resources() - resources_ft = Distributed.remotecall.(ParallelProcessingTools._current_process_resources, Distributed.workers()) - resources = fetch.(resources_ft) - sorted_resources = sort(resources, by = x -> x.workerid) - sorted_resources -end -export worker_resources - -function _current_process_resources() - return ( - workerid = Distributed.myid(), - hostname = Base.gethostname(), - nthreads = nthreads(), - blas_nthreads = LinearAlgebra.BLAS.get_num_threads(), - cpuids = ThreadPinning.getcpuids() - ) -end - - -""" - abstract type ParallelProcessingTools.AddProcsMode - -Abstract supertype for worker process addition modes. - -Subtypes must implement: - -* `ParallelProcessingTools.addworkers(mode::SomeAddProcsMode)` - -and may want to specialize: - -* `ParallelProcessingTools.worker_init_code(mode::SomeAddProcsMode)` -""" -abstract type AddProcsMode end - - -""" - ParallelProcessingTools.worker_init_code(::AddProcsMode)::Expr - -Get a Julia code expression to run on new worker processes even before -running [`@always_everywhere`](@ref) code on them. -""" -function worker_init_code end -worker_init_code(::AddProcsMode) = :() - - - -""" - addworkers(mode::ParallelProcessingTools.AddProcsMode) - -Add Julia worker processes for LEGEND data processing. - -By default ensures that all workers processes use the same Julia project -environment as the current process (requires that file systems paths are -consistenst across compute hosts). - -Use [`@always_everywhere`](@ref) to run initialization code on all current -processes and all future processes added via `addworkers`: - -```julia -using Distributed, ParallelProcessingTools - -@always_everywhere begin - using SomePackage - import SomeOtherPackage - - get_global_value() = 42 -end - -# ... some code ... - -addworkers(LocalProcesses(nprocs = 4)) - -# `get_global_value` is available even though workers were added later: -remotecall_fetch(get_global_value, last(workers())) -``` - -See also [`worker_resources()`](@ref). -""" -function addworkers end -export addworkers - - -""" - LocalProcesses(; - nprocs::Integer = 1 - ) - -Mode to add `nprocs` worker processes on the current host. -""" -@with_kw struct LocalProcesses <: AddProcsMode - nprocs::Int -end -export LocalProcesses - - -function addworkers(mode::LocalProcesses) - n_workers = mode.nprocs - try - lock(_g_processops_lock) - - @info "Adding $n_workers Julia processes on current host" - - # Maybe wait for shared/distributed file system to get in sync? - # sleep(5) - - julia_project = dirname(Pkg.project().path) - worker_nthreads = nthreads() - - new_workers = Distributed.addprocs( - n_workers, - exeflags = `--project=$julia_project --threads=$worker_nthreads` - ) - - @info "Configuring $n_workers new Julia worker processes" - - _run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode)) - - # Sanity check: - worker_ids = Distributed.remotecall_fetch.(Ref(Distributed.myid), Distributed.workers()) - @assert length(worker_ids) == Distributed.nworkers() - - @info "Added $(length(new_workers)) Julia worker processes on current host" - finally - unlock(_g_processops_lock) - end -end - - -#= -# ToDo: Add SSHWorkers or similar: - -@with_kw struct SSHWorkers <: AddProcsMode - hosts::Vector{Any} - ssd_flags::Cmd = _default_slurm_flags() - julia_flags::Cmd = _default_julia_flags() - dir = ... - env = ... - tunnel::Bool = false - multiplex::Bool = false - shell::Symbol = :posix - max_parallel::Int = 10 - enable_threaded_blas::Bool = true - topology::Symbol = :all_to_all - lazy_connections::Bool = true -end -=# - - -""" - ParallelProcessingTools.default_elastic_manager() - ParallelProcessingTools.default_elastic_manager(manager::ClusterManagers.ElasticManager) - -Get or set the default elastic cluster manager. -""" -function default_elastic_manager end - -const _g_elastic_manager = Ref{Union{Nothing, ClusterManagers.ElasticManager}}(nothing) - -function default_elastic_manager() - if isnothing(_g_elastic_manager[]) - _g_elastic_manager[] = ClusterManagers.ElasticManager(addr=:auto, port=0, topology=:master_worker) - end - return _g_elastic_manager[] -end - -function default_elastic_manager(manager::ClusterManagers.ElasticManager) - _g_elastic_manager[] = manager - return _g_elastic_manager[] -end - - - -""" - abstract type ParallelProcessingTools.ElasticAddProcsMode <: ParallelProcessingTools.AddProcsMode - -Abstract supertype for worker process addition modes that use the -elastic cluster manager. - -Subtypes must implement: - -* `ParallelProcessingTools.worker_start_command(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)` -* `ParallelProcessingTools.start_elastic_workers(mode::SomeElasticAddProcsMode, manager::ClusterManagers.ElasticManager)` - -and may want to specialize: - -* `ParallelProcessingTools.elastic_addprocs_timeout(mode::SomeElasticAddProcsMode)` -""" -abstract type ElasticAddProcsMode <: AddProcsMode end - -""" - ParallelProcessingTools.worker_start_command( - mode::ElasticAddProcsMode, - manager::ClusterManagers.ElasticManager = ParallelProcessingTools.default_elastic_manager() - )::Tuple{Cmd,Integer} - -Return the system command to start worker processes as well as the number of -workers to start. -""" -function worker_start_command end -worker_start_command(mode::ElasticAddProcsMode) = worker_start_command(mode, default_elastic_manager()) - - -function _elastic_worker_startjl(manager::ClusterManagers.ElasticManager) - cookie = Distributed.cluster_cookie() - socket_name = manager.sockname - address = string(socket_name[1]) - port = convert(Int, socket_name[2]) - """import ClusterManagers; ClusterManagers.elastic_worker("$cookie", "$address", $port)""" -end - -const _default_addprocs_params = Distributed.default_addprocs_params() - -_default_julia_cmd() = `$(_default_addprocs_params[:exename]) $(_default_addprocs_params[:exeflags])` -_default_julia_flags() = `` -_default_julia_project() = Pkg.project().path - - -""" - ParallelProcessingTools.elastic_localworker_startcmd( - manager::Distributed.ClusterManager; - julia_cmd::Cmd = _default_julia_cmd(), - julia_flags::Cmd = _default_julia_flags(), - julia_project::AbstractString = _default_julia_project() - )::Cmd - -Return the system command required to start a Julia worker process, that will -connect to `manager`, on the current host. -""" -function elastic_localworker_startcmd( - manager::Distributed.ClusterManager; - julia_cmd::Cmd = _default_julia_cmd(), - julia_flags::Cmd = _default_julia_flags(), - julia_project::AbstractString = _default_julia_project() -) - julia_code = _elastic_worker_startjl(manager) - - `$julia_cmd --project=$julia_project $julia_flags -e $julia_code` -end - - - -""" - ParallelProcessingTools.elastic_addprocs_timeout(mode::ElasticAddProcsMode) - -Get the timeout in seconds for waiting for worker processes to connect. -""" -function elastic_addprocs_timeout end - -elastic_addprocs_timeout(mode::ElasticAddProcsMode) = 60 - - -""" - ParallelProcessingTools.start_elastic_workers(mode::ElasticAddProcsMode, manager::ClusterManagers.ElasticManager)::Int - -Spawn worker processes as specified by `mode` and return the number of -expected additional workers. -""" -function start_elastic_workers end - - -function addworkers(mode::ElasticAddProcsMode) - try - lock(_g_processops_lock) - - manager = default_elastic_manager() - - old_procs = Distributed.procs() - n_previous = length(old_procs) - n_to_add = start_elastic_workers(mode, manager) - - @info "Waiting for $n_to_add workers to connect..." - - sleep(1) - - # ToDo: Add timeout and either prevent workers from connecting after - # or somehow make sure that init and @always everywhere code is still - # run on them before user code is executed on them. - - timeout = elastic_addprocs_timeout(mode) - - t_start = time() - t_waited = zero(t_start) - n_added_last = 0 - while true - t_waited = time() - t_start - if t_waited > timeout - @error "Timeout after waiting for workers to connect for $t_waited seconds" - break - end - n_added = Distributed.nprocs() - n_previous - if n_added > n_added_last - @info "$n_added of $n_to_add additional workers have connected" - end - if n_added == n_to_add - break - elseif n_added > n_to_add - @warn "More workers connected than expected: $n_added > $n_to_add" - break - end - - n_added_last = n_added - sleep(1) - end - - new_workers = setdiff(Distributed.workers(), old_procs) - n_new = length(new_workers) - - @info "Initializing $n_new new Julia worker processes" - _run_always_everywhere_code(new_workers, pre_always = worker_init_code(mode)) - - @info "Added $n_new new Julia worker processes" - - if n_new != n_to_add - throw(ErrorException("Tried to add $n_to_add new workers, but added $n_new")) - end - finally - unlock(_g_processops_lock) - end -end - - -""" - ParallelProcessingTools.ExternalProcesses(; - nprocs::Integer = ... - ) - -Add worker processes by starting them externally. - -Will log (via `@info`) a worker start command and then wait for the workers to -connect. The user is responsible for starting the specified number of workers -externally using that start command. - -Example: - -```julia -mode = ExternalProcesses(nprocs = 4) -addworkers(mode) -``` - -The user now has to start 4 Julia worker processes externally using the logged -start command. This start command can also be retrieved via -[`worker_start_command(mode)`](@ref). -""" -@with_kw struct ExternalProcesses <: ElasticAddProcsMode - nprocs::Int = 1 -end -export ExternalProcesses - - -function worker_start_command(mode::ExternalProcesses, manager::ClusterManagers.ElasticManager) - worker_nthreads = nthreads() - julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads` - elastic_localworker_startcmd(manager, julia_flags = julia_flags), mode.nprocs -end - -function start_elastic_workers(mode::ExternalProcesses, manager::ClusterManagers.ElasticManager) - start_cmd, n_workers = worker_start_command(mode, manager) - @info "To add Julia worker processes, run ($n_workers times in parallel, I'll wait for them): $start_cmd" - return n_workers -end diff --git a/src/custom_cluster_managers.jl b/src/custom_cluster_managers.jl new file mode 100644 index 0000000..f93e5d8 --- /dev/null +++ b/src/custom_cluster_managers.jl @@ -0,0 +1,180 @@ +# This code is a modified version of ClusterManagers.ElasticManager, both +# original code and modifications are licensed under the MIT License (MIT): +# https://github.com/JuliaParallel/ClusterManagers.jl/blob/master/LICENSE.md + +# Modifications are planned to be upstreamed, once tested in the field. + +module CustomClusterManagers + +# ================================================================== +using Distributed +using Sockets +using Pkg + +import Distributed: launch, manage, kill, init_worker, connect +# ================================================================== + + +# The master process listens on a well-known port +# Launched workers connect to the master and redirect their STDOUTs to the same +# Workers can join and leave the cluster on demand. + +export ElasticManager, elastic_worker + +const HDR_COOKIE_LEN = Distributed.HDR_COOKIE_LEN + +struct ElasticManager <: ClusterManager + active::Dict{Int, WorkerConfig} # active workers + pending::Channel{TCPSocket} # to be added workers + terminated::Set{Int} # terminated worker ids + topology::Symbol + sockname + manage_callback + printing_kwargs + + function ElasticManager(; + addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, + topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=() + ) + Distributed.init_multi() + cookie !== nothing && cluster_cookie(cookie) + + # Automatically check for the IP address of the local machine + if addr == :auto + try + addr = Sockets.getipaddr(IPv4) + catch + error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") + end + end + + l_sock = listen(addr, port) + + lman = new(Dict{Int, WorkerConfig}(), Channel{TCPSocket}(typemax(Int)), Set{Int}(), topology, getsockname(l_sock), manage_callback, printing_kwargs) + + @async begin + while true + let s = accept(l_sock) + @async process_worker_conn(lman, s) + end + end + end + + @async process_pending_connections(lman) + + lman + end +end + +ElasticManager(port) = ElasticManager(;port=port) +ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) +ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) + +elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing + +function process_worker_conn(mgr::ElasticManager, s::TCPSocket) + @debug "ElasticManager got new worker connection" + # Socket is the worker's STDOUT + wc = WorkerConfig() + wc.io = s + + # Validate cookie + cookie = read(s, HDR_COOKIE_LEN) + if length(cookie) < HDR_COOKIE_LEN + error("Cookie read failed. Connection closed by peer.") + end + self_cookie = cluster_cookie() + for i in 1:HDR_COOKIE_LEN + if UInt8(self_cookie[i]) != cookie[i] + println(i, " ", self_cookie[i], " ", cookie[i]) + error("Invalid cookie sent by remote worker.") + end + end + + put!(mgr.pending, s) +end + +function process_pending_connections(mgr::ElasticManager) + while true + wait(mgr.pending) + try + addprocs(mgr; topology=mgr.topology) + catch e + showerror(stderr, e) + Base.show_backtrace(stderr, Base.catch_backtrace()) + end + end +end + +function launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) + # The workers have already been started. + while isready(mgr.pending) + @debug "ElasticManager.launch new worker" + wc=WorkerConfig() + wc.io = take!(mgr.pending) + push!(launched, wc) + end + + notify(c) +end + +function manage(mgr::ElasticManager, id::Integer, config::WorkerConfig, op::Symbol) + if op == :register + @debug "ElasticManager registering process id $id" + mgr.active[id] = config + mgr.manage_callback(mgr, id, op) + elseif op == :deregister + @debug "ElasticManager deregistering process id $id" + mgr.manage_callback(mgr, id, op) + delete!(mgr.active, id) + push!(mgr.terminated, id) + end +end + +function Base.show(io::IO, mgr::ElasticManager) + iob = IOBuffer() + + println(iob, "ElasticManager:") + print(iob, " Active workers : [ ") + for id in sort(collect(keys(mgr.active))) + print(iob, id, ",") + end + seek(iob, position(iob)-1) + println(iob, "]") + + println(iob, " Number of workers to be added : ", Base.n_avail(mgr.pending)) + + print(iob, " Terminated workers : [ ") + for id in sort(collect(mgr.terminated)) + print(iob, id, ",") + end + seek(iob, position(iob)-1) + println(iob, "]") + + println(iob, " Worker connect command : ") + print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) + + print(io, String(take!(iob))) +end + +# Does not return. If executing from a REPL try +# @async elastic_worker(.....) +# addr, port that a ElasticManager on the master processes is listening on. +function elastic_worker( + cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009; + stdout_to_master::Bool = true, + Base.@nospecialize(env::AbstractVector = [],) +) + @debug "ElasticManager.elastic_worker(cookie, $addr, $port; stdout_to_master=$stdout_to_master, env=$env)" + for (k, v) in env + ENV[k] = v + end + + c = connect(addr, port) + write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) + stdout_to_master && redirect_stdout(c) + start_worker(c, cookie) +end + + +end # module CustomClusterManagers diff --git a/src/display.jl b/src/display.jl new file mode 100644 index 0000000..256e84d --- /dev/null +++ b/src/display.jl @@ -0,0 +1,80 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +const _g_unicode_vbars = ['\u2800', '\u2581', '▂', '▃', '▄', '▅', '▆', '▇', '█'] + +const _g_unicode_state = ( + green = '🟢', + yellow = '🟡', + orange = '🟠', + red = '🔴' +) + +const _g_unicode_occupancy = ( + sleeping = '💤', + working = '🔧', + onfire = '🔥', + overloaded = '🤯', + waiting = '⏳', + blocked = '🚫', + finished = '🏁', + failed = '❌', + succeeded = '✅', + unknown = '❓' +) + + +""" + ParallelProcessingTools.in_vscode_notebook():Bool + +Test if running within a Visual Studio Code notebook. +""" +in_vscode_notebook() = haskey(ENV, "VSCODE_CWD") + + +""" + ParallelProcessingTools.printover(f_show::Function, io::IOBuffer) + +Runs `f_show(tmpio)` with an IO buffer, then clears the required number of +lines on `io` (typically `stdout`) and prints the output over them. +""" +function printover(f_show, io) + vscode_nb_mode = in_vscode_notebook() + + tmpio = IOBuffer() + f_show(tmpio) + seekstart(tmpio) + output_lines = readlines(tmpio) + if vscode_nb_mode + output_lines = [join(strip.(output_lines), " | ")] + end + + n_lines = length(output_lines) + _move_cursor_up_while_clearing_lines(io, n_lines) + for l in output_lines + _printover_screen(io, l) + println(io) + end +end + +# Taken from ProgressMeter.jl: +function _move_cursor_up_while_clearing_lines(io, numlinesup) + if numlinesup > 0 && (isdefined(Main, :IJulia) && Main.IJulia.inited) + Main.IJulia.clear_output(true) + else + for _ in 1:numlinesup + print(io, "\r\u1b[K\u1b[A") + end + end +end + +# Taken from ProgressMeter.jl: +function _printover_screen(io::IO, s::AbstractString, color::Symbol = :color_normal) + print(io, "\r") + printstyled(io, s; color=color) + if isdefined(Main, :IJulia) + Main.IJulia.stdio_bytes[] = 0 # issue #76: circumvent IJulia I/O throttling + elseif isdefined(Main, :ESS) || isdefined(Main, :Atom) + else + print(io, "\u1b[K") # clear the rest of the line + end +end diff --git a/src/exceptions.jl b/src/exceptions.jl index b58ca59..d424480 100644 --- a/src/exceptions.jl +++ b/src/exceptions.jl @@ -1,17 +1,34 @@ # This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). +""" + ParallelProcessingTools.inner_exception(err) + +Replaces exceptions like a `TaskFailedException` or a `RemoteException` with +their underlying cause. Leaves other exceptions unchanged. +""" +function inner_exception end +export inner_exception + +inner_exception(err) = err +inner_exception(err::CompositeException) = CompositeException(inner_exception.(err.exceptions)) +inner_exception(err::TaskFailedException) = err.task.result +inner_exception(err::RemoteException) = err.captured.ex + + """ ParallelProcessingTools.original_exception(err) -Replaces `TaskFailedException`s and `RemoteException`s with the underlying -exception that originated within the task or on the remote process. +Replaces (possibly nested) exceptions like a `TaskFailedException` or +`RemoteException`s with the innermost exception, likely to be the one that +was thrown originally. Leaves other exceptions unchanged. """ function original_exception end +export original_exception original_exception(err) = err original_exception(err::CompositeException) = CompositeException(original_exception.(err.exceptions)) -original_exception(err::TaskFailedException) = err.task.result -original_exception(err::RemoteException) = err.captured.ex +original_exception(err::TaskFailedException) = original_exception(err.task.result) +original_exception(err::RemoteException) = original_exception(err.captured.ex) """ @@ -19,12 +36,13 @@ original_exception(err::RemoteException) = err.captured.ex Replaces `CompositeException`s with their first exception. -Also employs `original_exception` if `simplify` is `true`. +Also employs `inner_exception` if `simplify` is `true`. """ function onlyfirst_exception end +export onlyfirst_exception onlyfirst_exception(err) = err -onlyfirst_exception(err::CompositeException) = first(err.exceptions) +onlyfirst_exception(err::CompositeException) = first(err) """ @@ -36,15 +54,38 @@ If multiple exceptions originate from parallel code in `expr`, only one is rethrown, and `TaskFailedException`s and `RemoteException`s are replaced by the original exceptions that caused them. -See [`original_exception`] and [`onlyfirst_exception`](@ref). +See [`inner_exception`] and [`onlyfirst_exception`](@ref). """ macro userfriendly_exceptions(expr) quote try $(esc(expr)) catch err - rethrow(original_exception(onlyfirst_exception(err))) + rethrow(inner_exception(onlyfirst_exception(err))) end end end export @userfriendly_exceptions + + +""" + @return_exceptions expr + +Runs `expr` and catches and returns exceptions as values instead of having +them thrown. + +Useful for user-side debugging, especially of parallel and/or remote code +execution. + +See also [`@userfriendly_exceptions`](@ref). +""" +macro return_exceptions(expr) + quote + try + $(esc(expr)) + catch err + err + end + end +end +export @return_exceptions diff --git a/src/fileio.jl b/src/fileio.jl index 71e8d96..eaabadc 100644 --- a/src/fileio.jl +++ b/src/fileio.jl @@ -45,33 +45,78 @@ tmp_filename(fname::AbstractString) = tmp_filename(fname, dirname(fname)) _rand_fname_tag() = String(rand(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", 8)) +const _g_default_cachedir = Ref{String}("") +const _g_default_cachedir_lock = ReentrantLock() + +""" + ParallelProcessingTools.default_cache_dir()::String + +Returns the default cache directory, e.g. for [`create_files`](@ref) and +`read_files`(@ref). + +See also [`default_cache_dir!`](@ref). +""" +function default_cache_dir() + lock(_g_default_cachedir_lock) do + if isempty(_g_default_cachedir[]) + cache_dir = _generate_cache_path() + @info "Setting default cache directory to \"$cache_dir\"" + default_cache_dir!(cache_dir) + end + return _g_default_cachedir[] + end +end + +function _generate_cache_path() + username_var = Sys.iswindows() ? "USERNAME" : "USER" + tag = get(ENV, username_var, _rand_fname_tag()) + return joinpath(tempdir(), "pptjl-cache-$tag") +end + + +""" + ParallelProcessingTools.default_cache_dir!(dir::AbstractString) + +Sets the default cache directory to `dir` and returns it. + +See also [`default_cache_dir!`](@ref). +""" +function default_cache_dir!(dir::AbstractString) + lock(_g_default_cachedir_lock) do + _g_default_cachedir[] = dir + return _g_default_cachedir[] + end +end + + + """ function create_files( - f_write, filenames::AbstractString...; + f_create, filenames::AbstractString...; overwrite::Bool = true, - use_cache::Bool = false, cache_dir::AbstractString = tempdir(), + use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), create_dirs::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = true + verbose::Bool = false ) Creates `filenames` in an atomic fashion via a user-provided function -`f_write`. Returns `nothing`. +`f_create`. Returns `nothing`. -Using temporary filenames, calls `f_write(temporary_filenames...)`. If -`f_write` doesn't throw an exception, the files `temporary_filenames` are -renamed to `filenames`. If `f_write` throws an exception, the temporary files +Using temporary filenames, calls `f_create(temporary_filenames...)`. If +`f_create` doesn't throw an exception, the files `temporary_filenames` are +renamed to `filenames`. If `f_create` throws an exception, the temporary files are either deleted (if `delete_tmp_onerror` is `true`) or left in place (e.g. for debugging purposes). -If `create_dirs` is `true`, the `temporary_filenames` are created in -`cache_dir` and then atomically moved to `filenames`, otherwise, they are +If `use_cache` is `true`, the `temporary_filenames` are created in +`cache_dir` and then atomically moved to `filenames`, otherwise they are created next to `filenames` (in the same directories). If `create_dirs` is `true`, directories are created if necessary. If all of `filenames` already exist and `overwrite` is `false`, takes no action (or, on case the files are created by other code running in parallel, -while `f_write` is running, does not replace them). +while `f_create` is running, does not replace them). If `verbose` is `true`, uses log-level `Logging.Info` to log file creation, otherwise `Logging.Debug`. @@ -94,13 +139,74 @@ intermediate steps. On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use the default Linux RAM disk as an intermediate directory. + +See also [`read_files`](@ref), [`modify_files`](@ref) and +[`ParallelProcessingTools.default_cache_dir`](@ref). """ function create_files( - @nospecialize(f_write), @nospecialize(filenames::AbstractString...); + @nospecialize(f_create), @nospecialize(filenames::AbstractString...); overwrite::Bool = true, - use_cache::Bool = false, cache_dir::AbstractString = tempdir(), + use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), create_dirs::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = true + verbose::Bool = false +) + _create_modify_file_impl(false, f_create, filenames, overwrite, use_cache, String(cache_dir), create_dirs, delete_tmp_onerror, verbose) +end +export create_files + + +""" + function modify_files( + f_modify, filenames::AbstractString...; + use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), + create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = false + ) + +Modifies `filenames` in an atomic fashion via a user-provided function +`f_modify`. Returns `nothing`. + +Using temporary filenames, first copies the files `filenames` to temporary +filenames. Then calls `f_modify(temporary_filenames...)`. If `f_modify` +doesn't throw an exception, the files `temporary_filenames` are then renamed +to `filenames`, replacing them. + +If `use_cache` is `true`, the `temporary_filenames` are created in +`cache_dir`, otherwise they are created next to `filenames` (in the same +directories). + +Otherwise behaves like [`create_files`](@ref) and [`read_files`](@ref) in +regard to logging and cache and error handling. + +Returns `nothing`. + +Example: + +```julia +write("foo.txt", "Nothing"); write("bar.txt", "here") + +modify_files("foo.txt", "bar.txt", use_cache = true) do foo, bar + write(foo, "Hello") + write(bar, "World") +end +``` + +See also [`ParallelProcessingTools.default_cache_dir`](@ref). +""" +function modify_files( + @nospecialize(f_modify), @nospecialize(filenames::AbstractString...); + use_cache::Bool = false, cache_dir::AbstractString = default_cache_dir(), + create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = false +) + _create_modify_file_impl(true, f_modify, filenames, true, use_cache, String(cache_dir), create_cachedir, delete_tmp_onerror, verbose) +end +export modify_files + + +function _create_modify_file_impl( + modify_mode::Bool, f_create_or_modify, filenames, + overwrite::Bool, use_cache::Bool, cache_dir::String, create_dirs::Bool, delete_tmp_onerror::Bool, verbose::Bool ) loglevel = verbose ? Info : Debug @@ -147,9 +253,18 @@ function create_files( @assert !any(isfile, staging_fnames) writeto_fnames = use_cache ? cache_fnames : staging_fnames - @debug "Creating intermediate files $writeto_fnames." - f_write(writeto_fnames...) + if modify_mode + @debug "Copying files $target_fnames to intermediate files $writeto_fnames." + read_files(target_fnames...; use_cache=false) do readfrom_fnames... + _parallel_cp(readfrom_fnames, writeto_fnames) + end + @debug "Modifying intermediate files $writeto_fnames." + else + @debug "Creating intermediate files $writeto_fnames." + end + f_create_or_modify(writeto_fnames...) + post_f_write_existing = isfile.(target_fnames) if any(post_f_write_existing) if all(post_f_write_existing) @@ -164,14 +279,7 @@ function create_files( try if use_cache - @userfriendly_exceptions @sync for (cache_fn, staging_fn) in zip(cache_fnames, staging_fnames) - Threads.@spawn begin - @assert cache_fn != staging_fn - @debug "Moving file \"$cache_fn\" to \"$staging_fn\"." - isfile(cache_fn) || error("Expected file \"$cache_fn\" to exist, but it doesn't.") - mv(cache_fn, staging_fn) - end - end + _parallel_mv(cache_fnames, staging_fnames) empty!(cache_fnames) end @@ -222,15 +330,14 @@ function create_files( return nothing end -export create_files """ function read_files( f_read, filenames::AbstractString...; - use_cache::Bool = true, cache_dir::AbstractString = tempdir(), + use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(), create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = true + verbose::Bool = false ) Reads `filenames` in an atomic fashion (i.e. only if all `filenames` exist) @@ -244,7 +351,7 @@ afterwards. If `create_cachedir` is `true`, then `cache_dir` will be created if it doesn't exist yet. If `delete_tmp_onerror` is true, then temporary files are -deleted even if `f_write` throws an exception. +deleted even if `f_create` throws an exception. If `verbose` is `true`, uses log-level `Logging.Info` to log file reading, otherwise `Logging.Debug`. @@ -262,12 +369,15 @@ intermediate steps. On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use the default Linux RAM disk as an intermediate directory. + +See also [`create_files`](@ref), [`modify_files`](@ref) and +[`ParallelProcessingTools.default_cache_dir`](@ref). """ function read_files( @nospecialize(f_read), @nospecialize(filenames::AbstractString...); - use_cache::Bool = true, cache_dir::AbstractString = tempdir(), + use_cache::Bool = true, cache_dir::AbstractString = default_cache_dir(), create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, - verbose::Bool = true + verbose::Bool = false ) loglevel = verbose ? Info : Debug @@ -290,14 +400,7 @@ function read_files( append!(cache_fnames, tmp_filename.(source_fnames, Ref(cache_dir))) @assert !any(isfile, cache_fnames) - @userfriendly_exceptions @sync for (cache_fn, source_fn) in zip(cache_fnames, source_fnames) - Threads.@spawn begin - @assert cache_fn != source_fn - @debug "Copying file \"$source_fn\" to \"$cache_fn\"." - cp(source_fn, cache_fn) - isfile(cache_fn) || error("Tried to copy file \"$source_fn\" to \"$cache_fn\", but \"$cache_fn\" doesn't exist.") - end - end + _parallel_cp(source_fnames, cache_fnames) end readfrom_fnames = use_cache ? cache_fnames : source_fnames @@ -322,3 +425,27 @@ function read_files( end end export read_files + + +function _parallel_mv(source_fnames, target_fnames) + @userfriendly_exceptions @sync for (source_fn, target_fn) in zip(source_fnames, target_fnames) + Threads.@spawn begin + @assert source_fn != target_fn + @debug "Moving file \"$source_fn\" to \"$target_fn\"." + isfile(source_fn) || error("Expected file \"$source_fn\" to exist, but it doesn't.") + mv(source_fn, target_fn) + end + end +end + + +function _parallel_cp(source_fnames, target_fnames) + @userfriendly_exceptions @sync for (target_fn, source_fn) in zip(target_fnames, source_fnames) + Threads.@spawn begin + @assert target_fn != source_fn + @debug "Copying file \"$source_fn\" to \"$target_fn\"." + cp(source_fn, target_fn) + isfile(target_fn) || error("Tried to copy file \"$source_fn\" to \"$target_fn\", but \"$target_fn\" did't exist afterwards.") + end + end +end diff --git a/src/onworkers.jl b/src/onworkers.jl new file mode 100644 index 0000000..6484c49 --- /dev/null +++ b/src/onworkers.jl @@ -0,0 +1,236 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + + +""" + TimelimitExceeded <: Exception + +Exception thrown something timed out. +""" +struct TimelimitExceeded <: Exception + max_time::Float64 + elapsed_time::Float64 +end + + +""" + MaxTriesExceeded <: Exception + +Exception thrown when a number of (re-)tries was exceeded. +""" +struct MaxTriesExceeded <: Exception + max_tries::Int + n_tries::Int + retry_reason::Exception +end + + +_should_retry(::Any) = false +_should_retry(::Exception) = false +_should_retry(::TimelimitExceeded) = true +_should_retry(err::RemoteException) = _should_retry(err.captured.ex) + + +""" + onworker( + f::Function, args...; + pool::AbstractWorkerPool = ppt_worker_pool(), + maxtime::Real = 0, tries::Integer = 1, label::AbstractString = "" + ) + +Runs `f(args...)` on an available worker process from the given `pool` and +returns the result. + +If `maxtime > 0`, a maximum time for the activity is set. If the activity takes longer +than `maxtime` seconds, the process running it (if not the main process) will be +terminated. + +`label` is used for debug-logging. + +If a problem occurs (maxtime or worker failure) while running the activity, +reschedules the task if the maximum number of tries has not yet been reached, +otherwise throws an exception. +""" +function onworker end +export onworker + +function onworker( + f::Function; + @nospecialize(pool::AbstractWorkerPool = ppt_worker_pool()), + @nospecialize(maxtime::Real = 0), @nospecialize(tries::Integer = 1), @nospecialize(label::AbstractString = "") +) + R = _return_type(f, ()) + untyped_result = _on_worker_impl(f, (), pool, Float64(maxtime), Int(tries), String(label)) + return convert(R, untyped_result)::R +end + +function onworker( + f::Function, arg1, args...; + @nospecialize(pool::AbstractWorkerPool = ppt_worker_pool()), + @nospecialize(maxtime::Real = 0), @nospecialize(tries::Integer = 1), @nospecialize(label::AbstractString = "") +) + all_args = (arg1, args...) + R = _return_type(f, all_args) + untyped_result = _on_worker_impl(f, all_args, pool, Float64(maxtime), Int(tries), String(label)) + + @assert !(untyped_result isa Exception) + return convert(R, untyped_result)::R +end + +_return_type(f, args::Tuple) = Core.Compiler.return_type(f, typeof(args)) + + +@noinline function _on_worker_impl( + @nospecialize(f::Function), @nospecialize(args::Tuple), + @nospecialize(pool::AbstractWorkerPool), maxtime::Float64, tries::Int, label::String +) + n_tries::Int = 0 + while n_tries < tries + n_tries += 1 + activity = _Activity(f, label, tries) + + @debug "Preparing to run $activity, taking a worker from $(getlabel(pool))" + worker = take!(pool) + + start_time = time() + elapsed_time = zero(start_time) + + try + @debug "Running $activity on worker $worker" + + future_result = remotecall(f, worker, args...) + + result_isready = try + if maxtime > 0 + # May throw an exception: + wait_for_any(future_result, maxtime = maxtime) + else + # May throw an exception: + wait(future_result) + end + elapsed_time = time() - start_time + + isready(future_result) + catch err + # Testing if future is ready may throw exceptions from f already: + if _should_retry(err) + if !(n_tries < tries) + inner_err = inner_exception(err) + throw(MaxTriesExceeded(tries, n_tries, inner_err)) + else + @debug "Will retry $activity ($n_tries tries so far) due to" err + end + else + throw(err) + end + true + end + + if result_isready + # With a `remotecall` to the current process, fetch will return exceptions + # originating in the called function, while if run on a remote process they + # will be thrown to the caller of fetch. We need to unify this behavior: + + fetched_result = @return_exceptions fetch(future_result) + + if fetched_result isa Exception + err = fetched_result + if _should_retry(err) + if !(n_tries < tries) + inner_err = inner_exception(err) + throw(MaxTriesExceeded(tries, n_tries, inner_err)) + else + @debug "Will retry $activity ($n_tries tries so far) due to" err + end + else + throw(err) + end + else + @debug "Worker $worker ran $activity successfully in $elapsed_time s" + return fetched_result + end + else + # Sanity check: if we got here, we must have timed out: + @assert maxtime > 0 && elapsed_time > maxtime + + @warn "Running $activity on worker $worker timed out after $elapsed_time s (max runtime $(maxtime) s)" + + if worker == myid() + @warn "Will not terminate main process $worker, making it available again, but it may still running timed-out $activity" + else + @warn "Terminating worker $worker due to activity maxtime" + rmprocs(worker) + end + + if !(n_tries < tries) + err = TimelimitExceeded(maxtime, elapsed_time) + @debug "Giving up on $activity after $n_tries tries due to" err + throw(MaxTriesExceeded(tries, n_tries, err)) + end + end + catch err + if err isa ProcessExitedException + @warn "Worker $worker seems to have terminated during $activity" + # This try doesn't count: + n_tries -= 1 + # Make certain that worker is really gone: + rmprocs(worker) + elseif err isa RemoteException + orig_err = inner_exception(err) + if orig_err isa MethodError + func = orig_err.f + func_args = orig_err.args + func_name = string(typeof(func)) + func_module = nameof(parentmodule(parentmodule(typeof(func)))) + func_hasmethod_local = hasmethod(func, map(typeof, func_args)) + if func_module == :Serialization && func_hasmethod_local + @warn "Function $func_name may be corrupted on worker $worker (missing method), terminating worker." + rmprocs(worker) + # This try doesn't count: + n_tries -= 1 + else + rethrow() + end + else + @debug "Encountered exception while trying to run $activity on worker $worker:" orig_err + rethrow() + end + elseif err isa MaxTriesExceeded + retry_reason = err.retry_reason + @debug "Giving up on $activity after $err.n_tries tries due to" retry_reason + rethrow() + else + @debug "Encountered unexpected exception while trying to run $activity on worker $worker:" err + rethrow() + end + finally + put!(pool, worker) + end + end + # Should never reach this point: + @assert false +end + + +# ToDo: Turn Actitity into a runnable thing, with map and bcast specialiizations: +struct _Activity + f::Function + label::String + max_tries::Int + # n_tries::Int # ToDo - should n_tries be part of activity objects? + # Add max_time::Float64 +end + +function Base.show(io::IO, activity::_Activity) + print(io, "activity ") + if isempty(activity.label) + print(io, nameof(typeof(activity.f))) + else + print(io, "\"$(activity.label)\"") + end + #if activity.n_tries > 1 && activity.max_tries > 1 + # print(io, " (try $(activity.n_tries) of $(activity.max_tries))") + #end +end + + +# ToDo: Add function `async_onworker(f, ...)` ? diff --git a/src/procinit.jl b/src/procinit.jl new file mode 100644 index 0000000..ff079dd --- /dev/null +++ b/src/procinit.jl @@ -0,0 +1,410 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + + +""" + isvalid_pid(pid::Int)::Bool + +Tests if `pid` is a valid Julia process ID. + +Equivalent to `pid in Distributed.procs()`, but faster. +""" +function isvalid_pid end +export isvalid_pid + +# Distributed.id_in_procs is not public API, so we need a fallback when using it: +@static if isdefined(Distributed, :id_in_procs) + isvalid_pid(pid::Int) = Distributed.id_in_procs(pid) +else + isvalid_pid(pid::Int) = pid in Distributed.procs() +end + + +""" + ParallelProcessingTools.allprocs_management_lock()::ReentrantLock + +Returns the global process operations lock. This lock is used to protect +operations that concern the management of all processes. +""" +@inline allprocs_management_lock() = _g_allprocsmgmt_lock + +const _g_allprocsmgmt_lock = ReentrantLock() + +const _g_worker_hosts = IdDict{Int,String}() + +function _register_process(pid::Int) + registered, pidlock = lock(allprocs_management_lock()) do + return haskey(_g_worker_hosts, pid), proc_management_lock(pid) + end + if !registered + lock(pidlock) do + full_hostname = if pid == myid() + Base.gethostname() + else + remotecall_fetch(Base.gethostname, pid) + end + hostname = String(first(split(full_hostname, "."))) + + lock(allprocs_management_lock()) do + _g_worker_hosts[pid] = hostname + end + end + end + return nothing +end + +function _worker_hosts() + lock(allprocs_management_lock()) do + return deepcopy(_g_worker_hosts) + end +end + + +""" + ParallelProcessingTools.proc_management_lock(pid::Integer)::ReentrantLock + +Returns a process-specific lock. This lock is used to protect operations that +concern the management process `pid`. +""" +function proc_management_lock(pid::Integer) + try + lock(allprocs_management_lock()) + # Ensure _g_procmgmt_procinfo has an entry for pid: + get!(_g_procmgmt_initlvl, pid, 0) + return get!(_g_procmgmt_locks, pid, ReentrantLock()) + finally + unlock(allprocs_management_lock()) + end +end + +const _g_procmgmt_locks = IdDict{Integer, ReentrantLock}() +const _g_procmgmt_initlvl = IdDict{Integer,Integer}() + + +""" + ParallelProcessingTools.current_procinit_level() + +Return the init level of the current process. + +See also [`global_procinit_level`](@ref). +""" +function current_procinit_level() + try + lock(allprocs_management_lock()) + return _current_procinit_level[]::Int + finally + unlock(allprocs_management_lock()) + end +end + +const _current_procinit_level = Ref(0) + + +""" + ParallelProcessingTools.global_procinit_level() + +Return the global process init level. + +Returns, e.g., the number of times [`add_procinit_code`](@ref) resp. +[`@always_everywhere`](@ref) have been called. + +See also [`current_procinit_level`](@ref). +""" +function global_procinit_level() + try + lock(allprocs_management_lock()) + return _global_procinit_level[]::Int + finally + unlock(allprocs_management_lock()) + end +end + +const _global_procinit_level = Ref(1) + + +""" + ParallelProcessingTools.get_procinit_code() + +Returns the code that should be run on each process to ensure that desired +packages are loaded and global variable are set up as expected. + +See also [`ParallelProcessingTools.add_procinit_code`](@ref) and +[`ParallelProcessingTools.ensure_procinit`](@ref). +""" +function get_procinit_code() + try + lock(allprocs_management_lock()) + return _g_procinit_code + finally + unlock(allprocs_management_lock()) + end +end + + +const _g_initial_procinit_code = quote + if !isdefined(Main, :ParallelProcessingTools) + import ParallelProcessingTools + end + ParallelProcessingTools._initial_procinit_done() +end + +function _initial_procinit_done() + try + lock(ParallelProcessingTools.allprocs_management_lock()) + if ParallelProcessingTools._current_procinit_level[] < 1 + ParallelProcessingTools._current_procinit_level[] = 1 + end + finally + unlock(ParallelProcessingTools.allprocs_management_lock()) + end +end + +const _g_procinit_code = Expr(:block) + +const _g_wrapped_procinit_code = Expr(:block) + + +function _initial_init_current_process() + # Need to explicitly run _g_initial_procinit_code explicitly on current process once: + if _current_procinit_level[] < 1 + @debug "Running initial process initialization code on current process $(myid())" + Core.eval(Main, _g_initial_procinit_code) + end +end + + +""" + ParallelProcessingTools.add_procinit_code(expr; run_everywhere::Bool = false) + +Add `expr` to process init code. `expr` is run on the current proccess +immediately, but not automatically on remote processes unless `run_everywhere` +is `true`. + +User code should typically not need to call this function, but should use +[`@always_everywhere`](@ref) instead. + +See also [`ParallelProcessingTools.get_procinit_code`](@ref) and +[`ParallelProcessingTools.ensure_procinit`](@ref). +""" +@noinline function add_procinit_code(init_code; run_everywhere::Bool = false) + try + lock(allprocs_management_lock()) + + next_init_level = _global_procinit_level[] + 1 + + _initial_init_current_process() + Core.eval(Main, init_code) + + _store_additional_procinit_code(init_code, next_init_level) + + _global_procinit_level[] = next_init_level + _current_procinit_level[] = next_init_level + + return nothing + finally + unlock(allprocs_management_lock()) + end + + if run_everywhere + ensure_procinit_or_kill(pids) + end +end + + +function _store_additional_procinit_code(init_code::Expr, init_level::Int) + push!(_g_procinit_code.args, _initstep_wrapperexpr(init_code, init_level)) + + wrapped_init_code = _initcode_wrapperexpr(_g_procinit_code, init_level) + _g_wrapped_procinit_code.head = wrapped_init_code.head + _g_wrapped_procinit_code.args = wrapped_init_code.args +end + + +function _initstep_wrapperexpr(init_step_code::Expr, next_init_level::Int) + quote + if ParallelProcessingTools._current_procinit_level[] < $next_init_level + $init_step_code + ParallelProcessingTools._current_procinit_level[] = $next_init_level + end + end +end + + +function _initcode_wrapperexpr(init_code::Expr, target_init_level::Int) + quoted_init_code = Expr(:quote, init_code) + + quote + $_g_initial_procinit_code + + ParallelProcessingTools._execute_procinit_code( + $quoted_init_code, + $target_init_level + ) + end +end + + +function _execute_procinit_code(init_code::Expr, target_level::Int) + current_pid = myid() + try + lock(allprocs_management_lock()) + + if _global_procinit_level[] < target_level + _global_procinit_level[] = target_level + end + + current_level = current_procinit_level() + + if current_level < target_level + #@debug "Raising process $current_pid init level from $current_level to $target_level" + Core.eval(Main, init_code) + if current_procinit_level() != target_level + error("Failed to raise process $current_pid init level to $target_level, worker on level $current_level") + end + elseif current_level == target_level + #@debug "Process $current_pid init level already at $current_level of $target_level" + else + #@debug "Process $current_pid init level $current_level already higher than requested init level $target_level" + end + + return nothing + catch err + @error "Error while running init code on process $current_pid:" err + rethrow() + finally + unlock(allprocs_management_lock()) + end +end + + + +""" + ensure_procinit(pid::Int) + ensure_procinit(pids::AbstractVector{Int} = workers()) + +Run process initialization code on the given process(es) if necessary, +returns after initialization is complete. + +When using a [`FlexWorkerPool`](@ref), worker initialization can safely be run +in the background though, as the pool will only offer workers +(via `take!(pool)`) after it has fully initialized them. + +See also [`ParallelProcessingTools.get_procinit_code`](@ref) +and [`ParallelProcessingTools.add_procinit_code`](@ref). + +See also [`ParallelProcessingTools.get_procinit_code`](@ref), +[`ParallelProcessingTools.ensure_procinit`](@ref), +[`ParallelProcessingTools.global_procinit_level`](@ref) and +[`ParallelProcessingTools.current_procinit_level`](@ref). +""" +function ensure_procinit end +export ensure_procinit + +@noinline function ensure_procinit(pid::Int) + init_level, pid_lock = lock(allprocs_management_lock()) do + _initial_init_current_process() + global_procinit_level(), proc_management_lock(pid) + end + + if pid != myid() + lock(pid_lock) do + pid_initlvl = _g_procmgmt_initlvl[pid] + if pid_initlvl < init_level + wrapped_init_code = _g_wrapped_procinit_code + _init_single_process(pid, pid_lock, init_level, wrapped_init_code) + end + end + else + # Nothing to do: Current process should always be initialized already + end + return nothing +end + +@noinline function _init_single_process(pid::Int, pid_lock::ReentrantLock, init_level::Int, wrapped_init_code::Expr) + try + @debug "Initializing process $pid to init level $init_level." + lock(pid_lock) + + # ToDo: Maybe use fetch with timeout? + remotecall_fetch(Core.eval, pid, Main, wrapped_init_code) + + _g_procmgmt_initlvl[pid] = init_level + #@debug "Initialization of process $pid to init level $init_level complete." + catch err + orig_err = inner_exception(err) + @error "Error while running init code on process $pid:" orig_err + throw(err) + finally + unlock(pid_lock) + end + return nothing +end + +@noinline function ensure_procinit(pids::AbstractVector{Int} = workers()) + @sync for pid in pids + Threads.@spawn ensure_procinit(pid) + end +end + + +""" + ParallelProcessingTools.ensure_procinit_or_kill(pid::Int) + ParallelProcessingTools.ensure_procinit_or_kill(pids::AbstractVector{Int} = workers()) + +Run process initialization code on the given process(es) if necessary, kill +and remove process(es) for which initialization fails. + +See also [`ParallelProcessingTools.ensure_procinit`](@ref). +""" +function ensure_procinit_or_kill end + +function ensure_procinit_or_kill(pid::Int) + try + ensure_procinit(pid) + catch err + orig_err = inner_exception(err) + @warn "Error while initializig process $pid, removing it." orig_err + rmprocs(pid) + end + return nothing +end + +@noinline function ensure_procinit_or_kill(pids::AbstractVector{Int} = workers()) + @sync for pid in pids + Threads.@spawn ensure_procinit_or_kill(pid) + end +end + + +""" + @always_everywhere(expr) + +Runs `expr` on all current Julia processes, but also all future Julia +processes after an [`ensure_procinit()`](@ref)) when managed using a +[`FlexWorkerPool`](@ref). + +Similar to `Distributed.everywhere`, but also stores `expr` so that +`ensure_procinit` can execute it on future worker processes. + +Example: + +```julia +@always_everywhere begin + using SomePackage + using SomeOtherPackage + + some_global_variable = 42 +end +``` + +See also [`ParallelProcessingTools.add_procinit_code`](@ref) and +[`ParallelProcessingTools.ensure_procinit`](@ref). +""" +macro always_everywhere(ex) + # Code partially taken from Distributed.@everywhere + quote + let ex = Expr(:toplevel, :(task_local_storage()[:SOURCE_PATH] = $(get(task_local_storage(), :SOURCE_PATH, nothing))), $(esc(Expr(:quote, ex)))) + add_procinit_code(ex, run_everywhere = true) + end + end +end +export @always_everywhere diff --git a/src/runworkers.jl b/src/runworkers.jl new file mode 100644 index 0000000..75ee0bd --- /dev/null +++ b/src/runworkers.jl @@ -0,0 +1,377 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + + + +""" + pinthreads_auto() + +!!! note + Only has an effect if + [`ThreadPinning`](https://github.com/carstenbauer/ThreadPinning.jl/) is + loaded, and only on operating systems supported by `ThreadPinning`. +""" +function pinthreads_auto end +export pinthreads_auto + +pinthreads_auto() = _pinthreads_auto_impl(Val(true)) +_pinthreads_auto_impl(::Val) = nothing + + +_getcpuids() = _getcpuids_impl(Val(true)) +_getcpuids_impl(::Val) = nothing + + +""" + worker_resources + +Get the distributed Julia worker process resources currently available. + +This may take some time as some code needs to be loaded on all processes. +Automatically runs `ensure_procinit()` before querying worker resources. +""" +function worker_resources() + ensure_procinit() + pids = Distributed.workers() + load_ft = Distributed.remotecall.(Core.eval, pids, Ref(Main), Ref(:(import ParallelProcessingTools))) + fetch.(load_ft) + resources_ft = Distributed.remotecall.(ParallelProcessingTools._current_process_resources, pids) + resources = fetch.(resources_ft) + sorted_resources = sort(resources, by = x -> x.workerid) + sorted_resources +end +export worker_resources + +function _current_process_resources() + return ( + workerid = Distributed.myid(), + hostname = Base.gethostname(), + nthreads = nthreads(), + blas_nthreads = LinearAlgebra.BLAS.get_num_threads(), + cpuids = _getcpuids() + ) +end + + + +""" + abstract type ParallelProcessingTools.RunProcsMode + +Abstract supertype for worker process run modes. + +Subtypes must implement: + +* `ParallelProcessingTools.runworkers(runmode::SomeRunProcsMode, manager::Distributed.AbstractClusterManager)` +""" +abstract type RunProcsMode end + + +""" + runworkers( + runmode::ParallelProcessingTools.RunProcsMode + manager::Distributed.AbstractClusterManager = ppt_cluster_manager() + ) + +Run Julia worker processes. + +By default ensures that all workers processes use the same Julia project +environment as the current process (requires that file systems paths are +consistenst across compute hosts). + +The new workers are managed via [`ppt_cluster_manager()`](@ref) and +automatically added to the [`ppt_worker_pool()`](@ref) + +Returns a tuple `(task, n)`. Here, `task::Task` is done when all workers +have terminated. `n` is either an `Integer`, if the number of workers that +will be started is known, or `Nothing`, if the number of workers can't be +predicted (accurately). + +Example: + +```julia +task, n = runworkers(OnLocalhost(nprocs = 4)) +``` + +See also [`worker_resources()`](@ref). +""" +function runworkers end +export runworkers + +runworkers(runmode::RunProcsMode) = runworkers(runmode, ppt_cluster_manager()) + + +""" + ParallelProcessingTools.ppt_cluster_manager() + ParallelProcessingTools.ppt_cluster_manager(manager::ClusterManager) + +Get the default ParallelProcessingTools cluster manager. +""" +function ppt_cluster_manager end +export ppt_cluster_manager + +const _g_cluster_manager = Ref{Union{Nothing,ClusterManager}}(nothing) + +function ppt_cluster_manager() + if isnothing(_g_cluster_manager[]) + _g_cluster_manager[] = ElasticManager( + addr=:auto, port=0, topology=:master_worker, manage_callback = _get_elasticmgr_add_to_pool_callback() + ) + end + return _g_cluster_manager[] +end + +""" + ParallelProcessingTools.ppt_cluster_manager!(manager::CustomClusterManagers.ElasticManager) + +Set the default ParallelProcessingTools cluster manager. +""" +function ppt_cluster_manager!(manager::ElasticManager) + _g_cluster_manager[] = manager + return _g_cluster_manager[] +end +export ppt_cluster_manager! + +function _get_elasticmgr_add_to_pool_callback(get_workerpool::Function = ppt_worker_pool) + function mgr_add_too_pool(::ElasticManager, pid::Integer, op::Symbol) + pool = get_workerpool()::AbstractWorkerPool + if op == :register + Threads.@async begin + @debug "Adding process $pid to worker pool $(getlabel(pool))." + push!(pool, pid) + @debug "Added process $pid to worker pool $(getlabel(pool))." + end + elseif op == :deregister + @debug "Process $pid is being deregistered." + else + @error "Unknown ElasticManager manage op: $op." + end + end + return mgr_add_too_pool +end + + +""" + abstract type ParallelProcessingTools.DynamicAddProcsMode <: ParallelProcessingTools.RunProcsMode + +Abstract supertype for worker start modes that use an elastic cluster manager +that enables dynamic addition and removal of worker processes. + +Subtypes must implement: + +* `ParallelProcessingTools.worker_start_command(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)` +* `ParallelProcessingTools.runworkers(runmode::SomeDynamicAddProcsMode, manager::ClusterManager)` +""" +abstract type DynamicAddProcsMode <: RunProcsMode end + + +""" + worker_start_command( + runmode::DynamicAddProcsMode, + manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager() + )::Tuple{Cmd,Integer,Integer} + +Return a tuple `(cmd, m, n)`, with system command `cmd` that needs to be +run `m` times (in parallel) to start `n` workers. +""" +function worker_start_command end +export worker_start_command + +worker_start_command(runmode::DynamicAddProcsMode) = worker_start_command(runmode, ppt_cluster_manager()) + + +""" + write_worker_start_script( + filename::AbstractString, + runmode::DynamicAddProcsMode, + manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager() + ) + +Writes the system command to start worker processes to a shell script. +""" +function write_worker_start_script( + filename::AbstractString, + runmode::DynamicAddProcsMode, + manager::ClusterManager = ParallelProcessingTools.ppt_cluster_manager() +) + wstartcmd, m, _ = worker_start_command(runmode, manager) + @assert m isa Integer && (m >= 0) + _, ext = split_basename_ext(basename(filename)) + if Sys.iswindows() + if ext == ".bat" || ext == ".BAT" + error("Worker start script generation isn't supported on Windows OS yet.") + # write(filename, Base.shell_escape_wincmd(wstartcmd)) + else + throw(ArgumentError("Script filename extension \"$ext\" not supported on Windows.")) + end + else + if ext == ".sh" + open(filename, "w") do io + chmod(filename, 0o700) + println(io, "#!/bin/sh") + if m > 0 + if m > 1 + print(io, "printf \"%s\\n\" {1..$m} | xargs -n1 -P$m -I{} ") + end + println(io, Base.shell_escape_posixly(wstartcmd)) + end + end + return filename + else + throw(ArgumentError("Script filename extension \"$ext\" not supported on Posix-like OS.")) + end + end + return nothing +end +export write_worker_start_script + + +function _elastic_worker_startjl( + @nospecialize(manager::ElasticManager), + redirect_output::Bool, + @nospecialize(env::AbstractDict{<:AbstractString,<:AbstractString}) +) + env_withdefaults = Dict{String,String}() + haskey(ENV, "JULIA_WORKER_TIMEOUT") && (env_withdefaults["JULIA_WORKER_TIMEOUT"] = ENV["JULIA_WORKER_TIMEOUT"]) + env_withdefaults["JULIA_REVISE"] = "off" + merge!(env_withdefaults, env) + env_vec = isempty(env_withdefaults) ? [] : collect(env_withdefaults) + + cookie = Distributed.cluster_cookie() + socket_name = manager.sockname + address = string(socket_name[1]) + port = convert(Int, socket_name[2]) + """import ParallelProcessingTools; ParallelProcessingTools.CustomClusterManagers.elastic_worker("$cookie", "$address", $port, stdout_to_master=$redirect_output, env=$env_vec)""" +end + +const _default_addprocs_params = Distributed.default_addprocs_params() + +_default_julia_cmd() = `$(_default_addprocs_params[:exename]) $(_default_addprocs_params[:exeflags])` +_default_julia_flags() = `` +_default_julia_project() = Pkg.project().path + + +""" + ParallelProcessingTools.worker_local_startcmd( + manager::Distributed.ClusterManager; + julia_cmd::Cmd = _default_julia_cmd(), + julia_flags::Cmd = _default_julia_flags(), + julia_project::AbstractString = _default_julia_project() + redirect_output::Bool = true, + env::AbstractDict{<:AbstractString,<:AbstractString} = ..., + )::Cmd + +Return the system command required to start a Julia worker process locally +on some host, so that it will connect to `manager`. +""" +function worker_local_startcmd( + @nospecialize(manager::Distributed.ClusterManager); + julia_cmd::Cmd = _default_julia_cmd(), + julia_flags::Cmd = _default_julia_flags(), + @nospecialize(julia_project::AbstractString = _default_julia_project()), + redirect_output::Bool = true, + @nospecialize(env::AbstractDict{<:AbstractString,<:AbstractString} = Dict{String,String}()) +) + julia_code = _elastic_worker_startjl(manager, redirect_output, env) + + `$julia_cmd --project=$julia_project $julia_flags -e $julia_code` +end + + +""" + OnLocalhost(; + n::Integer = 1 + env::Dict{String,String} = Dict{String,String}() + ) isa DynamicAddProcsMode + +Mode that runs `n` worker processes on the current host. + +Example: + +```julia +runmode = OnLocalhost(n = 4) +task, n = runworkers(runmode) + +Threads.@async begin + wait(task) + @info "SLURM workers have terminated." +end + +@wait_while nprocs()-1 < n) +``` + +Workers can also be started manually, use +[`worker_start_command(runmode)`](@ref) to get the system (shell) command and +run it from a separate process or so. +""" +@with_kw struct OnLocalhost <: DynamicAddProcsMode + n::Int + env::Dict{String,String} = Dict{String,String}() +end +export OnLocalhost + +function worker_start_command(runmode::OnLocalhost, manager::ElasticManager) + worker_nthreads = nthreads() + julia_flags = `$(_default_julia_flags()) --threads=$worker_nthreads` + worker_cmd = worker_local_startcmd( + manager; + julia_flags = julia_flags, + env = runmode.env + ) + return worker_cmd, runmode.n, runmode.n +end + +function runworkers(runmode::OnLocalhost, manager::ElasticManager) + start_cmd, m, n = worker_start_command(runmode, manager) + + task = Threads.@async begin + processes = Base.Process[] + for _ in 1:m + push!(processes, open(start_cmd)) + end + @wait_while any(isactive, processes) + end + + return task, n +end + + +#= +# ToDo: Add SSHWorkers or similar: + +@with_kw struct SSHWorkers <: RunProcsMode + hosts::Vector{Any} + ssd_flags::Cmd = _default_slurm_flags() + julia_flags::Cmd = _default_julia_flags() + dir = ... + env = ... + tunnel::Bool = false + multiplex::Bool = false + shell::Symbol = :posix + max_parallel::Int = 10 + enable_threaded_blas::Bool = true + topology::Symbol = :all_to_all + lazy_connections::Bool = true +end +=# + + +""" + stopworkers() + stopworkers(pid::Int) + stopworkers(pids::AbstractVector{Int}) + +Stops all or the specified worker processes. The current process is ignored. +""" +function stopworkers end +export stopworkers + +stopworkers() = stopworkers(workers()) + +function stopworkers(pid::Int) + pid!=myid() && rmprocs(pid) + return nothing +end + +function stopworkers(pids::AbstractVector{Int}) + rmprocs(filter(!isequal(myid()), pids)) + return nothing +end diff --git a/src/slurm.jl b/src/slurm.jl index adb4a17..e4dad97 100644 --- a/src/slurm.jl +++ b/src/slurm.jl @@ -5,8 +5,8 @@ slurm_flags::Cmd = {defaults} julia_flags::Cmd = {defaults} dir = pwd() - user_start::Bool = false - timeout::Real = 60 + env::Dict{String,String} = Dict{String,String}() + redirect_output::Bool = true ) Mode to add worker processes via SLURM `srun`. @@ -20,31 +20,37 @@ Workers are started with current directory set to `dir`. Example: ```julia -mode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) -addworkers(mode) +runmode = SlurmRun(slurm_flags = `--ntasks=4 --cpus-per-task=8 --mem-per-cpu=8G`) +task = runworkers(runmode) + +Threads.@async begin + wait(task) + @info "SLURM workers have terminated." +end + +@wait_while nprocs()-1 < n ``` -If `user_start` is `true`, then the SLURM srun-command will not be run -automatically, instead it will be logged via `@info` and the user is -responsible for running it. This srun-command can also be retrieved via -[`worker_start_command(mode)`](@ref). +Workers can also be started manually, use +[`worker_start_command(runmode)`](@ref) to get the `srun` start command and +run it from a separate process or so. """ -@with_kw struct SlurmRun <: ElasticAddProcsMode +@with_kw struct SlurmRun <: DynamicAddProcsMode slurm_flags::Cmd = _default_slurm_flags() julia_flags::Cmd = _default_julia_flags() dir = pwd() - user_start::Bool = false - timeout::Real = 60 + env::Dict{String,String} = Dict{String,String}() + redirect_output::Bool = true end export SlurmRun const _g_slurm_nextjlstep = Base.Threads.Atomic{Int}(1) -function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticManager) - slurm_flags = mode.slurm_flags - julia_flags = mode.julia_flags - dir = mode.dir +function worker_start_command(runmode::SlurmRun, manager::ElasticManager) + slurm_flags = runmode.slurm_flags + julia_flags = runmode.julia_flags + dir = runmode.dir tc = _get_slurm_taskconf(slurm_flags, ENV) @@ -61,9 +67,13 @@ function worker_start_command(mode::SlurmRun, manager::ClusterManagers.ElasticMa jlstep = atomic_add!(_g_slurm_nextjlstep, 1) jobname = "julia-$(getpid())-$jlstep" - worker_cmd = elastic_localworker_startcmd(manager; julia_flags = `$julia_flags $additional_julia_flags`) + worker_cmd = worker_local_startcmd( + manager; + julia_flags = `$julia_flags $additional_julia_flags`, + redirect_output = runmode.redirect_output, env = runmode.env + ) - return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, n_workers + return `srun --job-name=$jobname --chdir=$dir $slurm_flags $worker_cmd`, 1, n_workers end function _slurm_nworkers(tc::NamedTuple) @@ -89,26 +99,17 @@ function _slurm_mem_per_task(tc::NamedTuple) end -function ParallelProcessingTools.start_elastic_workers(mode::SlurmRun, manager::ClusterManagers.ElasticManager) - srun_cmd, n_workers = worker_start_command(mode, manager) - if mode.user_start - @info "To add Julia worker processes (I'll wait for them), run: $srun_cmd" - else - @info "Starting SLURM job: $srun_cmd" - srun_proc = open(srun_cmd) - end - return n_workers -end - -function worker_init_code(::SlurmRun) - quote - import ParallelProcessingTools - ParallelProcessingTools.pinthreads_auto() +function runworkers(runmode::SlurmRun, manager::ElasticManager) + srun_cmd, m, n = worker_start_command(runmode, manager) + @info "Starting SLURM job: $srun_cmd" + task = Threads.@async begin + process = open(srun_cmd) + wait(process) + @info "SLURM job terminated: $srun_cmd" end + return task, n end -elastic_addprocs_timeout(mode::SlurmRun) = mode.timeout - function _default_slurm_flags() # `srun` in `salloc`/`sbatch` doesn't seem to always pick up @@ -120,7 +121,7 @@ end const _slurm_memunits = IdDict{Char,Int}('K' => 1024^1, 'M' => 1024^2, 'G' => 1024^3, 'T' => 1024^4) -const _slurm_memsize_regex = r"^([0-9]+)([KMGT])?$" +const _slurm_memsize_regex = r"^([0-9]+)(([KMGT])B?)?$" function _slurm_parse_memoptval(memsize::AbstractString) s = strip(memsize) m = match(_slurm_memsize_regex, s) @@ -128,7 +129,7 @@ function _slurm_parse_memoptval(memsize::AbstractString) throw(ArgumentError("Invalid SLURM memory size specification \"$s\"")) else value = parse(Int, m.captures[1]) - unitchar = only(something(m.captures[2], 'M')) + unitchar = only(something(m.captures[3], 'M')) unitmult = _slurm_memunits[unitchar] return value * unitmult end @@ -192,7 +193,7 @@ function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String}) ntasks_per_node = get(env, "SLURM_NTASKS_PER_NODE", nothing) mem_per_node = get(env, "SLURM_MEM_PER_NODE", nothing) - args = slurmflags.exec + args = collect(slurmflags) i::Int = firstindex(args) while i <= lastindex(args) last_i = i @@ -220,49 +221,3 @@ function _get_slurm_taskconf(slurmflags::Cmd, env::AbstractDict{String,String}) mem_per_node = _slurm_parse_memoptval(mem_per_node), ) end - - -function _addprocs_slurm(; kwargs...) - slurm_ntasks = parse(Int, ENV["SLURM_NTASKS"]) - slurm_ntasks > 1 || throw(ErrorException("Invalid nprocs=$slurm_ntasks inferred from SLURM environment")) - _addprocs_slurm(slurm_ntasks; kwargs...) -end - -function _addprocs_slurm( - nprocs::Int; - job_file_loc::AbstractString = joinpath(homedir(), "slurm-julia-output"), - retry_delays::AbstractVector{<:Real} = [1, 1, 2, 2, 4, 5, 5, 10, 10, 10, 10, 20, 20, 20] -) - try - lock(_g_processops_lock) - - @info "Adding $nprocs Julia processes via SLURM" - - julia_project = dirname(Pkg.project().path) - slurm_ntasks = nprocs - slurm_nthreads = parse(Int, ENV["SLURM_CPUS_PER_TASK"]) - slurm_mem_per_cpu = parse(Int, ENV["SLURM_MEM_PER_CPU"]) * 1024^2 - slurm_mem_per_task = slurm_nthreads * slurm_mem_per_cpu - - cluster_manager = ClusterManagers.SlurmManager(slurm_ntasks, retry_delays) - worker_timeout = round(Int, max(sum(cluster_manager.retry_delays), 60)) - ENV["JULIA_WORKER_TIMEOUT"] = "$worker_timeout" - - mkpath(job_file_loc) - new_workers = Distributed.addprocs( - cluster_manager, job_file_loc = job_file_loc, - exeflags = `--project=$julia_project --threads=$slurm_nthreads --heap-size-hint=$(slurm_mem_per_task÷2)`, - cpus_per_task = "$slurm_nthreads", mem_per_cpu="$(slurm_mem_per_cpu >> 30)G", # time="0:10:00", - mem_bind = "local", cpu_bind="cores", - ) - - @info "Configuring $nprocs new Julia worker processes" - - _run_always_everywhere_code(new_workers) - pinthreads_distributed(new_workers) - - @info "Added $(length(new_workers)) Julia worker processes via SLURM" - finally - unlock(_g_processops_lock) - end -end diff --git a/src/states.jl b/src/states.jl new file mode 100644 index 0000000..f7e12cf --- /dev/null +++ b/src/states.jl @@ -0,0 +1,150 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + ParallelProcessingTools.NonZeroExitCode(cmd::Cmd, exitcode::Integer) isa Exception + +Exception to indicate that a an external process running `cmd` failed with the +given exit code (not equal zero). +""" +struct NonZeroExitCode <: Exception + exitcode::Int +end + +function NonZeroExitCode(exitcode::Integer) + exitcode == 0 && throw(ArgumentError("NonZeroExitCode exitcode must not be zero")) + NonZeroExitCode(exitcode) +end + + + +""" + ParallelProcessingTools.getlabel(obj) + +Returns a descriptive label for `obj` suitable for using in exceptions and +logging messages. Defaults to `string(obj)`. +""" +function getlabel end + +getlabel(obj) = convert(String, string(obj)) +getlabel(task::Task) = "Task $(nameof(typeof(task.code)))" +getlabel(process::Process) = "Process $(getlabel(process.cmd))" +getlabel(future::Future) = "Future $(future.id)" + + +""" + ParallelProcessingTools.isactive(obj)::Bool + +Checks if `obj` is still active, running or whatever applies to the type of +`obj`. + +Supports `Task`, `Process`, `Future`, `Channel`, `Timer`, +`Base.AsyncCondition` and may be extended to other object types. + +Returns `false` if `isnothing(obj)` and `true` if `ismissing(obj)`. +""" +function isactive end + +isactive(::Nothing) = false +isactive(::Missing) = true +isactive(task::Task) = !istaskdone(task) +isactive(process::Process) = process_running(process) +isactive(future::Future) = !isready(future) +isactive(channel::Channel) = isopen(channel) +isactive(timer::Timer) = isopen(timer) +isactive(condition::Base.AsyncCondition) = isopen(condition) + + +""" + ParallelProcessingTools.wouldwait(obj)::Bool + +Returns `true` if `wait(obj)` would result in waiting and `false` if +`wait(obj)` would return (almost) immediately. + +Supports `Task`, `Process`, `Future`, `Channel`, `Timer`, +`Base.AsyncCondition` and may be extended to other object types. + +Returns `false` if `isnothing(obj)` but `obj` must not be `missing`. +""" +function wouldwait end + +wouldwait(::Nothing) = false +wouldwait(::Missing) = throw(ArgumentError("wouldwait does not support Missing")) +wouldwait(task::Task) = !istaskdone(task) +wouldwait(process::Process) = process_running(process) +wouldwait(future::Future) = !isready(future) +wouldwait(channel::Channel) = isopen(channel) && !isready(channel) +wouldwait(timer::Timer) = isopen(timer) +wouldwait(condition::Base.AsyncCondition) = isopen(condition) + + +""" + ParallelProcessingTools.hasfailed(obj)::Bool + +Checks if `obj` has failed in some way. + +Supports `Task` and `Process` and may be extended to other object types. + +Returns `false` if `isnothing(obj)` or `ismissing(obj)`. +""" +function hasfailed end + +hasfailed(::Nothing) = false +hasfailed(::Missing) = false +hasfailed(task::Task) = istaskfailed(task) +hasfailed(process::Process) = !isactive(process) && !iszero(process.exitcode) + +function hasfailed(channel::Channel) + if isactive(channel) return false + else + err = channel.excp + if err isa InvalidStateException + return err.state == :closed ? false : true + else + return true + end + end +end + + +""" + ParallelProcessingTools.whyfailed(obj)::Exception + +Returns a reason, as an `Exception` instance, why `obj` has failed. + +Supports `Task` and `Process` and may be extended to other object types. + +`obj` must not be `nothing` or `missing`. +""" +function whyfailed end + +whyfailed(::Nothing) = throw(ArgumentError("whyfailed does not support Nothing")) +whyfailed(::Missing) = throw(ArgumentError("whyfailed does not support Missing")) + +function whyfailed(task::Task) + if hasfailed(task) + err = task.result + if err isa Exception + return err + else + return ErrorException("Task failed with non-exception result of type $(nameof(typeof(err)))") + end + else + throw(ArgumentError("Task $(getlabel(task)) did not fail, whyfailed not allowed")) + end +end + +function whyfailed(process::Process) + if hasfailed(process) + return NonZeroExitCode(process.exitcode) + else + throw(ArgumentError("Process $(getlabel(process)) did not fail, whyfailed not allowed")) + end +end + +function whyfailed(channel::Channel) + if hasfailed(channel) + return channel.excp + else + throw(ArgumentError("Channel $(getlabel(channel)) did not fail, whyfailed not allowed")) + end +end diff --git a/src/waiting.jl b/src/waiting.jl new file mode 100644 index 0000000..c9a3c3e --- /dev/null +++ b/src/waiting.jl @@ -0,0 +1,275 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +const _g_yield_time_ns = Int64(250) # typical time for a `yield()` +const _g_sleep_0_time_ns = Int64(1500) # typical time for a `sleep(0)` +const _g_sleep_t_time_ns = Int64(2000000) # typical minimum time for a `sleep(t)` + +const _g_sleep_n_yield = 3 * div(_g_sleep_0_time_ns, _g_yield_time_ns) +const _g_sleep_n_sleep_0 = 3 * div(_g_sleep_t_time_ns, _g_sleep_0_time_ns) +const _g_sleep_yield_threshold = 3 * _g_sleep_0_time_ns +const _g_sleep_sleep_0_threshold = 3 * _g_sleep_t_time_ns + +""" + sleep_ns(t_in_ns::Real) + +Sleep for `t_in_ns` nanoseconds, using a mixture of `yield()`, `sleep(0)` +and `sleep(t)` to be able sleep for short times as well as long times with +good relative precision. + +Guaranteed to `yield()` at least once, even if `t_in_ns` is zero. +""" +function sleep_ns(t_in_ns::Integer) + t_ns = Int64(t_in_ns) + t_remaining_ns::Int64 = t_ns + t0 = time_ns() + yield() + if t_remaining_ns <= _g_sleep_yield_threshold + for _ in 1:_g_sleep_n_yield + t_slept = Int64(time_ns() - t0) + t_remaining_ns = t_ns - t_slept + t_remaining_ns > 0 || return nothing + yield() + end + end + if t_remaining_ns <= _g_sleep_sleep_0_threshold + for _ in 1:_g_sleep_n_sleep_0 + t_slept = Int64(time_ns() - t0) + t_remaining_ns = t_ns - t_slept + t_remaining_ns > 0 || return nothing + sleep(0) + end + end + if t_remaining_ns > 0 + t_remaining_s = 1e-9 * t_remaining_ns + sleep(t_remaining_s) + end + return nothing +end +export sleep_ns + + +""" + idle_sleep(n_idle::Integer, t_interval_s, t_max_s) + +Sleep because something has been idle for `n_idle` times. + +Will sleep for `log2(n_idle + 1) * t_interval_s` seconds, but at most for +`t_max_s` seconds. + +Guaranteed `yield()` at least once, even if `n_idle` is zero. +""" +function idle_sleep(n_idle::Integer, t_interval_s::Real, t_max_s::Real) + sleep_time = min(t_max_s, log2(n_idle + 1) * t_interval_s) + sleep_time_ns = round(Int64, 1e9 * sleep_time) + sleep_ns(sleep_time_ns) +end +export idle_sleep + + +""" + @wait_while [maxtime=nothing] [timeout_error=false] cond + +Wait while `cond` is true, using slowly increasing sleep times in between +evaluating `cond`. + +`cond` may be an arbitrary Julia expression. + +If `maxtime` is given with an real value, will only wait for `maxtime` +seconds, if the value is zero or negative will not wait at all. + +If `timeout_error` is `true`, will throw a `TimelimitExceeded` exception +if the maximum waiting time is exceeded. + +Example, wait for a task with a maxtime: + +```julia +task = Threads.@spawn sleep(10) +timer = Timer(2) +@wait_while !istaskdone(task) && isopen(timer) +istaskdone(task) == false +``` +""" +macro wait_while(args...) + maxtime = :(nothing) + timeout_error = :(false) + for arg in args[begin:end-1] + if arg isa Expr && arg.head == :(=) && length(arg.args) == 2 + optname, optval = arg.args[1], arg.args[2] + if optname == :maxtime + maxtime = optval + elseif optname == :timeout_error + timeout_error = optval + else + return quote + quoted_optname = $(esc(Expr(:quote, optname))) + throw(ArgumentError("Invalid option name for @wait_while: $quoted_optname")) + end + end + else + return quote + quoted_arg = $(esc(Expr(:quote, arg))) + throw(ArgumentError("Invalid option format for @wait_while: $quoted_arg")) + end + end + end + cond = args[end] + quote + maxtime_set, maxtime_s, maxtime_ns = _process_maxtime($(esc(maxtime))) + timeout_error = $(esc(timeout_error)) + t_start = time_ns() + while $(esc(cond)) + _wait_while_inner(maxtime_set, maxtime_s, maxtime_ns, timeout_error, t_start) || break + end + nothing + end +end +export @wait_while + +_process_maxtime(maxtime::Real) = _process_maxtime(Float64(maxtime)) +function _process_maxtime(maxtime::Union{Float64,Nothing}) + maxtime_set = !isnothing(maxtime) + maxtime_s::Float64 = maxtime_set ? max(zero(Float64), maxtime) : zero(Float64) + maxtime_ns::UInt64 = unsigned(round(Int64, maxtime_s * 1e9)) + return maxtime_set, maxtime_s, maxtime_ns +end + +function _wait_while_inner(maxtime_set::Bool, maxtime_s::Float64, maxtime_ns::UInt64, timeout_error::Bool, t_start::UInt64) + t_waited = time_ns() - t_start + if maxtime_set && t_waited > maxtime_ns + if timeout_error + throw(TimelimitExceeded(maxtime_s, t_waited * 1e-9)) + else + return false + end + end + # Wait for 12.5% of the time waited so far, but for one second and until maxtime at most: + max_sleeptime_ns = maxtime_set ? min(maxtime_ns - t_waited, _one_sec_in_ns) : _one_sec_in_ns + t_sleep = min(t_waited >> 3, max_sleeptime_ns) + sleep_ns(t_sleep) + return true +end + +const _one_sec_in_ns = Int64(1000000000) + + +""" + wait_for_any( + objs...; + maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false + ) + + wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...) + +Wait for any of the objects `objs` to become ready. + +Readiness of objects is as defined by [`wouldwait`](@ref). Objects that are +`Nothing` are ignored, i.e. not waited for. + +See [`@wait_while`](@ref) for the effects of `maxtime` and `timeout_error`. + +Example, wait for a task with a timeout: + +```julia +task1 = Threads.@spawn sleep(1.0) +task2 = Threads.@spawn sleep(5.0) +wait_for_any(task1, task2, maxtime = 3.0) +istaskdone(task1) == true +istaskdone(task2) == false +``` + +Similar to `waitany` (new in Julia v1.12), but applies to a wider range of +object types. +""" +function wait_for_any end +export wait_for_any + +function wait_for_any(obj::Any; maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false) + if isnothing(maxtime) + wait(obj) + else + mt, te = maxtime, timeout_error + @wait_while maxtime=mt timeout_error=te wouldwait(obj) + end +end + +wait_for_any(::Nothing; maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false) = nothing + +wait_for_any(obj, objs...; kwargs...) = _wait_for_any_in_iterable((obj, objs...); kwargs...) + +function wait_for_any(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...) + _wait_for_any_in_iterable(objs; kwargs...) +end + +function _wait_for_any_in_iterable(objs; maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false) + mt, te = maxtime, timeout_error + @wait_while maxtime=mt timeout_error=te all(wouldwait, objs) +end + +# ToDo: Use `waitany` (Julia >= v1.12) in wait_for_any implementation where possible. + + +""" + wait_for_all( + objs...; + maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false + ) + + wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...) + +Wait for all of the `objs` to become ready. + +Readiness of objects is as defined by [`wouldwait`](@ref). Objects that are +`Nothing` are ignored, i.e. not waited for. + +See [`@wait_while`](@ref) for the effects of `maxtime` and `timeout_error`. + +Example, wait for two tasks to finish: + +```julia +task1 = Threads.@spawn sleep(10) +task2 = Threads.@spawn sleep(2) +wait_for_all(task1, task2) +``` +""" +function wait_for_all end +export wait_for_all + +wait_for_all(obj; kwargs...) = wait_for_any(obj; kwargs...) + +wait_for_all(obj, objs...; kwargs...) = _wait_for_all_in_iterable((obj, objs...); kwargs...) + +function wait_for_all(objs::Union{Tuple,AbstractVector,Base.Generator,Base.ValueIterator}; kwargs...) + _wait_for_all_in_iterable(objs; kwargs...) +end + +function _wait_for_all_in_iterable(objs; maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false) + maxtime_set, maxtime_s, maxtime_ns = _process_maxtime(maxtime) + t_start = time_ns() + te = timeout_error + for o in objs + t_waited_ns = time_ns() - t_start + maxtime_remaining_ns = maxtime_ns > t_waited_ns ? maxtime_ns - t_waited_ns : zero(maxtime_ns) + mt = maxtime_set ? maxtime_remaining_ns * 1e-9 : nothing + @wait_while maxtime=mt timeout_error=te wouldwait(o) + end + return nothing +end + +function _wait_for_all_in_iterable(objs::Tuple; maxtime::Union{Real,Nothing} = nothing, timeout_error::Bool = false) + maxtime_set, maxtime_s, maxtime_ns = _process_maxtime(maxtime) + t_start_ns = time_ns() + _wait_for_all_in_tuple(objs, t_start_ns, maxtime_set, maxtime_ns, timeout_error) +end + + +_wait_for_all_in_tuple(::Tuple{}, ::UInt64, ::Bool, ::UInt64, ::Bool) = nothing + +function _wait_for_all_in_tuple(objs::Tuple, t_start_ns::UInt64, maxtime_set::Bool, maxtime_ns::UInt64, timeout_error::Bool) + t_waited_ns = time_ns() - t_start_ns + maxtime_rest_ns = maxtime_ns > t_waited_ns ? maxtime_ns - t_waited_ns : zero(maxtime_ns) + mt = maxtime_set ? maxtime_rest_ns * 1e-9 : nothing + te = timeout_error + o = objs[1] + @wait_while maxtime=mt timeout_error=te wouldwait(o) + _wait_for_all_in_tuple(Base.tail(objs), t_start_ns, maxtime_set, maxtime_ns, timeout_error) +end diff --git a/src/workerpool.jl b/src/workerpool.jl new file mode 100644 index 0000000..4c62da9 --- /dev/null +++ b/src/workerpool.jl @@ -0,0 +1,391 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + FlexWorkerPool{WP<:AbstractWorkerPool}( + worker_pids::AbstractVector{Int}; + label::AbstractString = "", maxoccupancy::Int = 1, init_workers::Bool = true + )::AbstractWorkerPool + + FlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...) + +An flexible worker pool, intended to work with cluster managers that may +add and remove Julia processes dynamically. + +If the current process (`Distributed.myid()`) is part of the pool, resp. if +`withmyid` is `true`, it will be used as a fallback when no other workers are +in are members of the pool (e.g. because no other processes have been added +yet or because all other processes in the pool have terminated and been +removed from it). The current process will *not* be used as a fallback when +all other workers are currently in use. + +If `caching` is true, the pool will use a `Distributed.CachingPool` as the +underlying pool, otherwise a `Distributed.WorkerPool`. + +If `maxoccupancy`is greater than one, individual workers can be used +`maxoccupancy` times in parallel. So `take!(pool)` may return the same process +ID `pid` multiple times without a `put!(pool, pid)` in between. Such a +(ideally moderate) oversubscription can be useful to reduce latency-related +idle times on workers: e.g. if communication latency to the worker +is not short compared the the runtime of the function called on them. Or if +the remote functions are often blocked waiting for I/O. Note: Workers still +must be put back the same number of times they were taken from the pool, +in total. + +If `init_workers` is `true`, workers taken from the pool will be guaranteed +to be initialized to the current global initialization level +(see [`@always_everywhere`](@ref)). + +`WP` is the type of the underlying worker pool used, e.g. +`Distributed.WorkerPool` (default) or `Distributed.CachingPool`. + +Example: + +```julia +using ParallelProcessingTools, Distributed + +pool = FlexWorkerPool(withmyid = true, maxoccupancy = 3) + +workers(pool) + +pids = [take!(pool) for _ in 1:3] +@assert pids == repeat([myid()], 3) +foreach(pid -> put!(pool, pid), pids) + +addprocs(4) +worker_procs = workers() +push!.(Ref(pool), worker_procs) + +pids = [take!(pool) for _ in 1:4*3] +@assert pids == repeat(worker_procs, 3) +foreach(pid -> put!(pool, pid), pids) +rmprocs(worker_procs) + +pids = [take!(pool) for _ in 1:3] +@assert pids == repeat([myid()], 3) +foreach(pid -> put!(pool, pid), pids) +``` +""" +struct FlexWorkerPool{WP<:AbstractWorkerPool} <: AbstractWorkerPool + _pool::WP + _mypid_pool::WorkerPool + _label::String + _maxoccupancy::Int + _init_workers::Bool + _spares::Channel{Tuple{Int,Int}} + _worker_mgmt::Threads.Condition + _worker_history::Set{Int} + _worker_occupancy::IdDict{Int,Int} +end +export FlexWorkerPool + + +function FlexWorkerPool{WP}( + worker_pids::AbstractVector{Int} = [Distributed.myid()]; + label::AbstractString = "", + maxoccupancy::Int = 1, init_workers::Bool = true +) where {WP <: AbstractWorkerPool} + @argcheck maxoccupancy >= 1 + + pool = WP(Int[]) + mypid_pool = WorkerPool(Int[]) + spares = Channel{Tuple{Int,Int}}(typemax(Int)) + worker_mgmt = Threads.Condition() + worker_history = Set{Int}() + _worker_occupancy = IdDict{Int,Int}() + + fwp = FlexWorkerPool{WP}( + pool, mypid_pool, label, maxoccupancy, init_workers, + spares, worker_mgmt, worker_history, _worker_occupancy + ) + + for pid in worker_pids + push!(fwp, pid) + end + + return fwp +end + +function FlexWorkerPool(; caching = false, withmyid::Bool = true, kwargs...) + worker_pids = withmyid ? Int[Distributed.myid()] : Int[] + WP = caching ? CachingPool : WorkerPool + FlexWorkerPool{WP}(worker_pids; kwargs...) +end + + +function Base.show(io::IO, @nospecialize(fwp::FlexWorkerPool)) + print(io, "FlexWorkerPool{$(nameof(typeof(fwp._pool)))}(...") + if !isempty(fwp._label) + print(io, ", label=\"", fwp._label, "\"") + end + print(io, ")") +end + +function Base.show(io::IO, ::MIME"text/plain", @nospecialize(fwp::FlexWorkerPool)) + show(io, fwp) + println(io) + + pids, occupancy = lock(fwp._worker_mgmt) do + deepcopy(workers(fwp)), deepcopy(fwp._worker_occupancy) + end + whmap = _worker_hosts() + host_worker_occupancy = IdDict{String,Vector{Int}}() + + for pid in pids + push!(get!(host_worker_occupancy, whmap[pid], Vector{Int}()), occupancy[pid]) + end + + hosts = sort(collect(keys(host_worker_occupancy))) + for hostname in hosts + occupancies = host_worker_occupancy[hostname] + occ_string = String(_worker_occupancy_symbol.(occupancies)) + println(io, " host $hostname (", length(occupancies), " workers): ", occ_string) + end +end + +function _worker_occupancy_symbol(occupancy::Int) + if occupancy == 0 + _g_unicode_occupancy.sleeping + elseif occupancy == 1 + _g_unicode_occupancy.working + elseif occupancy == 2 + _g_unicode_occupancy.onfire + elseif occupancy >= 3 + _g_unicode_occupancy.overloaded + else + _g_unicode_occupancy.unknown + end +end + + +function Base.length(fwp::FlexWorkerPool) + l = length(fwp._pool) + l > 0 ? l : length(fwp._mypid_pool) +end + + +function Base.isready(fwp::FlexWorkerPool) + _use_main_pool(fwp) ? isready(fwp._pool) : isready(fwp._mypid_pool) +end + +function _use_main_pool(fwp::FlexWorkerPool) + length(fwp._pool) > 0 || length(fwp._mypid_pool) == 0 +end + +function Distributed.workers(fwp::FlexWorkerPool) + lock(fwp._worker_mgmt) do + sort(_use_main_pool(fwp) ? workers(fwp._pool) : workers(fwp._mypid_pool)) + end +end + +function Distributed.nworkers(fwp::FlexWorkerPool) + lock(fwp._worker_mgmt) do + _use_main_pool(fwp) ? nworkers(fwp._pool) : nworkers(fwp._mypid_pool) + end +end + + +function Base.push!(fwp::FlexWorkerPool, pid::Int) + if isvalid_pid(pid) + _register_process(pid) + # Adding workers that are already in the pool must not increase maxoccupancy: + if !in(pid, fwp._worker_history) + lock(fwp._worker_mgmt) do + fwp._worker_occupancy[pid] = 0 + push!(fwp._worker_history, pid) + mypid = myid() + if pid == mypid + @assert length(fwp._mypid_pool) == 0 + for _ in 1:fwp._maxoccupancy + push!(fwp._mypid_pool, mypid) + end + return fwp + else + # Add worker to pool only once, hold maxoccupancy in reserve. We + # want to spread it out over the worker queue: + push!(fwp._pool, pid) + if fwp._maxoccupancy > 1 + push!(fwp._spares, (pid, fwp._maxoccupancy - 1)) + end + end + notify(fwp._worker_mgmt) + end + end + else + @warn "Not adding invalid process ID $pid to $(getlabel(fwp))." + end + + return fwp +end + + +function Base.put!(fwp::FlexWorkerPool, pid::Int) + lock(fwp._worker_mgmt) do + fwp._worker_occupancy[pid] -= 1 + pid != myid() ? put!(fwp._pool, pid) : put!(fwp._mypid_pool, pid) + end + return pid +end + + +function Base.take!(fwp::FlexWorkerPool) + while true + pid::Int = _take_worker_noinit!(fwp) + if fwp._init_workers + try + ensure_procinit(pid) + return pid + catch err + orig_err = inner_exception(err) + @warn "Error while initializig process $pid, removing it." orig_err + lock(fwp._worker_mgmt) do + fwp._worker_occupancy[pid] -= 1 + end + rmprocs(pid) + put!(fwp, pid) + end + else + return pid + end + end +end + +function _take_worker_noinit!(fwp::FlexWorkerPool) + while true + if (!isready(fwp._pool) || length(fwp._pool) == 0) && isready(fwp._spares) + _add_spare_to_pool!(fwp._spares, fwp._pool) + end + + try + if _use_main_pool(fwp) + if length(fwp._pool) > 0 + pid = take!(fwp._pool) + lock(fwp._worker_mgmt) do + fwp._worker_occupancy[pid] += 1 + end + return pid + else + yield() + lock(fwp._worker_mgmt) do + if length(fwp._pool) == 0 + wait(fwp._worker_mgmt) + end + end + end + else + pid = take!(fwp._mypid_pool) + lock(fwp._worker_mgmt) do + fwp._worker_occupancy[pid] += 1 + end + return pid + end + catch err + if err isa ErrorException && length(fwp._pool) == 0 + # err probably is `ErrorException("No active worker available in pool")`, + # we can deal with that, so ignore it. + else + rethrow() + end + end + end +end + +const _invalid_pid_counter = Threads.Atomic{UInt}() + +function _add_spare_to_pool!(spares::Channel{Tuple{Int,Int}}, @nospecialize(pool::AbstractWorkerPool)) + # `spares` may not be ready, even if checked before (due to a race condition). + # So we put in an invalid dummy entry to ensure we can take from it + # immediately. No one but us may take it out without putting it back in. + + invalid_pid_counterval = Threads.atomic_add!(_invalid_pid_counter, UInt(1)) + invalid_pid = -Int((invalid_pid_counterval << 2 >> 2) + UInt(1)) + + put!(spares, (invalid_pid, 0)) + while isready(spares) + pid, remaining_occupancy = take!(spares) + if pid == invalid_pid + # Ensure loop terminates, we added dummy_id to the end of spares: + break + elseif pid < 0 + # Invalid dummy id put into spares by someone else, need to put it back: + put!(spares, (pid, remaining_occupancy)) + else + @assert pid > 0 && remaining_occupancy > 0 + push!(pool, pid) + if remaining_occupancy > 1 + put!(spares, (pid, remaining_occupancy - 1)) + end + end + end + return nothing +end + + +""" + clear_worker_caches!(pool::AbstractWorkerPool) + +Clear the worker caches (cached function closures, etc.) on the workers In +`pool`. + +Does nothing if the pool doesn't perform any on-worker caching. +""" +function clear_worker_caches! end +export clear_worker_caches! + +clear_worker_caches!(::AbstractWorkerPool) = nothing + +clear_worker_caches!(fwp::FlexWorkerPool{<:CachingPool}) = clear_worker_caches!(fwp._pool) + +function clear_worker_caches!(wp::CachingPool) + clear!(wp._pool) + return nothing +end + + +# ToDo: Use atomic reference on recent Julia versions: +const _g_default_wp = Ref{Union{AbstractWorkerPool,Nothing}}(nothing) +const _g_default_wp_lock = ReentrantLock() + +""" + ppt_worker_pool() + +Returns the default ParallelProcessingTools worker pool. + +If the default instance doesn't exist yet, then a `FlexWorkerPool` will be +created that initially contains `Distributed.myid()` as the only worker. +""" +function ppt_worker_pool() + lock(_g_default_wp_lock) + wp = _g_default_wp[] + unlock(_g_default_wp_lock) + if isnothing(wp) + lock(_g_default_wp_lock) do + wp = _g_default_wp[] + if isnothing(wp) + return ppt_worker_pool!(FlexWorkerPool(label = "auto_default_flex_worker_pool")) + else + return wp + end + end + else + return wp + end +end +export ppt_worker_pool + + +""" + ppt_worker_pool!(wp::FlexWorkerPool) + +Sets the default ParallelProcessingTools worker pool to `wp` and returns it. + +See [`ppt_worker_pool()`](@ref). +""" +function ppt_worker_pool!(fwp::FlexWorkerPool) + lock(_g_default_wp_lock) do + lock(allprocs_management_lock()) do + _g_default_wp[] = fwp + return _g_default_wp[] + end + end +end +export ppt_worker_pool! diff --git a/test/Project.toml b/test/Project.toml index 74d8dd4..776675d 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -2,7 +2,9 @@ Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" +Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" +ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" [compat] Documenter = "1" diff --git a/test/runtests.jl b/test/runtests.jl index 6e8dfb1..b6cb8f2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,16 +2,24 @@ import Test +import ThreadPinning +ThreadPinning.Prefs.set_os_warning(false) + Test.@testset "Package ParallelProcessingTools" begin @info "Testing with $(Base.Threads.nthreads()) Julia threads." include("test_aqua.jl") + include("test_waiting.jl") + include("test_states.jl") include("test_fileio.jl") include("test_threadsafe.jl") include("test_threadlocal.jl") include("test_workpartition.jl") include("test_onthreads.jl") include("test_onprocs.jl") + include("test_procinit.jl") + include("test_workerpool.jl") + include("test_onworkers.jl") include("test_deprecated.jl") include("test_docs.jl") end # testset diff --git a/test/test_deprecated.jl b/test/test_deprecated.jl index 0b91283..63b6711 100644 --- a/test/test_deprecated.jl +++ b/test/test_deprecated.jl @@ -5,6 +5,8 @@ using ParallelProcessingTools using Distributed +include("testtools.jl") + @testset "deprecated" begin function do_work(n) @@ -35,6 +37,7 @@ using Distributed end end + pids = classic_addprocs(2) @testset "macro mp_async" begin @test_deprecated begin n = 128 @@ -48,4 +51,5 @@ using Distributed fetch.(A) == log.(1:n) end end + rmprocs(pids) end diff --git a/test/test_fileio.jl b/test/test_fileio.jl index 6c01bad..dd7e143 100644 --- a/test/test_fileio.jl +++ b/test/test_fileio.jl @@ -3,7 +3,7 @@ using Test using ParallelProcessingTools -using ParallelProcessingTools: split_basename_ext, tmp_filename +using ParallelProcessingTools: split_basename_ext, tmp_filename, default_cache_dir, default_cache_dir! old_julia_debug = get(ENV, "JULIA_DEBUG", "") ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" @@ -36,6 +36,17 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" end end + @testset "default_cache_dir" begin + @test @inferred(default_cache_dir()) isa String + orig_cache_dir = default_cache_dir() + @test mkpath(orig_cache_dir) == orig_cache_dir + dummy_cache_dir = joinpath("some", "tmp", "dir") + @test @inferred(default_cache_dir!(dummy_cache_dir)) == dummy_cache_dir + @test default_cache_dir() == dummy_cache_dir + @test default_cache_dir!(orig_cache_dir) == orig_cache_dir + @test default_cache_dir() == orig_cache_dir + end + for use_cache in [false, true] @testset "create_files $(use_cache ? "with" : "without") cache" begin mktempdir() do dir @@ -71,7 +82,10 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" @test read(fn1, String) == data1 && read(fn2, String) == data2 # Modify the target files: - write(fn1, "dummy content"); write(fn2, "dummy content"); + modify_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + write(fn1, "modified"); write(fn2, "content") + end + @test read(fn1, String) == "modified" && read(fn2, String) == "content" # Wont't overwrite: create_files(fn1, fn2, use_cache = use_cache, overwrite = false, verbose = true) do fn1, fn2 diff --git a/test/test_onprocs.jl b/test/test_onprocs.jl index fd36b06..97ba661 100644 --- a/test/test_onprocs.jl +++ b/test/test_onprocs.jl @@ -5,11 +5,13 @@ using ParallelProcessingTools using Distributed +include("testtools.jl") + @testset "onprocs" begin @testset "worker-init" begin if length(workers()) < 2 - addprocs(2) + classic_addprocs(2) end eval(:(@everywhere using Distributed)) @test length(workers()) >= 2 diff --git a/test/test_onworkers.jl b/test/test_onworkers.jl new file mode 100644 index 0000000..260c3c4 --- /dev/null +++ b/test/test_onworkers.jl @@ -0,0 +1,129 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using Distributed +using ParallelProcessingTools: isactive + +include("testtools.jl") + +old_julia_debug = get(ENV, "JULIA_DEBUG", "") +ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" +stopworkers() + +if !isdefined(@__MODULE__, :mytask) + @always_everywhere begin + struct MyExceptionRetry <: Exception + msg::String + end + ParallelProcessingTools._should_retry(::MyExceptionRetry) = true + + struct MyExceptionNoRetry <: Exception + msg::String + end + ParallelProcessingTools._should_retry(::MyExceptionNoRetry) = false + + using Distributed + function mytask(runtime::Real = 2, args...) + sleep(runtime) + @info "Hello from worker $(myid()), have to do $args." + return args + end + end + + function gen_mayfail(failure_probability::Real) + function failtask(args...) + @info "Hello from worker $(myid()), have to do $args, but may fail with $(failure_probability)% probably." + if rand() < failure_probability + throw(MyExceptionRetry("Predictably failed doing $args")) + else + return args + end + end + end +end + + +@testset "onworkers" begin + runmode = OnLocalhost(n = 2) + + @testset "runworkers $(nameof(typeof(runmode)))" begin + test_runprocs(2) do + runworkers(runmode)[1] + end + end + + #= + # Run manually for now, fails when run during CI tests for some reason: + + if Sys.isunix() + @testset "write_worker_start_script $(nameof(typeof(runmode)))" begin + mktempdir(prefix = "ppt-startscript-test") do dir + test_runprocs(2) do + startscript = write_worker_start_script(joinpath(dir, "startjlworkers.sh"), runmode) + open(`$startscript`) + end + end + end + end + =# + + #= + # For Debugging: + try; onworker(() -> error("foo"), label = "myactivity") ; catch err; err; end + try; onworker(() -> 42, 2, label = "myactivity") ; catch err; err; end + try; onworker(() -> 42, label = "myactivity") ; catch err; err; end + try; onworker((x) -> 40 + x, 2, label = "myactivity") ; catch err; err; end + try; onworker(() -> sleep(5), label = "myactivity", maxtime = 1) ; catch err; err; end + try; onworker(() -> sleep(5), label = "myactivity", maxtime = 1, tries = 3) ; catch err; err; end + =# + + @test @inferred(onworker(mytask)) == () + @test @inferred(onworker(mytask, 1, "foo")) == ("foo", ) + @test @inferred(onworker(gen_mayfail(0.5), "foo", 42; tries = 20, label = "mayfail")) == ("foo", 42) + + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(gen_mayfail(1), "bar"; tries = 2, label = "mayfail") + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", maxtime = 0.5, tries = 2) + + runworkers(OnLocalhost(n = 2)) + + timer = Timer(30) + @wait_while nprocs() < 3 && isopen(timer) + @test nprocs() == 3 + resources = worker_resources() + @test length(resources) == 2 + + @sync begin + for i in 1:8 + @async onworker(mytask, 1, i) + end + end + + @test @inferred(onworker(mytask)) == () + @test @inferred(onworker(mytask, 1, "foo")) == ("foo", ) + @test @inferred(onworker(gen_mayfail(0.5), "foo", 42; tries = 20, label = "mayfail")) == ("foo", 42) + + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(gen_mayfail(1), "bar"; tries = 2, label = "mayfail") + + + #= + # Run these manually for now. Not sure how to make Test enviroment ignore the + # EOFError exceptions that originate when we kill workers due to timeouts. + + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", time = 0.5, tries = 2) + @test nprocs() == 1 + + addworkers(OnLocalhost(2)) + + @test @inferred(onworker(mytask)) == () + @test @inferred(onworker(mytask, 1, "foo")) == ("foo", ) + @test @inferred(onworker(gen_mayfail(0.5), "foo", 42; tries = 20, label = "mayfail")) == ("foo", 42) + + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(gen_mayfail(1), "bar"; tries = 2, label = "mayfail") + @test_throws ParallelProcessingTools.MaxTriesExceeded onworker(mytask, 2, "foo", time = 0.5, tries = 2) + =# +end + +stopworkers() +ENV["JULIA_DEBUG"] = old_julia_debug diff --git a/test/test_procinit.jl b/test/test_procinit.jl new file mode 100644 index 0000000..a6e45cb --- /dev/null +++ b/test/test_procinit.jl @@ -0,0 +1,89 @@ +# This file is a part of jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using Distributed + +using ParallelProcessingTools: allprocs_management_lock, proc_management_lock, + current_procinit_level, global_procinit_level, get_procinit_code, + add_procinit_code, ensure_procinit + +using ParallelProcessingTools: _global_procinit_level, _current_procinit_level, + _g_initial_procinit_code, _g_procinit_code, _g_wrapped_procinit_code, + _store_additional_procinit_code, _execute_procinit_code + +include("testtools.jl") + +old_julia_debug = get(ENV, "JULIA_DEBUG", "") +ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" + + +@testset "procinit" begin + @test @inferred(allprocs_management_lock()) isa ReentrantLock + @test @inferred(proc_management_lock(1)) isa ReentrantLock + @test @inferred(current_procinit_level()) isa Integer + @test @inferred(global_procinit_level()) isa Integer + + @test @inferred(get_procinit_code()) isa Expr + + # Test that current procces is sure to get initialized on ensure_procinit + push!(_g_initial_procinit_code.args, :(_g_inittest1 = 101)) + cinitlvl = current_procinit_level() + ginitlvl = global_procinit_level() + @test @inferred(ensure_procinit([myid()])) isa Nothing + @test global_procinit_level() == ginitlvl + @test current_procinit_level() == global_procinit_level() + @test Main._g_inittest1 == 101 + + # Reset current process init state for testing: + _current_procinit_level[] = 0 + + # Test that current procces is sure to get initialized on ensure_procinit + push!(_g_initial_procinit_code.args, :(_g_inittest2 = 102)) + cinitlvl = current_procinit_level() + ginitlvl = global_procinit_level() + @test add_procinit_code(:(@info "Begin init")) isa Nothing + @test global_procinit_level() == ginitlvl + 1 + @test current_procinit_level() == global_procinit_level() + @test Main._g_inittest2 == 102 + + # Test that _execute_procinit_code runs cleanly: + _dummy_initstep_expr = :(_g_inittest3 = 103) + _global_procinit_level[] = _global_procinit_level[] + 1 + _store_additional_procinit_code(_dummy_initstep_expr, global_procinit_level()) + @info "The following \"Dummy error\" error message is expected" + @test_throws ErrorException _execute_procinit_code(:(error("Dummy error")), global_procinit_level()) + @test _execute_procinit_code(get_procinit_code(), global_procinit_level()) isa Nothing + @test current_procinit_level() == global_procinit_level() + @test Main._g_inittest3 == 103 + @info "The following \"Failed to raise process 1 init level\" error message is expected" + @test_throws ErrorException _execute_procinit_code(get_procinit_code(), global_procinit_level() + 1) + + # Test that output of _initcode_wrapperexpr runs cleanly: + _dummy_initstep_expr = :(_g_inittest4 = 104) + _global_procinit_level[] = _global_procinit_level[] + 1 + _store_additional_procinit_code(_dummy_initstep_expr, global_procinit_level()) + @test Core.eval(Main, _g_wrapped_procinit_code) isa Nothing + @test current_procinit_level() == global_procinit_level() + @test Main._g_inittest4 == 104 + + add_procinit_code(:(_g_somevar1 = 201)) + @test Main._g_somevar1 == 201 + + @always_everywhere begin + _g_somevar2 = 202 + end + @test Main._g_somevar2 == 202 + + classic_addprocs(2) + ensure_procinit(workers()[end]) + + @test remotecall_fetch(last(workers())) do + _g_inittest1 + _g_inittest2 + _g_inittest3 + _g_inittest4 + _g_somevar1 + _g_somevar2 + end == 813 + + rmprocs(workers()) +end + +ENV["JULIA_DEBUG"] = old_julia_debug diff --git a/test/test_readme_examples.jl b/test/test_readme_examples.jl index 12d53e5..3b36b78 100644 --- a/test/test_readme_examples.jl +++ b/test/test_readme_examples.jl @@ -5,14 +5,16 @@ using Test using Distributed +include("testtools.jl") + if length(workers()) < 2 - addprocs(2) + classic_addprocs(2) end @testset "workpartition" begin @testset "parallel histogramming" begin using Distributed, ParallelProcessingTools - addprocs(2) + classic_addprocs(2) @everywhere using ParallelProcessingTools, Base.Threads, DistributedArrays, Statistics, StatsBase diff --git a/test/test_states.jl b/test/test_states.jl new file mode 100644 index 0000000..4da7e4d --- /dev/null +++ b/test/test_states.jl @@ -0,0 +1,125 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using ParallelProcessingTools: getlabel, isactive, wouldwait, hasfailed, whyfailed + +using Distributed: myid, remotecall + +@testset "states" begin + good_task = Threads.@spawn 42 + bad_task = Threads.@spawn error("Some error") + + @static if Sys.isunix() + good_process = open(`true`) + bad_process = open(`false`) + elseif Sys.iswindows() + good_process = open(`cmd /C "exit 0"`) + bad_process = open(`cmd /C "exit 1"`) + else + error("Unsupported OS") + end + + running_future = remotecall(()->sleep(20), myid()) + complete_future = remotecall(()-> 42, myid()) + + empty_open_channel = Channel{Int}(1) + ready_open_channel = Channel{Int}(1) + put!(ready_open_channel, 42) + good_closed_channel = Channel{Int}(1) + close(good_closed_channel) + bad_closed_channel = Channel{Int}(1) + close(bad_closed_channel, ErrorException("Some error")) + + active_timer = Timer(120) + stopped_timer = Timer(0) + + active_condition = Base.AsyncCondition() + closed_condition = Base.AsyncCondition() + close(closed_condition) + + sleep(2) + + @testset "getlabel" begin + @test getlabel(nothing) isa String + @test getlabel(missing) isa String + @test getlabel(good_task) isa String + @test getlabel(bad_task) isa String + @test getlabel(good_process) isa String + @test getlabel(bad_process) isa String + @test getlabel(active_timer) isa String + @test getlabel(stopped_timer) isa String + @test getlabel(running_future) isa String + @test getlabel(complete_future) isa String + @test getlabel(empty_open_channel) isa String + @test getlabel(ready_open_channel) isa String + @test getlabel(good_closed_channel) isa String + @test getlabel(bad_closed_channel) isa String + @test getlabel(active_condition) isa String + @test getlabel(closed_condition) isa String + end + + @testset "isactive" begin + @test isactive(nothing)== false + @test isactive(missing) == true + @test isactive(good_task) == false + @test isactive(bad_task) == false + @test isactive(good_process) == false + @test isactive(bad_process) == false + @test isactive(active_timer) == true + @test isactive(stopped_timer) == false + @test isactive(running_future) == true + @test isactive(complete_future) == false + @test isactive(empty_open_channel) == true + @test isactive(ready_open_channel) == true + @test isactive(good_closed_channel) == false + @test isactive(bad_closed_channel) == false + @test isactive(active_condition) == true + @test isactive(closed_condition) == false + end + + @testset "wouldwait" begin + @test wouldwait(nothing) == false + @test_throws ArgumentError wouldwait(missing) + @test wouldwait(good_task) == false + @test wouldwait(bad_task) == false + @test wouldwait(good_process) == false + @test wouldwait(bad_process) == false + @test wouldwait(active_timer) == true + @test wouldwait(stopped_timer) == false + @test wouldwait(running_future) == true + @test wouldwait(complete_future) == false + @test wouldwait(empty_open_channel) == true + @test wouldwait(ready_open_channel) == false + @test wouldwait(good_closed_channel) == false + @test wouldwait(bad_closed_channel) == false + @test wouldwait(active_condition) == true + @test wouldwait(closed_condition) == false + end + + @testset "hasfailed" begin + @test hasfailed(nothing) == false + @test hasfailed(missing) == false + @test hasfailed(good_task) == false + @test hasfailed(bad_task) == true + @test hasfailed(good_process) == false + @test hasfailed(bad_process) == true + @test hasfailed(empty_open_channel) == false + @test hasfailed(good_closed_channel) == false + @test hasfailed(bad_closed_channel) == true + end + + @testset "whyfailed" begin + @test_throws ArgumentError whyfailed(nothing) + @test_throws ArgumentError whyfailed(missing) + @test_throws ArgumentError whyfailed(good_task) + @test_throws ArgumentError whyfailed(good_process) + + @test whyfailed(bad_task) isa ErrorException + @test whyfailed(bad_process) == ParallelProcessingTools.NonZeroExitCode(1) + + @test_throws ArgumentError whyfailed(empty_open_channel) + @test whyfailed(bad_closed_channel) isa ErrorException + end +end diff --git a/test/test_waiting.jl b/test/test_waiting.jl new file mode 100644 index 0000000..d10de5a --- /dev/null +++ b/test/test_waiting.jl @@ -0,0 +1,148 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using ParallelProcessingTools: TimelimitExceeded + + +@testset "waiting" begin + if Sys.islinux() + sleep_test_precision = 2 + elseif Sys.isapple() + sleep_test_precision = 50 + else + sleep_test_precision = 3 + end + + @testset "sleep_ns" begin + function measure_sleep_ns(t_s; ntimes) + t_ns = round(Int64, t_s * 1e9) + sleep_ns(t_ns) + minimum(broadcast(1:10) do _ + inv(ntimes) * @elapsed for _ in 1:ntimes + sleep_ns(t_ns) + end + end) + end + + @test measure_sleep_ns(0e-6, ntimes = 10000) < sleep_test_precision * 1e-6 + @test 0.5e-6 < measure_sleep_ns(1e-6, ntimes = 10000) < sleep_test_precision * 2e-6 + @test 5e-6 < measure_sleep_ns(10e-6, ntimes = 1000) < sleep_test_precision * 15e-6 + @test 50e-6 < measure_sleep_ns(100e-6, ntimes = 100) < sleep_test_precision * 150e-6 + @test 500e-6 < measure_sleep_ns(1000e-6, ntimes = 10) < sleep_test_precision * 1500e-6 + @test 5000e-6 < measure_sleep_ns(10000e-6, ntimes = 1) < sleep_test_precision * 15000e-6 + @test 50000e-6 < measure_sleep_ns(100000e-6, ntimes = 1) < sleep_test_precision * 150000e-6 + end + + @testset "idle_sleep" begin + function measure_idle_sleep(n_idle, t_interval_s, t_max_s; ntimes) + idle_sleep(n_idle, t_interval_s, t_max_s) + minimum(broadcast(1:10) do _ + inv(ntimes) * @elapsed for _ in 1:ntimes + idle_sleep(n_idle, t_interval_s, t_max_s) + end + end) + end + + @test measure_idle_sleep(0, 10e-6, 100e-6, ntimes = 10000) < sleep_test_precision * 2e-6 + @test 5e-6 < measure_idle_sleep(1, 10e-6, 100e-6, ntimes = 1000) < sleep_test_precision * 15e-6 + @test 10e-6 < measure_idle_sleep(2, 10e-6, 100e-6, ntimes = 100) < sleep_test_precision * 25e-6 + @test 15e-6 < measure_idle_sleep(5, 10e-6, 100e-6, ntimes = 100) < sleep_test_precision * 35e-6 + @test 30e-6 < measure_idle_sleep(10, 10e-6, 100e-6, ntimes = 100) < sleep_test_precision * 40e-6 + @test 50e-6 < measure_idle_sleep(100, 10e-6, 100e-6, ntimes = 100) < sleep_test_precision * 80e-6 + @test 85e-6 < measure_idle_sleep(100000, 10e-6, 100e-6, ntimes = 100) < sleep_test_precision * 120e-6 + end + + @testset "wait_while" begin + t0 = time() + task = Threads.@spawn sleep(5) + timer = Timer(0.2) + @wait_while !istaskdone(task) && isopen(timer) + @test istaskdone(task) == false + @test time() - t0 < 3 + + @test_throws ArgumentError @wait_while maxtime true + @test_throws ArgumentError @wait_while someopt=1 true + @test_throws TimelimitExceeded @wait_while maxtime=0.25 timeout_error=true true + @timed(@wait_while maxtime=-0.5 true).time < 0.1 + t = Timer(2); 0.3 < @timed(@wait_while maxtime=0.5 isopen(t)).time < 0.7 + t = Timer(0.5); 0.3 < @timed(@wait_while timeout_error=true isopen(t)).time < 0.7 + end + + @testset "wait_for_any" begin + @test wait_for_any(nothing) isa Nothing + @test wait_for_any(nothing, nothing, nothing) isa Nothing + @test wait_for_any([nothing, nothing, nothing]) isa Nothing + + t0 = time() + wait_for_any(Timer(0.5)) + @test 0.1 < time() - t0 < 0.9 + + @test_throws TimelimitExceeded wait_for_any(Timer(0.5), maxtime = 0.1, timeout_error = true) + + t0 = time() + task1 = Threads.@spawn sleep(0.2) + task2 = Threads.@spawn sleep(0.6) + wait_for_any(task1, task2, maxtime = 0.4, timeout_error = true) + @test istaskdone(task1) == true + @test istaskdone(task2) == false + @test 0.1 < time() - t0 < 0.5 + + t0 = time() + task1 = Threads.@spawn sleep(0.4) + task2 = Threads.@spawn sleep(0.6) + @test_throws TimelimitExceeded wait_for_any(task1, task2, maxtime = 0.1, timeout_error = true) + + t0 = time() + task1 = Threads.@spawn sleep(0.2) + task2 = Threads.@spawn sleep(0.6) + wait_for_any([task1, task2], maxtime = 0.4, timeout_error = true) + @test istaskdone(task1) == true + @test istaskdone(task2) == false + @test 0.1 < time() - t0 < 0.5 + + t0 = time() + task1 = Threads.@spawn sleep(0.4) + task2 = Threads.@spawn sleep(0.6) + @test_throws TimelimitExceeded wait_for_any([task1, task2], maxtime = 0.1, timeout_error = true) + end + + @testset "wait_for_all" begin + @test wait_for_all(nothing) isa Nothing + @test wait_for_all(nothing, nothing, nothing) isa Nothing + @test wait_for_all([nothing, nothing, nothing]) isa Nothing + + t0 = time() + wait_for_all(Timer(1)) + @test 0.5 < time() - t0 < 3 + + t0 = time() + @test_throws TimelimitExceeded wait_for_all(Timer(5); maxtime = 0.4, timeout_error = true) + @test 0.2 < time() - t0 < 0.6 + + t0 = time() + task1 = Threads.@spawn sleep(1) + task2 = Threads.@spawn sleep(0.1) + wait_for_all(task1, nothing, task2) + @test 0.8 < time() - t0 < 3 + + t0 = time() + task1 = Threads.@spawn sleep(1) + task2 = Threads.@spawn sleep(0.1) + wait_for_all([task1, nothing, task2]) + @test 0.8 < time() - t0 < 3 + + t0 = time() + task1 = Threads.@spawn sleep(1) + task2 = Threads.@spawn sleep(0.1) + @test_throws TimelimitExceeded wait_for_all(task1, nothing, task2; maxtime = 0.4, timeout_error = true) + @test 0.2 < time() - t0 < 0.6 + + t0 = time() + task1 = Threads.@spawn sleep(1) + task2 = Threads.@spawn sleep(0.1) + @test_throws TimelimitExceeded wait_for_all([task1, nothing, task2]; maxtime = 0.4, timeout_error = true) + @test 0.2 < time() - t0 < 0.6 + end +end diff --git a/test/test_workerpool.jl b/test/test_workerpool.jl new file mode 100644 index 0000000..8d3ed8d --- /dev/null +++ b/test/test_workerpool.jl @@ -0,0 +1,102 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using Distributed +import Pkg + +old_julia_debug = get(ENV, "JULIA_DEBUG", "") +ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" +stopworkers() + +if !isdefined(@__MODULE__, :wp_test_func) + @always_everywhere begin + wp_test_func() = 42 + end +end + +@testset "workerpool" begin + @test wp_test_func() == 42 + + pool = FlexWorkerPool(withmyid = true, caching = false, label = "mypool", maxoccupancy = 3) + + # no workers yet, pool should fall back to using myid(): + @test @inferred(workers(pool)) == [myid()] + @test @inferred(length(pool)) == length(workers(pool)) + pids = [@inferred(take!(pool)) for _ in 1:3] + @test remotecall_fetch(() -> wp_test_func(), first(pids)) == 42 + @test @inferred(isready(pool)) == false + @test sort(pids) == repeat([myid()], 3) + foreach(pid -> @inferred(put!(pool, pid)), pids) + @test isready(pool) == true + + # This should be a no-op, as myid() is already in the pool: + @test push!(pool, myid()) isa FlexWorkerPool + + prev_workers = workers() + classic_addprocs(2) + new_workers = setdiff(workers(), prev_workers) + + # pool2 has no fallback to myid() and doesn_t init workers: + pool2 = FlexWorkerPool{WorkerPool}(new_workers, maxoccupancy = 3, init_workers = false) + + foreach(pid -> push!(pool2, pid), new_workers) + @test workers(pool2) == new_workers + @test length(pool2) == length(workers(pool2)) + pids = [take!(pool2) for _ in 1:2*3] + @test_throws UndefVarError @userfriendly_exceptions remotecall_fetch(() -> wp_test_func(), first(pids)) + @test isready(pool2) == false + @test sort(pids) == sort(repeat(new_workers, 3)) + foreach(pid -> put!(pool2, pid), pids) + @test isready(pool2) == true + + # Add new workers to pool: + foreach(pid -> @inferred(push!(pool, pid)), new_workers) + + @test workers(pool) == new_workers + @test length(pool) == length(workers(pool)) + pids = [take!(pool) for _ in 1:2*3] + @test remotecall_fetch(() -> wp_test_func(), first(pids)) == 42 + @test isready(pool) == false + @test sort(pids) == sort(repeat(new_workers, 3)) + foreach(pid -> put!(pool, pid), pids) + @test isready(pool) == true + + # This should be a no-op, as the workers are already in the pool: + @test push!(pool, first(new_workers)) isa FlexWorkerPool + @test push!(pool2, first(new_workers)) isa FlexWorkerPool + + rmprocs(new_workers) + + # Workers are gone, should show a warning, but not throw an exception + # (ToDo: Use @test_warn): + @test push!(pool, first(new_workers)) isa FlexWorkerPool + @test push!(pool2, first(new_workers)) isa FlexWorkerPool + + # no more workers, pool should fall back to using myid(): + pids = [take!(pool) for _ in 1:3] + # length should be updated now: + @test length(pool) == 1 + @test sort(pids) == repeat([myid()], 3) + foreach(pid -> put!(pool, pid), pids) + + # Trigger update of pool2._pool: + @test_throws ErrorException take!(pool2._pool) + @test length(pool2) == 0 + + # Allow fallback to myid() for pool2: + push!(pool2, myid()) + @test length(pool2) == 1 + + pids = [take!(pool2) for _ in 1:3] + @test sort(pids) == repeat([myid()], 3) + foreach(pid -> put!(pool2, pid), pids) + + pool3 = ppt_worker_pool() + @test pool3 isa FlexWorkerPool + @test workers(pool3) == [myid()] +end + +stopworkers() +ENV["JULIA_DEBUG"] = old_julia_debug diff --git a/test/testtools.jl b/test/testtools.jl new file mode 100644 index 0000000..c60f089 --- /dev/null +++ b/test/testtools.jl @@ -0,0 +1,40 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +import Pkg + +if !isdefined(@__MODULE__, :classic_addprocs) + +function classic_addprocs(n::Integer) + if VERSION >= v"1.10" + addprocs(2) + else + # addprocs doesn't set project automatically on some older Julia versions + addprocs(2; exeflags=`--project=$(Pkg.project().path)`) + end +end + + +function test_runprocs(f_runprocs, additional_n) + old_procs = procs() + old_n = length(old_procs) + expected_n = old_n + additional_n + try + state = @return_exceptions f_runprocs() + @test !(state isa Exception) + if !(state isa Exception) + @wait_while maxtime=30 timeout_error = true ( + nprocs() < expected_n && (isnothing(state) || isactive(state)) + ) + @test isnothing(state) || isactive(state) + @test nprocs() == expected_n + rmprocs(setdiff(procs(), old_procs)) + @test procs() == old_procs + @wait_while maxtime=10 isactive(state) + @test !isactive(state) + end + finally + rmprocs(setdiff(procs(), old_procs)) + end +end + +end # if not already defined