diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 59b056d3c..6fb48098a 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -39,12 +39,23 @@ struct SVSIndexBase virtual ~SVSIndexBase() = default; virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; virtual int deleteVectors(const labelType *labels, size_t n) = 0; + virtual bool isLabelExists(labelType label) const = 0; virtual size_t indexStorageSize() const = 0; virtual size_t getNumThreads() const = 0; virtual void setNumThreads(size_t numThreads) = 0; virtual size_t getThreadPoolCapacity() const = 0; virtual bool isCompressed() const = 0; size_t getNumMarkedDeleted() const { return num_marked_deleted; } + + // Abstract handler to manage SVS implementation instance + // declared to avoid unsafe unique_ptr usage + // Derived SVSIndex class should implement it + struct ImplHandler { + virtual ~ImplHandler() = default; + }; + virtual std::unique_ptr createImpl(const void *vectors_data, + const labelType *labels, size_t n) = 0; + virtual void setImpl(std::unique_ptr impl) = 0; #ifdef BUILD_TESTS virtual svs::logging::logger_ptr getLogger() const = 0; #endif @@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract, fl // Create SVS index instance with initial data // Data should not be empty template - void initImpl(const Dataset &points, std::span ids) { + std::unique_ptr initImpl(const Dataset &points, + std::span ids) const { svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}}; // Construct SVS index initial storage with compression if needed @@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract, fl // Construct initial Vamana Graph auto graph = - graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point, + graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point, this->blockSize, this->getAllocator(), logger_); // Create SVS MutableIndex instance - impl_ = std::make_unique(std::move(graph), std::move(data), entry_point, - std::move(distance), ids, threadpool_, logger_); + auto impl = std::make_unique(std::move(graph), std::move(data), entry_point, + std::move(distance), ids, threadpool_, logger_); // Set SVS MutableIndex build parameters to be used in future updates - impl_->set_construction_window_size(parameters.window_size); - impl_->set_max_candidates(parameters.max_candidate_pool_size); - impl_->set_prune_to(parameters.prune_to); - impl_->set_alpha(parameters.alpha); - impl_->set_full_search_history(parameters.use_full_search_history); + impl->set_construction_window_size(parameters.window_size); + impl->set_max_candidates(parameters.max_candidate_pool_size); + impl->set_prune_to(parameters.prune_to); + impl->set_alpha(parameters.alpha); + impl->set_full_search_history(parameters.use_full_search_history); // Configure default search parameters - auto sp = impl_->get_search_parameters(); + auto sp = impl->get_search_parameters(); sp.buffer_config({this->search_window_size, this->search_buffer_capacity}); - impl_->set_search_parameters(sp); - impl_->reset_performance_parameters(); + impl->set_search_parameters(sp); + impl->reset_performance_parameters(); + return impl; } // Preprocess batch of vectors @@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract, fl return processed_blob; } + // Handler to manage SVS implementation instance + struct SVSImplHandler : public SVSIndexBase::ImplHandler { + std::unique_ptr impl; + SVSImplHandler(std::unique_ptr impl) : impl{std::move(impl)} {} + }; + + std::unique_ptr createImpl(const void *vectors_data, const labelType *labels, + size_t n) override { + // If no data provided, return empty handler + if (n == 0) { + return std::make_unique(nullptr); + } + + std::span ids(labels, n); + auto processed_blob = this->preprocessForBatchStorage(vectors_data, n); + auto typed_vectors_data = static_cast(processed_blob.get()); + // Wrap data into SVS SimpleDataView for SVS API + auto points = svs::data::SimpleDataView{typed_vectors_data, n, this->dim}; + + return std::make_unique(initImpl(points, ids)); + } + + void setImpl(std::unique_ptr handler) override { + assert(handler && "SVSIndex::setImpl called with null handler"); + assert(impl_ == nullptr); // Should be called only on empty impl_ + if (impl_ != nullptr) { + throw std::logic_error("SVSIndex::setImpl called on non-empty impl_"); + } + + SVSImplHandler *svs_handler = dynamic_cast(handler.get()); + if (!svs_handler) { + throw std::logic_error("Failed to cast to SVSImplHandler"); + } + this->impl_ = std::move(svs_handler->impl); + } + // Assuming numThreads was updated to reflect the number of available threads before this // function was called. // This function assumes that the caller has already set numThreads to the appropriate value @@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract, fl if (!impl_) { // SVS index instance cannot be empty, so we have to construct it at first rows - initImpl(points, ids); + impl_ = initImpl(points, ids); } else { // Add new points to existing SVS index impl_->add_points(points, ids); @@ -490,6 +539,10 @@ class SVSIndex : public VecSimIndexAbstract, fl return deleteVectorsImpl(labels, n); } + bool isLabelExists(labelType label) const override { + return impl_ ? impl_->has_id(label) : false; + } + size_t getNumThreads() const override { return threadpool_.size(); } void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 351122367..f037a82f6 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex { executeTracingCallback("UpdateJob::before_add_to_svs"); { // lock backend index for writing and add vectors there - std::lock_guard lock(this->mainIndexGuard); + std::shared_lock main_shared_lock(this->mainIndexGuard); auto svs_index = GetSVSIndex(); assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); - svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), - labels_to_move.size()); + if (this->backendIndex->indexSize() == 0) { + // If backend index is empty, we need to initialize it first. + svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), + labels_to_move.size()); + + // Upgrade to unique lock to set the new impl + main_shared_lock.unlock(); + std::lock_guard lock(this->mainIndexGuard); + svs_index->setImpl(std::move(impl)); + } else { + // Backend index is initialized - just add the vectors. + main_shared_lock.unlock(); + std::lock_guard lock(this->mainIndexGuard); + // Upgrade to unique lock to add vectors + svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), + labels_to_move.size()); + } } executeTracingCallback("UpdateJob::after_add_to_svs"); // clean-up frontend index @@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex { } } // Remove vector from the backend index if it exists in case of non-MULTI. - std::lock_guard lock(this->mainIndexGuard); - ret -= svs_index->deleteVectors(&label, 1); + auto label_exists = [&]() { + std::shared_lock lock(this->mainIndexGuard); + return svs_index->isLabelExists(label); + }(); + + if (label_exists) { + std::lock_guard lock(this->mainIndexGuard); + ret -= svs_index->deleteVectors(&label, 1); + } } { // Add vector to the frontend index. std::lock_guard lock(this->flatIndexGuard); @@ -887,7 +910,13 @@ class TieredSVSIndex : public VecSimTieredIndex { std::lock_guard lock(this->flatIndexGuard); ret = this->deleteAndRecordSwaps_Unsafe(label); } - { + + label_exists = [&]() { + std::shared_lock lock(this->mainIndexGuard); + return svs_index->isLabelExists(label); + }(); + + if (label_exists) { std::lock_guard lock(this->mainIndexGuard); ret += svs_index->deleteVectors(&label, 1); }