Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/oneapi/tbb/detail/_template_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ struct type_identity {
template <typename T>
using type_identity_t = typename type_identity<T>::type;

template <typename Callable>
void invoke_if(std::true_type, Callable&& f) {
std::forward<Callable>(f)();
}
template <typename Callable>
void invoke_if(std::false_type, Callable&&) {}

} // inline namespace d0
} // namespace detail
} // namespace tbb
Expand Down
28 changes: 8 additions & 20 deletions src/tbb/arena.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,29 +544,17 @@ inline d1::task* arena::steal_task(unsigned arena_index, FastRandom& frnd, execu
++k; // Adjusts random distribution to exclude self
}
arena_slot* victim = &my_slots[k];
d1::task **pool = victim->task_pool.load(std::memory_order_relaxed);
d1::task *t = nullptr;
if (pool == EmptyTaskPool || !(t = victim->steal_task(*this, isolation, k))) {
d1::task** pool = victim->task_pool.load(std::memory_order_relaxed);
d1::task* result = nullptr;
if (pool == EmptyTaskPool || !(result = victim->steal_task(*this, isolation, k))) {
return nullptr;
}
if (task_accessor::is_proxy_task(*t)) {
task_proxy &tp = *(task_proxy*)t;
d1::slot_id slot = tp.slot;
t = tp.extract_task<task_proxy::pool_bit>();
if (!t) {
// Proxy was empty, so it's our responsibility to free it
tp.allocator.delete_object(&tp, ed);
return nullptr;
}
// Note affinity is called for any stolen task (proxy or general)
ed.affinity_slot = slot;
} else {
// Note affinity is called for any stolen task (proxy or general)
ed.affinity_slot = d1::any_slot;
result = task_proxy::try_extract_task_from( result, ed, /*stolen=*/std::true_type{} );
if (result) {
// Update the task owner slot id to identify stealing
ed.original_slot = k;
}
// Update task owner thread id to identify stealing
ed.original_slot = k;
return t;
return result;
}

template<task_stream_accessor_type accessor>
Expand Down
75 changes: 56 additions & 19 deletions src/tbb/arena_slot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,61 @@ namespace r1 {
// Arena Slot
//------------------------------------------------------------------------

d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation) {
d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation, std::false_type) {
suppress_unused_warning(isolation);
__TBB_ASSERT(isolation == no_isolation, nullptr);
__TBB_ASSERT(is_task_pool_published(), nullptr);
bool all_tasks_checked = false;
// The current task position in the task pool.
std::size_t T0 = tail.load(std::memory_order_relaxed);
// The bounds of available tasks in the task pool. H0 is only used when the head bound is reached.
std::size_t H0 = (std::size_t)-1, T = T0;
d1::task* result = nullptr;
do {
// The full fence is required to sync the store of `tail` with the load of `head` (write-read barrier)
T = --tail;
// The acquire load of head is required to guarantee consistency of our task pool
// when a thief rolls back the head.
if ( (std::intptr_t)( head.load(std::memory_order_acquire) ) > (std::intptr_t)T ) {
acquire_task_pool();
H0 = head.load(std::memory_order_relaxed);
if ( (std::intptr_t)H0 > (std::intptr_t)T ) {
// The thief has not backed off - nothing to grab.
__TBB_ASSERT( H0 == head.load(std::memory_order_relaxed)
&& T == tail.load(std::memory_order_relaxed)
&& H0 == T + 1, "victim/thief arbitration algorithm failure" );
reset_task_pool_and_leave();
all_tasks_checked = true;
break /*do-while*/;
} else if ( H0 == T ) {
reset_task_pool_and_leave();
all_tasks_checked = true;
} else {
// Release task pool if there are still some tasks.
// After the release, the tail will be less than T, thus a thief
// will not attempt to get a task at the position T.
release_task_pool();
}
}
// Get a task from the pool at the position T.
__TBB_ASSERT(tail.load(std::memory_order_relaxed) <= T || is_local_task_pool_quiescent(),
"Is it safe to get a task at position T?");
__TBB_ASSERT( !all_tasks_checked || H0 == T, nullptr );
if ( task_pool_ptr[T] ) {
result = task_proxy::try_extract_task_from( task_pool_ptr[T], ed );
}
poison_pointer( task_pool_ptr[T] );
if ( result ) break /*do-while*/;
__TBB_ASSERT( T0 == T+1, nullptr );
T0 = T;
} while ( /*!result &&*/ !all_tasks_checked );

__TBB_ASSERT( (std::intptr_t)tail.load(std::memory_order_relaxed) >= 0, nullptr );
__TBB_ASSERT( result || is_quiescent_local_task_pool_reset(), nullptr );
return result;
}

d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation, std::true_type) {
bool all_tasks_checked = false;
bool tasks_skipped = false;

Expand All @@ -43,23 +97,6 @@ d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation)
tasks_skipped = true;
return nullptr;
};
// A helper function to detect and handle proxy tasks.
// Returns the pointer to the real task, or nullptr if there is no task to execute.
auto check_task_proxy = [&](d1::task* task_candidate) -> d1::task* {
__TBB_ASSERT(task_candidate, nullptr);
__TBB_ASSERT(!is_poisoned( task_candidate ), "A poisoned task cannot be processed");
if (!task_accessor::is_proxy_task(*task_candidate)){
return task_candidate;
}
task_proxy& tp = static_cast<task_proxy&>(*task_candidate);
if ( d1::task *t = tp.extract_task<task_proxy::pool_bit>() ) {
ed.affinity_slot = tp.slot;
return t;
}
// Proxy was empty, so it's our responsibility to free it
tp.allocator.delete_object(&tp, ed);
return nullptr;
};

__TBB_ASSERT(is_task_pool_published(), nullptr);
accessed_by_owner.store(true, std::memory_order_relaxed);
Expand Down Expand Up @@ -108,7 +145,7 @@ d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation)
}
if ( result ) {
// Isolation matches; check if there is a real task
result = check_task_proxy( result );
result = task_proxy::try_extract_task_from( result, ed );
// If some tasks were skipped, mark the position as a hole, otherwise poison it.
if ( tasks_skipped ) {
task_pool_ptr[T] = nullptr;
Expand Down
6 changes: 4 additions & 2 deletions src/tbb/arena_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ class arena_slot : private arena_slot_shared_state, private arena_slot_private_s
//! Get a task from the local pool.
/** Called only by the pool owner.
Returns the pointer to the task or nullptr if a suitable task is not found.
Resets the pool if it is empty. **/
d1::task* get_task(execution_data_ext&, isolation_type);
Resets the pool if it is empty.
The last parameter is used to differentiate overloads with vs. without task isolation. **/
d1::task* get_task(execution_data_ext&, isolation_type, std::false_type);
d1::task* get_task(execution_data_ext&, isolation_type, std::true_type);

//! Steal task from slot's ready pool
d1::task* steal_task(arena&, isolation_type, std::size_t);
Expand Down
30 changes: 25 additions & 5 deletions src/tbb/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ struct task_proxy : public d1::task {
return (tat & location_mask) == location_mask;
}

//! Returns a pointer to the encapsulated task or nullptr.
//! Cleans a task pointer from a tag
static task* task_ptr ( intptr_t tat ) {
return (task*)(tat & ~location_mask);
}

//! Returns a pointer to the encapsulated task or nullptr, and frees proxy if necessary.
//! Returns a pointer to the encapsulated task or nullptr.
template<intptr_t from_bit>
inline task* extract_task () {
// __TBB_ASSERT( prefix().extra_state == es_task_proxy, "Normal task misinterpreted as a proxy?" );
__TBB_ASSERT( task_accessor::is_proxy_task(*this), "A supposed task proxy does not have the proxy trait" );
intptr_t tat = task_and_tag.load(std::memory_order_acquire);
__TBB_ASSERT( tat == from_bit || (is_shared(tat) && task_ptr(tat)),
"Proxy's tag cannot specify both locations if the proxy "
Expand All @@ -71,17 +71,37 @@ struct task_proxy : public d1::task {
const intptr_t cleaner_bit = location_mask & ~from_bit;
// Attempt to transition the proxy to the "empty" state with
// cleaner_bit specifying entity responsible for its eventual freeing.
// Explicit cast to void* is to work around a seeming ICC 11.1 bug.
if ( task_and_tag.compare_exchange_strong(tat, cleaner_bit) ) {
// Successfully grabbed the task, and left new owner with the job of freeing the proxy
return task_ptr(tat);
}
}
// Proxied task has already been claimed from another proxy location.
// The proxied task has already been claimed from the other location.
__TBB_ASSERT( task_and_tag.load(std::memory_order_relaxed) == from_bit, "Empty proxy cannot contain non-zero task pointer" );
return nullptr;
}

//! Checks if a given task is a proxy, then either extracts the real task or frees the proxy.
template<bool B = false>
static task* try_extract_task_from ( task* t, execution_data_ext& ed, std::integral_constant<bool, B> stolen = {} ) {
__TBB_ASSERT(t && !is_poisoned(t), "Not a valid task pointer");
if (!task_accessor::is_proxy_task(*t)){
invoke_if( stolen, [&ed](){
ed.affinity_slot = d1::any_slot;
});
return t;
}
task_proxy& tp = static_cast<task_proxy&>(*t);
t = tp.extract_task<pool_bit>();
if (t) {
ed.affinity_slot = tp.slot;
return t;
}
// The task has been consumed, so it's our responsibility to free the proxy.
tp.allocator.delete_object(&tp, ed);
return nullptr;
}

task* execute(d1::execution_data&) override {
__TBB_ASSERT_RELEASE(false, nullptr);
return nullptr;
Expand Down
4 changes: 2 additions & 2 deletions src/tbb/scheduler_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,8 @@ class alignas (max_nfs_size) task_dispatcher {
d1::task* receive_or_steal_task(thread_data& tls, execution_data_ext& ed, Waiter& waiter,
isolation_type isolation, bool outermost, bool criticality_absence);

template <bool ITTPossible, typename Waiter>
d1::task* local_wait_for_all(d1::task * t, Waiter& waiter);
template <bool ITTPossible, bool Isolated, typename Waiter>
d1::task* local_wait_for_all(d1::task* t, Waiter& waiter);

task_dispatcher(const task_dispatcher&) = delete;

Expand Down
21 changes: 14 additions & 7 deletions src/tbb/task_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ d1::task* task_dispatcher::receive_or_steal_task(
return t;
}

template <bool ITTPossible, typename Waiter>
template <bool ITTPossible, bool Isolated, typename Waiter>
d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
assert_pointer_valid(m_thread_data);
__TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
Expand Down Expand Up @@ -268,8 +268,11 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
// The context guard to track fp setting and itt tasks.
context_guard_helper</*report_tasks=*/ITTPossible> context_guard;

constexpr std::integral_constant<bool, Isolated> use_isolation;
// Current isolation context
const isolation_type isolation = dl_guard.old_execute_data_ext.isolation;
__TBB_ASSERT((isolation == no_isolation) != Isolated,
"Isolation mismatch; using a wrong instantiation of local_wait_for_all?");

// Critical work inflection point. Once turned false current execution context has taken
// critical task on the previous stack frame and cannot take more until that critical path is
Expand Down Expand Up @@ -352,7 +355,7 @@ d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
break;
}
// Retrieve the task from local task pool
if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation)))) {
if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation, use_isolation)))) {
__TBB_ASSERT(ed.original_slot == m_thread_data->my_arena_index, nullptr);
ed.context = task_accessor::context(*t);
ed.isolation = task_accessor::isolation(*t);
Expand Down Expand Up @@ -464,11 +467,15 @@ inline d1::task* task_dispatcher::get_mailbox_task(mail_inbox& my_inbox, executi

template <typename Waiter>
d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter) {
if (governor::is_itt_present()) {
return local_wait_for_all</*ITTPossible = */ true>(t, waiter);
} else {
return local_wait_for_all</*ITTPossible = */ false>(t, waiter);
}
using instantiation_type = d1::task*(task_dispatcher::*)(d1::task*, Waiter&);
static instantiation_type instantiations[] = {
&task_dispatcher::local_wait_for_all</*ITTPossible=*/false, /*Isolated=*/false, Waiter>,
&task_dispatcher::local_wait_for_all</*ITTPossible=*/false, /*Isolated=*/true, Waiter>,
&task_dispatcher::local_wait_for_all</*ITTPossible=*/true, /*Isolated=*/false, Waiter>,
&task_dispatcher::local_wait_for_all</*ITTPossible=*/true, /*Isolated=*/true, Waiter>
};
const int call_idx = (int(governor::is_itt_present()) << 1) + int(m_execute_data_ext.isolation != no_isolation);
return (this->*instantiations[call_idx])(t, waiter);
}

} // namespace r1
Expand Down