Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port improvements from ParallelProcessingTools #19

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
ClusterManagers.jl is licensed under the MIT License:
ElasticClusterManager.jl is licensed under the MIT License:

> Copyright (c) 2013: Amit Murthy
> and other contributors: https://github.com/amitmurthy/ClusterManagers.jl/contributors
> Copyright (c) 2013: Amit Murthy and contributors
> and other contributors: https://github.com/JuliaParallel/ElasticClusterManager.jl
>
> Permission is hereby granted, free of charge, to any person obtaining
> a copy of this software and associated documentation files (the
Expand Down
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "ElasticClusterManager"
uuid = "547eee1f-27c8-4193-bfd6-9e092c8e3331"
version = "1.0.0"
version = "2.0.0"

[deps]
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Expand Down
19 changes: 15 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ port and publish their own host/port information for other workers to connect to
On the master, you need to instantiate an instance of `ElasticManager`. The constructors defined are:

```julia
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)
ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
Expand All @@ -34,9 +34,20 @@ ElasticManager:
Number of workers to be added : 0
Terminated workers : []
Worker connect command :
/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ClusterManagers; ClusterManagers.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)'
/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ElasticClusterManager; ElasticClusterManager.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)'
```

By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session. You can change either of these defaults by passing `printing_kwargs=(absolute_exename=false, same_project=false))` to the first form of the `ElasticManager` constructor.
Use `ElasticClusterManager.get_connect_cmd(em; kwargs...)` to generate a suitable system command to start up a
worker process, e.g.:

Once workers are connected, you can print the `em` object again to see them added to the list of active workers.
```julia
print(ElasticClusterManager.get_connect_cmd(em; absolute_exename=false, same_project=false))
```

```
/home/user/bin/julia --project=/home/user/myproject/Project.toml -e 'using ElasticClusterManager; ElasticClusterManager.elastic_worker("4cOSyaYpgSl6BC0C","127.0.1.1",36275)'
```

By default, the printed command uses the absolute path to the current Julia executable and activates the same project as the current session.

Once workers have connected, `show(em)` should show them added to the list of active workers.
103 changes: 87 additions & 16 deletions src/elastic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,68 @@
my_errormonitor(t) = nothing
end

"""
ElasticManager <: Distributed.ClusterManager

A cluster manager that allows for adding and removing workers dynamically.

Constructor:

```julia
function ElasticManager(;
addr=IPv4("127.0.0.1"), port=9009, cookie=nothing,
topology=:all_to_all, manage_callback=elastic_no_op_callback
)
```

The manager will listen for worker connections on `addr:port`. Workers have
to authenticate themselves with `cookie` (randomly generated by default).
`topology` sets the Julia cluster topology and may be `:all_to_all` or
`:master_worker` (see documentation of `Distributed.addprocs` for details.

Workers are not added via `Distributed.addprocs`, instead started via
[`ElasticClusterManager.elastic_worker`](@ref) from external Julia
processes, this is up to the user.

If `manage_callback` is set to a user-provided `my_manage_callback`, it will
be called on the primary Julia process when `Distributed.manage` adds or
removes an elastic worker. The callback signature is

```julia
my_manage_callback(mgr::ElasticManager, id::Integer, :register)
my_manage_callback(mgr::ElasticManager, id::Integer, :deregister)
```

This can be used to automatically add workers to a `Distributed.WorkerPool`,
and so on.
"""
struct ElasticManager <: Distributed.ClusterManager
active::Dict{Int, Distributed.WorkerConfig} # active workers
pending::Channel{Sockets.TCPSocket} # to be added workers
terminated::Set{Int} # terminated worker ids
topology::Symbol
sockname
printing_kwargs
manage_callback

function ElasticManager(;addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
function ElasticManager(;
addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing,
topology=:all_to_all, manage_callback=elastic_no_op_callback
)
Distributed.init_multi()
cookie !== nothing && Distributed.cluster_cookie(cookie)

# Automatically check for the IP address of the local machine
if addr == :auto
try
addr = Sockets.getipaddr(Distributed.IPv4)
addr = Sockets.getipaddr(Sockets.IPv4)
catch
error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.")
end
end

l_sock = Distributed.listen(addr, port)

lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), printing_kwargs)
lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback)

t1 = @async begin
while true
Expand All @@ -53,12 +91,14 @@
end
end

ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
@deprecate ElasticManager(port) ElasticManager(;port=port)
@deprecate ElasticManager(addr, port) ElasticManager(;addr=addr, port=port)
@deprecate ElasticManager(addr, port, cookie) ElasticManager(;addr=addr, port=port, cookie=cookie)

elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing

function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket)
@debug "ElasticManager got new worker connection"
# Socket is the worker's STDOUT
wc = Distributed.WorkerConfig()
wc.io = s
Expand Down Expand Up @@ -94,6 +134,7 @@
function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition)
# The workers have already been started.
while isready(mgr.pending)
@debug "ElasticManager.launch new worker"
wc=Distributed.WorkerConfig()
wc.io = take!(mgr.pending)
push!(launched, wc)
Expand All @@ -104,8 +145,12 @@

function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol)
if op == :register
@debug "ElasticManager registering process id $id"
mgr.active[id] = config
mgr.manage_callback(mgr, id, op)
elseif op == :deregister
@debug "ElasticManager deregistering process id $id"
mgr.manage_callback(mgr, id, op)
delete!(mgr.active, id)
push!(mgr.terminated, id)
end
Expand All @@ -131,22 +176,49 @@
seek(iob, position(iob)-1)
println(iob, "]")

println(iob, " Worker connect command : ")
print(iob, " ", get_connect_cmd(mgr; mgr.printing_kwargs...))

print(io, String(take!(iob)))
end

# Does not return. If executing from a REPL try
# @async connect_to_cluster(.....)
# addr, port that a ElasticManager on the master processes is listening on.
function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
"""
ElasticClusterManager.elastic_worker(
cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009;
forward_stdout::Bool = true,
Base.@nospecialize(env::AbstractVector = [],)
)

Start an elastic worker process that connects to the main Julia process
at IP-address `addr` and port `port`.

Does not return.
"""
function elastic_worker(
cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009;
forward_stdout::Bool = true,
Base.@nospecialize(env::AbstractVector = [],)
)
@debug "ElasticManager.elastic_worker(cookie, $addr, $port; forward_stdout=$forward_stdout, env=$env)"
for (k, v) in env
ENV[k] = v
end

Check warning on line 202 in src/elastic.jl

View check run for this annotation

Codecov / codecov/patch

src/elastic.jl#L201-L202

Added lines #L201 - L202 were not covered by tests

c = connect(addr, port)
write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN])
stdout_to_master && redirect_stdout(c)
if forward_stdout
redirect_stdout(c)
end
Distributed.start_worker(c, cookie)
end

"""
ElasticManager.get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=())

Return a suitable system command that starts a Julia process with an elastic
worker that will try to connect the the primary Julia process running the
[`ElasticManager`](@ref) instance `em`.

The output of `get_connect_cmd` should be taken as a baseline. Complex use
cases (on some batch systems, etc.) may require a more involved command.
"""
function get_connect_cmd(em::ElasticManager; absolute_exename=true, same_project=true, exeflags::Tuple=())
ip = string(em.sockname[1])
port = convert(Int,em.sockname[2])
Expand All @@ -160,5 +232,4 @@
project...,
"-e 'import ElasticClusterManager; ElasticClusterManager.elastic_worker(\"$cookie\",\"$ip\",$port)'"
]," ")

end
1 change: 0 additions & 1 deletion test/elastic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
@test lines[2] == "Active workers : []"
@test lines[3] == "Number of workers to be added : 0"
@test lines[4] == "Terminated workers : [ 2]"
@test lines[5] == "Worker connect command :"
end

@testset "Other constructors for ElasticManager()" begin
Expand Down
Loading