Skip to content

Commit a7834da

Browse files
committed
Fix/improve threadpooling
1 parent c0314c3 commit a7834da

File tree

2 files changed

+55
-16
lines changed

2 files changed

+55
-16
lines changed

src/VecSim/algorithms/svs/svs.h

+29-4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct SVSIndexBase {
2020
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
2121
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
2222
virtual size_t indexStorageSize() const = 0;
23+
virtual size_t getNumThreads() const = 0;
2324
virtual void setNumThreads(size_t numThreads) = 0;
2425
virtual size_t getThreadPoolCapacity() const = 0;
2526
};
@@ -180,14 +181,25 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
180181
// Wrap data into SVS SimpleDataView for SVS API
181182
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};
182183

183-
// SVS index instance cannot be empty, so we have to construct it at first rows
184+
// If n == 1, we should ensure single-threading
185+
const size_t num_threads = (n == 1) ? getNumThreads() : 1;
186+
if (num_threads > 1) {
187+
setNumThreads(1);
188+
}
189+
184190
if (!impl_) {
191+
// SVS index instance cannot be empty, so we have to construct it at first rows
185192
initImpl(points, ids);
186-
return n - deleted_num;
193+
} else {
194+
// Add new points to existing SVS index
195+
impl_->add_points(points, ids);
196+
}
197+
198+
// Restore multi-threading if needed
199+
if (num_threads > 1) {
200+
setNumThreads(num_threads);
187201
}
188202

189-
// Add new points to existing SVS index
190-
impl_->add_points(points, ids);
191203
return n - deleted_num;
192204
}
193205

@@ -209,7 +221,19 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
209221
return 0;
210222
}
211223

224+
// If entries_to_delete.size() == 1, we should ensure single-threading
225+
const size_t num_threads = (entries_to_delete.size() == 1) ? getNumThreads() : 1;
226+
if (num_threads > 1) {
227+
setNumThreads(1);
228+
}
229+
212230
impl_->delete_entries(entries_to_delete);
231+
232+
// Restore multi-threading if needed
233+
if (num_threads > 1) {
234+
setNumThreads(num_threads);
235+
}
236+
213237
this->markIndexUpdate(entries_to_delete.size());
214238
return entries_to_delete.size();
215239
}
@@ -307,6 +331,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
307331
return deleteVectorsImpl(labels, n);
308332
}
309333

334+
size_t getNumThreads() const override { return threadpool_.size(); }
310335
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }
311336

312337
size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); }

src/VecSim/algorithms/svs/svs_tiered.h

+26-12
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,25 @@
99
#include <memory>
1010
#include <mutex>
1111

12+
/**
13+
* @class SVSMultiThreadJob
14+
* @brief Represents a multi-threaded asynchronous job for the SVS algorithm.
15+
*
16+
* This class is responsible for managing multi-threaded jobs, including thread reservation,
17+
* synchronization, and execution of tasks. It uses a control block to coordinate threads
18+
* and ensure proper execution of the job.
19+
*
20+
* @details
21+
* The SVSMultiThreadJob class supports creating multiple threads for a task and ensures
22+
* synchronization between them. It uses a nested ControlBlock class to manage thread
23+
* reservations and job completion. Additionally, it includes a nested ReserveThreadJob
24+
* class to handle individual thread reservations.
25+
*
26+
* The main job executes a user-defined task with the number of reserved threads, while
27+
* additional threads wait for the main job to complete.
28+
*
29+
* @note This class is designed to work with the AsyncJob framework.
30+
*/
1231
class SVSMultiThreadJob : public AsyncJob {
1332

1433
// Thread reservation control block shared between all threads
@@ -32,16 +51,11 @@ class SVSMultiThreadJob : public AsyncJob {
3251
// reserve a thread and wait for the job to be done
3352
void reserveThreadAndWait() {
3453
// count current thread
35-
{
36-
std::lock_guard lock{mutex};
37-
++reservedThreads;
38-
}
39-
cv.notify_one();
40-
// wait for the job to be done
41-
{
42-
std::unique_lock lock{mutex};
43-
cv.wait(lock, [&] { return jobDone; });
44-
}
54+
std::unique_lock lock{mutex};
55+
++reservedThreads;
56+
cv.notify_all();
57+
// Wait until the job is marked as done, handling potential spurious wakeups.
58+
cv.wait(lock, [&] { return jobDone; });
4559
}
4660

4761
// wait for threads to be reserved
@@ -528,7 +542,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
528542
// Use the frontend parameters to manually prepare the blob for its transfer to the SVS
529543
// index.
530544
auto storage_blob = this->frontendIndex->preprocessForStorage(blob);
531-
std::unique_lock<std::shared_mutex> svs_lock(this->mainIndexGuard);
545+
std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard);
532546
return svs_index->addVectors(storage_blob.get(), &label, 1);
533547
}
534548
bool index_update_needed = false;
@@ -557,7 +571,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
557571
return !this->frontendIndex->isLabelExists(label);
558572
}());
559573

560-
std::unique_lock<std::shared_mutex> svs_lock(this->mainIndexGuard);
574+
std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard);
561575
return svs_index->deleteVectors(&label, 1);
562576
}
563577

0 commit comments

Comments
 (0)