diff --git a/LICENSE.md b/LICENSE.md index 77ab41e..40011a5 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,7 +1,7 @@ -ClusterManagers.jl is licensed under the MIT License: +ElasticClusterManager.jl is licensed under the MIT License: -> Copyright (c) 2013: Amit Murthy -> and other contributors: https://github.com/amitmurthy/ClusterManagers.jl/contributors +> Copyright (c) 2013: Amit Murthy and contributors +> and other contributors: https://github.com/JuliaParallel/ElasticClusterManager.jl > > Permission is hereby granted, free of charge, to any person obtaining > a copy of this software and associated documentation files (the diff --git a/Project.toml b/Project.toml index 429f6e4..4083cd4 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "ElasticClusterManager" uuid = "547eee1f-27c8-4193-bfd6-9e092c8e3331" -version = "1.0.0" +version = "2.0.0" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/README.md b/README.md index d52f5bd..0367787 100755 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ port and publish their own host/port information for other workers to connect to On the master, you need to instantiate an instance of `ElasticManager`. The constructors defined are: ```julia -ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) +ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all) ElasticManager(port) = ElasticManager(;port=port) ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) @@ -34,9 +34,20 @@ ElasticManager: Number of workers to be added : 0 Terminated workers : [] Worker connect command : - /home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)' + /home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ElasticClusterManager; ElasticClusterManager.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)' ``` -By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor. +Use `ElasticClusterManager.get_connect_cmd(em; kwargs...)` to generate a suitable system command to start up a +worker process, e.g.: -Once workers are connected, you can print the `em` object again to see them added to the list of active workers. +```julia +print(ElasticClusterManager.get_connect_cmd(em; absolute_exename=false, same_project=false)) +``` + +``` +/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ElasticClusterManager; ElasticClusterManager.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)' +``` + +By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. + +Once workers have connected, `show(em)` should show them added to the list of active workers. diff --git a/src/ElasticClusterManager.jl b/src/ElasticClusterManager.jl index 9333e9b..e50e8a7 100755 --- a/src/ElasticClusterManager.jl +++ b/src/ElasticClusterManager.jl @@ -11,7 +11,6 @@ import Pkg # Bring some names into scope, just for convenience: using Distributed: launch, manage, kill, init_worker, connect -export launch, manage, kill, init_worker, connect export ElasticManager, elastic_worker # This seems to be currently unused: diff --git a/src/elastic.jl b/src/elastic.jl index b5f1d15..f8d4e98 100644 --- a/src/elastic.jl +++ b/src/elastic.jl @@ -11,22 +11,60 @@ else my_errormonitor(t) = nothing end +""" + ElasticManager <: Distributed.ClusterManager + +A cluster manager that allows for adding and removing workers dynamically. + +Constructor: + +```julia +function ElasticManager(; + addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, + topology=:all_to_all, manage_callback=elastic_no_op_callback +) +``` + +The manager will listen for worker connections on `addr:port`. Workers have +to authenticate themselves with `cookie` (randomly generated by default). +`topology` sets the Julia cluster topology and may be `:all_to_all` or +`:master_worker` (see documentation of `Distributed.addprocs` for details. + +Workers are not added via `Distributed.addprocs`, instead started via +[`ElasticClusterManager.elastic_worker`](@ref) from external Julia +processes, this is up to the user. + +If `manage_callback` is set to a user-provided `my_manage_callback`, it will +be called on the primary Julia process when `Distributed.manage` adds or +removes an elastic worker. The callback signature is + +```julia +my_manage_callback(mgr::ElasticManager, id::Integer, :register) +my_manage_callback(mgr::ElasticManager, id::Integer, :deregister) +``` + +This can be used to automatically add workers to a `Distributed.WorkerPool`, +and so on. +""" struct ElasticManager <: Distributed.ClusterManager active::Dict{Int, Distributed.WorkerConfig} # active workers pending::Channel{Sockets.TCPSocket} # to be added workers terminated::Set{Int} # terminated worker ids topology::Symbol sockname - printing_kwargs + manage_callback - function ElasticManager(;addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=()) + function ElasticManager(; + addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing, + topology=:all_to_all, manage_callback=elastic_no_op_callback + ) Distributed.init_multi() cookie !== nothing && Distributed.cluster_cookie(cookie) # Automatically check for the IP address of the local machine if addr == :auto try - addr = Sockets.getipaddr(Distributed.IPv4) + addr = Sockets.getipaddr(Sockets.IPv4) catch error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.") end @@ -34,7 +72,7 @@ struct ElasticManager <: Distributed.ClusterManager l_sock = Distributed.listen(addr, port) - lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), printing_kwargs) + lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback) t1 = @async begin while true @@ -53,12 +91,14 @@ struct ElasticManager <: Distributed.ClusterManager end end -ElasticManager(port) = ElasticManager(;port=port) -ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port) -ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie) +@deprecate ElasticManager(port) ElasticManager(;port=port) +@deprecate ElasticManager(addr, port) ElasticManager(;addr=addr, port=port) +@deprecate ElasticManager(addr, port, cookie) ElasticManager(;addr=addr, port=port, cookie=cookie) +elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket) + @debug "ElasticManager got new worker connection" # Socket is the worker's STDOUT wc = Distributed.WorkerConfig() wc.io = s @@ -94,6 +134,7 @@ end function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition) # The workers have already been started. while isready(mgr.pending) + @debug "ElasticManager.launch new worker" wc=Distributed.WorkerConfig() wc.io = take!(mgr.pending) push!(launched, wc) @@ -104,8 +145,12 @@ end function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol) if op == :register + @debug "ElasticManager registering process id $id" mgr.active[id] = config + mgr.manage_callback(mgr, id, op) elseif op == :deregister + @debug "ElasticManager deregistering process id $id" + mgr.manage_callback(mgr, id, op) delete!(mgr.active, id) push!(mgr.terminated, id) end @@ -131,22 +176,49 @@ function Base.show(io::IO, mgr::ElasticManager) seek(iob, position(iob)-1) println(iob, "]") - println(iob, " Worker connect command : ") - print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...)) - print(io, String(take!(iob))) end -# Does not return. If executing from a REPL try -# @async connect_to_cluster(.....) -# addr, port that a ElasticManager on the master processes is listening on. -function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true) +""" + ElasticClusterManager.elastic_worker( + cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009; + forward_stdout::Bool = true, + Base.@nospecialize(env::AbstractVector = [],) + ) + +Start an elastic worker process that connects to the main Julia process +at IP-address `addr` and port `port`. + +Does not return. +""" +function elastic_worker( + cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009; + forward_stdout::Bool = true, + Base.@nospecialize(env::AbstractVector = [],) +) + @debug "ElasticManager.elastic_worker(cookie, $addr, $port; forward_stdout=$forward_stdout, env=$env)" + for (k, v) in env + ENV[k] = v + end + c = connect(addr, port) write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN]) - stdout_to_master && redirect_stdout(c) + if forward_stdout + redirect_stdout(c) + end Distributed.start_worker(c, cookie) end +""" + ElasticManager.get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=()) + +Return a suitable system command that starts a Julia process with an elastic +worker that will try to connect the the primary Julia process running the +[`ElasticManager`](@ref) instance `em`. + +The output of `get_connect_cmd` should be taken as a baseline. Complex use +cases (on some batch systems, etc.) may require a more involved command. +""" function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=()) ip = string(em.sockname[1]) port = convert(Int,em.sockname[2]) @@ -160,5 +232,4 @@ function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project project..., "-e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'" ]," ") - end diff --git a/test/elastic.jl b/test/elastic.jl index 4ff2f88..6ac9c1c 100644 --- a/test/elastic.jl +++ b/test/elastic.jl @@ -25,7 +25,6 @@ @test lines[2] == "Active workers : []" @test lines[3] == "Number of workers to be added : 0" @test lines[4] == "Terminated workers : [ 2]" - @test lines[5] == "Worker connect command :" end @testset "Other constructors for ElasticManager()" begin