Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@threads f(...) syntax to tell callee to use threaded version? #40329

Open
stevengj opened this issue Apr 3, 2021 · 8 comments
Open

@threads f(...) syntax to tell callee to use threaded version? #40329

stevengj opened this issue Apr 3, 2021 · 8 comments
Labels
design Design of APIs or of the language itself multithreading Base.Threads and related functionality speculative Whether the change will be implemented is speculative

Comments

@stevengj
Copy link
Member

stevengj commented Apr 3, 2021

It occurred to me that it might be nice to have @threads f(...) rewrite the function call to something like f(Threads.ThreadsCall{true}(), ...), if only to provide a uniform syntax for requesting a multi-threaded version of an algorithm.

For example, we could write @threads sum(x) to call a sum(::ThreadsCall{true}) method rather than having a sum_threads function. (Of course, in an ideal world with no threading overhead, we could have sum always use a @threads for, but for the foreseeable future this will pay a big price for summing short arrays.)

The reason I suggest ThreadsCall{true} is that this allows you to define functions that share code between the threaded and serial versions:

f(args...) = f(ThreadsCall{false}(), args...) # default is non-threaded
function f(::ThreadsCall{TC}, args...) where {TC}
    ...some calculations....
    if TC
        ... threaded algorithm ...
    else
        ... serial algorithm ...
    end
    @maybethreads TC for ...    # flag version of @threads for to enable/disable threading
    end
    ... some other calculations ...
    return g(ThreadsCall{TC}(), ...) # call subsidiary function and pass TC
end

in order to share code between the threaded and non-threaded version, and to pass the choice of whether or not to thread through to subsidiary functions.

As suggested above, it would be nice to also make @maybethreads bool for ... turn into

if bool
    @threads for ...
else
    for ...
end

so that you could write a single loop and use TC from ::ThreadsCall{TC} to decide whether to thread it. Similarly for @maybespawn, @maybesync as suggested below

For #19777, we could write a broadcast(::ThreadsCall{true}, ...) method and then make @threads f.(...) call it.

cc: @jw3126, @baggepinnen, @mohamed82008, who have all written packages with threaded variants of stdlib functions. Eventually it would be nice to fold some of this into base with @threads map(...) etcetera.

@stevengj stevengj added speculative Whether the change will be implemented is speculative multithreading Base.Threads and related functionality labels Apr 3, 2021
@quinnj
Copy link
Member

quinnj commented Apr 3, 2021

I have found myself wanting something exactly like this and this.

We've also been discussing and working on a lot of multithreading integration in DataFrames.jl for the 1.0 release and there have been several times where it's unclear exactly the API to provide users to control threading. cc: @nalimilan @bkamins

@stevengj
Copy link
Member Author

stevengj commented Apr 3, 2021

@quinnj, your first example could be simplified if @spawn @maybespawn also accepted a boolean parameter similar to what I proposed for @threads for, i.e. you could do

@maybespawn TC for .... end

to use the boolean type parameter TC to decide whether or not to spawn a thread, rather than having to write two identical copies of the loop.

@stevengj
Copy link
Member Author

stevengj commented Apr 3, 2021

Alternatively, if the boolean-flag version of @threads and @spawn seem too overloaded/ambiguous, we could have new macros to take a boolean parameter, e.g.

@maybethreads bool for ... end  # uses @threads for if bool == true
@maybethreads bool f(args...)   # calls @threads f(args...) if bool == true
@maybespawn bool something  # uses @spawn something if bool == true
@maybesync bool something  # uses @sync something if bool == true

which you could use with the type parameter TC (which will allow the compiler to eliminate the branch) to unify the threaded and serial implementations of a function.

@bkamins
Copy link
Member

bkamins commented Apr 3, 2021

Let me give one example from DataFrames.jl that would be great to simplify:

    @static if VERSION >= v"1.4"
        if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
            @sync begin
                for col in eachcol(dfl_noon)
                    cols_i = left_idxs[col_idx]
                    Threads.@spawn _noon_compose_helper!(cols, _similar_left, cols_i,
                                                         col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
                    col_idx += 1
                end
                @assert col_idx == ncol(joiner.dfl) + 1
                for col in eachcol(dfr_noon)
                    cols_i = col_idx
                    Threads.@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow,
                                                         right_ixs, lil + loil + 1, rightonly_ixs, roil)
                    col_idx += 1
                end
            end
        else
            for col in eachcol(dfl_noon)
                _noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
                                      col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
                col_idx += 1
            end
            @assert col_idx == ncol(joiner.dfl) + 1
            for col in eachcol(dfr_noon)
                _noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
                                      right_ixs, lil + loil + 1, rightonly_ixs, roil)
                col_idx += 1
            end
        end
    else
        for col in eachcol(dfl_noon)
            _noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
                                  col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
            col_idx += 1
        end
        @assert col_idx == ncol(joiner.dfl) + 1
        for col in eachcol(dfr_noon)
            _noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
                                  right_ixs, lil + loil + 1, rightonly_ixs, roil)
            col_idx += 1
        end
    end

(of course it is more complex than it needs to be also because we also support Julia 1.0 - and once new LTS is out the code will be cleaned up, but the point is that it would be great to have something that would allow to enable/disable @spawn calling multiple threads or not with a single bool switch).

@mohamed82008
Copy link
Contributor

I think all of the proposals here can live in a package, e.g. https://github.com/tkf/ThreadsX.jl. For higher order functions in Base, calling @threads can simply replace f by ThreadsX.f, e.g. for map, mapreduce, foreach etc. I think in practice, people may want to switch threading on or off in different places based on the application. Doing this manually or on a case-by-case basis is probably easier than trying to fit in a centralized approach. The method proposed here only allows a binary choice so it only works if we either want to thread all the way down or only use a single thread throughout. It's probably sufficient in a lot of cases but I don't see a reason it can't live outside of Base.

@stevengj
Copy link
Member Author

stevengj commented Apr 3, 2021

For higher order functions in Base, calling @threads can simply replace f by ThreadsX.f, e.g. for map, mapreduce, foreach etc.

@threads can't introduce a dependency on an external package. You'd have to call the macro something else, e.g. @threadx f(...) — and of course, you can do that now if you want.

Second, I specifically don't want to replace f(...) with SomeOtherModule.f(...) because I want to have the option of sharing code between the threaded and serial versions of f as in the examples above.

(In my mind, there's not much benefit to writing @threadx f(...) vs. ThreadX.f(...) if that's what you want. Whereas there is a big benefit to not having to write two nearly identical copies of a function, one with @threads for ... and @spawn ... and the other without.)

It's probably sufficient in a lot of cases but I don't see a reason it can't live outside of Base.

It's certainly true that the proposed functionality could be put into a ThreadCalls.jl package to start with (e.g. as @threadcall, @maybethreads, @maybespawn, @maybesync). And this has the advantage of being available immediately, and in earlier versions of Julia.

But it seems like a uniform syntax to opt-in to threaded versions of functions, and to share code between threaded and serial versions, was fundamental enough to consider for Base. (Also, I wanted to bounce the general design questions off of the core-dev audience.)

@jw3126
Copy link
Contributor

jw3126 commented Apr 3, 2021

cc @tkf

@tkf
Copy link
Member

tkf commented Apr 3, 2021

I think the underlying infrastructure for this already exists in JuliaFolds, especially as the executor. The surface API closest to the one mentioned in OP is provided from Folds.jl. Furthermore, all executors can also be used with FLoops.jl for the familiar for loop syntax and highly customizable reduction and Transducers.jl for a rich set of monoid combinators.

There's a quick introduction in [ANN] Folds.jl: threaded, distributed, and GPU-based high-level data-parallel interface for Julia - Community / Package announcements - JuliaLang. I also discussed how we can build an ecosystem for parallel computing by extending the infrastructure in JuliaFolds.

I'd also note that specifying "threaded" f is rather limited (and, as an aside, no macro is needed). We can also aim higher, to include GPU and distributed-based version of a given function. Just as a quick overview, here's a subset of executors that are already implemented:

using Folds
xs = (f(x) for x in 1:10 if p(x))  # arbitrary splittable data collection and iterator comprehension
Folds.sum(xs)  # default to ThreadedEx
Folds.sum(xs, ThreadedEx())
Folds.sum(xs, SequentialEx())   # single-threaded
Folds.sum(xs, DistributedEx())  # Distributed.jl
Folds.sum(xs, CUDAEx())         # using FoldsCUDA.jl (CUDA)
Folds.sum(xs, KAEx())           # using FoldsKernelAbstractions.jl (GPU)
Folds.sum(xs, DaggerExEx())     # using FoldsDagger.jl (distributed)

Additionally, there are various thread-based executors in FoldsThreads.jl. See [ANN] FoldsThreads.jl: A zoo of pluggable thread-based data-parallel execution mechanisms - Community / Package announcements - JuliaLang for what it has and the benchmarks demonstrating their performance characteristics. That is to say, even just "threaded" version of a given function can have a gazillion of variants.

There are a lot of things to consider if we want to build a composable and extensible infrastructure for parallel computing. We need to carefully decompose the existing notion of the algorithm, data structure, and execution mechanism. I hope we don't add any premature APIs in Base.

@JeffBezanson JeffBezanson added the design Design of APIs or of the language itself label Apr 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design Design of APIs or of the language itself multithreading Base.Threads and related functionality speculative Whether the change will be implemented is speculative
Projects
None yet
Development

No branches or pull requests

7 participants