diff --git a/sycl/test-e2e/Regression/queue_submitted_kernel_oom.cpp b/sycl/test-e2e/Regression/queue_submitted_kernel_oom.cpp new file mode 100644 index 0000000000000..8016e642f29f8 --- /dev/null +++ b/sycl/test-e2e/Regression/queue_submitted_kernel_oom.cpp @@ -0,0 +1,107 @@ + +// RUN: %{build} -o %t.out +// RUN: %{run} %t.out + +#include +#include +#include +#include +#include + +static constexpr std::size_t kUniqueKernels = 256; +static constexpr std::size_t kConsecutiveDupSubmissions = + 5000; // same kernel over and over +static constexpr std::size_t kCyclicSubmissions = + 8000; // cycle over small subset +static constexpr std::size_t kCyclicSubset = 16; // cycle kernel subset +static constexpr std::size_t kAllKernelsSubmissions = + 10000; // running all kernel + +template struct KernelTag; + +template static void submit_increment(sycl::queue &Q, int *accum) { + Q.submit([&](sycl::handler &CGH) { + CGH.single_task>([=]() { + // atomic_ref to avoid data races while we spam submissions. + sycl::atomic_ref + ref(accum[ID]); + ref.fetch_add(1); + }); + }); +} + +using SubmitFn = void (*)(sycl::queue &, int *); + +template +static auto make_fn_table(std::index_sequence) { + return std::array{ + &submit_increment(Is)>...}; +} + +int main() { + sycl::queue Q; + + int *accum = sycl::malloc_shared(kUniqueKernels, Q); + assert(accum && "USM alloc failed"); + for (std::size_t i = 0; i < kUniqueKernels; ++i) + accum[i] = 0; + + std::vector expected(kUniqueKernels, 0); + + auto fns = make_fn_table(std::make_index_sequence{}); + + // Submit the same kernel over and over again. The submitted kernel + // vector shouldn't grow at all, since we do a lookback over + // a few previous kernels. + auto runDuplicates = [&]() { + for (size_t i = 0; i < kConsecutiveDupSubmissions; ++i) { + fns[0](Q, accum); + expected[0]++; + } + }; + + // Run a small subset of kernels in a loop. Likely the most realistic + // scenario. Should be mostly absorbed by loopback duplicate search, and, + // possibliy, compaction. + auto runCyclical = [&]() { + for (size_t i = 0; i < kCyclicSubmissions; ++i) { + size_t id = i % kCyclicSubset; + fns[id](Q, accum); + expected[id]++; + } + }; + + // Run all kernels in the loop. Should dynamically adjust the + // threshold for submitted kernels. + auto runAll = [&]() { + for (size_t i = 0; i < kAllKernelsSubmissions; ++i) { + size_t id = i % kUniqueKernels; + fns[id](Q, accum); + expected[id]++; + } + }; + + // Run from small kernel variety, to large, to small, to test dynamic + // threshold changes. + runDuplicates(); + runCyclical(); + runAll(); + Q.wait(); // this clears the submitted kernels list, allowing the threshold to + // lower. + runCyclical(); + runDuplicates(); + + Q.wait(); + + int ret = 0; + for (std::size_t i = 0; i < kUniqueKernels; ++i) { + if (static_cast(accum[i]) != expected[i]) { + ret = 0; + std::cout << "fail: " << accum[i] << " != " << expected[i] << "\n"; + } + } + + sycl::free(accum, Q); + return ret; +} diff --git a/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.cpp b/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.cpp index e3e88f1aa581c..a4f5647926e00 100644 --- a/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.cpp +++ b/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.cpp @@ -167,10 +167,60 @@ ur_result_t ur_queue_immediate_in_order_t::queueFinish() { void ur_queue_immediate_in_order_t::recordSubmittedKernel( ur_kernel_handle_t hKernel) { + + bool isDuplicate = std::any_of( + submittedKernels.end() - + std::min(SUBMITTED_KERNELS_DUPE_CHECK_DEPTH, submittedKernels.size()), + submittedKernels.end(), [hKernel](auto k) { return k == hKernel; }); + + if (isDuplicate) { + return; + } + + if (submittedKernels.size() > compactionThreshold) { + compactSubmittedKernels(); + } + submittedKernels.push_back(hKernel); hKernel->RefCount.increment(); } +void ur_queue_immediate_in_order_t::compactSubmittedKernels() { + size_t beforeSize = submittedKernels.size(); + + std::sort(submittedKernels.begin(), submittedKernels.end()); + + // Remove all but one unique entry for each kernel. All removed entries + // need to have their refcounts decremented. + auto newEnd = std::unique( + submittedKernels.begin(), submittedKernels.end(), [](auto lhs, auto rhs) { + if (lhs == rhs) { + [[maybe_unused]] const bool lastEntry = + rhs->RefCount.decrementAndTest(); + assert(!lastEntry); // there should be at least one entry left. + return true; // duplicate. + } + return false; + }); + + submittedKernels.erase(newEnd, submittedKernels.end()); + + // Adjust compaction threshold. + size_t removed = beforeSize - submittedKernels.size(); + size_t removedPct = beforeSize > 0 ? (removed * 100) / beforeSize : 0; + if (removedPct > 75) { + // We removed a lot of entries. Lower the threshold if possible. + compactionThreshold = std::max( + SUBMITTED_KERNELS_DEFAULT_THRESHOLD, compactionThreshold / 2); + } else if (removedPct < 10 && + compactionThreshold < SUBMITTED_KERNELS_MAX_THRESHOLD) { + // Increase the threshold if we removed very little entries. This means + // there are many unique kernels, and we need to allow the vector to grow + // more. + compactionThreshold *= 2; + } +} + ur_result_t ur_queue_immediate_in_order_t::queueFlush() { return UR_RESULT_SUCCESS; } diff --git a/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.hpp b/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.hpp index fb7ed9a9b43e9..8567d83e74476 100644 --- a/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.hpp +++ b/unified-runtime/source/adapters/level_zero/v2/queue_immediate_in_order.hpp @@ -27,6 +27,24 @@ namespace v2 { using queue_group_type = ur_device_handle_t_::queue_group_info_t::type; +// When recording submitted kernels, we only care about unique kernels. It's not +// important whether the kernel has been submitted to the kernel just once or +// dozens of times. The number of unique kernels should be fairly low. +// So, in order to reduce the number of entries in the submitted kernels vector, +// we do a lookback at 4 previous entries (to try to keep within a cacheline), +// and don't record a new kernel if it exists. +static const size_t SUBMITTED_KERNELS_DUPE_CHECK_DEPTH = 4; + +// In scenarios where queue synchronization happens rarely, the submitted kernel +// vector can grow unbounded. In order to avoid that, we go through the entire +// vector, eliminating any duplicates. +static const size_t SUBMITTED_KERNELS_DEFAULT_THRESHOLD = 128; + +// If we reach this many unique kernels, the application is probably doing +// something incorrectly. The adapter will still function, just that compaction +// will happen more frequently. +static const size_t SUBMITTED_KERNELS_MAX_THRESHOLD = 65536; + struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ { private: ur_context_handle_t hContext; @@ -35,6 +53,7 @@ struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ { lockable commandListManager; std::vector submittedKernels; + std::size_t compactionThreshold = SUBMITTED_KERNELS_DEFAULT_THRESHOLD; wait_list_view getWaitListView(locked &commandList, @@ -64,6 +83,8 @@ struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ { void recordSubmittedKernel(ur_kernel_handle_t hKernel); + void compactSubmittedKernels(); + public: ur_queue_immediate_in_order_t(ur_context_handle_t, ur_device_handle_t, const ur_queue_properties_t *);