Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/VecSim/algorithms/hnsw/hnsw_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
// associated swap jobs.
std::mutex idToRepairJobsGuard;

// Counter for vectors inserted directly into HNSW by the main thread (bypassing flat buffer).
// This happens in WriteInPlace mode or when the flat buffer is full.
// Not atomic since it's only accessed from the main thread.
size_t directHNSWInsertions{0};

void executeInsertJob(HNSWInsertJob *job);
void executeRepairJob(HNSWRepairJob *job);

Expand Down Expand Up @@ -211,6 +216,7 @@ class TieredHNSWIndex : public VecSimTieredIndex<DataType, DistType> {
// needed.
VecSimIndexDebugInfo debugInfo() const override;
VecSimIndexBasicInfo basicInfo() const override;
VecSimIndexStatsInfo statisticInfo() const override;
VecSimDebugInfoIterator *debugInfoIterator() const override;
VecSimBatchIterator *newBatchIterator(const void *queryBlob,
VecSimQueryParams *queryParams) const override {
Expand Down Expand Up @@ -729,6 +735,8 @@ int TieredHNSWIndex<DataType, DistType>::addVector(const void *blob, labelType l
this->lockMainIndexGuard();
hnsw_index->addVector(storage_blob.get(), label);
this->unlockMainIndexGuard();
// Track direct insertion to HNSW (bypassing flat buffer)
++this->directHNSWInsertions;
return ret;
}
if (this->frontendIndex->indexSize() >= this->flatBufferLimit) {
Expand All @@ -746,6 +754,8 @@ int TieredHNSWIndex<DataType, DistType>::addVector(const void *blob, labelType l
// index.
auto storage_blob = this->frontendIndex->preprocessForStorage(blob);
this->insertVectorToHNSW<false>(hnsw_index, label, storage_blob.get());
// Track direct insertion to HNSW (flat buffer was full)
++this->directHNSWInsertions;
return ret;
}
// Otherwise, we fall back to the "regular" insertion into the flat buffer
Expand Down Expand Up @@ -1151,6 +1161,13 @@ void TieredHNSWIndex<DataType, DistType>::TieredHNSW_BatchIterator::filter_irrel
results.resize(cur_end - results.begin());
}

template <typename DataType, typename DistType>
VecSimIndexStatsInfo TieredHNSWIndex<DataType, DistType>::statisticInfo() const {
auto stats = VecSimTieredIndex<DataType, DistType>::statisticInfo();
stats.directHNSWInsertions = this->directHNSWInsertions;
return stats;
}

template <typename DataType, typename DistType>
VecSimIndexDebugInfo TieredHNSWIndex<DataType, DistType>::debugInfo() const {
auto info = VecSimTieredIndex<DataType, DistType>::debugInfo();
Expand Down
3 changes: 3 additions & 0 deletions src/VecSim/vec_sim_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ typedef struct {
size_t memory;
size_t numberOfMarkedDeleted; // The number of vectors that are marked as deleted (HNSW/tiered
// only).
size_t directHNSWInsertions; // Count of vectors inserted directly into HNSW by main thread
// (bypassing flat buffer). Tiered HNSW only.
size_t flatBufferSize; // Current flat buffer size. Tiered indexes only.
} VecSimIndexStatsInfo;

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions src/VecSim/vec_sim_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ struct VecSimIndexAbstract : public VecSimIndexInterface {
return VecSimIndexStatsInfo{
.memory = this->getAllocationSize(),
.numberOfMarkedDeleted = 0,
.directHNSWInsertions = 0,
.flatBufferSize = 0,
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/VecSim/vec_sim_tiered_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ VecSimIndexStatsInfo VecSimTieredIndex<DataType, DistType>::statisticInfo() cons
auto stats = VecSimIndexStatsInfo{
.memory = this->getAllocationSize(),
.numberOfMarkedDeleted = this->getNumMarkedDeleted(),
.directHNSWInsertions = 0, // Base tiered index returns 0; TieredHNSWIndex overrides
.flatBufferSize = this->frontendIndex->indexSize(),
};

return stats;
Expand Down
77 changes: 77 additions & 0 deletions tests/unit/test_hnsw_tiered.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2761,6 +2761,9 @@ TYPED_TEST(HNSWTieredIndexTest, testInfo) {
EXPECT_EQ(info.tieredInfo.backgroundIndexing, false);
EXPECT_EQ(info.tieredInfo.bufferLimit, 1000);
EXPECT_EQ(info.tieredInfo.specificTieredBackendInfo.hnswTieredInfo.pendingSwapJobsThreshold, 1);
// Verify new tiered-specific stats
EXPECT_EQ(stats.flatBufferSize, 0);
EXPECT_EQ(stats.directHNSWInsertions, 0);

// Validate that Static info returns the right restricted info as well.
VecSimIndexBasicInfo s_info = VecSimIndex_BasicInfo(tiered_index);
Expand All @@ -2787,6 +2790,9 @@ TYPED_TEST(HNSWTieredIndexTest, testInfo) {
info.tieredInfo.frontendCommonInfo.memory);
EXPECT_EQ(info.commonInfo.memory, stats.memory);
EXPECT_EQ(info.tieredInfo.backgroundIndexing, true);
// Vector is in flat buffer, no direct insertions yet
EXPECT_EQ(stats.flatBufferSize, 1);
EXPECT_EQ(stats.directHNSWInsertions, 0);

mock_thread_pool.thread_iteration();
info = tiered_index->debugInfo();
Expand All @@ -2803,6 +2809,9 @@ TYPED_TEST(HNSWTieredIndexTest, testInfo) {
info.tieredInfo.frontendCommonInfo.memory);
EXPECT_EQ(info.commonInfo.memory, stats.memory);
EXPECT_EQ(info.tieredInfo.backgroundIndexing, false);
// Vector moved from flat buffer to HNSW by background thread
EXPECT_EQ(stats.flatBufferSize, 0);
EXPECT_EQ(stats.directHNSWInsertions, 0);

if (TypeParam::isMulti()) {
GenerateAndAddVector<TEST_DATA_T>(tiered_index, dim, 1, 1);
Expand Down Expand Up @@ -2839,6 +2848,74 @@ TYPED_TEST(HNSWTieredIndexTest, testInfo) {
EXPECT_EQ(info.tieredInfo.backgroundIndexing, false);
}

TYPED_TEST(HNSWTieredIndexTest, testDirectHNSWInsertionsStats) {
// Test that directHNSWInsertions counter is incremented when flat buffer is full.
size_t dim = 4;
size_t buffer_limit = 5;
HNSWParams params = {.type = TypeParam::get_index_type(),
.dim = dim,
.metric = VecSimMetric_L2,
.multi = TypeParam::isMulti()};
VecSimParams hnsw_params = CreateParams(params);
auto mock_thread_pool = tieredIndexMock();

// Create index with small buffer limit
auto *tiered_index =
this->CreateTieredHNSWIndex(hnsw_params, mock_thread_pool, 1, buffer_limit);

// Fill the flat buffer
for (size_t i = 0; i < buffer_limit; i++) {
GenerateAndAddVector<TEST_DATA_T>(tiered_index, dim, i, i);
}

VecSimIndexStatsInfo stats = tiered_index->statisticInfo();
EXPECT_EQ(stats.flatBufferSize, buffer_limit);
EXPECT_EQ(stats.directHNSWInsertions, 0);

// Add more vectors - these should go directly to HNSW
size_t extra_vectors = 3;
for (size_t i = buffer_limit; i < buffer_limit + extra_vectors; i++) {
GenerateAndAddVector<TEST_DATA_T>(tiered_index, dim, i, i);
}

stats = tiered_index->statisticInfo();
EXPECT_EQ(stats.flatBufferSize, buffer_limit);
EXPECT_EQ(stats.directHNSWInsertions, extra_vectors);

// Drain the flat buffer by starting threads and waiting for them to finish
mock_thread_pool.init_threads();
mock_thread_pool.thread_pool_join();

stats = tiered_index->statisticInfo();
EXPECT_EQ(stats.flatBufferSize, 0);
// Direct insertions counter should be preserved
EXPECT_EQ(stats.directHNSWInsertions, extra_vectors);

// Test write-in-place mode: vectors should go directly to HNSW even when buffer is not full
VecSim_SetWriteMode(VecSim_WriteInPlace);

size_t write_in_place_vectors = 4;
size_t label_offset = buffer_limit + extra_vectors;
for (size_t i = 0; i < write_in_place_vectors; i++) {
GenerateAndAddVector<TEST_DATA_T>(tiered_index, dim, label_offset + i, label_offset + i);
}

stats = tiered_index->statisticInfo();
// Flat buffer should still be empty (vectors went directly to HNSW)
EXPECT_EQ(stats.flatBufferSize, 0);
// Direct insertions counter should include the write-in-place vectors
EXPECT_EQ(stats.directHNSWInsertions, extra_vectors + write_in_place_vectors);

// Verify all vectors are in the backend index
VecSimIndexDebugInfo info = tiered_index->debugInfo();
EXPECT_EQ(info.tieredInfo.backendCommonInfo.indexSize,
buffer_limit + extra_vectors + write_in_place_vectors);
EXPECT_EQ(info.tieredInfo.frontendCommonInfo.indexSize, 0);

// Reset to async mode
VecSim_SetWriteMode(VecSim_WriteAsync);
}

TYPED_TEST(HNSWTieredIndexTest, testInfoIterator) {
// Create TieredHNSW index instance with a mock queue.
size_t dim = 4;
Expand Down
Loading