Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions sycl/test-e2e/Regression/queue_submitted_kernel_oom.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@

// RUN: %{build} -o %t.out
// RUN: %{run} %t.out

#include <array>
#include <cassert>
#include <cstdint>
#include <sycl/sycl.hpp>
#include <vector>

static constexpr std::size_t kUniqueKernels = 256;
static constexpr std::size_t kConsecutiveDupSubmissions =
5000; // same kernel over and over
static constexpr std::size_t kCyclicSubmissions =
8000; // cycle over small subset
static constexpr std::size_t kCyclicSubset = 16; // cycle kernel subset
static constexpr std::size_t kAllKernelsSubmissions =
10000; // running all kernel

template <int ID> struct KernelTag;

template <int ID> static void submit_increment(sycl::queue &Q, int *accum) {
Q.submit([&](sycl::handler &CGH) {
CGH.single_task<KernelTag<ID>>([=]() {
// atomic_ref to avoid data races while we spam submissions.
sycl::atomic_ref<int, sycl::memory_order::relaxed,
sycl::memory_scope::device>
ref(accum[ID]);
ref.fetch_add(1);
});
});
}

using SubmitFn = void (*)(sycl::queue &, int *);

template <std::size_t... Is>
static auto make_fn_table(std::index_sequence<Is...>) {
return std::array<SubmitFn, kUniqueKernels>{
&submit_increment<static_cast<int>(Is)>...};
}

int main() {
sycl::queue Q;

int *accum = sycl::malloc_shared<int>(kUniqueKernels, Q);
assert(accum && "USM alloc failed");
for (std::size_t i = 0; i < kUniqueKernels; ++i)
accum[i] = 0;

std::vector<std::size_t> expected(kUniqueKernels, 0);

auto fns = make_fn_table(std::make_index_sequence<kUniqueKernels>{});

// Submit the same kernel over and over again. The submitted kernel
// vector shouldn't grow at all, since we do a lookback over
// a few previous kernels.
auto runDuplicates = [&]() {
for (size_t i = 0; i < kConsecutiveDupSubmissions; ++i) {
fns[0](Q, accum);
expected[0]++;
}
};

// Run a small subset of kernels in a loop. Likely the most realistic
// scenario. Should be mostly absorbed by loopback duplicate search, and,
// possibliy, compaction.
auto runCyclical = [&]() {
for (size_t i = 0; i < kCyclicSubmissions; ++i) {
size_t id = i % kCyclicSubset;
fns[id](Q, accum);
expected[id]++;
}
};

// Run all kernels in the loop. Should dynamically adjust the
// threshold for submitted kernels.
auto runAll = [&]() {
for (size_t i = 0; i < kAllKernelsSubmissions; ++i) {
size_t id = i % kUniqueKernels;
fns[id](Q, accum);
expected[id]++;
}
};

// Run from small kernel variety, to large, to small, to test dynamic
// threshold changes.
runDuplicates();
runCyclical();
runAll();
Q.wait(); // this clears the submitted kernels list, allowing the threshold to
// lower.
runCyclical();
runDuplicates();

Q.wait();

int ret = 0;
for (std::size_t i = 0; i < kUniqueKernels; ++i) {
if (static_cast<std::size_t>(accum[i]) != expected[i]) {
ret = 0;
std::cout << "fail: " << accum[i] << " != " << expected[i] << "\n";
}
}

sycl::free(accum, Q);
return ret;
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,60 @@ ur_result_t ur_queue_immediate_in_order_t::queueFinish() {

void ur_queue_immediate_in_order_t::recordSubmittedKernel(
ur_kernel_handle_t hKernel) {

bool isDuplicate = std::any_of(
submittedKernels.end() -
std::min(SUBMITTED_KERNELS_DUPE_CHECK_DEPTH, submittedKernels.size()),
submittedKernels.end(), [hKernel](auto k) { return k == hKernel; });

if (isDuplicate) {
return;
}

if (submittedKernels.size() > compactionThreshold) {
compactSubmittedKernels();
}

submittedKernels.push_back(hKernel);
hKernel->RefCount.increment();
}

void ur_queue_immediate_in_order_t::compactSubmittedKernels() {
size_t beforeSize = submittedKernels.size();

std::sort(submittedKernels.begin(), submittedKernels.end());

// Remove all but one unique entry for each kernel. All removed entries
// need to have their refcounts decremented.
auto newEnd = std::unique(
submittedKernels.begin(), submittedKernels.end(), [](auto lhs, auto rhs) {
if (lhs == rhs) {
[[maybe_unused]] const bool lastEntry =
rhs->RefCount.decrementAndTest();
assert(!lastEntry); // there should be at least one entry left.
return true; // duplicate.
}
return false;
});

submittedKernels.erase(newEnd, submittedKernels.end());

// Adjust compaction threshold.
size_t removed = beforeSize - submittedKernels.size();
size_t removedPct = beforeSize > 0 ? (removed * 100) / beforeSize : 0;
if (removedPct > 75) {
// We removed a lot of entries. Lower the threshold if possible.
compactionThreshold = std::max<std::size_t>(
SUBMITTED_KERNELS_DEFAULT_THRESHOLD, compactionThreshold / 2);
} else if (removedPct < 10 &&
compactionThreshold < SUBMITTED_KERNELS_MAX_THRESHOLD) {
// Increase the threshold if we removed very little entries. This means
// there are many unique kernels, and we need to allow the vector to grow
// more.
compactionThreshold *= 2;
}
}

ur_result_t ur_queue_immediate_in_order_t::queueFlush() {
return UR_RESULT_SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ namespace v2 {

using queue_group_type = ur_device_handle_t_::queue_group_info_t::type;

// When recording submitted kernels, we only care about unique kernels. It's not
// important whether the kernel has been submitted to the kernel just once or
// dozens of times. The number of unique kernels should be fairly low.
// So, in order to reduce the number of entries in the submitted kernels vector,
// we do a lookback at 4 previous entries (to try to keep within a cacheline),
// and don't record a new kernel if it exists.
static const size_t SUBMITTED_KERNELS_DUPE_CHECK_DEPTH = 4;

// In scenarios where queue synchronization happens rarely, the submitted kernel
// vector can grow unbounded. In order to avoid that, we go through the entire
// vector, eliminating any duplicates.
static const size_t SUBMITTED_KERNELS_DEFAULT_THRESHOLD = 128;

// If we reach this many unique kernels, the application is probably doing
// something incorrectly. The adapter will still function, just that compaction
// will happen more frequently.
static const size_t SUBMITTED_KERNELS_MAX_THRESHOLD = 65536;

struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ {
private:
ur_context_handle_t hContext;
Expand All @@ -35,6 +53,7 @@ struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ {

lockable<ur_command_list_manager> commandListManager;
std::vector<ur_kernel_handle_t> submittedKernels;
std::size_t compactionThreshold = SUBMITTED_KERNELS_DEFAULT_THRESHOLD;

wait_list_view
getWaitListView(locked<ur_command_list_manager> &commandList,
Expand Down Expand Up @@ -64,6 +83,8 @@ struct ur_queue_immediate_in_order_t : _ur_object, public ur_queue_t_ {

void recordSubmittedKernel(ur_kernel_handle_t hKernel);

void compactSubmittedKernels();

public:
ur_queue_immediate_in_order_t(ur_context_handle_t, ur_device_handle_t,
const ur_queue_properties_t *);
Expand Down
Loading