Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
190c82a
WIP async bank processing during import
smirnov-alexey Oct 15, 2025
4c3da31
Always evaluate bank async
smirnov-alexey Oct 16, 2025
7eb64b4
Move resets after bank's finalized
smirnov-alexey Oct 17, 2025
83c43dd
Merge branch 'master' into as/npuw_async_bank_finalization
smirnov-alexey Oct 19, 2025
5e12d4d
Merge branch 'master' into as/npuw_async_bank_finalization
smirnov-alexey Oct 20, 2025
69917c7
Add one more wait() to prevent data race
smirnov-alexey Oct 20, 2025
d414e58
Guard closure for async weights evaluation
smirnov-alexey Oct 21, 2025
c4ebbab
Address review comments
smirnov-alexey Oct 22, 2025
e38bd1e
Clean up
smirnov-alexey Oct 22, 2025
a702e30
Merge branch 'master' into as/npuw_async_bank_closure_guard
smirnov-alexey Oct 22, 2025
5c24d9c
Refactoring
smirnov-alexey Oct 22, 2025
d11e98d
Merge branch 'master' into as/npuw_async_bank_closure_guard
smirnov-alexey Oct 22, 2025
49ff036
Merge branch 'master' into as/npuw_async_bank_closure_guard
smirnov-alexey Oct 24, 2025
7910996
Guard closure-related members as well
smirnov-alexey Oct 24, 2025
764b033
Remove evaluation metric
smirnov-alexey Oct 24, 2025
c7c1318
Remove incorrect test and fix UB
smirnov-alexey Oct 26, 2025
e4e3020
Enable test back
smirnov-alexey Oct 26, 2025
91c714a
Wait in destructor
smirnov-alexey Oct 26, 2025
6e676a2
Keep weights bank alive until future is finished
smirnov-alexey Oct 27, 2025
70153fc
Add futures for all functions
smirnov-alexey Oct 28, 2025
481b660
Address review comments
smirnov-alexey Oct 28, 2025
2fd7a90
Address review comments
smirnov-alexey Oct 28, 2025
9da86b7
Fix missing reference
smirnov-alexey Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,10 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
std::vector<std::size_t> closure_unpack_required;
std::vector<std::size_t> closure_copy_required;

for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.size(); cidx++) {
auto& closure = comp_model_desc.closure[cidx];
auto& desc_closure = comp_model_desc.closure.get().closure;

for (std::size_t cidx = 0u; cidx < desc_closure.size(); cidx++) {
auto& closure = desc_closure[cidx];
const auto closure_param_id = comp_model_desc.param_base + cidx;

if (m_npuw_model->is_gather_closure(idx, cidx)) {
Expand Down Expand Up @@ -440,7 +442,7 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
// m_ms_unpack += ov::npuw::perf::ms_to_run([&](){
ov::parallel_for(closure_copy_required.size(), [&](std::size_t j) {
auto cidx = closure_copy_required[j];
auto& closure = comp_model_desc.closure[cidx];
auto& closure = desc_closure[cidx];
const auto closure_param_id = comp_model_desc.param_base + cidx;
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
auto clparam = request->get_tensor(iport);
Expand All @@ -455,7 +457,7 @@ void ov::npuw::IBaseInferRequest::unpack_closure(std::size_t idx, RqPtr request)
auto cidx = closure_unpack_required[j];

// FIXME: zerops are stored with absolute indexing, this needs to be aligned
auto& closure = comp_model_desc.closure[cidx];
auto& closure = desc_closure[cidx];

const auto closure_param_id = comp_model_desc.param_base + cidx;
auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
Expand Down Expand Up @@ -565,7 +567,8 @@ void ov::npuw::IBaseInferRequest::bind_global_params(std::size_t idx, RqPtr requ
const auto& gport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.dst_idx];
const auto gather = request->get_tensor(gport);

const auto& vocab = comp_model_desc.closure[comp_model_desc.host_gather.src_idx - comp_model_desc.param_base];
const auto& vocab =
comp_model_desc.closure.get().closure[comp_model_desc.host_gather.src_idx - comp_model_desc.param_base];
const auto& lport = comp_model_desc.compiled_model->inputs()[comp_model_desc.host_gather.idx_idx];
const auto lookup = request->get_tensor(lport);

Expand Down Expand Up @@ -926,7 +929,7 @@ bool ov::npuw::IBaseInferRequest::needs_copy(std::size_t idx, std::size_t cidx)
return false;
}
auto& comp_model_desc = m_npuw_model->m_compiled_submodels[idx];
if (comp_model_desc.is_remote[cidx]) {
if (comp_model_desc.closure.get().is_remote[cidx]) {
// FIXME: Test if the tensor device and the request device are
// the same or compatible!
return false;
Expand Down
164 changes: 98 additions & 66 deletions src/plugins/intel_npu/src/plugin/npuw/compiled_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ ov::npuw::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
// - dump the subgraphs, if necessary
std::map<std::string, std::size_t> compiledFunctions;
m_compiled_submodels.resize(orderedSubgraphs.size());

const std::size_t end_sub_idx = orderedSubgraphs.size();

const std::string dump_sub_opt = m_cfg.get<::intel_npu::NPUW_DUMP_SUBS>();
Expand Down Expand Up @@ -382,16 +383,18 @@ ov::npuw::CompiledModel::CompiledModel(const std::shared_ptr<ov::Model>& model,
m_compiled_submodels[id].replaced_by = compiled_fcn_iter->second;
LOG_INFO("Subgraph[" << id << "] is a function call to [" << compiled_fcn_iter->second << "]");
}
auto& closure_desc = m_compiled_submodels[id].closure.get();

m_compiled_submodels[id].host_gather = subgraph._host_gather;
m_compiled_submodels[id].quant_unpack_gather = subgraph._quant_unpack_gather;
m_compiled_submodels[id].param_base = fcn_template._param_offset;
m_compiled_submodels[id].closure = subgraph._closure;
closure_desc.closure = subgraph._closure;
m_compiled_submodels[id].lazy_closure = subgraph._lazy_closure;
m_compiled_submodels[id].closure_uid.resize(m_compiled_submodels[id].closure.size(), -1);
closure_desc.closure_uid.resize(subgraph._closure.size(), -1);
m_compiled_submodels[id].scales = subgraph._scales;
m_compiled_submodels[id].zerops = subgraph._zerops;
m_compiled_submodels[id].forced_to_fcall = subgraph._forced_to_fcall;
m_compiled_submodels[id].is_remote.resize(m_compiled_submodels[id].closure.size(), false);
closure_desc.is_remote.resize(subgraph._closure.size(), false);
} // if(!funcall)

if (!m_compiled_submodels[id].model && !m_compiled_submodels[id].replaced_by) {
Expand Down Expand Up @@ -649,22 +652,24 @@ void ov::npuw::CompiledModel::CompiledModelDesc::serialize(std::ostream& stream,
write(stream, spatial);
write(stream, attention);

write(stream, is_remote);
write(stream, closure_uid);
auto& closure_desc = closure.get();

write(stream, closure_desc.is_remote);
write(stream, closure_desc.closure_uid);

if (ctx.is_weightless) {
write_weightless(stream, scales, ctx);
write_weightless(stream, zerops, ctx);

write(stream, closure.size());
write(stream, closure_desc.closure.size());
std::vector<ov::Tensor> cpu_closures;
std::vector<std::size_t> cpu_closure_ids;
std::vector<ov::npuw::weights::LazyTensor> non_cpu_tensors;
std::vector<std::size_t> non_cpu_tensors_ids;
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
if (closure_uid[cidx] == -1) { // CPU closure
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
if (closure_desc.closure_uid[cidx] == -1) { // CPU closure
cpu_closure_ids.push_back(cidx);
cpu_closures.push_back(closure[cidx]);
cpu_closures.push_back(closure_desc.closure[cidx]);
} else {
non_cpu_tensors_ids.push_back(cidx);
non_cpu_tensors.push_back(lazy_closure[cidx]); // must be there
Expand All @@ -679,13 +684,13 @@ void ov::npuw::CompiledModel::CompiledModelDesc::serialize(std::ostream& stream,
write(stream, scales);
write(stream, zerops);

write(stream, closure.size());
write(stream, closure_desc.closure.size());
std::vector<ov::Tensor> cpu_closures;
std::vector<std::size_t> cpu_closure_ids;
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
if (closure_uid[cidx] == -1) { // CPU closure, not in the bank
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
if (closure_desc.closure_uid[cidx] == -1) { // CPU closure, not in the bank
cpu_closure_ids.push_back(cidx);
cpu_closures.push_back(closure[cidx]);
cpu_closures.push_back(closure_desc.closure[cidx]);
}
}

Expand Down Expand Up @@ -724,16 +729,18 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
read(stream, spatial);
read(stream, attention);

read(stream, is_remote);
read(stream, closure_uid);
auto& closure_desc = closure.get();

read(stream, closure_desc.is_remote);
read(stream, closure_desc.closure_uid);

if (ctx.weights || !ctx.consts_cache.empty()) {
read_weightless(stream, scales, ctx);
read_weightless(stream, zerops, ctx);

std::size_t closure_size = 0;
read(stream, closure_size);
closure.resize(closure_size);
closure_desc.closure.resize(closure_size);
lazy_closure.resize(closure_size);

std::vector<std::size_t> cpu_closure_ids;
Expand All @@ -743,7 +750,7 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
read_weightless(stream, cpu_closures, ctx);
std::size_t tidx = 0;
for (const auto& idx : cpu_closure_ids) {
closure[idx] = std::move(cpu_closures[tidx++]);
closure_desc.closure[idx] = std::move(cpu_closures[tidx++]);
}

std::vector<std::size_t> non_cpu_tensors_ids;
Expand All @@ -757,8 +764,9 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
}

// Also read weights into LazyTensors
for (std::size_t cidx = 0; cidx < closure.size(); ++cidx) {
if (closure_uid[cidx] != -1 && lazy_closure[cidx]) { // previously registered before serialization
for (std::size_t cidx = 0; cidx < closure_desc.closure.size(); ++cidx) {
if (closure_desc.closure_uid[cidx] != -1 &&
lazy_closure[cidx]) { // previously registered before serialization
lazy_closure[cidx].read_weight(ctx);
}
}
Expand All @@ -770,15 +778,21 @@ void ov::npuw::CompiledModel::CompiledModelDesc::deserialize(std::istream& strea
read(stream, closure_size);
std::vector<std::size_t> cpu_closure_ids;
read(stream, cpu_closure_ids);
closure.resize(closure_size);
closure_desc.closure.resize(closure_size);
for (const auto& cidx : cpu_closure_ids) {
read(stream, closure[cidx]);
read(stream, closure_desc.closure[cidx]);
}
}

LOG_DEBUG("DONE.");
}

ov::npuw::CompiledModel::~CompiledModel() {
if (m_eval_future.valid()) {
m_eval_future.wait();
}
Comment on lines +791 to +793
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet coverity will have something to say about that as it may throw.. so it may be safer to have it in the try {} catch (...) block. But let's see

}

void ov::npuw::CompiledModel::export_model(std::ostream& stream) const {
using namespace ov::npuw::s11n;

Expand Down Expand Up @@ -912,7 +926,6 @@ std::shared_ptr<ov::npuw::CompiledModel> ov::npuw::CompiledModel::import_model(
if (is_weightless) {
compiled->m_weights_bank = ov::npuw::weights::bank(bank_name, compiled->get_plugin()->get_core(), "");
compiled->finalize_weights_bank();
compiled->m_import_weights_ctx.reset();
} else {
compiled->m_weights_bank =
ov::npuw::weights::Bank::deserialize(model_stream, compiled->get_plugin()->get_core(), bank_name);
Expand Down Expand Up @@ -1013,10 +1026,13 @@ void ov::npuw::CompiledModel::serialize(std::ostream& stream, const ov::npuw::s1

// Serialize compiled submodels
write(model_stream, m_compiled_submodels.size());
for (const auto& subm : m_compiled_submodels) {
for (std::size_t i = 0; i < m_compiled_submodels.size(); ++i) {
auto& subm = m_compiled_submodels[i];
auto real_idx = subm.replaced_by.value_or(i);
// Write device idx
std::size_t device_idx = subm.device_it - m_dev_list.begin();
Comment on lines 1017 to -1018
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how.. did this work before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea, but the behavior is unidentified here

write(model_stream, device_idx);
// FIXME: if there is no compiled submodel, device_it is not set.
std::size_t device_idx = m_compiled_submodels[real_idx].device_it - m_dev_list.begin();
write(model_stream, real_idx == i ? device_idx : 0);
// Write ICompiledModel if it's there
if (subm.compiled_model) {
write(model_stream, true);
Expand Down Expand Up @@ -1224,49 +1240,79 @@ void ov::npuw::CompiledModel::reconstruct_closure() {

const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
auto& func_desc = m_compiled_submodels[real_idx];
auto& desc_closure = comp_model_desc.closure.get();

for (std::size_t cidx = 0; cidx < comp_model_desc.closure.size(); ++cidx) {
if (comp_model_desc.closure[cidx]) {
for (std::size_t cidx = 0; cidx < desc_closure.closure.size(); ++cidx) {
if (desc_closure.closure[cidx]) {
// host-side closure - already set, do nothing
NPUW_ASSERT(!comp_model_desc.is_remote[cidx]);
NPUW_ASSERT(!desc_closure.is_remote[cidx]);
continue;
}
NPUW_ASSERT(comp_model_desc.closure_uid[cidx] != -1);
comp_model_desc.closure[cidx] =
m_weights_bank->get(comp_model_desc.closure_uid[cidx], *func_desc.device_it);
NPUW_ASSERT(desc_closure.closure_uid[cidx] != -1);
desc_closure.closure[cidx] = m_weights_bank->get(desc_closure.closure_uid[cidx], *func_desc.device_it);
}
}
}

void ov::npuw::CompiledModel::finalize_weights_bank() {
LOG_INFO("Finalizing weights bank...");
// Register lazy tensors
for (std::size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
auto& comp_model_desc = m_compiled_submodels[idx];
std::shared_future<void> weights_bank_evaluation = std::async(std::launch::async, [&]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is the CompiledModel the entity which initiates the async request, IMO it is right for this same entity to track the status too. Otherwise you trigger it here but you wait in each compiled desc for some reason. If you just add this as a class member, you can just wait for this object alone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Register lazy tensors
for (std::size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
auto& comp_model_desc = m_compiled_submodels[idx];

// Skip optimized out and non-functions
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
continue;
}
// Skip optimized out and non-functions
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
continue;
}

const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
auto& func_desc = m_compiled_submodels[real_idx];
const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
auto& func_desc = m_compiled_submodels[real_idx];

for (std::size_t tidx = 0; tidx < comp_model_desc.lazy_closure.size(); ++tidx) {
if (comp_model_desc.closure[tidx]) {
continue; // host-side closure
for (std::size_t tidx = 0; tidx < comp_model_desc.lazy_closure.size(); ++tidx) {
if (comp_model_desc.closure.unsafe_get().closure[tidx]) {
continue; // host-side closure
}
comp_model_desc.closure.unsafe_get().closure_uid[tidx] =
m_weights_bank->registerLT(comp_model_desc.lazy_closure[tidx], *func_desc.device_it);
}
comp_model_desc.closure_uid[tidx] =
m_weights_bank->registerLT(comp_model_desc.lazy_closure[tidx], *func_desc.device_it);
}
}

// Evaluate and allocate all LazyTensors inside the bank
m_profile["weights bank"].record([&]() {
// Evaluate and allocate all LazyTensors inside the bank
m_weights_bank->evaluate_and_allocate();

// Set evaluated and allocated ov::Tensors to closures
for (size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
auto& comp_model_desc = m_compiled_submodels[idx];

// Skip optimized out and non-functions
if (!comp_model_desc.compiled_model && !comp_model_desc.replaced_by) {
continue;
}

const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
auto& func_desc = m_compiled_submodels[real_idx];
auto& desc_closure = comp_model_desc.closure.unsafe_get();

for (std::size_t tidx = 0; tidx < desc_closure.closure.size(); ++tidx) {
if (desc_closure.closure[tidx]) {
// host-side closure - already set, do nothing
desc_closure.is_remote[tidx] = false;
continue;
}
const auto& uid = desc_closure.closure_uid[tidx];
NPUW_ASSERT(uid != -1); // All tensors should be registered at this point
desc_closure.closure[tidx] = m_weights_bank->get(uid, *func_desc.device_it);
// FIXME: find a more reliable way to do so
desc_closure.is_remote[tidx] = m_weights_bank->is_remote(uid);
}
}

m_import_weights_ctx.reset();
});

// Set evaluated and allocated ov::Tensors to closures
m_eval_future = weights_bank_evaluation;

for (size_t idx = 0; idx < m_compiled_submodels.size(); ++idx) {
auto& comp_model_desc = m_compiled_submodels[idx];

Expand All @@ -1275,21 +1321,7 @@ void ov::npuw::CompiledModel::finalize_weights_bank() {
continue;
}

const auto real_idx = comp_model_desc.replaced_by.value_or(idx);
auto& func_desc = m_compiled_submodels[real_idx];

for (std::size_t tidx = 0; tidx < comp_model_desc.closure.size(); ++tidx) {
if (comp_model_desc.closure[tidx]) {
// host-side closure - already set, do nothing
comp_model_desc.is_remote[tidx] = false;
continue;
}
const auto& uid = comp_model_desc.closure_uid[tidx];
NPUW_ASSERT(uid != -1); // All tensors should be registered at this point
comp_model_desc.closure[tidx] = m_weights_bank->get(uid, *func_desc.device_it);
// FIXME: find a more reliable way to do so
comp_model_desc.is_remote[tidx] = m_weights_bank->is_remote(uid);
}
comp_model_desc.closure.set_future(weights_bank_evaluation);
}

LOG_INFO("Done.");
Expand Down Expand Up @@ -1654,7 +1686,7 @@ std::string ov::npuw::CompiledModel::submodel_device(const std::size_t idx) cons

bool ov::npuw::CompiledModel::unpack_required(const std::size_t idx) const {
auto& comp_model_desc = m_compiled_submodels.at(idx);
for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.size(); cidx++) {
for (std::size_t cidx = 0u; cidx < comp_model_desc.closure.get().closure.size(); cidx++) {
if (unpack_required(idx, cidx)) {
return true;
}
Expand All @@ -1671,7 +1703,7 @@ bool ov::npuw::CompiledModel::unpack_required(const std::size_t idx, const std::
const auto real_idx = comp_model_desc.replaced_by.value();
auto& func_desc = m_compiled_submodels.at(real_idx);

auto& closure = comp_model_desc.closure.at(cidx);
auto& closure = comp_model_desc.closure.get().closure.at(cidx);
const auto closure_param_id = comp_model_desc.param_base + cidx;

auto& iport = func_desc.compiled_model->inputs()[closure_param_id];
Expand Down
Loading
Loading