distributed map
returning an observable for following worker progress
The opmap
function is a wrapper around pmap
which allows workers to report their status. The function passed to opmap
takes as its first argument a callback setmessage
that can be used to set a status message for the current worker. Unlike pmap
, the mapping is performed asynchronously. opmap
returns an Observable summary string of worker statuses, and the Task
performing the mapping.
Running some fake work and viewing worker state in IJulia or Juno plot pane using HTML:
using Distributed
addprocs(2)
@everywhere using ObservablePmap
using CSSUtil: vbox
obs, task = opmap(1:5; schedule_now=false, on_error=identity) do setmessage, x
setmessage("Initializing...")
sleep(1)
n = rand(2:10)
for i=1:n
setmessage("Fooing $x-bars ($i/$n) ")
x==3 && error("Error at $i")
sleep(rand())
end
setmessage("All $x-bars successfully fooed.")
end
html = map(x -> HTML("<pre>$x</pre>"), obs)
schedule(task)
vbox(html) # not necessary on IJulia
Here's what the output looks like in Juno's plot pane:
When using ologpmap
, log messages produced by workers are used as their status message. Accordingly, the function argument to ologpmap
does not take an additional setmessage
argument. By default, log messages in workers are processed by a Base.CoreLogging.SimpleLogger
, and its output is used as the status message. The logger_f
keyword argument to ologpmap
can be used to specify a different logger: specifically, logger_f(io)
should produce a logger writing log messages to io
.
This example uses ProgressLogging.jl
and TerminalLoggers.jl
to show progress bars.
using Distributed
addprocs(2)
@everywhere using ObservablePmap
@everywhere using ProgressLogging
@everywhere using TerminalLoggers
using CSSUtil: vbox
summ, task = ologpmap( 'a':'e', 2:2:10; logger_f=TerminalLogger ) do c, x
@withprogress name="Processing '$c'" for i=1:x
sleep(rand())
@logprogress i/x
end
@info "finished '$c'."
x
end
html = map(x -> HTML("<pre>$x</pre>"), summ)
vbox(html)