Skip to content

Thunk cost estimation, chunk caching, benchmark updates #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"

[compat]
Colors = "0.10, 0.11, 0.12"
MemPool = "0.3.3"
MemPool = "0.3.4"
Requires = "1"
StatsBase = "0.28, 0.29, 0.30, 0.31, 0.32, 0.33"
julia = "1.0"
Expand Down
82 changes: 57 additions & 25 deletions benchmarks/benchmark.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ elseif render == "offline"
using FFMPEG, FileIO, ImageMagick
end
const RENDERS = Dict{Int,Dict}()
const live_port = parse(Int, get(ENV, "BENCHMARK_LIVE_PORT", "8000"))

const graph = parse(Bool, get(ENV, "BENCHMARK_GRAPH", "0"))
const profile = parse(Bool, get(ENV, "BENCHMARK_PROFILE", "0"))

_benches = get(ENV, "BENCHMARK", "cpu,cpu+dagger")
const benches = []
Expand Down Expand Up @@ -106,7 +110,7 @@ end

theory_flops(nrow, ncol, nfeatures) = 11 * ncol * nrow * nfeatures + 2 * (ncol + nrow) * nfeatures

function nmf_suite(; dagger, accel, kwargs...)
function nmf_suite(ctx; dagger, accel)
suite = BenchmarkGroup()

#= TODO: Re-enable
Expand Down Expand Up @@ -179,47 +183,56 @@ function nmf_suite(; dagger, accel, kwargs...)
])
elseif accel == "cpu"
Dagger.Sch.SchedulerOptions()
else
error("Unknown accelerator $accel")
end
ctx = Context(collect((1:nw) .+ 1); kwargs...)
p = sum([length(Dagger.get_processors(OSProc(id))) for id in 2:(nw+1)])
#bsz = ncol ÷ length(workers())
bsz = ncol ÷ 64
nsuite["Workers: $nw"] = @benchmarkable begin
compute($ctx, nnmf($X[], $W[], $H[]); options=$opts)
_ctx = Context($ctx, workers()[1:$nw])
compute(_ctx, nnmf($X[], $W[], $H[]); options=$opts)
end setup=begin
_nw, _scale = $nw, $scale
@info "Starting $_nw worker Dagger NNMF (scale by $_scale)"
if render != ""
Dagger.show_gantt($ctx; width=1800, window_length=20, delay=2, port=4040, live=live)
end
if $accel == "cuda"
# FIXME: Allocate with CUDA.rand if possible
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts))
$X[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(CUDA.cu, compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts))
elseif $accel == "amdgpu"
$X[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts))
$W[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts))
$H[] = Dagger.mapchunks(ROCArray, compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts))
elseif $accel == "cpu"
$X[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $ncol); options=$opts)
$W[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nrow, $nfeatures); options=$opts)
$H[] = compute(rand(Blocks($nrow, $ncol÷$p), Float32, $nfeatures, $ncol); options=$opts)
$X[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $ncol); options=$opts)
$W[] = compute(rand(Blocks($nrow, $bsz), Float32, $nrow, $nfeatures); options=$opts)
$H[] = compute(rand(Blocks($nrow, $bsz), Float32, $nfeatures, $ncol); options=$opts)
end
end teardown=begin
if render != ""
if render != "" && !live
Dagger.continue_rendering[] = false
video_paths = take!(Dagger.render_results)
try
video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths))
push!(get!(()->[], RENDERS[$scale], $nw), video_data)
catch
for i in 1:5
isready(Dagger.render_results) && break
sleep(1)
end
if isready(Dagger.render_results)
video_paths = take!(Dagger.render_results)
try
video_data = Dict(key=>read(video_paths[key]) for key in keys(video_paths))
push!(get!(()->[], RENDERS[$scale], $nw), video_data)
catch err
@error "Failed to process render results" exception=(err,catch_backtrace())
end
else
@warn "Failed to fetch render results"
end
end
$X[] = nothing
$W[] = nothing
$H[] = nothing
@everywhere GC.gc()
end
break
nw ÷= 2
end
suite["NNMF scaled by: $scale"] = nsuite
Expand All @@ -234,28 +247,42 @@ function main()
output_prefix = "result-$(np)workers-$(nt)threads-$(Dates.now())"

suites = Dict()
graph_opts = if graph && render != ""
(log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot")
elseif render != ""
(log_sink=Dagger.LocalEventLog(),)
else
NamedTuple()
end
ctx = Context(collect((1:nw) .+ 1); profile=profile, graph_opts...)
for bench in benches
name = bench.name
println("creating $name benchmarks")
suites[name] = if bench.dagger
nmf_suite(; dagger=true, accel=bench.accel, log_sink=Dagger.LocalEventLog(), log_file=output_prefix*".dot", profile=false)
else
nmf_suite(; dagger=false, accel=bench.accel)
suites[name] = nmf_suite(ctx; dagger=bench.dagger, accel=bench.accel)
end
if render != ""
Dagger.show_gantt(ctx; width=1800, window_length=5, delay=2, port=live_port, live=live)
if live
# Make sure server code is compiled
sleep(1)
run(pipeline(`curl -s localhost:$live_port/`; stdout=devnull))
run(pipeline(`curl -s localhost:$live_port/profile`; stdout=devnull))
@info "Rendering started on port $live_port"
end
end
res = Dict()
for bench in benches
name = bench.name
println("running $name benchmarks")
res[name] = try
run(suites[name]; samples=5, seconds=10*60, gcsample=true)
run(suites[name]; samples=3, seconds=10*60, gcsample=true)
catch err
@error "Error running $name benchmarks" exception=(err,catch_backtrace())
nothing
end
end
for bench in benches
println("benchmark results for $(bench.name): $(res[bench.name])")
println("benchmark results for $(bench.name): $(minimum(res[bench.name]))")
end

println("saving results in $output_prefix.$output_format")
Expand All @@ -267,6 +294,11 @@ function main()
serialize(io, outdict)
end
end

if parse(Bool, get(ENV, "BENCHMARK_VISUALIZE", "0"))
run(`$(Base.julia_cmd()) $(joinpath(pwd(), "visualize.jl")) -- $(output_prefix*"."*output_format)`)
end

println("Done.")

# TODO: Compare with multiple results
Expand Down
104 changes: 71 additions & 33 deletions benchmarks/visualize.jl
Original file line number Diff line number Diff line change
@@ -1,27 +1,59 @@
using JLD
using JLD, Serialization
using BenchmarkTools
using TypedTables

res = JLD.load(ARGS[1])
res = if endswith(ARGS[1], ".jld")
JLD.load(ARGS[1])
elseif endswith(ARGS[1], ".jls")
deserialize(ARGS[1])
else
error("Unknown file type")
end

serial_results = res["results"]["Serial"]
dagger_results = res["results"]["Dagger"]
serial_results = filter(x->!occursin("dagger", x[1]), res["results"])
@assert length(keys(serial_results)) > 0 "No serial results found"
dagger_results = filter(x->occursin("dagger", x[1]), res["results"])
@assert length(keys(dagger_results)) > 0 "No Dagger results found"

scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(serial_results)[2])]; by=x->x[2])
nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(first(dagger_results)[2][first(first(scale_set))])]; by=x->x[2])
raw_table = NamedTuple[]
for bset_key in keys(res["results"])
bset = res["results"][bset_key]
if typeof(bset[first(first(scale_set))]) <: BenchmarkGroup
procs = parse(Int, lstrip(last(split(first(first(bset[first(first(scale_set))])), ':')), ' '))
for nw in nw_set
for i in 1:length(scale_set)
set_times = [minimum(bset[scale][nw[1]]).time/(10^9) for scale in first.(scale_set)]
push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=nw[2]))
end
end
else
set_times = [minimum(bset[scale]).time/(10^9) for scale in first.(scale_set)]
procs = 8 # default for OpenBLAS
for i in 1:length(set_times)
push!(raw_table, (name=bset_key, time=set_times[i], scale=last.(scale_set)[i], procs=procs))
end
end
end
table = Table(raw_table)

scale_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(serial_results)]; by=x->x[2])
serial_times = [minimum(serial_results[scale]).time/(10^9) for scale in first.(scale_set)]
nw_set = sort([key=>parse(Int, lstrip(last(split(key, ':')), ' ')) for key in keys(dagger_results[first(first(scale_set))])]; by=x->x[2])
btable = copy(table[map(x->!x, occursin.(Ref("dagger"), table.name))])
dtable = copy(table[occursin.(Ref("dagger"), table.name)])

table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3])
#table = Table(name=[:Base for _ in 1:3], time=serial_times, scale=last.(scale_set), procs=[8 for _ in 1:3])

btable = copy(table)
#btable = copy(table)

#=
for (nw,nw_val) in nw_set
dagger_times = [minimum(dagger_results[scale][nw]).time/(10^9) for scale in first.(scale_set)]
t = Table(name=[:Dagger for _ in 1:3], time=dagger_times, scale=last.(scale_set), procs=[parse(Int,split(nw, ":")[2]) for _ in 1:3])
append!(table, t)
end
=#

dtable = table[table.name .== :Dagger]
#dtable = table[table.name .== :Dagger]

# Plotting

Expand All @@ -45,11 +77,11 @@ legend_names = String[]

scales = unique(dtable.scale)

colors = distinguishable_colors(lenght(scales), ColorSchemes.seaborn_deep.colors)
colors = distinguishable_colors(length(scales), ColorSchemes.seaborn_deep.colors)

for (i, scale) in enumerate(scales)
stable = dtable[dtable.scale .== scale]
t1 = first(stable[stable.procs .== 1].time)
t1 = first(stable[stable.procs .== minimum(dtable.procs)].time)
ss_efficiency = strong_scaling.(t1, stable.time, stable.procs)
push!(line_plots, lines!(ssp, stable.procs, ss_efficiency, linewidth=3.0, color = colors[i]))
push!(legend_names, "scale = $scale")
Expand All @@ -65,25 +97,32 @@ save("strong_scaling.png", fig)
# too little data

fig = Figure(resolution = (1200, 800))
weak_scaling(t1, tn) = t1/tn
weak_scaling(t1, tn, p_prime, p) = t1/((p_prime/p)*tn)

dtable = table[table.name .== :Dagger]
wstable = filter(row->row.scale == row.procs, dtable)
wstable = sort(wstable, by=r->r.scale)
t1 = first(wstable).time
t1 = first(dtable[map(row->(row.scale == 10) && (row.procs == 1), dtable)]).time

fig = Figure(resolution = (1200, 800))
perf = fig[1, 1] = Axis(fig, title = "Weak scaling")
perf.xlabel = "nprocs"
perf.ylabel = "Efficiency"
perf = fig[1, 1] = Axis(fig, title = "Weak Scaling")
perf.xlabel = "Number of processes"
perf.ylabel = "Scaling efficiency"

line_plots = Any[]
legend_names = String[]

lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time), linewidth=3.0)
wstable = similar(dtable, 0)
for pair in [(10,1),(35,4),(85,8)]
append!(wstable, dtable[map(row->(row.scale == pair[1]) && (row.procs == pair[2]), rows(dtable))])
end
push!(line_plots, lines!(perf, wstable.procs, weak_scaling.(t1, wstable.time, wstable.procs .* 10, wstable.scale), linewidth=3.0))
push!(legend_names, "cpu+dagger")

legend = fig[1, 2] = Legend(fig, line_plots, legend_names)
save("weak_scaling.png", fig)

# 3. Comparision against Base

fig = Figure(resolution = (1200, 800))
perf = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base")
perf = fig[1, 1] = Axis(fig, title = "Dagger vs Base")
perf.xlabel = "Scaling factor"
perf.ylabel = "time (s)"

Expand All @@ -92,7 +131,7 @@ legend_names = String[]

procs = unique(dtable.procs)

colors = distinguishable_colors(lenght(procs) + 1, ColorSchemes.seaborn_deep.colors)
colors = distinguishable_colors(length(procs) + 1, ColorSchemes.seaborn_deep.colors)

for (i, nproc) in enumerate(procs)
stable = dtable[dtable.procs .== nproc]
Expand All @@ -109,23 +148,22 @@ save("raw_timings.png", fig)

# 4. Speedup
fig = Figure(resolution = (1200, 800))
speedup = fig[1, 1] = Axis(fig, title = "DaggerArrays vs Base (8 threads)")
speedup.xlabel = "Scaling factor"
speedup.ylabel = "Speedup Base/Dagger"
speedup = fig[1, 1] = Axis(fig, title = "Speedup vs. 1 processor")
speedup.xlabel = "Number of processors"
speedup.ylabel = "Speedup"

line_plots = Any[]
legend_names = String[]

colors = distinguishable_colors(length(procs), ColorSchemes.seaborn_deep.colors)

sort!(btable, by=r->r.scale)
t1 = sort(dtable[dtable.scale .== 10]; by=r->r.procs)

for (i, nproc) in enumerate(unique(dtable.procs))
nproc < 8 && continue
stable = dtable[dtable.procs .== nproc]
sort!(stable, by=r->r.scale)
push!(line_plots, lines!(speedup, stable.scale, btable.time ./ stable.time, linewidth=3.0, color = colors[i]))
push!(legend_names, "Dagger (nprocs = $nproc)")
for (i, scale) in enumerate(unique(dtable.scale))
stable = dtable[dtable.scale .== scale]
sort!(stable, by=r->r.procs)
push!(line_plots, lines!(speedup, stable.procs, stable.time ./ t1.time, linewidth=3.0, color = colors[i]))
push!(legend_names, "Dagger (scale = $scale)")
end

legend = fig[1, 2] = Legend(fig, line_plots, legend_names)
Expand Down
4 changes: 4 additions & 0 deletions src/chunks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ function unrelease(c::Chunk{<:Any,DRef})
end
unrelease(c::Chunk) = c

Base.:(==)(c1::Chunk, c2::Chunk) = c1.handle == c2.handle
Base.hash(c::Chunk, x::UInt64) = hash(c.handle, x)

collect_remote(chunk::Chunk) =
move(chunk.processor, OSProc(), poolget(chunk.handle))

function collect(ctx::Context, chunk::Chunk; options=nothing)
# delegate fetching to handle by default.
if chunk.handle isa DRef && !(chunk.processor isa OSProc)
Expand Down
18 changes: 10 additions & 8 deletions src/compute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ end
##### Dag utilities #####

"""
dependents(node::Thunk, deps=Dict{Thunk, Set{Thunk}}()) -> Dict{Thunk, Set{Thunk}}
dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}

Find the set of direct dependents for each task.
"""
function dependents(node::Thunk)
deps = Dict{Thunk, Set{Thunk}}()
deps = Dict{Union{Thunk,Chunk}, Set{Thunk}}()
visited = Set{Thunk}()
to_visit = Set{Thunk}()
push!(to_visit, node)
Expand All @@ -98,10 +98,12 @@ function dependents(node::Thunk)
deps[next] = Set{Thunk}()
end
for inp in inputs(next)
if inp isa Thunk
s::Set{Thunk} = get!(()->Set{Thunk}(), deps, inp)
if istask(inp) || (inp isa Chunk)
s = get!(()->Set{Thunk}(), deps, inp)
push!(s, next)
!(inp in visited) && push!(to_visit, inp)
if istask(inp) && !(inp in visited)
push!(to_visit, inp)
end
end
end
push!(visited, next)
Expand All @@ -110,14 +112,14 @@ function dependents(node::Thunk)
end

"""
noffspring(dpents::Dict{Thunk, Set{Thunk}}) -> Dict{Thunk, Int}
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}

Recursively find the number of tasks dependent on each task in the DAG.
Takes a Dict as returned by [`dependents`](@ref).
"""
function noffspring(dpents::Dict{Thunk, Set{Thunk}})
function noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}})
noff = Dict{Thunk,Int}()
to_visit = collect(keys(dpents))
to_visit = collect(filter(istask, keys(dpents)))
while !isempty(to_visit)
next = popfirst!(to_visit)
haskey(noff, next) && continue
Expand Down
Loading