diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index 30e778787..d1bcec99f 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -757,14 +757,16 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) inputs = map(last, collect_task_inputs(state, task)) opts = populate_defaults(opts, chunktype(task.f), map(chunktype, inputs)) - local_procs, costs = estimate_task_costs(state, local_procs, task, inputs) + local_procs, costs = estimate_task_costs(state, local_procs, task, inputs; sig) scheduled = false - # Move our corresponding ThreadProc to be the last considered + # Move our corresponding ThreadProc to be the last considered, + # if the task is expected to run for longer than the time it takes to + # schedule it onto another worker (estimated at 1ms). if length(local_procs) > 1 sch_threadproc = Dagger.ThreadProc(myid(), Threads.threadid()) sch_thread_idx = findfirst(proc->proc==sch_threadproc, local_procs) - if sch_thread_idx !== nothing + if sch_thread_idx !== nothing && costs[sch_threadproc] > 1_000_000 # 1ms deleteat!(local_procs, sch_thread_idx) push!(local_procs, sch_threadproc) end @@ -786,7 +788,7 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) state.worker_time_pressure[gproc.pid][proc] = get(state.worker_time_pressure[gproc.pid], proc, 0) + est_time_util - @dagdebug task :schedule "Scheduling to $gproc -> $proc" + @dagdebug task :schedule "Scheduling to $gproc -> $proc (cost: $(costs[proc]))" @goto pop_task end end diff --git a/src/sch/util.jl b/src/sch/util.jl index 2cd96c38e..4e1d2bfff 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -462,7 +462,7 @@ current estimated per-processor compute pressure, and transfer costs for each `Chunk` argument to `task`. Returns `(procs, costs)`, with `procs` sorted in order of ascending cost. """ -function estimate_task_costs(state, procs, task, inputs) +function estimate_task_costs(state, procs, task, inputs; sig=nothing) tx_rate = state.transfer_rate[] # Find all Chunks @@ -473,9 +473,17 @@ function estimate_task_costs(state, procs, task, inputs) end end + # Estimate the cost of executing the task itself + if sig === nothing + sig = signature(task.f, inputs) + end + est_time_util = get(state.signature_time_cost, sig, 1000^3) + + # Estimate total cost for executing this task on each candidate processor costs = Dict{Processor,Float64}() for proc in procs - chunks_filt = Iterators.filter(c->get_parent(processor(c))!=get_parent(proc), chunks) + gproc = get_parent(proc) + chunks_filt = Iterators.filter(c->get_parent(processor(c))!=gproc, chunks) # Estimate network transfer costs based on data size # N.B. `affinity(x)` really means "data size of `x`" @@ -485,8 +493,14 @@ function estimate_task_costs(state, procs, task, inputs) tx_cost = impute_sum(affinity(chunk)[2] for chunk in chunks_filt) # Estimate total cost to move data and get task running after currently-scheduled tasks - est_time_util = get(state.worker_time_pressure[get_parent(proc).pid], proc, 0) - costs[proc] = est_time_util + (tx_cost/tx_rate) + est_business = get(state.worker_time_pressure[get_parent(proc).pid], proc, 0) + + # Add fixed cost for cross-worker task transfer (esimated at 1ms) + # TODO: Actually estimate/benchmark this + task_xfer_cost = gproc.pid != myid() ? 1_000_000 : 0 # 1ms + + # Compute final cost + costs[proc] = est_time_util + est_business + (tx_cost/tx_rate) + task_xfer_cost end # Shuffle procs around, so equally-costly procs are equally considered