diff --git a/Project.toml b/Project.toml index d5bc672e0..8cb63e662 100644 --- a/Project.toml +++ b/Project.toml @@ -18,7 +18,7 @@ StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91" [compat] Colors = "0.10, 0.11, 0.12" -MemPool = "0.3.3" +MemPool = "0.3.4" Requires = "1" StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33" julia = "1.0" diff --git a/benchmarks/benchmark.jl b/benchmarks/benchmark.jl index fa072c1a8..2fe1b8b63 100644 --- a/benchmarks/benchmark.jl +++ b/benchmarks/benchmark.jl @@ -40,6 +40,10 @@ elseif render == "offline" using FFMPEG, FileIO, ImageMagick end const RENDERS = Dict{Int,Dict}() +const live_port = parse(Int, get(ENV, "BENCHMARK_LIVE_PORT", "8000")) + +const graph = parse(Bool, get(ENV, "BENCHMARK_GRAPH", "0")) +const profile = parse(Bool, get(ENV, "BENCHMARK_PROFILE", "0")) _benches = get(ENV, "BENCHMARK", "cpu,cpu+dagger") const benches = [] @@ -106,7 +110,7 @@ end theory_flops(nrow, ncol, nfeatures) = 11 * ncol * nrow * nfeatures + 2 * (ncol + nrow) * nfeatures -function nmf_suite(; dagger, accel, kwargs...) +function nmf_suite(ctx; dagger, accel) suite = BenchmarkGroup() #= TODO: Re-enable @@ -179,39 +183,49 @@ function nmf_suite(; dagger, accel, kwargs...) ]) elseif accel == "cpu" Dagger.Sch.SchedulerOptions() + else + error("Unknown accelerator $accel") end - ctx = Context(collect((1:nw) .+ 1); kwargs...) p = sum([length(Dagger.get_processors(OSProc(id))) for id in 2:(nw+1)]) + #bsz = ncol ÷ length(workers()) + bsz = ncol ÷ 64 nsuite["Workers: $nw"] = @benchmarkable begin - compute($ctx, nnmf($X[], $W[], $H[]); options=$opts) + _ctx = Context($ctx, workers()[1:$nw]) + compute(_ctx, nnmf($X[], $W[], $H[]); options=$opts) end setup=begin _nw, _scale = $nw, $scale @info "Starting $_nw worker Dagger NNMF (scale by $_scale)" - if render != "" - Dagger.show_gantt($ctx; width=1800, window_length=20, delay=2, port=4040, live=live) - end if $accel == "cuda" # FIXME: Allocate with CUDA.rand if possible - $X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts)) - $W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts)) - $H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts)) + $X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts)) + $W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts)) + $H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts)) elseif $accel == "amdgpu" $X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts)) $W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts)) $H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts)) elseif $accel == "cpu" - $X[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts) - $W[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts) - $H[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts) + $X[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts) + $W[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts) + $H[] = compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts) end end teardown=begin - if render != "" + if render != "" && !live Dagger.continue_rendering[] = false - video_paths = take!(Dagger.render_results) - try - video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths)) - push!(get!(()->[], RENDERS[$scale], $nw), video_data) - catch + for i in 1:5 + isready(Dagger.render_results) && break + sleep(1) + end + if isready(Dagger.render_results) + video_paths = take!(Dagger.render_results) + try + video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths)) + push!(get!(()->[], RENDERS[$scale], $nw), video_data) + catch err + @error "Failed to process render results" exception=(err,catch_backtrace()) + end + else + @warn "Failed to fetch render results" end end $X[] = nothing @@ -219,7 +233,6 @@ function nmf_suite(; dagger, accel, kwargs...) $H[] = nothing @everywhere GC.gc() end - break nw ÷= 2 end suite["NNMF scaled by: $scale"] = nsuite @@ -234,13 +247,27 @@ function main() output_prefix = "result-$(np)workers-$(nt)threads-$(Dates.now())" suites = Dict() + graph_opts = if graph && render != "" + (log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot") + elseif render != "" + (log_sink=Dagger.LocalEventLog(),) + else + NamedTuple() + end + ctx = Context(collect((1:nw) .+ 1); profile=profile, graph_opts...) for bench in benches name = bench.name println("creating $name benchmarks") - suites[name] = if bench.dagger - nmf_suite(; dagger=true, accel=bench.accel, log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot", profile=false) - else - nmf_suite(; dagger=false, accel=bench.accel) + suites[name] = nmf_suite(ctx; dagger=bench.dagger, accel=bench.accel) + end + if render != "" + Dagger.show_gantt(ctx; width=1800, window_length=5, delay=2, port=live_port, live=live) + if live + # Make sure server code is compiled + sleep(1) + run(pipeline(`curl -s localhost:$live_port/`; stdout=devnull)) + run(pipeline(`curl -s localhost:$live_port/profile`; stdout=devnull)) + @info "Rendering started on port $live_port" end end res = Dict() @@ -248,14 +275,14 @@ function main() name = bench.name println("running $name benchmarks") res[name] = try - run(suites[name]; samples=5, seconds=10*60, gcsample=true) + run(suites[name]; samples=3, seconds=10*60, gcsample=true) catch err @error "Error running $name benchmarks" exception=(err,catch_backtrace()) nothing end end for bench in benches - println("benchmark results for $(bench.name): $(res[bench.name])") + println("benchmark results for $(bench.name): $(minimum(res[bench.name]))") end println("saving results in $output_prefix.$output_format") @@ -267,6 +294,11 @@ function main() serialize(io, outdict) end end + + if parse(Bool, get(ENV, "BENCHMARK_VISUALIZE", "0")) + run(`$(Base.julia_cmd()) $(joinpath(pwd(), "visualize.jl")) -- $(output_prefix*"."*output_format)`) + end + println("Done.") # TODO: Compare with multiple results diff --git a/benchmarks/visualize.jl b/benchmarks/visualize.jl index c281f6f72..acb218866 100644 --- a/benchmarks/visualize.jl +++ b/benchmarks/visualize.jl @@ -1,27 +1,59 @@ -using JLD +using JLD, Serialization using BenchmarkTools using TypedTables -res = JLD.load(ARGS[1]) +res = if endswith(ARGS[1], ".jld") + JLD.load(ARGS[1]) +elseif endswith(ARGS[1], ".jls") + deserialize(ARGS[1]) +else + error("Unknown file type") +end -serial_results = res["results"]["Serial"] -dagger_results = res["results"]["Dagger"] +serial_results = filter(x->!occursin("dagger", x[1]), res["results"]) +@assert length(keys(serial_results)) > 0 "No serial results found" +dagger_results = filter(x->occursin("dagger", x[1]), res["results"]) +@assert length(keys(dagger_results)) > 0 "No Dagger results found" + +scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(serial_results)[2])]; by=x->x[2]) +nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(dagger_results)[2][first(first(scale_set))])]; by=x->x[2]) +raw_table = NamedTuple[] +for bset_key in keys(res["results"]) + bset = res["results"][bset_key] + if typeof(bset[first(first(scale_set))]) <: BenchmarkGroup + procs = parse(Int, lstrip(last(split(first(first(bset[first(first(scale_set))])), ':')), ' ')) + for nw in nw_set + for i in 1:length(scale_set) + set_times = [minimum(bset[scale][nw[1]]).time/(10^9) for scale in first.(scale_set)] + push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=nw[2])) + end + end + else + set_times = [minimum(bset[scale]).time/(10^9) for scale in first.(scale_set)] + procs = 8 # default for OpenBLAS + for i in 1:length(set_times) + push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=procs)) + end + end +end +table = Table(raw_table) -scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(serial_results)]; by=x->x[2]) -serial_times = [minimum(serial_results[scale]).time/(10^9) for scale in first.(scale_set)] -nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(dagger_results[first(first(scale_set))])]; by=x->x[2]) +btable = copy(table[map(x->!x, occursin.(Ref("dagger"), table.name))]) +dtable = copy(table[occursin.(Ref("dagger"), table.name)]) -table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3]) +#table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3]) -btable = copy(table) +#btable = copy(table) +#= for (nw,nw_val) in nw_set dagger_times = [minimum(dagger_results[scale][nw]).time/(10^9) for scale in first.(scale_set)] t = Table(name=[:Dagger for _ in 1:3], time=dagger_times, scale=last.(scale_set), procs=[parse(Int,split(nw, ":")[2]) for _ in 1:3]) append!(table, t) end +=# -dtable = table[table.name .== :Dagger] +#dtable = table[table.name .== :Dagger] # Plotting @@ -45,11 +77,11 @@ legend_names = String[] scales = unique(dtable.scale) -colors = distinguishable_colors(lenght(scales), ColorSchemes.seaborn_deep.colors) +colors = distinguishable_colors(length(scales), ColorSchemes.seaborn_deep.colors) for (i, scale) in enumerate(scales) stable = dtable[dtable.scale .== scale] - t1 = first(stable[stable.procs .== 1].time) + t1 = first(stable[stable.procs .== minimum(dtable.procs)].time) ss_efficiency = strong_scaling.(t1, stable.time, stable.procs) push!(line_plots, lines!(ssp, stable.procs, ss_efficiency, linewidth=3.0, color = colors[i])) push!(legend_names, "scale = $scale") @@ -65,25 +97,32 @@ save("strong_scaling.png", fig) # too little data fig = Figure(resolution = (1200, 800)) -weak_scaling(t1, tn) = t1/tn +weak_scaling(t1, tn, p_prime, p) = t1/((p_prime/p)*tn) -dtable = table[table.name .== :Dagger] -wstable = filter(row->row.scale == row.procs, dtable) -wstable = sort(wstable, by=r->r.scale) -t1 = first(wstable).time +t1 = first(dtable[map(row->(row.scale == 10) && (row.procs == 1), dtable)]).time fig = Figure(resolution = (1200, 800)) -perf = fig[1, 1] = Axis(fig, title = "Weak scaling") -perf.xlabel = "nprocs" -perf.ylabel = "Efficiency" +perf = fig[1, 1] = Axis(fig, title = "Weak Scaling") +perf.xlabel = "Number of processes" +perf.ylabel = "Scaling efficiency" + +line_plots = Any[] +legend_names = String[] -lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time), linewidth=3.0) +wstable = similar(dtable, 0) +for pair in [(10,1),(35,4),(85,8)] + append!(wstable, dtable[map(row->(row.scale == pair[1]) && (row.procs == pair[2]), rows(dtable))]) +end +push!(line_plots, lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time, wstable.procs .* 10, wstable.scale), linewidth=3.0)) +push!(legend_names, "cpu+dagger") + +legend = fig[1, 2] = Legend(fig, line_plots, legend_names) save("weak_scaling.png", fig) # 3. Comparision against Base fig = Figure(resolution = (1200, 800)) -perf = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base") +perf = fig[1, 1] = Axis(fig, title = "Dagger vs Base") perf.xlabel = "Scaling factor" perf.ylabel = "time (s)" @@ -92,7 +131,7 @@ legend_names = String[] procs = unique(dtable.procs) -colors = distinguishable_colors(lenght(procs) + 1, ColorSchemes.seaborn_deep.colors) +colors = distinguishable_colors(length(procs) + 1, ColorSchemes.seaborn_deep.colors) for (i, nproc) in enumerate(procs) stable = dtable[dtable.procs .== nproc] @@ -109,23 +148,22 @@ save("raw_timings.png", fig) # 4. Speedup fig = Figure(resolution = (1200, 800)) -speedup = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base (8 threads)") -speedup.xlabel = "Scaling factor" -speedup.ylabel = "Speedup Base/Dagger" +speedup = fig[1, 1] = Axis(fig, title = "Speedup vs. 1 processor") +speedup.xlabel = "Number of processors" +speedup.ylabel = "Speedup" line_plots = Any[] legend_names = String[] colors = distinguishable_colors(length(procs), ColorSchemes.seaborn_deep.colors) -sort!(btable, by=r->r.scale) +t1 = sort(dtable[dtable.scale .== 10]; by=r->r.procs) -for (i, nproc) in enumerate(unique(dtable.procs)) - nproc < 8 && continue - stable = dtable[dtable.procs .== nproc] - sort!(stable, by=r->r.scale) - push!(line_plots, lines!(speedup, stable.scale, btable.time ./ stable.time, linewidth=3.0, color = colors[i])) - push!(legend_names, "Dagger (nprocs = $nproc)") +for (i, scale) in enumerate(unique(dtable.scale)) + stable = dtable[dtable.scale .== scale] + sort!(stable, by=r->r.procs) + push!(line_plots, lines!(speedup, stable.procs, stable.time ./ t1.time, linewidth=3.0, color = colors[i])) + push!(legend_names, "Dagger (scale = $scale)") end legend = fig[1, 2] = Legend(fig, line_plots, legend_names) diff --git a/src/chunks.jl b/src/chunks.jl index ba07d62da..e7697add7 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -65,8 +65,12 @@ function unrelease(c::Chunk{<:Any,DRef}) end unrelease(c::Chunk) = c +Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle +Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x) + collect_remote(chunk::Chunk) = move(chunk.processor, OSProc(), poolget(chunk.handle)) + function collect(ctx::Context, chunk::Chunk; options=nothing) # delegate fetching to handle by default. if chunk.handle isa DRef && !(chunk.processor isa OSProc) diff --git a/src/compute.jl b/src/compute.jl index 2337d0d7c..4bb8db9f8 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -82,12 +82,12 @@ end ##### Dag utilities ##### """ - dependents(node::Thunk, deps=Dict{Thunk, Set{Thunk}}()) -> Dict{Thunk, Set{Thunk}} + dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}} Find the set of direct dependents for each task. """ function dependents(node::Thunk) - deps = Dict{Thunk, Set{Thunk}}() + deps = Dict{Union{Thunk,Chunk}, Set{Thunk}}() visited = Set{Thunk}() to_visit = Set{Thunk}() push!(to_visit, node) @@ -98,10 +98,12 @@ function dependents(node::Thunk) deps[next] = Set{Thunk}() end for inp in inputs(next) - if inp isa Thunk - s::Set{Thunk} = get!(()->Set{Thunk}(), deps, inp) + if istask(inp) || (inp isa Chunk) + s = get!(()->Set{Thunk}(), deps, inp) push!(s, next) - !(inp in visited) && push!(to_visit, inp) + if istask(inp) && !(inp in visited) + push!(to_visit, inp) + end end end push!(visited, next) @@ -110,14 +112,14 @@ function dependents(node::Thunk) end """ - noffspring(dpents::Dict{Thunk, Set{Thunk}}) -> Dict{Thunk, Int} + noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int} Recursively find the number of tasks dependent on each task in the DAG. Takes a Dict as returned by [`dependents`](@ref). """ -function noffspring(dpents::Dict{Thunk, Set{Thunk}}) +function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) noff = Dict{Thunk,Int}() - to_visit = collect(keys(dpents)) + to_visit = collect(filter(istask, keys(dpents))) while !isempty(to_visit) next = popfirst!(to_visit) haskey(noff, next) && continue diff --git a/src/lib/logging.jl b/src/lib/logging.jl index 48a59cb6f..64dd17136 100644 --- a/src/lib/logging.jl +++ b/src/lib/logging.jl @@ -116,11 +116,13 @@ function raise_event(ctx, phase, category, id,tl, t, gc_num, prof, async) end end -empty_prof() = ProfilerResult(UInt[], Profile.getdict(UInt[])) +empty_prof() = ProfilerResult(UInt[], Dict{UInt64, Vector{Base.StackTraces.StackFrame}}()) + +const prof_refcount = Ref{Threads.Atomic{Int}}(Threads.Atomic{Int}(0)) function timespan_start(ctx, category, id, tl, async=isasync(ctx.log_sink)) isa(ctx.log_sink, NoOpLog) && return # don't go till raise - if ctx.profile && category == :compute + if ctx.profile && category == :compute && Threads.atomic_add!(prof_refcount[], 1) == 0 Profile.start_timer() end raise_event(ctx, :start, category, id, tl, time_ns(), gc_num(), empty_prof(), async) @@ -129,13 +131,18 @@ end function timespan_end(ctx, category, id, tl, async=isasync(ctx.log_sink)) isa(ctx.log_sink, NoOpLog) && return + time = time_ns() + gcn = gc_num() prof = UInt[] + lidict = Dict{UInt64, Vector{Base.StackTraces.StackFrame}}() if ctx.profile && category == :compute - Profile.stop_timer() - prof = Profile.fetch() + if Threads.atomic_sub!(prof_refcount[], 1) == 1 + Profile.stop_timer() + end + prof, lidict = Profile.retrieve() Profile.clear() end - raise_event(ctx, :finish, category, id, tl,time_ns(), gc_num(), ProfilerResult(prof, Profile.getdict(prof)), async) + raise_event(ctx, :finish, category, id, tl, time, gcn, ProfilerResult(prof, lidict), async) nothing end diff --git a/src/processor.jl b/src/processor.jl index a2261f087..5f5f975d7 100644 --- a/src/processor.jl +++ b/src/processor.jl @@ -168,7 +168,6 @@ iscompatible_arg(proc::ThreadProc, opts, x) = true else # TODO: Use Threads.@threads? function execute!(proc::ThreadProc, f, args...) - sch_handle = task_local_storage(:sch_handle) tls = get_tls() task = @async begin set_tls!(tls) @@ -227,6 +226,9 @@ Context(procs::Vector{P}=Processor[OSProc(w) for w in workers()]; profile=false, options=nothing) where {P<:Processor} = Context(procs, proc_lock, log_sink, log_file, profile, options) Context(xs::Vector{Int}; kwargs...) = Context(map(OSProc, xs); kwargs...) +Context(ctx::Context, xs::Vector) = # make a copy + Context(xs; log_sink=ctx.log_sink, log_file=ctx.log_file, + profile=ctx.profile, options=ctx.options) procs(ctx::Context) = lock(ctx) do copy(ctx.procs) end diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index d80a4a924..0c04e1202 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -4,14 +4,39 @@ using Distributed import MemPool: DRef import ..Dagger -import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, choose_processor, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor +import ..Dagger: Context, Processor, Thunk, ThunkFuture, ThunkFailedException, Chunk, OSProc, order, free!, dependents, noffspring, istask, inputs, affinity, tochunk, @dbg, @logmsg, timespan_start, timespan_end, unrelease, procs, move, capacity, chunktype, default_enabled, get_processors, execute!, rmprocs!, addprocs!, thunk_processor const OneToMany = Dict{Thunk, Set{Thunk}} include("util.jl") +if Sys.isunix() + include("unix.jl") +else + # FIXME + cputhreadtime() = time_ns() +end include("fault-handler.jl") include("dynamic.jl") +mutable struct ProcessorCacheEntry + gproc::OSProc + proc::Processor + next::ProcessorCacheEntry + + ProcessorCacheEntry(gproc::OSProc, proc::Processor) = new(gproc, proc) +end +Base.isequal(p1::ProcessorCacheEntry, p2::ProcessorCacheEntry) = + p1.proc === p2.proc +function Base.show(io::IO, entry::ProcessorCacheEntry) + entries = 1 + next = entry.next + while next !== entry + entries += 1 + next = next.next + end + print(io, "ProcessorCacheEntry(pid $(entry.gproc.pid), $(entry.proc), $entries entries)") +end + """ ComputeState @@ -19,45 +44,93 @@ The internal state-holding struct of the scheduler. Fields: - uid::UInt64 - Unique identifier for this scheduler instance -- dependents::OneToMany - The result of calling `dependents` on the DAG +- dependents::Dict{Union{Thunk,Chunk},Set{Thunk}} - The result of calling `dependents` on the DAG - finished::Set{Thunk} - The set of completed `Thunk`s - waiting::OneToMany - Map from downstream `Thunk` to upstream `Thunk`s that still need to execute -- waiting_data::OneToMany - Map from upstream `Thunk` to all downstream `Thunk`s, accumulating over time +- waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}} - Map from input `Chunk`/upstream `Thunk` to all unfinished downstream `Thunk`s, to retain caches - ready::Vector{Thunk} - The list of `Thunk`s that are ready to execute - cache::Dict{Thunk, Any} - Maps from a finished `Thunk` to it's cached result, often a DRef - running::Set{Thunk} - The set of currently-running `Thunk`s +- running_on::Dict{Thunk,OSProc} - Map from `Thunk` to the OS process executing it - thunk_dict::Dict{Int, Any} - Maps from thunk IDs to a `Thunk` - node_order::Any - Function that returns the order of a thunk -- worker_procs::Dict{Int,OSProc} - Maps from worker ID to root processor -- worker_pressure::Dict{Int,Dict{Type,Float64}} - Cache of worker pressure -- worker_capacity::Dict{Int,Dict{Type,Float64}} - Maps from worker ID to capacity +- worker_pressure::Dict{Int,Dict{Type,UInt}} - Cache of worker pressure +- worker_capacity::Dict{Int,Dict{Type,UInt}} - Maps from worker ID to capacity +- worker_loadavg::Dict{Int,NTuple{3,Float64}} - Worker load average - worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}} - Communication channels between the scheduler and each worker -- halt::Ref{Bool} - Flag indicating, when set, that the scheduler should halt immediately +- procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}} - Cached linked list of processors ready to be used +- function_cost_cache::Dict{Type{<:Tuple},UInt} - Cache of estimated CPU time required to compute the given signature +- halt::Base.RefValue{Bool} - Flag indicating, when set, that the scheduler should halt immediately - lock::ReentrantLock() - Lock around operations which modify the state - futures::Dict{Thunk, Vector{ThunkFuture}} - Futures registered for waiting on the result of a thunk. - errored::Set{Thunk} - Thunks that threw an error -- chan::Channel - Channel for receiving completed thunks +- chan::RemoteChannel{Channel{Any}} - Channel for receiving completed thunks """ struct ComputeState uid::UInt64 - dependents::OneToMany + dependents::Dict{Union{Thunk,Chunk},Set{Thunk}} finished::Set{Thunk} waiting::OneToMany - waiting_data::OneToMany + waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}} ready::Vector{Thunk} cache::Dict{Thunk, Any} running::Set{Thunk} + running_on::Dict{Thunk,OSProc} thunk_dict::Dict{Int, Any} node_order::Any - worker_procs::Dict{Int,OSProc} - worker_pressure::Dict{Int,Dict{Type,Float64}} - worker_capacity::Dict{Int,Dict{Type,Float64}} + worker_pressure::Dict{Int,Dict{Type,UInt}} + worker_capacity::Dict{Int,Dict{Type,UInt}} + worker_loadavg::Dict{Int,NTuple{3,Float64}} worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}} - halt::Ref{Bool} + procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}} + function_cost_cache::Dict{Type{<:Tuple},UInt} + halt::Base.RefValue{Bool} lock::ReentrantLock futures::Dict{Thunk, Vector{ThunkFuture}} errored::Set{Thunk} - chan::Channel{Any} + chan::RemoteChannel{Channel{Any}} +end + +function start_state(deps::Dict, node_order, chan) + state = ComputeState(rand(UInt64), + deps, + Set{Thunk}(), + OneToMany(), + Dict{Union{Thunk,Chunk},Set{Thunk}}(), + Vector{Thunk}(undef, 0), + Dict{Thunk, Any}(), + Set{Thunk}(), + Dict{Thunk,OSProc}(), + Dict{Int, Thunk}(), + node_order, + Dict{Int,Dict{Type,UInt}}(), + Dict{Int,Dict{Type,UInt}}(), + Dict{Int,NTuple{3,Float64}}(), + Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(), + Ref{Union{ProcessorCacheEntry,Nothing}}(nothing), + Dict{Type{<:Tuple},UInt}(), + Ref{Bool}(false), + ReentrantLock(), + Dict{Thunk, Vector{ThunkFuture}}(), + Set{Thunk}(), + chan) + + nodes = sort(collect(keys(deps)), by=node_order) + # N.B. Using merge! here instead would modify deps + for (key,val) in deps + state.waiting_data[key] = copy(val) + end + for k in nodes + if istask(k) + waiting = Set{Thunk}(Iterators.filter(istask, inputs(k))) + if isempty(waiting) + push!(state.ready, k) + else + state.waiting[k] = waiting + end + end + end + state end """ @@ -82,6 +155,8 @@ the current scheduler invocation to persistent storage, for later retrieval by the current scheduler invocation, were it to execute. If this returns a result, all thunks will be skipped. If this throws an error, restoring will be skipped, the error will be displayed, and the scheduler will execute as usual. +- `round_robin::Bool=false`: Whether to schedule in round-robin mode, which +spreads load instead of the default behavior of filling processors to capacity. """ Base.@kwdef struct SchedulerOptions single::Int = 0 @@ -89,6 +164,7 @@ Base.@kwdef struct SchedulerOptions allow_errors::Bool = false checkpoint = nothing restore = nothing + round_robin::Bool = false end """ @@ -138,6 +214,8 @@ function merge(sopts::SchedulerOptions, topts::ThunkOptions) proclist = topts.proclist !== nothing ? topts.proclist : sopts.proclist ThunkOptions(single, proclist, topts.procutil, allow_errors, topts.checkpoint, topts.restore) end +merge(sopts::SchedulerOptions, ::Nothing) = + ThunkOptions(sopts.single, sopts.proclist, Dict{Type,Any}()) function isrestricted(task::Thunk, proc::OSProc) if (task.options !== nothing) && (task.options.single != 0) && @@ -146,29 +224,58 @@ function isrestricted(task::Thunk, proc::OSProc) end return false end -merge(sopts::SchedulerOptions, ::Nothing) = - ThunkOptions(sopts.single, sopts.proclist, Dict{Type,Any}()) function cleanup(ctx) end +const WORKER_MONITOR_LOCK = Threads.ReentrantLock() +const WORKER_MONITOR_TASKS = Dict{Int,Task}() +const WORKER_MONITOR_CHANS = Dict{Int,Dict{UInt64,RemoteChannel}}() function init_proc(state, p) # Initialize pressure and capacity proc = OSProc(p.pid) lock(state.lock) do - state.worker_procs[p.pid] = proc - state.worker_pressure[p.pid] = Dict{Type,Float64}() - state.worker_capacity[p.pid] = Dict{Type,Float64}() + state.worker_pressure[p.pid] = Dict{Type,UInt}() + state.worker_capacity[p.pid] = Dict{Type,UInt}() + state.worker_loadavg[p.pid] = (0.0, 0.0, 0.0) for T in unique(typeof.(get_processors(proc))) state.worker_pressure[p.pid][T] = 0 - state.worker_capacity[p.pid][T] = capacity(proc, T) + state.worker_capacity[p.pid][T] = capacity(proc, T) * UInt(1e9) end state.worker_pressure[p.pid][OSProc] = 0 state.worker_capacity[p.pid][OSProc] = 0 end - cap = remotecall_fetch(capacity, p.pid) - @async lock(state.lock) do - state.worker_capacity[p.pid] = cap + cap = remotecall(capacity, p.pid) + @async begin + cap = fetch(cap) * UInt(1e9) + lock(state.lock) do + state.worker_capacity[p.pid] = cap + end + end + lock(WORKER_MONITOR_LOCK) do + wid = p.pid + if !haskey(WORKER_MONITOR_TASKS, wid) + t = @async begin + try + # Wait until this connection is terminated + remotecall_fetch(sleep, wid, typemax(UInt64)) + catch err + if err isa ProcessExitedException + lock(WORKER_MONITOR_LOCK) do + d = WORKER_MONITOR_CHANS[wid] + for uid in keys(d) + put!(d[uid], (wid, OSProc(wid), nothing, (ProcessExitedException(wid), nothing))) + end + empty!(d) + delete!(WORKER_MONITOR_CHANS, wid) + end + end + end + end + WORKER_MONITOR_TASKS[wid] = t + WORKER_MONITOR_CHANS[wid] = Dict{UInt64,RemoteChannel}() + end + WORKER_MONITOR_CHANS[wid][state.uid] = state.chan end # Setup worker-to-scheduler channels @@ -178,9 +285,21 @@ function init_proc(state, p) state.worker_chans[p.pid] = (inp_chan, out_chan) end end +function _cleanup_proc(uid) + empty!(CHUNK_CACHE) # FIXME: Should be keyed on uid! +end +function cleanup_proc(state, p) + lock(WORKER_MONITOR_LOCK) do + wid = p.pid + if haskey(WORKER_MONITOR_CHANS, wid) + delete!(WORKER_MONITOR_CHANS[wid], state.uid) + remote_do(_cleanup_proc, wid, state.uid) + end + end +end "Process-local count of actively-executing Dagger tasks per processor type." -const ACTIVE_TASKS = Dict{UInt64,Dict{Type,Ref{Float64}}}() +const ACTIVE_TASKS = Dict{UInt64,Dict{Type,Ref{UInt}}}() const ACTIVE_TASKS_LOCK = ReentrantLock() "Process-local condition variable indicating task completion." @@ -204,7 +323,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) master = OSProc(myid()) @dbg timespan_start(ctx, :scheduler_init, 0, master) - chan = Channel{Any}(32) + chan = RemoteChannel(()->Channel(1024)) deps = dependents(d) ord = order(d, noffspring(deps)) @@ -212,7 +331,7 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) state = start_state(deps, node_order, chan) # setup thunk_dict mappings - for node in keys(deps) + for node in filter(istask, keys(deps)) state.thunk_dict[node.id] = node for dep in deps[node] state.thunk_dict[dep.id] = dep @@ -273,13 +392,13 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) thunk_failed = false if res isa Exception if unwrap_nested_exception(res) isa Union{ProcessExitedException, Base.IOError} - @warn "Worker $(pid) died on thunk $thunk_id, rescheduling work" + @warn "Worker $(pid) died, rescheduling work" # Remove dead worker from procs list remove_dead_proc!(ctx, state, gproc) lock(state.lock) do - handle_fault(ctx, state, state.thunk_dict[thunk_id], gproc) + handle_fault(ctx, state, gproc) end continue else @@ -290,10 +409,13 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) end end end + node = state.thunk_dict[thunk_id] if metadata !== nothing state.worker_pressure[pid][typeof(proc)] = metadata.pressure + state.worker_loadavg[pid] = metadata.loadavg + sig = signature(node, state) + state.function_cost_cache[sig] = (metadata.threadtime + get(state.function_cost_cache, sig, 0)) ÷ 2 end - node = state.thunk_dict[thunk_id] state.cache[node] = res if node.options !== nothing && node.options.checkpoint !== nothing try @@ -306,12 +428,16 @@ function compute_dag(ctx, d::Thunk; options=SchedulerOptions()) # FIXME: Move log start and lock to before error check @dbg timespan_start(ctx, :finish, thunk_id, master) lock(state.lock) do - finish_task!(state, node, thunk_failed) + finish_task!(ctx, state, node, thunk_failed) end @dbg timespan_end(ctx, :finish, thunk_id, master) safepoint(state) end + state.halt[] = true + @sync for p in procs_to_use(ctx) + @async cleanup_proc(state, p) + end value = state.cache[d] # TODO: move(OSProc(), state.cache[d]) if d in state.errored throw(value) @@ -337,60 +463,158 @@ end check_integrity(ctx) = @assert !isempty(procs_to_use(ctx)) "No suitable workers available in context." +struct SchedulingException <: Exception + reason::String +end +function Base.show(io::IO, se::SchedulingException) + print(io, "SchedulingException ($(se.reason))") +end + +const CHUNK_CACHE = Dict{Chunk,Dict{Processor,Any}}() + function schedule!(ctx, state, procs=procs_to_use(ctx)) lock(state.lock) do safepoint(state) @assert length(procs) > 0 - proc_keys = map(x->x.pid, procs) - proc_set = Set{Any}() - for p in proc_keys - for proc in get_processors(OSProc(p)) - push!(proc_set, p=>proc) + + # Populate the cache if empty + if state.procs_cache_list[] === nothing + current = nothing + for p in map(x->x.pid, procs) + for proc in get_processors(OSProc(p)) + next = ProcessorCacheEntry(OSProc(p), proc) + if current === nothing + current = next + current.next = current + state.procs_cache_list[] = current + else + current.next = next + current = next + current.next = state.procs_cache_list[] + end + end + end + # FIXME: Sort by lowest absolute utilization + end + + function can_use_proc(task, gproc, proc, opts) + # Check against proclist + if opts.proclist === nothing + if !default_enabled(proc) + return false + end + elseif opts.proclist isa Function + if !opts.proclist(proc) + return false + end + elseif opts.proclist isa Vector + if !(typeof(proc) in opts.proclist) + return false + end + else + throw(SchedulingException("proclist must be a Function, Vector, or nothing")) + end + + # Check against single + if opts.single != 0 + if gproc.pid != opts.single + return false + end end + + return true end + function has_capacity(p, gp, procutil, sig) + T = typeof(p) + # FIXME: MaxUtilization + extra_util = round(UInt, get(procutil, T, 1) * 1e9) + real_util = state.worker_pressure[gp][T] + if (T === Dagger.ThreadProc) && haskey(state.function_cost_cache, sig) + # Assume that the extra pressure is between estimated and measured + # TODO: Generalize this to arbitrary processor types + extra_util = min(extra_util, state.function_cost_cache[sig]) + end + # TODO: update real_util based on loadavg + cap = state.worker_capacity[gp][T] + if ((extra_util isa MaxUtilization) && (real_util > 0)) || + ((extra_util isa Real) && (extra_util + real_util > cap)) + return false, cap, extra_util + end + return true, cap, extra_util + end + + # Schedule tasks + to_fire = Dict{Tuple{OSProc,<:Processor},Vector{Tuple{Thunk,<:Any}}}() failed_scheduling = Thunk[] while !isempty(state.ready) + # Select a new task and get its options task = pop!(state.ready) opts = merge(ctx.options, task.options) - proclist = opts.proclist - proc_set_useable = if proclist === nothing - filter(x->default_enabled(x[2]), proc_set) - elseif proclist isa Function - filter(x->proclist(x[2]), proc_set) + sig = signature(task, state) + + # Try to select a processor + selected_entry = nothing + entry = state.procs_cache_list[] + cap, extra_util = nothing, nothing + procs_found = false + # N.B. if we only have one processor, we need to select it now + if can_use_proc(task, entry.gproc, entry.proc, opts) + has_cap, cap, extra_util = has_capacity(entry.proc, entry.gproc.pid, opts.procutil, sig) + if has_cap + selected_entry = entry + else + procs_found = true + entry = entry.next + end else - filter(x->typeof(x[2]) in proclist, proc_set) - end - if opts.single != 0 - proc_set_useable = filter(x->x[1]==opts.single, proc_set_useable) + entry = entry.next end - @assert !isempty(proc_set_useable) "No processors available, try making proclist more liberal" - procutil = opts.procutil - gproc = nothing - proc = nothing - extra_util = nothing - cap = nothing - # FIXME: Sort by lowest utilization - for (gp,p) in proc_set_useable - T = typeof(p) - extra_util = get(procutil, T, 1) - real_util = state.worker_pressure[gp][T] - cap = capacity(OSProc(gp), T) - if ((extra_util isa MaxUtilization) && (real_util > 0)) || - ((extra_util isa Real) && (extra_util + real_util > cap)) - continue + while selected_entry === nothing + if entry === state.procs_cache_list[] + if procs_found + push!(failed_scheduling, task) + break + else + throw(SchedulingException("No processors available, try making proclist more liberal")) + end + end + + if can_use_proc(task, entry.gproc, entry.proc, opts) + has_cap, cap, extra_util = has_capacity(entry.proc, entry.gproc.pid, opts.procutil, sig) + if has_cap + # Select this processor + selected_entry = entry + else + # We could have selected it otherwise + procs_found = true + entry = entry.next + end else - gproc = OSProc(gp) - proc = p - break + # Try next processor + entry = entry.next end end - if proc !== nothing - extra_util = extra_util isa MaxUtilization ? cap : extra_util - fire_task!(ctx, task, (gproc, proc), state; util=extra_util) + selected_entry === nothing && continue + + # Schedule task onto proc + gproc, proc = entry.gproc, entry.proc + extra_util = extra_util isa MaxUtilization ? cap : extra_util + push!(get!(()->Vector{Tuple{Thunk,<:Any}}(), to_fire, (gproc, proc)), (task, extra_util)) + + # Progress through list + if ctx.options.round_robin + # Proceed to next entry to spread work + state.procs_cache_list[] = state.procs_cache_list[].next continue - else - push!(failed_scheduling, task) end + util = state.worker_pressure[gproc.pid][typeof(proc)] + if util >= cap + # Proceed to next entry due to over-pressure + state.procs_cache_list[] = state.procs_cache_list[].next + end + end + @sync for gpp in keys(to_fire) + @async fire_tasks!(ctx, to_fire[gpp], gpp, state) end append!(state.ready, failed_scheduling) end @@ -415,10 +639,11 @@ shall_remove_proc(ctx, proc) = proc ∉ procs_to_use(ctx) function remove_dead_proc!(ctx, state, proc, options=ctx.options) @assert options.single !== proc.pid "Single worker failed, cannot continue." rmprocs!(ctx, [proc]) - delete!(state.worker_procs, proc.pid) delete!(state.worker_pressure, proc.pid) delete!(state.worker_capacity, proc.pid) + delete!(state.worker_loadavg, proc.pid) delete!(state.worker_chans, proc.pid) + state.procs_cache_list[] = nothing end function pop_with_affinity!(ctx, tasks, proc) @@ -461,53 +686,9 @@ function pop_with_affinity!(ctx, tasks, proc) return nothing end -function fire_task!(ctx, thunk, (gproc, proc), state; util=1.0) - push!(state.running, thunk) - if thunk.cache && thunk.cache_ref !== nothing - # the result might be already cached - data = unrelease(thunk.cache_ref) # ask worker to keep the data around - # till this compute cycle frees it - if data !== nothing - # cache hit - state.cache[thunk] = data - thunk_failed = thunk in state.errored - finish_task!(state, thunk, thunk_failed; free=false) - return - else - # cache miss - thunk.cache_ref = nothing - end - end - if thunk.options !== nothing && thunk.options.restore !== nothing - try - result = thunk.options.restore(thunk) - state.cache[thunk] = result - finish_task!(state, thunk, false; free=false) - return - catch err - @error "Thunk restore failed" exception=(err,catch_backtrace()) - end - end - - ids = map(enumerate(thunk.inputs)) do (idx,x) - istask(x) ? x.id : -idx - end - - data = map(thunk.inputs) do x - istask(x) ? state.cache[x] : x - end - toptions = thunk.options !== nothing ? thunk.options : ThunkOptions() - options = merge(ctx.options, toptions) - @assert (options.single == 0) || (gproc.pid == options.single) - sch_handle = SchedulerHandle(ThunkID(thunk.id), state.worker_chans[gproc.pid]...) - state.worker_pressure[gproc.pid][typeof(proc)] += util - async_apply((gproc, proc), thunk.id, thunk.f, data, state.chan, - thunk.get_result, thunk.persist, thunk.cache, thunk.meta, - options, ids, ctx, sch_handle, state.uid) -end - -function finish_task!(state, node, thunk_failed; free=true) +function finish_task!(ctx, state, node, thunk_failed; free=true) pop!(state.running, node) + delete!(state.running_on, node) if !thunk_failed push!(state.finished, node) else @@ -538,82 +719,118 @@ function finish_task!(state, node, thunk_failed; free=true) delete!(state.futures, node) end end - # Internal clean-up - for inp in filter(istask, inputs(node)) + + # Chunk clean-up + to_evict = Set{Chunk}() + for inp in filter(t->istask(t) || (t isa Chunk), inputs(node)) if inp in keys(state.waiting_data) s = state.waiting_data[inp] if node in s pop!(s, node) end if free && isempty(s) - if haskey(state.cache, inp) + if istask(inp) && haskey(state.cache, inp) _node = state.cache[inp] + if _node isa Chunk + push!(to_evict, _node) + end free!(_node, force=false, cache=(istask(inp) && inp.cache)) pop!(state.cache, inp) + elseif inp isa Chunk + push!(to_evict, inp) end end end end + if !isempty(to_evict) + @sync for w in map(p->p.pid, procs_to_use(ctx)) + @async remote_do(evict_chunks!, w, to_evict) + end + end end -function start_state(deps::Dict, node_order, chan) - state = ComputeState(rand(UInt64), - deps, - Set{Thunk}(), - OneToMany(), - OneToMany(), - Vector{Thunk}(undef, 0), - Dict{Thunk, Any}(), - Set{Thunk}(), - Dict{Int, Thunk}(), - node_order, - Dict{Int,OSProc}(), - Dict{Int,Dict{Type,Float64}}(), - Dict{Int,Dict{Type,Float64}}(), - Dict{Int, Tuple{RemoteChannel,RemoteChannel}}(), - Ref{Bool}(false), - ReentrantLock(), - Dict{Thunk, Vector{ThunkFuture}}(), - Set{Thunk}(), - chan) - - nodes = sort(collect(keys(deps)), by=node_order) - # N.B. Using merge! here instead would modify deps - for (key,val) in deps - state.waiting_data[key] = copy(val) +function evict_chunks!(chunks::Set{Chunk}) + for chunk in chunks + haskey(CHUNK_CACHE, chunk) && delete!(CHUNK_CACHE, chunk) end - for k in nodes - if istask(k) - waiting = Set{Thunk}(Iterators.filter(istask, inputs(k))) - if isempty(waiting) - push!(state.ready, k) + nothing +end + +fire_task!(ctx, thunk::Thunk, p, state; util=10^9) = + fire_task!(ctx, (thunk, util), p, state) +fire_task!(ctx, (thunk, util)::Tuple{Thunk,<:Any}, p, state) = + fire_tasks!(ctx, [(thunk, util)], p, state) +function fire_tasks!(ctx, thunks::Vector{<:Tuple}, (gproc, proc), state) + to_send = [] + for (thunk, util) in thunks + push!(state.running, thunk) + state.running_on[thunk] = gproc + if thunk.cache && thunk.cache_ref !== nothing + # the result might be already cached + data = unrelease(thunk.cache_ref) # ask worker to keep the data around + # till this compute cycle frees it + if data !== nothing + # cache hit + state.cache[thunk] = data + thunk_failed = thunk in state.errored + finish_task!(ctx, state, thunk, thunk_failed; free=false) + continue else - state.waiting[k] = waiting + # cache miss + thunk.cache_ref = nothing end end - end - state + if thunk.options !== nothing && thunk.options.restore !== nothing + try + result = thunk.options.restore(thunk) + state.cache[thunk] = result + finish_task!(ctx, state, thunk, false; free=false) + continue + catch err + @error "Thunk restore failed" exception=(err,catch_backtrace()) + end + end + + ids = map(enumerate(thunk.inputs)) do (idx,x) + istask(x) ? x.id : -idx + end + + data = map(thunk.inputs) do x + istask(x) ? state.cache[x] : x + end + toptions = thunk.options !== nothing ? thunk.options : ThunkOptions() + options = merge(ctx.options, toptions) + @assert (options.single == 0) || (gproc.pid == options.single) + sch_handle = SchedulerHandle(ThunkID(thunk.id), state.worker_chans[gproc.pid]...) + state.worker_pressure[gproc.pid][typeof(proc)] += util + + # FIXME: De-dup common fields (log_sink, uid, etc.) + push!(to_send, (util, thunk.id, thunk.f, data, thunk.get_result, + thunk.persist, thunk.cache, thunk.meta, options, ids, + (log_sink=ctx.log_sink, profile=ctx.profile), + sch_handle, state.uid)) + end + remote_do(do_tasks, gproc.pid, proc, state.chan, to_send) end -function fetch_report(task) - try - fetch(task) - catch err - @static if VERSION >= v"1.1" - stk = Base.catch_stack(task) - err, frames = stk[1] - rethrow(CapturedException(err, frames)) - else - rethrow(task.result) +"Executes a batch of tasks on `to_proc`." +function do_tasks(to_proc, chan, tasks) + for task in tasks + @async begin + try + put!(chan, (myid(), to_proc, task[2], do_task(to_proc, task...))) + catch ex + bt = catch_backtrace() + put!(chan, (myid(), to_proc, task[2], (CapturedException(ex, bt), nothing))) + end end end end - -@noinline function do_task(to_proc, thunk_id, f, data, send_result, persist, cache, meta, options, ids, ctx_vars, sch_handle, uid) +"Executes a single task on `to_proc`." +function do_task(to_proc, extra_util, thunk_id, f, data, send_result, persist, cache, meta, options, ids, ctx_vars, sch_handle, uid) ctx = Context(Processor[]; log_sink=ctx_vars.log_sink, profile=ctx_vars.profile) from_proc = OSProc() - # TODO: Time choose_processor Tdata = map(x->x isa Chunk ? chunktype(x) : x, data) # Fetch inputs @@ -623,7 +840,23 @@ end fetch_report.(map(Iterators.zip(data,ids)) do (x, id) @async begin @dbg timespan_start(ctx, :move, (thunk_id, id), (f, id)) - x = move(to_proc, x) + x = if x isa Chunk + if haskey(CHUNK_CACHE, x) + get!(CHUNK_CACHE[x], to_proc) do + # TODO: Choose "closest" processor of same type first + some_proc = first(keys(CHUNK_CACHE[x])) + some_x = CHUNK_CACHE[x][some_proc] + move(some_proc, to_proc, some_x) + end + else + _x = move(to_proc, x) + CHUNK_CACHE[x] = Dict{Processor,Any}() + CHUNK_CACHE[x][to_proc] = _x + _x + end + else + move(to_proc, x) + end @dbg timespan_end(ctx, :move, (thunk_id, id), (f, id)) return x end @@ -631,29 +864,28 @@ end end # Check if we'll go over capacity from running this thunk - extra_util = get(options.procutil, typeof(to_proc), 1) real_util = lock(ACTIVE_TASKS_LOCK) do - AT = get!(()->Dict{Type,Ref{Float64}}(), ACTIVE_TASKS, uid) - get!(()->Ref{Float64}(0.0), AT, typeof(to_proc)) + AT = get!(()->Dict{Type,Ref{UInt}}(), ACTIVE_TASKS, uid) + get!(()->Ref{UInt}(UInt(0)), AT, typeof(to_proc)) end - cap = Float64(capacity(OSProc(), typeof(to_proc))) + cap = UInt(capacity(OSProc(), typeof(to_proc))) * UInt(1e9) while true lock(ACTIVE_TASKS_LOCK) if ((extra_util isa MaxUtilization) && (real_util[] > 0)) || ((extra_util isa Real) && (extra_util + real_util[] > cap)) # Fully subscribed, wait and re-check - @debug "($(myid())) $f Waiting for free $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" + @debug "($(myid())) $f ($thunk_id) Waiting for free $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" unlock(ACTIVE_TASKS_LOCK) wait(TASK_SYNC) else # Under-subscribed, calculate extra utilization and execute thunk - @debug "($(myid())) Using available $to_proc: $extra_util | $(real_util[])/$cap" + @debug "($(myid())) ($thunk_id) Using available $to_proc: $extra_util | $(real_util[])/$cap" extra_util = if extra_util isa MaxUtilization count(c->typeof(c)===typeof(to_proc), children(from_proc)) else extra_util end - real_util[] += Float64(extra_util) + real_util[] += extra_util unlock(ACTIVE_TASKS_LOCK) break end @@ -661,6 +893,7 @@ end @dbg timespan_start(ctx, :compute, thunk_id, (f, to_proc)) res = nothing + threadtime_start = cputhreadtime() result_meta = try # Set TLS variables Dagger.set_tls!(( @@ -679,28 +912,19 @@ end bt = catch_backtrace() RemoteException(myid(), CapturedException(ex, bt)) end + threadtime = cputhreadtime() - threadtime_start @dbg timespan_end(ctx, :compute, thunk_id, (f, to_proc)) lock(ACTIVE_TASKS_LOCK) do - real_util[] -= Float64(extra_util) + real_util[] -= extra_util end - @debug "($(myid())) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" + @debug "($(myid())) ($thunk_id) Releasing $(typeof(to_proc)): $extra_util | $(real_util[])/$cap" notify(TASK_SYNC) - metadata = (pressure=real_util[],) + metadata = ( + pressure=real_util[], + loadavg=((Sys.loadavg()...,) ./ Sys.CPU_THREADS), + threadtime=threadtime, + ) (result_meta, metadata) end -@noinline function async_apply((gp,p), thunk_id, f, data, chan, send_res, persist, cache, meta, options, ids, ctx, sch_handle, uid) - @async begin - try - put!(chan, (gp.pid, p, thunk_id, remotecall_fetch(do_task, gp.pid, p, thunk_id, f, data, - send_res, persist, cache, meta, options, ids, - (log_sink=ctx.log_sink, profile=ctx.profile), sch_handle, uid))) - catch ex - bt = catch_backtrace() - put!(chan, (gp.pid, p, thunk_id, (CapturedException(ex, bt), nothing))) - end - nothing - end -end - end # module Sch diff --git a/src/sch/dynamic.jl b/src/sch/dynamic.jl index 030d0aacd..8733c13db 100644 --- a/src/sch/dynamic.jl +++ b/src/sch/dynamic.jl @@ -19,6 +19,11 @@ sch_handle() = task_local_storage(:_dagger_sch_handle)::SchedulerHandle "Thrown when the scheduler halts before finishing processing the DAG." struct SchedulerHaltedException <: Exception end +"Thrown when a dynamic thunk encounters an exception in Dagger's utilities." +struct DynamicThunkException <: Exception + reason::String +end + function safepoint(state) if state.halt[] # Force dynamic thunks and listeners to terminate @@ -102,11 +107,11 @@ function register_future!(h::SchedulerHandle, id::ThunkID, future::ThunkFuture) exec!(_register_future, h, future, id.id) end function _register_future(ctx, state, task, tid, (future, id)) - @assert tid != id "Cannot fetch own result" + tid != id || throw(DynamicThunkException("Cannot fetch own result")) thunk = state.thunk_dict[id] ownthunk = state.thunk_dict[tid] dominates(target, t) = (t == target) || any(_t->dominates(target, _t), filter(istask, t.inputs)) - @assert !dominates(ownthunk, thunk) "Cannot fetch result of dominated thunk" + !dominates(ownthunk, thunk) || throw(DynamicThunkException("Cannot fetch result of dominated thunk")) if thunk in state.finished || thunk in state.errored error = thunk in state.errored if haskey(state.cache, thunk) diff --git a/src/sch/fault-handler.jl b/src/sch/fault-handler.jl index c0baa6b2e..090df7dd8 100644 --- a/src/sch/fault-handler.jl +++ b/src/sch/fault-handler.jl @@ -2,9 +2,10 @@ handle_fault(...) An internal function to handle a worker dying or being killed by the OS. -Attempts to determine which `Thunk`s require rescheduling based on a -"deadlist", and then corrects the scheduler's internal `ComputeState` struct -to recover from the fault. +Attempts to determine which `Thunk`s were running on (or had their results +cached on) the dead worker, and stores them in a "deadlist". It uses this +deadlist to correct the scheduler's internal `ComputeState` struct to recover +from the fault. Note: The logic for this functionality is not currently perfectly robust to all failure modes, and is only really intended as a last-ditch attempt to @@ -13,67 +14,32 @@ of DAGs, it *may* cause a `KeyError` or other failures in the scheduler due to the complexity of getting the internal state back to a consistent and proper state. """ -function handle_fault(ctx, state, thunk, oldproc) - # Find thunks whose results were cached on the dead worker and place them - # on what's called a "deadlist". This structure will direct the recovery - # of the scheduler's state. - deadlist = Thunk[thunk] - # This thunk is guaranteed to not have valid cached data anymore - thunk.cache = false - thunk.cache_ref = nothing +function handle_fault(ctx, state, deadproc) + @assert !isempty(procs(ctx)) "No workers left for fault handling!" + + deadlist = Thunk[] + + # Evict cache entries that were stored on the worker for t in keys(state.cache) v = state.cache[t] - if v isa Chunk && v.handle isa DRef && v.handle.owner == oldproc.pid + if v isa Chunk && v.handle isa DRef && v.handle.owner == deadproc.pid push!(deadlist, t) - # Any inputs to dead cached thunks must be rescheduled - function bfs!(deadlist, t) - for input in t.inputs - istask(input) || continue - !(input in deadlist) && push!(deadlist, input) - bfs!(deadlist, input) - end - end - bfs!(deadlist, t) + pop!(state.cache, t) end end - # TODO: Find *all* thunks who were actively running on the dead worker - - # Empty cache of dead thunks - for ct in keys(state.cache) - if ct in deadlist - delete!(state.cache, ct) - end - end - - function fix_waitdicts!(state, deadlist, t::Thunk; isleaf=false) - waiting, waiting_data = state.waiting, state.waiting_data - if !(t in keys(waiting)) - waiting[t] = Set{Thunk}() - end - if !isleaf - # If we aren't a leaf thunk, then we may still need to recover - # further into the DAG - for input in t.inputs - istask(input) || continue - @assert haskey(waiting, t) "Error: $t not in state.waiting" - push!(waiting[t], input) - push!(waiting_data[input], t) - isleaf = !(input in deadlist) - fix_waitdicts!(state, deadlist, input; isleaf=isleaf) - end - end - if isempty(waiting[t]) - delete!(waiting, t) + # Remove thunks that were running on the worker + for t in collect(keys(state.running_on)) + pid = state.running_on[t].pid + if pid == deadproc.pid + push!(deadlist, t) + delete!(state.running_on, t) end end - - # Add state.waiting deps back to state.waiting - for ot in keys(state.waiting) - fix_waitdicts!(state, deadlist, ot) + # Clear thunk.cache_ref + for t in deadlist + t.cache_ref = nothing end - fix_waitdicts!(state, deadlist, thunk) - # Remove thunks from state.ready that have inputs on the deadlist for idx in length(state.ready):-1:1 rt = state.ready[idx] @@ -82,40 +48,10 @@ function handle_fault(ctx, state, thunk, oldproc) end end - # Remove dead thunks from state.running, and add state.running - # deps back to state.waiting - wasrunning = copy(state.running) - empty!(state.running) - while !isempty(wasrunning) - temp = pop!(wasrunning) - if temp isa Thunk - if !(temp in deadlist) - push!(state.running, temp) - end - fix_waitdicts!(state, deadlist, temp) - elseif temp isa Vector - newtemp = [] - for t in temp - fix_waitdicts!(state, deadlist, t) - if !(t in deadlist) - push!(newtemp, t) - end - end - isempty(newtemp) || push!(state.running, newtemp) - else - throw("Unexpected type in recovery: $temp") - end - end - # Reschedule inputs from deadlist - @assert !isempty(procs(ctx)) "No workers left for fault handling!" - while length(deadlist) > 0 - dt = popfirst!(deadlist) - if any((input in deadlist) for input in dt.inputs) - # We need to schedule our input thunks first - continue - end - push!(state.ready, dt) + seen = Dict{Thunk,Bool}() + for t in deadlist + reschedule_inputs!(state, t, seen) end schedule!(ctx, state) end diff --git a/src/sch/unix.jl b/src/sch/unix.jl new file mode 100644 index 000000000..4e964d1c0 --- /dev/null +++ b/src/sch/unix.jl @@ -0,0 +1,77 @@ +## +# This file is a part of Dagger.jl. License is MIT +# +# Based upon https://github.com/google/benchmark, which is licensed under Apache v2: +# https://github.com/google/benchmark/blob/master/LICENSE +# +# In compliance with the Apache v2 license, here are the original copyright notices: +# Copyright 2015 Google Inc. All rights reserved. +## + +struct TimeSpec + tv_sec :: UInt64 # time_t + tv_nsec :: UInt64 +end + +maketime(ts) = ts.tv_sec * UInt(1e9) + ts.tv_nsec + +# From bits/times.h on a Linux system +# Check if those are the same on BSD +if Sys.islinux() + const CLOCK_MONOTONIC = Cint(1) + const CLOCK_PROCESS_CPUTIME_ID = Cint(2) + const CLOCK_THREAD_CPUTIME_ID = Cint(3) +elseif Sys.KERNEL == :FreeBSD # atleast on FreeBSD 11.1 + const CLOCK_MONOTONIC = Cint(4) + const CLOCK_PROCESS_CPUTIME_ID = Cint(14) +elseif Compat.Sys.isapple() # Version 10.12 required + const CLOCK_MONOTONIC = Cint(6) + const CLOCK_PROCESS_CPUTIME_ID = Cint(12) +else + error(""" + BenchmarkTools doesn't currently support your operating system. + Please file an issue, your kernel is $(Sys.KERNEL) + """) +end + +@inline function clock_gettime(cid) + ts = Ref{TimeSpec}() + ccall(:clock_gettime, Cint, (Cint, Ref{TimeSpec}), cid, ts) + return ts[] +end + +@inline function realtime() + maketime(clock_gettime(CLOCK_MONOTONIC)) +end + +@inline function cputime() + maketime(clock_gettime(CLOCK_PROCESS_CPUTIME_ID)) +end + +@inline function cputhreadtime() + maketime(clock_gettime(CLOCK_THREAD_CPUTIME_ID)) +end + +struct Measurement + realtime::TimeSpec + cputime::TimeSpec + function Measurement() + rtime = clock_gettime(CLOCK_MONOTONIC) + ctime = clock_gettime(CLOCK_PROCESS_CPUTIME_ID) + return new(rtime, ctime) + end +end + +struct MeasurementDelta + realtime::Float64 + cpuratio::Float64 + function MeasurementDelta(t1::Measurement, t0::Measurement) + rt0 = maketime(t0.realtime) + ct0 = maketime(t0.cputime) + rt1 = maketime(t1.realtime) + ct1 = maketime(t1.cputime) + realtime = rt1 - rt0 + cputime = ct1 - ct0 + return new(realtime, cputime/realtime) + end +end diff --git a/src/sch/util.jl b/src/sch/util.jl index 44ec8d2b1..4120c3d88 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -10,13 +10,16 @@ unwrap_nested_exception(err::RemoteException) = unwrap_nested_exception(err) = err "Prepares the scheduler to schedule `thunk`." -function reschedule_inputs!(state, thunk) +function reschedule_inputs!(state, thunk, seen=Dict{Thunk,Bool}()) + haskey(seen, thunk) && return seen[thunk] w = get!(()->Set{Thunk}(), state.waiting, thunk) scheduled = false for input in thunk.inputs + if istask(input) || (input isa Chunk) + push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk) + push!(get!(()->Set{Thunk}(), state.dependents, input), thunk) + end istask(input) || continue - push!(get!(()->Set{Thunk}(), state.waiting_data, input), thunk) - push!(get!(()->Set{Thunk}(), state.dependents, input), thunk) if input in state.errored set_failed!(state, input, thunk) break # TODO: Allow collecting all error'd inputs @@ -24,14 +27,17 @@ function reschedule_inputs!(state, thunk) haskey(state.cache, input) && continue if (input in state.running) || (input in state.ready) || - reschedule_inputs!(state, input) + reschedule_inputs!(state, input, seen) push!(w, input) scheduled = true + else + error("Failed to reschedule $(input.id) for $(thunk.id)") end end if isempty(w) && !(thunk in state.errored) # Inputs are ready push!(state.ready, thunk) + delete!(state.waiting, thunk) return true else return scheduled @@ -59,13 +65,13 @@ function set_failed!(state, origin, thunk=origin) end "Internal utility, useful for debugging scheduler state." -function print_sch_status(state, thunk) +function print_sch_status(state, thunk; kwargs...) iob = IOBuffer() - print_sch_status(iob, state, thunk) + print_sch_status(iob, state, thunk; kwargs...) seek(iob, 0) write(stderr, iob) end -function print_sch_status(io::IO, state, thunk; offset=0, limit=5) +function print_sch_status(io::IO, state, thunk; offset=0, limit=5, max_inputs=3) function status_string(node) status = "" if node in state.errored @@ -77,15 +83,25 @@ function print_sch_status(io::IO, state, thunk; offset=0, limit=5) status *= "R" elseif haskey(state.cache, node) status *= "C" + elseif node in state.finished + status *= "F" else status *= "?" end status end - offset == 0 && print(io, "($(status_string(thunk))) ") + if offset == 0 + println(io, "Ready ($(length(state.ready))): $(join(map(t->t.id, state.ready), ','))") + println(io, "Running: ($(length(state.running))): $(join(map(t->t.id, collect(state.running)), ','))") + print(io, "($(status_string(thunk))) ") + end println(io, "$(thunk.id): $(thunk.f)") - for input in thunk.inputs + for (idx,input) in enumerate(thunk.inputs) input isa Thunk || continue + if idx > max_inputs + println(io, repeat(' ', offset+2), "…") + break + end status = status_string(input) if haskey(state.waiting, thunk) && input in state.waiting[thunk] status *= "W" @@ -97,7 +113,7 @@ function print_sch_status(io::IO, state, thunk; offset=0, limit=5) status *= "d" end if haskey(state.futures, input) - status *= "f$(length(state.futures[input]))" + status *= "f($(length(state.futures[input])))" end print(io, repeat(' ', offset+2), "($status) ") if limit > 0 @@ -107,3 +123,22 @@ function print_sch_status(io::IO, state, thunk; offset=0, limit=5) end end end + +function fetch_report(task) + try + fetch(task) + catch err + @static if VERSION >= v"1.1" + stk = Base.catch_stack(task) + err, frames = stk[1] + rethrow(CapturedException(err, frames)) + else + rethrow(task.result) + end + end +end + +function signature(task::Thunk, state) + inputs = map(x->istask(x) ? state.cache[x] : x, task.inputs) + Tuple{typeof(task.f), map(x->x isa Chunk ? x.chunktype : typeof(x), inputs)...} +end diff --git a/src/ui/gantt-server.jl b/src/ui/gantt-server.jl index a01cec4a8..7c37b2463 100644 --- a/src/ui/gantt-server.jl +++ b/src/ui/gantt-server.jl @@ -1,12 +1,11 @@ using .Mux -function serve_gantt(svg_path, prof_path; port=8000) +function serve_gantt(svg_path, prof_path; port=8000, delay=5) # Setup Mux app @app gantt_app = ( Mux.defaults, page(req->begin data = String(read(svg_path)) - prof = String(read(prof_path)) html = """ @@ -16,8 +15,31 @@ function serve_gantt(svg_path, prof_path; port=8000) + Go to profile data $data -
+ + + """ + hdrs = Dict( + Symbol("Content-Type")=>"text/html", + Symbol("Cache-Control")=>"no-store", + ) + Dict( + :body=>html, + :headers=>hdrs, + ) + end), + page("/profile", req->begin + prof = String(read(prof_path)) + html = """ + + + + Dagger: Scheduler Profile Flamegraph + + + + $prof @@ -35,5 +57,11 @@ function serve_gantt(svg_path, prof_path; port=8000) ) # Start serving app - Threads.@spawn serve(gantt_app, port) + Threads.@spawn begin + try + serve(gantt_app, port) + catch err + @error exception=(err,catch_backtrace()) + end + end end diff --git a/src/ui/gantt.jl b/src/ui/gantt.jl index b6269ecb3..e3c8cd796 100644 --- a/src/ui/gantt.jl +++ b/src/ui/gantt.jl @@ -188,13 +188,11 @@ function show_gantt(ctx; delay=2, port=8000, width=1000, height=640, window_leng draw_gantt(ctx, svg_path, prof_path; delay=delay, width=width, height=height, window_length=window_length) catch err - Base.showerror(stderr, err) - Base.show_backtrace(stderr, catch_backtrace()) - println(stderr) + @error exception=(err,catch_backtrace()) end end if live - serve_gantt(svg_path, prof_path; port=port) + serve_gantt(svg_path, prof_path; port=port, delay=delay) end end diff --git a/test/processors.jl b/test/processors.jl index 0244fe2a2..39ad43d0c 100644 --- a/test/processors.jl +++ b/test/processors.jl @@ -37,9 +37,9 @@ end end @testset "Processor exhaustion" begin opts = ThunkOptions(proclist=[OptOutProc]) - @test_throws_unwrap AssertionError collect(delayed(sum; options=opts)([1,2,3])) + @test_throws_unwrap Dagger.Sch.SchedulingException reason="No processors available, try making proclist more liberal" collect(delayed(sum; options=opts)([1,2,3])) opts = ThunkOptions(proclist=(proc)->false) - @test_throws_unwrap AssertionError collect(delayed(sum; options=opts)([1,2,3])) + @test_throws_unwrap Dagger.Sch.SchedulingException reason="No processors available, try making proclist more liberal" collect(delayed(sum; options=opts)([1,2,3])) opts = ThunkOptions(proclist=nothing) @test collect(delayed(sum; options=opts)([1,2,3])) == 6 end diff --git a/test/scheduler.jl b/test/scheduler.jl index 65f28a04f..eeb9ff832 100644 --- a/test/scheduler.jl +++ b/test/scheduler.jl @@ -353,12 +353,7 @@ end @test res == 2 @testset "self as input" begin a = delayed(dynamic_add_thunk_self_dominated)(1) - try - collect(Context(), a) - @test false - catch err - @test Dagger.Sch.unwrap_nested_exception(err) isa AssertionError - end + @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) end end @testset "Fetch/Wait" begin @@ -368,21 +363,30 @@ end end @testset "self" begin a = delayed(dynamic_fetch_self)(1) - try - collect(Context(), a) - @test false - catch err - @test Dagger.Sch.unwrap_nested_exception(err) isa AssertionError - end + @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch own result" collect(Context(), a) end @testset "dominated" begin a = delayed(identity)(delayed(dynamic_fetch_dominated)(1)) - try - collect(Context(), a) - @test false - catch err - @test Dagger.Sch.unwrap_nested_exception(err) isa AssertionError - end + @test_throws_unwrap Dagger.Sch.DynamicThunkException reason="Cannot fetch result of dominated thunk" collect(Context(), a) end end end + +c1 = Dagger.tochunk(1) +c2 = Dagger.tochunk(2) +@everywhere begin +function testpresent(x,y) + @assert haskey(Dagger.Sch.CHUNK_CACHE, $c1) + @assert haskey(Dagger.Sch.CHUNK_CACHE, $c2) + x+y +end +function testevicted(x) + sleep(1) + @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c1) + @assert !haskey(Dagger.Sch.CHUNK_CACHE, $c2) + x +end +end +@testset "Caching" begin + compute(delayed(testevicted)(delayed(testpresent)(c1,c2))) +end diff --git a/test/util.jl b/test/util.jl index 2e87a2690..77983071d 100644 --- a/test/util.jl +++ b/test/util.jl @@ -1,10 +1,25 @@ -macro test_throws_unwrap(terr, ex) +macro test_throws_unwrap(terr, args...) + _test_throws_unwrap(terr, args...) +end +function _test_throws_unwrap(terr, ex; to_match=[]) + @gensym rerr + match_expr = Expr(:block) + for m in to_match + @assert m.head == :(=) + push!(match_expr.args, :(@test $rerr.$(m.args[1]) == $(m.args[2]))) + end quote - rerr = try + $rerr = try $(esc(ex)) catch err - err + Dagger.Sch.unwrap_nested_exception(err) end - @test Dagger.Sch.unwrap_nested_exception(rerr) isa $terr + @test $rerr isa $terr + $match_expr end end +function _test_throws_unwrap(terr, args...) + ex = last(args) + to_match = args[1:end-1] + _test_throws_unwrap(terr, ex; to_match=to_match) +end