Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,23 @@ struct SVSIndexBase
virtual ~SVSIndexBase() = default;
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual bool isLabelExists(labelType label) const = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }

// Abstract handler to manage SVS implementation instance
// declared to avoid unsafe unique_ptr<void> usage
// Derived SVSIndex class should implement it
struct ImplHandler {
virtual ~ImplHandler() = default;
};
virtual std::unique_ptr<ImplHandler> createImpl(const void *vectors_data,
const labelType *labels, size_t n) = 0;
virtual void setImpl(std::unique_ptr<ImplHandler> impl) = 0;
#ifdef BUILD_TESTS
virtual svs::logging::logger_ptr getLogger() const = 0;
#endif
Expand Down Expand Up @@ -144,7 +155,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
// Create SVS index instance with initial data
// Data should not be empty
template <svs::data::ImmutableMemoryDataset Dataset>
void initImpl(const Dataset &points, std::span<const labelType> ids) {
std::unique_ptr<impl_type> initImpl(const Dataset &points,
std::span<const labelType> ids) const {
svs::threads::ThreadPoolHandle threadpool_handle{VecSimSVSThreadPool{threadpool_}};

// Construct SVS index initial storage with compression if needed
Expand All @@ -160,25 +172,26 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

// Construct initial Vamana Graph
auto graph =
graph_builder_t::build_graph(parameters, data, distance, threadpool_, entry_point,
graph_builder_t::build_graph(parameters, data, distance, threadpool_handle, entry_point,
this->blockSize, this->getAllocator(), logger_);

// Create SVS MutableIndex instance
impl_ = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);
auto impl = std::make_unique<impl_type>(std::move(graph), std::move(data), entry_point,
std::move(distance), ids, threadpool_, logger_);

// Set SVS MutableIndex build parameters to be used in future updates
impl_->set_construction_window_size(parameters.window_size);
impl_->set_max_candidates(parameters.max_candidate_pool_size);
impl_->set_prune_to(parameters.prune_to);
impl_->set_alpha(parameters.alpha);
impl_->set_full_search_history(parameters.use_full_search_history);
impl->set_construction_window_size(parameters.window_size);
impl->set_max_candidates(parameters.max_candidate_pool_size);
impl->set_prune_to(parameters.prune_to);
impl->set_alpha(parameters.alpha);
impl->set_full_search_history(parameters.use_full_search_history);

// Configure default search parameters
auto sp = impl_->get_search_parameters();
auto sp = impl->get_search_parameters();
sp.buffer_config({this->search_window_size, this->search_buffer_capacity});
impl_->set_search_parameters(sp);
impl_->reset_performance_parameters();
impl->set_search_parameters(sp);
impl->reset_performance_parameters();
return impl;
}

// Preprocess batch of vectors
Expand All @@ -204,6 +217,42 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return processed_blob;
}

// Handler to manage SVS implementation instance
struct SVSImplHandler : public SVSIndexBase::ImplHandler {
std::unique_ptr<impl_type> impl;
SVSImplHandler(std::unique_ptr<impl_type> impl) : impl{std::move(impl)} {}
};

std::unique_ptr<ImplHandler> createImpl(const void *vectors_data, const labelType *labels,
size_t n) override {
// If no data provided, return empty handler
if (n == 0) {
return std::make_unique<SVSImplHandler>(nullptr);
}

std::span<const labelType> ids(labels, n);
auto processed_blob = this->preprocessForBatchStorage(vectors_data, n);
auto typed_vectors_data = static_cast<DataType *>(processed_blob.get());
// Wrap data into SVS SimpleDataView for SVS API
auto points = svs::data::SimpleDataView<DataType>{typed_vectors_data, n, this->dim};

Comment on lines +233 to +238
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be a duplication of what we do in addVectorsImpl. Consider unifying these into a single function

return std::make_unique<SVSImplHandler>(initImpl(points, ids));
}

void setImpl(std::unique_ptr<ImplHandler> handler) override {
assert(handler && "SVSIndex::setImpl called with null handler");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a debug-only assert, let's add this to the log in a warning level as well

assert(impl_ == nullptr); // Should be called only on empty impl_
if (impl_ != nullptr) {
throw std::logic_error("SVSIndex::setImpl called on non-empty impl_");
}

SVSImplHandler *svs_handler = dynamic_cast<SVSImplHandler *>(handler.get());
if (!svs_handler) {
throw std::logic_error("Failed to cast to SVSImplHandler");
}
Comment on lines +249 to +252
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation to have an abstract ImplHandler rather than have only SVSImplHandler? The dynamic_cast here seems a bit awkward

this->impl_ = std::move(svs_handler->impl);
}

// Assuming numThreads was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
Expand All @@ -230,7 +279,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

if (!impl_) {
// SVS index instance cannot be empty, so we have to construct it at first rows
initImpl(points, ids);
impl_ = initImpl(points, ids);
} else {
// Add new points to existing SVS index
impl_->add_points(points, ids);
Expand Down Expand Up @@ -490,6 +539,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return deleteVectorsImpl(labels, n);
}

bool isLabelExists(labelType label) const override {
return impl_ ? impl_->has_id(label) : false;
}

size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }

Expand Down
43 changes: 36 additions & 7 deletions src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,28 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {

executeTracingCallback("UpdateJob::before_add_to_svs");
{ // lock backend index for writing and add vectors there
std::lock_guard lock(this->mainIndexGuard);
std::shared_lock main_shared_lock(this->mainIndexGuard);
auto svs_index = GetSVSIndex();
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
if (this->backendIndex->indexSize() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this also handle re-initialization after the index was emptied? Is that scenario tested?

// If backend index is empty, we need to initialize it first.
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());

// Upgrade to unique lock to set the new impl
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
svs_index->setImpl(std::move(impl));
} else {
// Backend index is initialized - just add the vectors.
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
// Upgrade to unique lock to add vectors
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
}
}
executeTracingCallback("UpdateJob::after_add_to_svs");
// clean-up frontend index
Expand Down Expand Up @@ -801,8 +817,15 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
}
}
// Remove vector from the backend index if it exists in case of non-MULTI.
std::lock_guard lock(this->mainIndexGuard);
ret -= svs_index->deleteVectors(&label, 1);
auto label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret -= svs_index->deleteVectors(&label, 1);
}
}
{ // Add vector to the frontend index.
std::lock_guard lock(this->flatIndexGuard);
Expand Down Expand Up @@ -887,7 +910,13 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
std::lock_guard lock(this->flatIndexGuard);
ret = this->deleteAndRecordSwaps_Unsafe(label);
}
{

label_exists = [&]() {
std::shared_lock lock(this->mainIndexGuard);
return svs_index->isLabelExists(label);
}();

if (label_exists) {
std::lock_guard lock(this->mainIndexGuard);
ret += svs_index->deleteVectors(&label, 1);
}
Expand Down
Loading