-
Notifications
You must be signed in to change notification settings - Fork 21
[SVS] Implement 2-stage backend SVS index initialization #903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,12 +39,23 @@ struct SVSIndexBase | |
| virtual ~SVSIndexBase() = default; | ||
| virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; | ||
| virtual int deleteVectors(const labelType *labels, size_t n) = 0; | ||
| virtual bool isLabelExists(labelType label) const = 0; | ||
| virtual size_t indexStorageSize() const = 0; | ||
| virtual size_t getNumThreads() const = 0; | ||
| virtual void setNumThreads(size_t numThreads) = 0; | ||
| virtual size_t getThreadPoolCapacity() const = 0; | ||
| virtual bool isCompressed() const = 0; | ||
| size_t getNumMarkedDeleted() const { return num_marked_deleted; } | ||
|
|
||
| // Abstract handler to manage SVS implementation instance | ||
| // declared to avoid unsafe unique_ptr<void> usage | ||
| // Derived SVSIndex class should implement it | ||
| struct ImplHandler { | ||
| virtual ~ImplHandler() = default; | ||
| }; | ||
| virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, | ||
| const labelType *labels, size_t n) = 0; | ||
| virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0; | ||
| #ifdef BUILD_TESTS | ||
| virtual svs::logging::logger_ptr getLogger() const = 0; | ||
| #endif | ||
|
|
@@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| // Create SVS index instance with initial data | ||
| // Data should not be empty | ||
| template <svs::data::ImmutableMemoryDataset Dataset> | ||
| void initImpl(const Dataset &points, std::span<const labelType> ids) { | ||
| std::unique_ptr<impl_type> initImpl(const Dataset &points, | ||
| std::span<const labelType> ids) const { | ||
| svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}}; | ||
|
|
||
| // Construct SVS index initial storage with compression if needed | ||
|
|
@@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| // Construct initial Vamana Graph | ||
| auto graph = | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point, | ||
| graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point, | ||
| this->blockSize, this->getAllocator(), logger_); | ||
|
|
||
| // Create SVS MutableIndex instance | ||
| impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
| auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point, | ||
| std::move(distance), ids, threadpool_, logger_); | ||
|
|
||
| // Set SVS MutableIndex build parameters to be used in future updates | ||
| impl_->set_construction_window_size(parameters.window_size); | ||
| impl_->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl_->set_prune_to(parameters.prune_to); | ||
| impl_->set_alpha(parameters.alpha); | ||
| impl_->set_full_search_history(parameters.use_full_search_history); | ||
| impl->set_construction_window_size(parameters.window_size); | ||
| impl->set_max_candidates(parameters.max_candidate_pool_size); | ||
| impl->set_prune_to(parameters.prune_to); | ||
| impl->set_alpha(parameters.alpha); | ||
| impl->set_full_search_history(parameters.use_full_search_history); | ||
|
|
||
| // Configure default search parameters | ||
| auto sp = impl_->get_search_parameters(); | ||
| auto sp = impl->get_search_parameters(); | ||
| sp.buffer_config({this->search_window_size, this->search_buffer_capacity}); | ||
| impl_->set_search_parameters(sp); | ||
| impl_->reset_performance_parameters(); | ||
| impl->set_search_parameters(sp); | ||
| impl->reset_performance_parameters(); | ||
| return impl; | ||
| } | ||
|
|
||
| // Preprocess batch of vectors | ||
|
|
@@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return processed_blob; | ||
| } | ||
|
|
||
| // Handler to manage SVS implementation instance | ||
| struct SVSImplHandler : public SVSIndexBase::ImplHandler { | ||
| std::unique_ptr<impl_type> impl; | ||
| SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {} | ||
| }; | ||
|
|
||
| std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels, | ||
| size_t n) override { | ||
| // If no data provided, return empty handler | ||
| if (n == 0) { | ||
| return std::make_unique<SVSImplHandler>(nullptr); | ||
| } | ||
|
|
||
| std::span<const labelType> ids(labels, n); | ||
| auto processed_blob = this->preprocessForBatchStorage(vectors_data, n); | ||
| auto typed_vectors_data = static_cast<DataType *>(processed_blob.get()); | ||
| // Wrap data into SVS SimpleDataView for SVS API | ||
| auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim}; | ||
|
|
||
| return std::make_unique<SVSImplHandler>(initImpl(points, ids)); | ||
| } | ||
|
|
||
| void setImpl(std::unique_ptr<ImplHandler> handler) override { | ||
| assert(handler && "SVSIndex::setImpl called with null handler"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is a debug-only assert, let's add this to the log in a warning level as well |
||
| assert(impl_ == nullptr); // Should be called only on empty impl_ | ||
| if (impl_ != nullptr) { | ||
| throw std::logic_error("SVSIndex::setImpl called on non-empty impl_"); | ||
| } | ||
|
|
||
| SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get()); | ||
| if (!svs_handler) { | ||
| throw std::logic_error("Failed to cast to SVSImplHandler"); | ||
| } | ||
|
Comment on lines
+249
to
+252
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the motivation to have an abstract |
||
| this->impl_ = std::move(svs_handler->impl); | ||
| } | ||
|
|
||
| // Assuming numThreads was updated to reflect the number of available threads before this | ||
| // function was called. | ||
| // This function assumes that the caller has already set numThreads to the appropriate value | ||
|
|
@@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
|
|
||
| if (!impl_) { | ||
| // SVS index instance cannot be empty, so we have to construct it at first rows | ||
| initImpl(points, ids); | ||
| impl_ = initImpl(points, ids); | ||
| } else { | ||
| // Add new points to existing SVS index | ||
| impl_->add_points(points, ids); | ||
|
|
@@ -490,6 +539,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl | |
| return deleteVectorsImpl(labels, n); | ||
| } | ||
|
|
||
| bool isLabelExists(labelType label) const override { | ||
| return impl_ ? impl_->has_id(label) : false; | ||
| } | ||
|
|
||
| size_t getNumThreads() const override { return threadpool_.size(); } | ||
| void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
|
|
||
| executeTracingCallback("UpdateJob::before_add_to_svs"); | ||
| { // lock backend index for writing and add vectors there | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| std::shared_lock main_shared_lock(this->mainIndexGuard); | ||
| auto svs_index = GetSVSIndex(); | ||
| assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| if (this->backendIndex->indexSize() == 0) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this also handle re-initialization after the index was emptied? Is that scenario tested? |
||
| // If backend index is empty, we need to initialize it first. | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
|
|
||
| // Upgrade to unique lock to set the new impl | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| svs_index->setImpl(std::move(impl)); | ||
| } else { | ||
| // Backend index is initialized - just add the vectors. | ||
| main_shared_lock.unlock(); | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| // Upgrade to unique lock to add vectors | ||
| svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); | ||
| svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), | ||
| labels_to_move.size()); | ||
| } | ||
| } | ||
| executeTracingCallback("UpdateJob::after_add_to_svs"); | ||
| // clean-up frontend index | ||
|
|
@@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| } | ||
| } | ||
| // Remove vector from the backend index if it exists in case of non-MULTI. | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= svs_index->deleteVectors(&label, 1); | ||
| auto label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret -= svs_index->deleteVectors(&label, 1); | ||
| } | ||
| } | ||
| { // Add vector to the frontend index. | ||
| std::lock_guard lock(this->flatIndexGuard); | ||
|
|
@@ -887,7 +910,13 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> { | |
| std::lock_guard lock(this->flatIndexGuard); | ||
| ret = this->deleteAndRecordSwaps_Unsafe(label); | ||
| } | ||
| { | ||
|
|
||
| label_exists = [&]() { | ||
| std::shared_lock lock(this->mainIndexGuard); | ||
| return svs_index->isLabelExists(label); | ||
| }(); | ||
|
|
||
| if (label_exists) { | ||
| std::lock_guard lock(this->mainIndexGuard); | ||
| ret += svs_index->deleteVectors(&label, 1); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to be a duplication of what we do in
addVectorsImpl. Consider unifying these into a single function