diff --git a/doc/main/tbb_userguide/Cancellation_Without_An_Exception.rst b/doc/main/tbb_userguide/Cancellation_Without_An_Exception.rst index 90353ac569..4160275266 100644 --- a/doc/main/tbb_userguide/Cancellation_Without_An_Exception.rst +++ b/doc/main/tbb_userguide/Cancellation_Without_An_Exception.rst @@ -44,3 +44,29 @@ The example below shows how to use ``current_context()->cancel_group_execution() return 0; } +task_group Cancellation Example +=============================== + +The interface to a ``task_group`` provides shortcuts to access its asoociated ``task_group_context``. +The function ``oneapi::tbb::task_group::cancel()`` cancels the ``task_group_context`` associated +with the ``task_group`` instance. And the free function +``oneapi::tbb::is_current_task_group_canceling()`` returns ``true`` if the innermost ``task_group`` +executing on the calling thread is cancelling its tasks. + +Here is an example of how to use task cancellation with ``oneapi::tbb::task_group``. This code +uses ``struct TreeNode`` and the function ``sequential_tree_search`` that are described in +:ref:`creating_tasks_with_parallel_invoke`. + +The function ``parallel_tree_search_cancellable_impl`` cancels the ``task_group`` when a result +is found. + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_parallel_search_cancellation*/ + :end-before: /*end_parallel_search_cancellation*/ + +The call to ``tg.cancel()`` cancels the tasks in the ``task_group`` that have been submitted but +have not started to execute. The task scheduler will not execute these tasks. Those tasks that +have already started will not be interrupted but they can query the cancellation status of the +``task_group`` by calling ``oneapi::tbb::is_current_task_group_canceling()`` and exit early if +they detect cancellation. \ No newline at end of file diff --git a/doc/main/tbb_userguide/Images/concurrent_tasks.png b/doc/main/tbb_userguide/Images/concurrent_tasks.png new file mode 100644 index 0000000000..cccc435310 Binary files /dev/null and b/doc/main/tbb_userguide/Images/concurrent_tasks.png differ diff --git a/doc/main/tbb_userguide/Images/concurrent_tasks_canceled.png b/doc/main/tbb_userguide/Images/concurrent_tasks_canceled.png new file mode 100644 index 0000000000..ddcbb688f3 Binary files /dev/null and b/doc/main/tbb_userguide/Images/concurrent_tasks_canceled.png differ diff --git a/doc/main/tbb_userguide/Parallelizing_with_Tasks.rst b/doc/main/tbb_userguide/Parallelizing_with_Tasks.rst new file mode 100644 index 0000000000..d33c553264 --- /dev/null +++ b/doc/main/tbb_userguide/Parallelizing_with_Tasks.rst @@ -0,0 +1,17 @@ +.. _Parallelizing_with_Tasks: + +Parallelizing with Tasks +======================== + +When parallel loops or the flow graph are not sufficient, the |full_name| +library supports parallelization directly with tasks. Tasks +can be created using the function ``oneapi::tbb::parallel_invoke`` or +the class ``oneapi::tbb::task_group``. + + +.. toctree:: + :maxdepth: 4 + + ../tbb_userguide/creating_tasks_with_parallel_invoke + ../tbb_userguide/creating_tasks_with_task_group + ../tbb_userguide/task_group_thread_safety diff --git a/doc/main/tbb_userguide/creating_tasks_with_parallel_invoke.rst b/doc/main/tbb_userguide/creating_tasks_with_parallel_invoke.rst new file mode 100644 index 0000000000..de13b66ae5 --- /dev/null +++ b/doc/main/tbb_userguide/creating_tasks_with_parallel_invoke.rst @@ -0,0 +1,55 @@ +.. _creating_tasks_with_parallel_invoke: + +Creating Tasks with parallel_invoke +=================================== + +Suppose you want to search a binary tree for the node that contains a specific value. +Here is sequential code to do this: + +Nodes are represented by ``struct TreeNode``: + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_treenode*/ + :end-before: /*end_treenode*/ + +The function ``serial_tree_search`` is a recursive algorithm that checks the current node +and, if the value is not found, calls itself on the left and right subtree. + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_search_serial*/ + :end-before: /*end_search_serial*/ + + +To improve performance, you can use ``oneapi::tbb::parallel_invoke`` to search the tree +in parallel: + +A recursive base case is used after a minimum size threshold is reached to avoid parallel overheads. +Since more than one thread can call the base case concurrently as part of the same tree, ``result`` +is held in an atomic variable. + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_sequential_tree_search*/ + :end-before: /*end_sequential_tree_search*/ + +The function ``oneapi::tbb::parallel_invoke`` runs multiple independent tasks in parallel. +Here is a function ``parallel_invoke_search``, where two lambdas are passed that define tasks +that search the left and right subtrees of the current node in parallel: + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_parallel_invoke_search*/ + :end-before: /*end_parallel_invoke_search*/ + +If the value is found, the pointer to the node that contains the value is stored +in the ``std::atomic result``. This example uses recursion to create many tasks, instead of +just two. The depth of the parallel recursion is limited by the ``depth_threshold`` parameter. After this depth is +reached, the search falls back to a sequential approach. The value of ``result`` is periodically checked +to see if the value has been found by other concurrent tasks, and if so, the search in the current task is +terminated. Because multiple threads may access ``result`` concurrently, an atomic variable is used, even +from the sequential base case. + +Because ``oneapi::tbb::parallel_invoke`` is a fork-join algorithm, each level of the recursion does not +complete until both the left and right subtrees have completed. \ No newline at end of file diff --git a/doc/main/tbb_userguide/creating_tasks_with_task_group.rst b/doc/main/tbb_userguide/creating_tasks_with_task_group.rst new file mode 100644 index 0000000000..6c888b8f4e --- /dev/null +++ b/doc/main/tbb_userguide/creating_tasks_with_task_group.rst @@ -0,0 +1,34 @@ +.. _creating_tasks_with_task_group: + +Creating Tasks with task_group +============================== + +The |full_name| library supports parallelization directly with tasks. The class +``oneapi::tbb::task_group`` is used to run and wait for tasks in a less +structured way than ``oneapi::tbb::parallel_invoke``. It is useful when you want to +create a set of tasks that can be run in parallel, but you do not know how many there +will be in advance. + +Here is code that uses ``oneapi::tbb::task_group`` to implement a parallel search in a +binary tree. This code uses ``struct TreeNode`` and the function ``sequential_tree_search`` +that are described in :ref:`creating_tasks_with_parallel_invoke`. + +In ``parallel_tree_search_impl``, ``task_group::run`` is used to create new tasks for searching +in the subtrees. The recursion does not wait on the ``task_group`` at each level. + +.. literalinclude:: ./examples/task_examples.cpp + :language: c++ + :start-after: /*begin_parallel_search*/ + :end-before: /*end_parallel_search*/ + +This example uses recursion to create many tasks. The depth of the parallel recursion is +limited by the ``depth_threshold`` parameter. After this depth is reached, no new tasks +are created. The value of ``result`` is periodically checked to see if the value has been +found by other concurrent tasks, and if so, the search in the current task is terminated. + +In this example, tasks that execute within the ``task_group tg`` create additional tasks +by calling ``run`` on the same ``task_group`` object. These calls are thread-safe and the +call to ``tg.wait()`` will block until all of these tasks are complete. Although +tasks might be added from different worker threads, these additions are logically nested +within the top-most calls to ``tg.run``. There is therefore no race in adding these tasks from +the worker threads and waiting for them in the main thread. \ No newline at end of file diff --git a/doc/main/tbb_userguide/examples/task_examples.cpp b/doc/main/tbb_userguide/examples/task_examples.cpp new file mode 100644 index 0000000000..104bd06e59 --- /dev/null +++ b/doc/main/tbb_userguide/examples/task_examples.cpp @@ -0,0 +1,315 @@ +/* + Copyright (c) 2025 UXL Foundation Contributors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "oneapi/tbb.h" + +const int initial_depth_threshold = 10; + +/*begin_treenode*/ +struct TreeNode { + int value; + TreeNode* left = nullptr; + TreeNode* right = nullptr; + + ~TreeNode() { + delete left; + delete right; + } +}; +/*end_treenode*/ + +/*begin_search_serial*/ +void serial_tree_search(TreeNode* node, int target, TreeNode*& result) { + if (node && !result) { + if (node->value == target) { + result = node; + } else { + serial_tree_search(node->left, target, result); + serial_tree_search(node->right, target, result); + } + } +} +/*end_search_serial*/ + +/*begin_sequential_tree_search*/ +void sequential_tree_search(TreeNode* node, int target, std::atomic& result) { + if (node && !result.load()) { + if (node->value == target) { + result.store(node); // overwrite is ok since any result is valid + } else { + sequential_tree_search(node->left, target, result); + sequential_tree_search(node->right, target, result); + } + } +} +/*end_sequential_tree_search*/ + +/*begin_parallel_invoke_search*/ +void parallel_invoke_search(TreeNode* node, int target, std::atomic& result, + size_t depth_threshold = initial_depth_threshold) { + if (node && !result.load()) { + if (node->value == target) { + result.store(node); // overwrite is ok since any result is valid + } else if (depth_threshold == 0) { + sequential_tree_search(node, target, result); + } else { + tbb::parallel_invoke( + [&] { parallel_invoke_search(node->left, target, result, depth_threshold - 1); }, + [&] { parallel_invoke_search(node->right, target, result, depth_threshold - 1); } + ); + } + } +} +/*end_parallel_invoke_search*/ + +/*begin_parallel_search*/ +void parallel_tree_search_impl(tbb::task_group& tg, TreeNode* node, int target, + std::atomic& result, + size_t depth_threshold = initial_depth_threshold) { + if (node && !result.load()) { + if (node->value == target) { + result.store(node); // overwrite is ok since any result is valid + } else if (depth_threshold == 0) { + sequential_tree_search(node, target, result); + } else { + // Run on left and right subtrees in parallel + tg.run([node, target, &result, &tg, depth_threshold] { + parallel_tree_search_impl(tg, node->left, target, result, depth_threshold - 1); + }); + tg.run([node, target, &result, &tg, depth_threshold] { + parallel_tree_search_impl(tg, node->right, target, result, depth_threshold - 1); + }); + } + } +} + +TreeNode* parallel_tree_search(TreeNode* root, int target) { + if (!root) return nullptr; + + std::atomic result{nullptr}; + tbb::task_group tg; + + // Start the divide and conquer search with a single task group + parallel_tree_search_impl(tg, root, target, result); + + // Wait for all tasks to complete at the outermost level + tg.wait(); + + return result.load(); +} +/*end_parallel_search*/ + +/*begin_parallel_search_cancellation*/ +void parallel_tree_search_cancellable_impl(tbb::task_group& tg, TreeNode* node, int target, + std::atomic& result, + size_t depth_threshold = initial_depth_threshold) { + // tbb::is_current_task_group_canceling() checks asoociated task_group_context + if (node && !tbb::is_current_task_group_canceling()) { + if (node->value == target) { + result.store(node); // overwrite is ok since any result is valid + // cancel the task_group_context associated with task_group + tg.cancel(); // multiple cancellations are ok due to single wait + } else if (depth_threshold == 0) { + sequential_tree_search(node, target, result); + if (result.load() != nullptr) { + // cancel the task_group_context associated with task_group + tg.cancel(); // multiple cancellations are ok due to single wait + } + } else { + // Run on left and right subtrees in parallel + tg.run([node, target, &result, &tg, depth_threshold] { + parallel_tree_search_cancellable_impl(tg, node->left, target, result, + depth_threshold - 1); + }); + tg.run([node, target, &result, &tg, depth_threshold] { + parallel_tree_search_cancellable_impl(tg, node->right, target, result, + depth_threshold - 1); + }); + } + } +} + +TreeNode* parallel_tree_search_cancellable(TreeNode* root, int target) { + if (!root) return nullptr; + + std::atomic result{nullptr}; + tbb::task_group tg; + + parallel_tree_search_cancellable_impl(tg, root, target, result); + + tg.wait(); + + return result.load(); +} +/*end_parallel_search_cancellation*/ + +// Helper function to generate a random binary tree +TreeNode* generate_random_tree(size_t num_nodes, std::mt19937& gen, + std::uniform_int_distribution& dist, int& target) { + if (num_nodes == 0) return nullptr; + + // Generate unique values for all nodes + std::size_t universe_size = dist.b(); + std::vector universe(universe_size); + std::iota(universe.begin(), universe.end(), 1); + std::vector unique_values(num_nodes); + for (size_t i = 0; i < num_nodes; ++i, --universe_size) { + const std::size_t index = (dist(gen) - 1) % universe_size; + unique_values[i] = universe[index]; + std::swap(universe[index], universe[universe_size - 1]); + } + + + // Build tree using unique values + auto root = new TreeNode{unique_values[0]}; + std::queue queue; + queue.push(root); + + size_t value_index = 1; + + while (!queue.empty() && value_index < num_nodes) { + TreeNode* current = queue.front(); + queue.pop(); + + // Add left child + if (value_index < num_nodes) { + current->left = new TreeNode{unique_values[value_index]}; + queue.push(current->left); + value_index++; + if (value_index == num_nodes) + target = current->left->value; + } + + // Add right child + if (value_index < num_nodes) { + current->right = new TreeNode{unique_values[value_index]}; + queue.push(current->right); + value_index++; + if (value_index == num_nodes) + target = current->right->value; + } + } + + return root; +} + +// Example usage and test function +int main() { + // Generate binary tree with 1 million nodes + const size_t num_nodes = 10000000; + + // Random number generation setup + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(1, 100000000); + + std::cout << "Generating binary tree with " << num_nodes << " nodes...\n"; + int target = -1; + TreeNode* root = generate_random_tree(num_nodes, gen, dist, target); + + // Find a value that exists in the tree (use the root value as target) + + std::cout << "Target value: " << target << " (guaranteed to exist)\n"; + std::cout << "Testing tree search algorithms on " << num_nodes << " nodes:\n"; + + // Serial version with timing + std::cout << "\nSerial tree search:\n"; + auto start = std::chrono::high_resolution_clock::now(); + TreeNode* serial_result = nullptr; + serial_tree_search(root, target, serial_result); + auto end = std::chrono::high_resolution_clock::now(); + auto serial_duration = std::chrono::duration_cast(end - start); + + if (serial_result) { + std::cout << "Found " << target << " (node address: " << serial_result << ")\n"; + } else { + std::cout << "Target " << target << " not found" << std::endl; + } + std::cout << "Serial search time: " << serial_duration.count() << " us\n"; + + // parallel_invoke version with timing + std::cout << "\nparallel_invoke search:\n"; + start = std::chrono::high_resolution_clock::now(); + TreeNode* parallel_invoke_result = parallel_tree_search(root, target); + end = std::chrono::high_resolution_clock::now(); + auto parallel_invoke_duration = std::chrono::duration_cast(end - start); + if (parallel_invoke_result) { + std::cout << "Found " << target << " (node address: " << parallel_invoke_result << ")\n"; + } else { + std::cout << "Target " << target << " not found" << std::endl; + } + std::cout << "Parallel search time: " << parallel_invoke_duration.count() << " us\n"; + + // Parallel version (basic) with timing + std::cout << "\nParallel tree search (basic):\n"; + start = std::chrono::high_resolution_clock::now(); + TreeNode* parallel_result = parallel_tree_search(root, target); + end = std::chrono::high_resolution_clock::now(); + auto parallel_duration = std::chrono::duration_cast(end - start); + + if (parallel_result) { + std::cout << "Found " << target << " (node address: " << parallel_result << ")\n"; + } else { + std::cout << "Target " << target << " not found" << std::endl; + } + std::cout << "Parallel search time: " << parallel_duration.count() << " us\n"; + + // Parallel version with cancellation and timing + std::cout << "\nParallel tree search (with cancellation):\n"; + start = std::chrono::high_resolution_clock::now(); + TreeNode* cancellation_result = parallel_tree_search_cancellable(root, target); + end = std::chrono::high_resolution_clock::now(); + auto cancellation_duration = std::chrono::duration_cast(end - start); + + if (cancellation_result) { + std::cout << "Found " << target << " (node address: " << cancellation_result << ")\n"; + } else { + std::cout << "Target " << target << " not found" << std::endl; + } + std::cout << "Parallel search with cancellation time: " << cancellation_duration.count() << " us\n"; + + // Performance comparison + std::cout << "\nPerformance Summary:\n"; + std::cout << "Serial search: " << serial_duration.count() << " us\n"; + std::cout << "Parallel search (invoke): " << parallel_invoke_duration.count() << " us\n"; + std::cout << "Parallel search (basic): " << parallel_duration.count() << " us\n"; + std::cout << "Parallel search (cancellation): " << cancellation_duration.count() << " us\n"; + + if (serial_duration.count() > 0) { + double speedup_invoke = static_cast(serial_duration.count()) / parallel_invoke_duration.count(); + double speedup_basic = static_cast(serial_duration.count()) / parallel_duration.count(); + double speedup_cancel = static_cast(serial_duration.count()) / cancellation_duration.count(); + std::cout << "Speedup (invoke): " << speedup_invoke << "x\n"; + std::cout << "Speedup (basic): " << speedup_basic << "x\n"; + std::cout << "Speedup (cancellation): " << speedup_cancel << "x\n"; + } + + // Clean up the tree + delete root; + + return 0; +} diff --git a/doc/main/tbb_userguide/task_group_thread_safety.rst b/doc/main/tbb_userguide/task_group_thread_safety.rst new file mode 100644 index 0000000000..1fff4bf741 --- /dev/null +++ b/doc/main/tbb_userguide/task_group_thread_safety.rst @@ -0,0 +1,92 @@ +.. _task_group_thread_safety: + +task_group Thread Safety +======================== + +The use of a shared ``task_group`` object across different threads is safe and easy to reason about +in many common cases, such as recursive algorithms. But sometimes it is difficult +to reason about the concurrent use of a single shared ``task_group`` across threads. + +In :ref:`creating_tasks_with_task_group`, tasks that are executing within a ``task_group`` add more tasks +by calling ``run`` on the same ``task_group`` object. The call to ``task_group::wait`` is then made +from the single thread that started the recursive parallel algorithm. The calls to ``run`` are logically nested +within a recursive algorithm and that single call to ``wait`` is guaranteed to wait +for all of the children tasks, even those added from other worker threads. + +In less structured cases with calls to both ``task_group::run`` and ``task_group::wait`` on the same object +from different threads, the behavior is more sophisticated. Here is a diagram that shows a single ``task_group`` +object that is accessed in parallel by three different threads. Each thread runs some tasks and then calls wait +on the shared ``task_group``: + + +.. container:: fignone + :name: concurrent_tasks + + + .. container:: imagecenter + + + |image0| + + +If none of the tasks run in the ``task_group`` throw an exception or cancel the execution of the +``task_group``, there are two execution guarantees. + +First, all tasks created by calls to ``run`` that *happen before* a call to ``wait`` on the same thread +are guaranteed to be complete when the call to wait returns. So for example, the thread that runs the +`A` tasks is guaranteed to wait for all the `A` tasks in its call to ``wait``. + +Second, any ``run`` that *inter-thread happens before* a call to ``wait`` on another thread will be complete +when that call to ``wait`` returns. + +Both of these guarantees mean that if you use C++ mechanisms to order the calls to ``run`` +and ``wait`` on the same ``task_group``, this ordering will be respected. But if you do not +enforce an ordering, task submissions and waits on different threads are not synchronized. +That is, it is unknown if a task in the ``task_group`` will be complete when +a ``wait`` on that ``task_group`` by another thread returns. + +Use of cancellation or exceptions complicates the semantics of concurrent calls to ``wait`` +on the same ``task_group`` object. An example of task cancellation in a ``task_group`` can +be found in :ref:`Cancellation_Without_An_Exception`. + +The ``task_group::wait`` function resets the ``task_group_context`` associated +with the ``task_group``. Cancellations or exceptions combined with concurrent calls to ``wait`` on a shared +``task_group`` can result in behavior that may seem unexpected or difficult to reason about. + +The following diagram shows three threads that call ``run``, ``wait`` and ``cancel`` on a +shared ``task_group``. The execution guarantees described above no longer hold for this example. + +.. container:: fignone + :name: concurrent_tasks_canceled + + + .. container:: imagecenter + + + |image1| + +.. |image0| image:: Images/concurrent_tasks.png + :width: 600px +.. |image1| image:: Images/concurrent_tasks_canceled.png + :width: 600px + +In this diagram, there is no guarantee that all of the `A` tasks complete, since an intervening call +to ``cancel`` on another thread may cancel their execution. + +For the thread that runs the `B` tasks, we may expect the call to ``wait`` to return a status of +``canceled``. However, an intervening call to ``wait`` on another thread, which results in a reset of the +``task_group_context``, may cause the call to ``wait`` on the thread that executes the `B` tasks to return +a status other than ``canceled``. + +A thread that has canceled a ``task_group`` and then ``runs`` more tasks may see those tasks execute because +a ``wait`` on another thread completed and reset the ``task_group`` before those tasks were ``run``, effectively +uncanceling the ``task_group``. + +Exceptions that are thrown from a ``task_group`` task cause the cancellation of the ``task_group``, so +applications that throw exceptions and have concurrent waits can have similarly complicated behavior. In +addition, exceptions that originate in tasks ``run`` in a ``task_group`` by one thread may propagate to the call +to ``wait`` on another thread. + +Due to the lack of meaningful guarantees for cancellation and exception handling in these situations, +we recommend that concurrent calls to ``wait`` be used only in cases where there is no possibility of +concurrent cancellations or exceptions. diff --git a/doc/main/tbb_userguide/title.rst b/doc/main/tbb_userguide/title.rst index 8adb7093fe..435d0335d2 100644 --- a/doc/main/tbb_userguide/title.rst +++ b/doc/main/tbb_userguide/title.rst @@ -11,6 +11,7 @@ ../tbb_userguide/Package_Contents_os ../tbb_userguide/Parallelizing_Simple_Loops_os ../tbb_userguide/Parallelizing_Complex_Loops + ../tbb_userguide/Parallelizing_with_Tasks ../tbb_userguide/Flow_Graph ../tbb_userguide/work_isolation ../tbb_userguide/Exceptions_and_Cancellation