Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/icebug-disk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Icebug-Disk Storage Format

## Overview
<!-- TODO: update link to Icebug-Disk spec -->
This is LadybugDB's implementation of [Icebug-Disk](https://github.com/Ladybug-Memory/icebug-format), a read-only graph storage format based on Parquet files. It is designed for efficient analytical queries on large graphs.

## V1

Implements Icebug-Disk v1.

### Version

Version is stored in each file's metadata footer as a key-value pair: `icebug_disk_version = 1`.

### schema.cypher

The graph schema is declared in a `schema.cypher`, which can be loaded using `lbug -i schema.cypher` to create tables in Ladybug.

```cypher
CREATE NODE TABLE city(id INT32, name STRING, population INT64, PRIMARY KEY(id)) WITH (storage = 'icebug-disk:<path-to-dir>');
CREATE NODE TABLE user(id INT32, name STRING, age INT64, PRIMARY KEY(id)) WITH (storage = 'icebug-disk:<path-to-dir>');
CREATE REL TABLE follows(FROM user TO user, since INT32) WITH (storage = 'icebug-disk:<path-to-dir>');
CREATE REL TABLE livesin(FROM user TO city) WITH (storage = 'icebug-disk:<path-to-dir>');
```

File paths can be relative or absolute and are resolved as `<path-to-dir>/nodes_{tableName}.parquet` for node tables, and `<path-to-dir>/indices_{tableName}.parquet` and `<path-to-dir>/indptr_{tableName}.parquet` for relationship tables.

`storage = 'icebug-disk'` and `storage = 'icebug-disk:'` both resolve to the current working directory.

Tables can also be created by manually running the above queries in the Ladybug CLI.

If the directory is moved, the affected tables must be dropped and re-created with the updated path. This updates the file pointers in the catalog. A fresh db instance can also be used to run the `CREATE TABLE` queries with the new paths.

Mixed tables are not supported — queries involving both `icebug-disk` and non-`icebug-disk` tables will throw a `BinderException`.

### Node tables

For each node table, there is a corresponding Parquet file named `nodes_{tableName}.parquet` containing a primary key column and one column per property as declared in the schema.

### Indices

Each relationship table has a corresponding `indices_{tableName}.parquet` file containing one row per edge. The first column is always `target` (the destination node offset), followed by zero or more edge property columns as declared in the schema.

### Indptr

Each relationship table has a corresponding `indptr_{tableName}.parquet` file containing the CSR row pointers. It has a single integer column with `N+1` entries, where `N` is the number of source nodes.

<!--TODO: reference converter utility-->
15 changes: 14 additions & 1 deletion src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "catalog/catalog.h"
#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "catalog/catalog_entry/sequence_catalog_entry.h"
#include "common/constants.h"
#include "common/enums/extend_direction_util.h"
#include "common/exception/binder.h"
#include "common/exception/message.h"
Expand Down Expand Up @@ -170,6 +171,7 @@ static std::string getStorage(const case_insensitive_map_t<Value>& options) {
if (options.contains(TableOptionConstants::REL_STORAGE_OPTION)) {
return options.at(TableOptionConstants::REL_STORAGE_OPTION).toString();
}

return "";
}

Expand Down Expand Up @@ -217,7 +219,7 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo*
if (!storage.empty()) {
auto dotPos = storage.find('.');
// Check if storage is database.table format by verifying the attached database exists
// Otherwise, treat as file path (e.g., "dataset/demo-db/icebug-disk/demo" or
// Otherwise, treat as file path (e.g., "dataset/demo-db/graph-std/demo" or
// "data.parquet")
if (dotPos != std::string::npos) {
std::string dbName = storage.substr(0, dotPos);
Expand Down Expand Up @@ -310,6 +312,17 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo*
}
}

// For icebug-disk rel tables, validate that FROM and TO are icebug-disk node tables
if (TableOptionConstants::isIceBugDiskStorage(storage)) {
auto srcNodeEntry = srcEntry->ptrCast<NodeTableCatalogEntry>();
auto dstNodeEntry = dstEntry->ptrCast<NodeTableCatalogEntry>();
if (!TableOptionConstants::isIceBugDiskStorage(srcNodeEntry->getStorage()) ||
!TableOptionConstants::isIceBugDiskStorage(dstNodeEntry->getStorage())) {
throw BinderException("icebug-disk rel tables require both FROM and TO tables to "
"be icebug-disk node tables.");
}
}

// Use the actual shadow table IDs, not FOREIGN_TABLE_ID
// The shadow tables allow the query planner to distinguish between different node tables
auto srcTableID = srcEntry->getTableID();
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/catalog_entry/rel_group_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ RelGroupCatalogEntry::getBoundExtraCreateInfo(transaction::Transaction*) const {
}
return std::make_unique<binder::BoundExtraCreateRelTableGroupInfo>(
copyVector(propertyCollection.getDefinitions()), srcMultiplicity, dstMultiplicity,
storageDirection, std::move(nodePairs));
storageDirection, std::move(nodePairs), storage);
}

} // namespace catalog
Expand Down
5 changes: 5 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ struct StorageConstants {
struct TableOptionConstants {
static constexpr char REL_STORAGE_DIRECTION_OPTION[] = "STORAGE_DIRECTION";
static constexpr char REL_STORAGE_OPTION[] = "STORAGE";
static constexpr std::string_view ICEBUG_DISK_PREFIX = "icebug-disk";

static bool isIceBugDiskStorage(const std::string& storage) {
return storage.starts_with(ICEBUG_DISK_PREFIX);
}
};

// Hash Index Configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ParquetReader {
common::VirtualFileSystem* vfs);
bool scanInternal(ParquetReaderScanState& state, common::DataChunk& result);
void scan(ParquetReaderScanState& state, common::DataChunk& result);
uint64_t getNumRowsGroups() { return metadata->row_groups.size(); }
uint64_t getNumRowGroups() { return metadata->row_groups.size(); }

uint32_t getNumColumns() const { return columnNames.size(); }
std::string getColumnName(uint32_t idx) const { return columnNames[idx]; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ScanMultiRelTable final : public ScanTable {
directionInfo{std::move(directionInfo)}, scanState{nullptr}, boundNodeIDVector{nullptr},
scanners{std::move(scanners)}, currentScanner{nullptr} {}

void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/scan/scan_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ScanRelTable final : public ScanTable {
bool isSource() const override { return sourceMode; }
bool isParallel() const override { return !sourceMode; }

void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal(ExecutionContext* context) override;
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/table/columnar_node_table_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ColumnarNodeTableBase : public NodeTable {

virtual ~ColumnarNodeTableBase() = default;

bool supportsPrimaryKeyScan() const override { return false; }
// Columnar tables don't support modifications
void insert([[maybe_unused]] transaction::Transaction* transaction,
[[maybe_unused]] TableInsertState& insertState) final {
Expand Down
112 changes: 112 additions & 0 deletions src/include/storage/table/ice_disk_node_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#pragma once

#include <atomic>
#include <vector>

#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "storage/table/node_table.h"

namespace lbug {
namespace storage {

class IceDiskNodeTable;

struct IceDiskNodeTableScanState : public TableScanState {
bool initialized = false;
std::unique_ptr<processor::ParquetReader> parquetReader;
std::unique_ptr<processor::ParquetReaderScanState> parquetScanState;
bool scanCompleted = false;
std::size_t currentRowGroupIdx = static_cast<std::size_t>(common::INVALID_NODE_GROUP_IDX);
bool dataReadCompleted = false;
std::vector<std::vector<std::unique_ptr<common::Value>>> data; // data[rowGroup][column]
std::size_t currentRowGroupBatchOffset = 0; // offset of current rowGroupBatch

IceDiskNodeTableScanState([[maybe_unused]] MemoryManager& mm, common::ValueVector* nodeIDVector,
std::vector<common::ValueVector*> outputVectors,
std::shared_ptr<common::DataChunkState> outChunkState)
: TableScanState{nodeIDVector, std::move(outputVectors), std::move(outChunkState)} {
parquetScanState = std::make_unique<processor::ParquetReaderScanState>();
}

void setToTable(const transaction::Transaction* transaction, Table* table_,
std::vector<common::column_id_t> columnIDs_,
std::vector<ColumnPredicateSet> columnPredicateSets_ = {},
common::RelDataDirection direction = common::RelDataDirection::INVALID) override;
};

// Shared state for morsel assignment across parallel scan threads
struct IceDiskNodeTableScanSharedState {
private:
std::mutex mtx;
std::size_t numRowGroups = 0;
std::size_t currentRowGroupIdx = 0;

public:
void reset(std::size_t totalRowGroups) {
numRowGroups = totalRowGroups;
currentRowGroupIdx = 0;
}

bool getNextMorsel(IceDiskNodeTableScanState* scanState) {
std::lock_guard<std::mutex> lock(mtx);

if (currentRowGroupIdx < numRowGroups) {
scanState->currentRowGroupIdx = currentRowGroupIdx;
currentRowGroupIdx++;
return true;
}

return false;
}
};

class IceDiskNodeTable final : public NodeTable {
public:
IceDiskNodeTable(const StorageManager* storageManager,
const catalog::NodeTableCatalogEntry* nodeTableEntry, MemoryManager* memoryManager);

void initializeScanCoordination(const transaction::Transaction* transaction) override;

void initScanState(transaction::Transaction* transaction, TableScanState& scanState,
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;

bool supportsPrimaryKeyScan() const override { return false; }

void insert(transaction::Transaction*, TableInsertState&) override {
throw common::RuntimeException("Cannot insert into icebug-disk-backed node table");
}
void update(transaction::Transaction*, TableUpdateState&) override {
throw common::RuntimeException("Cannot update icebug-disk-backed node table");
}
bool delete_(transaction::Transaction*, TableDeleteState&) override {
throw common::RuntimeException("Cannot delete from icebug-disk-backed node table");
}

common::row_idx_t getNumTotalRows(const transaction::Transaction* transaction) override;

const std::string& getParquetFilePath() const { return parquetFilePath; }
const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; }
IceDiskNodeTableScanSharedState* getTableScanSharedState() const {
return tableScanSharedState.get();
}

std::size_t getNumScanMorsels(const transaction::Transaction* transaction) const;

private:
std::size_t getNumRowGroups(const transaction::Transaction* transaction) const;
void initIceDiskScanForRowGroup(transaction::Transaction* transaction,
IceDiskNodeTableScanState& scanState) const;
void readParquetData(transaction::Transaction* transaction, TableScanState& scanState) const;

private:
const std::string parquetFilePath;
const catalog::NodeTableCatalogEntry* nodeTableCatalogEntry;
std::vector<std::size_t> rowGroupStartOffsets;
mutable std::unique_ptr<IceDiskNodeTableScanSharedState> tableScanSharedState;
constexpr static std::size_t scanRowGroupBatchSize = 2048; // Default batch size
};

} // namespace storage
} // namespace lbug
118 changes: 118 additions & 0 deletions src/include/storage/table/ice_disk_rel_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#pragma once

#include <cstdint>
#include <optional>

#include "catalog/catalog_entry/rel_group_catalog_entry.h"
#include "common/exception/runtime.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "storage/table/rel_table.h"

namespace lbug {
namespace common {
class VirtualFileSystem;
} // namespace common
namespace main {
class ClientContext;
} // namespace main

namespace storage {

class IceDiskRelTable;

// The scan is reinitialized to the relevant row groups for each bound node. scanBatch is a reusable
// read buffer; it carries no positional state. High-degree nodes are handled by resuming across
// multiple calls.
struct IceDiskRelTableScanState : public RelTableScanState {
std::unique_ptr<processor::ParquetReader> indicesReader; // null until first use
std::unique_ptr<processor::ParquetReaderScanState> indicesScanState;
std::unique_ptr<common::DataChunk> scanBatch; // reusable read buffer, lazily allocated

// Resume state for the currently active bound node.
// activeEdgeEnd == 0 means no node is active (start fresh from the next bound node).
uint64_t activeEdgePos = 0; // global edge row to resume from
uint64_t activeEdgeEnd = 0; // exclusive end of the active node's edge range
common::sel_t activeSelPos = 0; // sel-vector position of the active bound node
common::offset_t activeNodeOffset = 0; // node offset of the active bound node (BWD filter)

IceDiskRelTableScanState(MemoryManager& mm, common::ValueVector* nodeIDVector,
std::vector<common::ValueVector*> outputVectors,
std::shared_ptr<common::DataChunkState> outChunkState)
: RelTableScanState{mm, nodeIDVector, std::move(outputVectors), std::move(outChunkState)},
indicesScanState{std::make_unique<processor::ParquetReaderScanState>()} {}

void setToTable(const transaction::Transaction* transaction, Table* table_,
std::vector<common::column_id_t> columnIDs_,
std::vector<ColumnPredicateSet> columnPredicateSets_ = {},
common::RelDataDirection direction_ = common::RelDataDirection::FWD) override;
};

class IceDiskRelTable final : public RelTable {
public:
IceDiskRelTable(catalog::RelGroupCatalogEntry* relGroupEntry, common::table_id_t fromTableID,
common::table_id_t toTableID, const StorageManager* storageManager,
MemoryManager* memoryManager);

void initializeScanCoordination(transaction::Transaction* transaction);
void initScanState(transaction::Transaction* transaction, TableScanState& scanState,
bool resetCachedBoundNodeSelVec = true) const override;

bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override;

void insert(transaction::Transaction*, TableInsertState&) override {
throw common::RuntimeException("Cannot insert into icebug-disk-backed rel table");
}
void update(transaction::Transaction*, TableUpdateState&) override {
throw common::RuntimeException("Cannot update icebug-disk-backed rel table");
}
bool delete_(transaction::Transaction*, TableDeleteState&) override {
throw common::RuntimeException("Cannot delete from icebug-disk-backed rel table");
}

common::row_idx_t getNumTotalRows(const transaction::Transaction* transaction) override;

const std::string& getIndicesFilePath() const { return indicesFilePath; }
const std::string& getIndptrFilePath() const { return indptrFilePath; }
const catalog::RelGroupCatalogEntry* getRelGroupCatalogEntry() const {
return relGroupCatalogEntry;
}

private:
// Lazy-open the indices parquet reader and allocate the reusable scan batch.
void initIndicesReaderIfNeeded(IceDiskRelTableScanState& iceState, main::ClientContext* context,
common::VirtualFileSystem* vfs, MemoryManager* memMgr) const;

// Compute the CSR edge range for a node. Returns nullopt when the node has no edges.
struct EdgeRange {
uint64_t start;
uint64_t end;
};
std::optional<EdgeRange> getEdgeRange(common::offset_t nodeOffset, bool isFwd) const;

// Find row groups covering [range.start, range.end), read up to DEFAULT_VECTOR_CAPACITY
// edges starting at range.start. Returns {count, nextEdgePos} where nextEdgePos == range.end
// means the node is fully scanned; otherwise resume from nextEdgePos next call.
struct EdgeScanProgress {
uint64_t count; // edges written to output vectors
uint64_t nextEdgePos; // global edge row to resume from next call
};
EdgeScanProgress collectNodeEdges(RelTableScanState& state, IceDiskRelTableScanState& iceState,
EdgeRange range, common::offset_t nodeOffset, bool isFwd, common::table_id_t nbrTableID,
common::VirtualFileSystem* vfs) const;

void loadIndptrData(transaction::Transaction* transaction);
void loadIndicesMetadata(transaction::Transaction* transaction);
void copyCachedBoundNodeSelVector(RelTableScanState& relScanState) const;
std::size_t findSourceNodeForRow(std::size_t globalRowIdx) const;

private:
std::string indicesFilePath;
std::string indptrFilePath;
const catalog::RelGroupCatalogEntry* relGroupCatalogEntry;
// CSR indptr: element i = start of node i's edges. Size = numNodes + 1.
std::vector<std::size_t> indptrData;
std::vector<std::size_t> indicesRGStarts;
};

} // namespace storage
} // namespace lbug
Loading
Loading