diff --git a/src/mapreduce.jl b/src/mapreduce.jl index e9a3f979..e6aa7f04 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -1,16 +1,70 @@ ## COV_EXCL_START # TODO -# - serial version for lower latency -# - group-stride loop to delay need for second kernel launch -# - let the driver choose the local size +# - block-stride loop to delay need for second kernel launch +#= +function nextwarp(dev, threads::Integer) + ws = get_sub_group_size() + return threads + (ws - threads % ws) % ws +end + +function prevwarp(dev, threads::Integer) + ws = get_sub_group_size() + return threads - Base.rem(threads, ws) +end +# Reduce a value across a warp +@inline function reduce_warp(op, val) + assume(warpsize() == 32) + offset = Int32(1) + while offset < warpsize() + val = op(val, intel_shfl_down(val, zero(val), offset)) + offset <<= 1 + end + + return val +end +=# +#= +# Reduce a value across a block, using shared memory for communication +@inline function reduce_block(op, val::T, neutral, shuffle::Val{true}) where T + # shared mem for partial sums + # assume(warpsize() == 32) + shared = CLLocalArray(T, 32) -# Reduce a value across a group, using local memory for communication -@inline function reduce_group(op, val::T, neutral, ::Val{maxitems}) where {T, maxitems} + wid, lane = fldmod1(get_local_id(), get_sub_group_size()) + + # each warp performs partial reduction + val = reduce_warp(op, val) + + # write reduced value to shared memory + if lane == 1 + @inbounds shared[wid] = val + end + + # wait for all partial reductions + work_group_barrier(LOCAL_MEM_FENCE) + + # read from shared memory only if that warp existed + val = if threadIdx().x <= fld1(get_local_size().x, warpsize()) + @inbounds shared[lane] + else + neutral + end + + # final reduce within first warp + if wid == 1 + val = reduce_warp(op, val) + end + + return val +end +=# + +@inline function reduce_block(op, val::T, neutral, shuffle::Val{false}, ::Val{maxitems}) where {T, maxitems} items = get_local_size() item = get_local_id() - # local mem for a complete reduction + # shared mem for a complete reduction shared = CLLocalArray(T, (maxitems,)) @inbounds shared[item] = val @@ -30,7 +84,7 @@ d *= 2 end - # load the final value on the first item + # load the final value on the first thread if item == 1 val = @inbounds shared[item] end @@ -45,43 +99,45 @@ Base.@propagate_inbounds _map_getindex(args::Tuple{}, I) = () # Reduce an array across the grid. All elements to be processed can be addressed by the # product of the two iterators `Rreduce` and `Rother`, where the latter iterator will have # singleton entries for the dimensions that should be reduced (and vice versa). -function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, As...) - # decompose the 1D hardware indices into separate ones for reduction (across items - # and possibly groups if it doesn't fit) and other elements (remaining groups) - localIdx_reduce = get_local_id() - localDim_reduce = get_local_size() - groupIdx_reduce, groupIdx_other = fldmod1(get_group_id(), length(Rother)) - groupDim_reduce = get_num_groups() ÷ length(Rother) - - # group-based indexing into the values outside of the reduction dimension - # (that means we can safely synchronize items within this group) - iother = groupIdx_other +function partial_mapreduce_grid(f, op, neutral, maxitems, Rreduce, Rother, shuffle, R::AbstractArray{T}, As...) where T + assume(length(Rother) > 0) + + # decompose the 1D hardware indices into separate ones for reduction (across threads + # and possibly blocks if it doesn't fit) and other elements (remaining blocks) + threadIdx_reduce = get_local_id() + blockDim_reduce = get_local_size() + blockIdx_reduce, blockIdx_other = fldmod1(get_group_id(), length(Rother)) + gridDim_reduce = get_num_groups() ÷ length(Rother) + + # block-based indexing into the values outside of the reduction dimension + # (that means we can safely synchronize threads within this block) + iother = blockIdx_other @inbounds if iother <= length(Rother) Iother = Rother[iother] # load the neutral value - Iout = CartesianIndex(Tuple(Iother)..., groupIdx_reduce) + Iout = CartesianIndex(Tuple(Iother)..., blockIdx_reduce) neutral = if neutral === nothing R[Iout] else neutral end - val = op(neutral, neutral) + val::T = op(neutral, neutral) - # reduce serially across chunks of input vector that don't fit in a group - ireduce = localIdx_reduce + (groupIdx_reduce - 1) * localDim_reduce + # reduce serially across chunks of input vector that don't fit in a block + ireduce = threadIdx_reduce + (blockIdx_reduce - 1) * blockDim_reduce while ireduce <= length(Rreduce) Ireduce = Rreduce[ireduce] J = max(Iother, Ireduce) val = op(val, f(_map_getindex(As, J)...)) - ireduce += localDim_reduce * groupDim_reduce + ireduce += blockDim_reduce * gridDim_reduce end - val = reduce_group(op, val, neutral, maxitems) + val = reduce_block(op, val, neutral, shuffle, maxitems) # write back to memory - if localIdx_reduce == 1 + if threadIdx_reduce == 1 R[Iout] = val end end @@ -89,15 +145,55 @@ function partial_mapreduce_device(f, op, neutral, maxitems, Rreduce, Rother, R, return end +function serial_mapreduce_kernel(f, op, neutral, Rreduce, Rother, R, As) + grid_idx = get_local_id() + (get_group_id() - 1) * get_local_size() + @inbounds if grid_idx <= length(Rother) + Iother = Rother[grid_idx] + + # load the neutral value + neutral = if neutral === nothing + R[Iother] + else + neutral + end + + val = op(neutral, neutral) + + Ibegin = Rreduce[1] + for Ireduce in Rreduce + val = op(val, f(As[Iother + Ireduce - Ibegin])) + end + R[Iother] = val + end + return +end + ## COV_EXCL_STOP -function GPUArrays.mapreducedim!( - f::F, op::OP, R::WrappedCLArray{T}, +# factored out for use in tests +function serial_mapreduce_threshold(dev) + dev.max_work_group_size * dev.max_compute_units +end + +function GPUArrays.mapreducedim!(f::F, op::OP, R::WrappedCLArray{T}, A::Union{AbstractArray,Broadcast.Broadcasted}; init=nothing) where {F, OP, T} - Base.check_reducedims(R, A) + if !isa(A, Broadcast.Broadcasted) + # XXX: Base.axes isn't defined anymore for Broadcasted, breaking this check + Base.check_reducedims(R, A) + end length(A) == 0 && return R # isempty(::Broadcasted) iterates + dev = cl.device() + # be conservative about using shuffle instructions + #= + shuffle = T <: Union{Bool, + UInt8, UInt16, UInt32, UInt64, UInt128, + Int8, Int16, Int32, Int64, Int128, + Float16, Float32, Float64, + ComplexF16, ComplexF32, ComplexF64} + =# + shuffle = false # add singleton dimensions to the output container, if needed if ndims(R) < ndims(A) dims = Base.fill_to_length(size(R), 1, Val(ndims(A))) @@ -112,73 +208,111 @@ function GPUArrays.mapreducedim!( # NOTE: we hard-code `OneTo` (`first.(axes(A))` would work too) or we get a # CartesianIndices object with UnitRanges that behave badly on the GPU. @assert length(Rall) == length(Rother) * length(Rreduce) + @assert length(Rother) > 0 - # allocate an additional, empty dimension to write the reduced value to. - # this does not affect the actual location in memory of the final values, - # but allows us to write a generalized kernel supporting partial reductions. - R′ = reshape(R, (size(R)..., 1)) - - # how many items do we want? + # If `Rother` is large enough, then a naive loop is more efficient than partial reductions. + # @info serial_mapreduce_threshold(dev) + + if !contains(string(cl.device()), "zink") && length(Rother) >= serial_mapreduce_threshold(dev) || contains(string(cl.platform()), "Portable Computing Language") + args = (f, op, init, Rreduce, Rother, R, A) + kernel = @opencl launch=false serial_mapreduce_kernel(args...) + wg_info = cl.work_group_info(kernel.fun, dev) + local_size = wg_info.size + global_size = cld(length(Rother), local_size) * local_size + # @info local_size global_size + kernel(args...; local_size, global_size) + return R + end + + # how many threads do we want? # - # items in a group work together to reduce values across the reduction dimensions; + # threads in a block work together to reduce values across the reduction dimensions; # we want as many as possible to improve algorithm efficiency and execution occupancy. - wanted_items = length(Rreduce) - function compute_items(max_items) - if wanted_items > max_items - max_items + + wanted_threads = shuffle ? nextwarp(dev, length(Rreduce)) : length(Rreduce) + function compute_threads(max_threads) + if wanted_threads > max_threads + shuffle ? prevwarp(dev, max_threads) : max_threads else - wanted_items + wanted_threads end end - - # how many items can we launch? + max_lmem_elements = dev.local_mem_size ÷ sizeof(T) + max_items = min(dev.max_work_group_size, + compute_threads(max_lmem_elements ÷ 2)) + + # how many threads can we launch? # - # we might not be able to launch all those items to reduce each slice in one go. - # that's why each items also loops across their inputs, processing multiple values - # so that we can span the entire reduction dimension using a single item group. - - # group size is restricted by local memory - max_lmem_elements = cl.device().local_mem_size ÷ sizeof(T) - max_items = min(cl.device().max_work_group_size, - compute_items(max_lmem_elements ÷ 2)) - # TODO: dynamic local memory to avoid two compilations - - # let the driver suggest a group size - args = (f, op, init, Val(max_items), Rreduce, Rother, R′, A) - kernel_args = kernel_convert.(args) - kernel_tt = Tuple{Core.Typeof.(kernel_args)...} - kernel = clfunction(partial_mapreduce_device, kernel_tt) - wg_info = cl.work_group_info(kernel.fun, cl.device()) - reduce_items = compute_items(wg_info.size) - - # how many groups should we launch? + # we might not be able to launch all those threads to reduce each slice in one go. + # that's why each threads also loops across their inputs, processing multiple values + # so that we can span the entire reduction dimension using a single thread block. + # @info f op init Rreduce Rother Val(shuffle) R A shuffle + kernel = @opencl launch=false partial_mapreduce_grid(f, op, init, Val(max_items), Rreduce, Rother, Val(shuffle), R, A) + # compute_shmem(threads) = shuffle ? 0 : threads*sizeof(T) + # kernel_config = launch_configuration(kernel.fun; shmem=compute_shmem∘compute_threads) + wg_info = cl.work_group_info(kernel.fun, dev) + reduce_threads = compute_threads(wg_info.size) + # reduce_shmem = compute_shmem(reduce_threads) + + # how many blocks should we launch? # - # even though we can always reduce each slice in a single item group, that may not be - # optimal as it might not saturate the GPU. we already launch some groups to process + # even though we can always reduce each slice in a single thread block, that may not be + # optimal as it might not saturate the GPU. we already launch some blocks to process # independent dimensions in parallel; pad that number to ensure full occupancy. - other_groups = length(Rother) - reduce_groups = cld(length(Rreduce), reduce_items) + other_blocks = length(Rother) + blocks = 1 + reduce_blocks = if other_blocks >= blocks + 1 + else + min(cld(length(Rreduce), reduce_threads), # how many we need at most + cld(kernel_config.blocks, other_blocks)) # maximize occupancy + end # determine the launch configuration - local_size = reduce_items - global_size = reduce_items*reduce_groups*other_groups - + local_size = reduce_threads + # shmem = reduce_shmem + global_size = reduce_blocks*other_blocks*reduce_threads # perform the actual reduction - if reduce_groups == 1 - # we can cover the dimensions to reduce using a single group - @opencl local_size global_size partial_mapreduce_device( - f, op, init, Val(local_size), Rreduce, Rother, R′, A) + if reduce_blocks == 1 + # we can cover the dimensions to reduce using a single block + # @info local_size global_size + kernel(f, op, init, Val(local_size), Rreduce, Rother, Val(shuffle), R, A; local_size, global_size) else - # we need multiple steps to cover all values to reduce - partial = similar(R, (size(R)..., reduce_groups)) + @warn "not here" + #= + # TODO: provide a version that atomically reduces from different blocks + + # temporary empty array whose type will match the final partial array + partial = similar(R, ntuple(_ -> 0, Val(ndims(R)+1))) + + # NOTE: we can't use the previously-compiled kernel, or its launch configuration, + # since the type of `partial` might not match the original output container + # (e.g. if that was a view). + partial_kernel = @cuda launch=false partial_mapreduce_grid(f, op, init, Rreduce, Rother, Val(shuffle), partial, A) + partial_kernel_config = launch_configuration(partial_kernel.fun; shmem=compute_shmem∘compute_threads) + partial_reduce_threads = compute_threads(partial_kernel_config.threads) + partial_reduce_shmem = compute_shmem(partial_reduce_threads) + partial_reduce_blocks = if other_blocks >= partial_kernel_config.blocks + 1 + else + min(cld(length(Rreduce), partial_reduce_threads), + cld(partial_kernel_config.blocks, other_blocks)) + end + partial_threads = partial_reduce_threads + partial_shmem = partial_reduce_shmem + partial_blocks = partial_reduce_blocks*other_blocks + + partial = similar(R, (size(R)..., partial_reduce_blocks)) if init === nothing # without an explicit initializer we need to copy from the output container partial .= R end - @opencl local_size global_size partial_mapreduce_device( - f, op, init, Val(local_size), Rreduce, Rother, partial, A) - GPUArrays.mapreducedim!(identity, op, R′, partial; init=init) + partial_kernel(f, op, init, Rreduce, Rother, Val(shuffle), partial, A; + threads=partial_threads, blocks=partial_blocks, shmem=partial_shmem) + + GPUArrays.mapreducedim!(identity, op, R, partial; init) + =# end return R