diff --git a/docs/icebug-disk.md b/docs/icebug-disk.md new file mode 100644 index 0000000000..3c7a6245ce --- /dev/null +++ b/docs/icebug-disk.md @@ -0,0 +1,48 @@ +# Icebug-Disk Storage Format + +## Overview + +This is LadybugDB's implementation of [Icebug-Disk](https://github.com/Ladybug-Memory/icebug-format), a read-only graph storage format based on Parquet files. It is designed for efficient analytical queries on large graphs. + +## V1 + +Implements Icebug-Disk v1. + +### Version + +Version is stored in each file's metadata footer as a key-value pair: `icebug_disk_version = 1`. + +### schema.cypher + +The graph schema is declared in a `schema.cypher`, which can be loaded using `lbug -i schema.cypher` to create tables in Ladybug. + +```cypher +CREATE NODE TABLE city(id INT32, name STRING, population INT64, PRIMARY KEY(id)) WITH (storage = 'icebug-disk:'); +CREATE NODE TABLE user(id INT32, name STRING, age INT64, PRIMARY KEY(id)) WITH (storage = 'icebug-disk:'); +CREATE REL TABLE follows(FROM user TO user, since INT32) WITH (storage = 'icebug-disk:'); +CREATE REL TABLE livesin(FROM user TO city) WITH (storage = 'icebug-disk:'); +``` + +File paths can be relative or absolute and are resolved as `/nodes_{tableName}.parquet` for node tables, and `/indices_{tableName}.parquet` and `/indptr_{tableName}.parquet` for relationship tables. + +`storage = 'icebug-disk'` and `storage = 'icebug-disk:'` both resolve to the current working directory. + +Tables can also be created by manually running the above queries in the Ladybug CLI. + +If the directory is moved, the affected tables must be dropped and re-created with the updated path. This updates the file pointers in the catalog. A fresh db instance can also be used to run the `CREATE TABLE` queries with the new paths. + +Mixed tables are not supported — queries involving both `icebug-disk` and non-`icebug-disk` tables will throw a `BinderException`. + +### Node tables + +For each node table, there is a corresponding Parquet file named `nodes_{tableName}.parquet` containing a primary key column and one column per property as declared in the schema. + +### Indices + +Each relationship table has a corresponding `indices_{tableName}.parquet` file containing one row per edge. The first column is always `target` (the destination node offset), followed by zero or more edge property columns as declared in the schema. + +### Indptr + +Each relationship table has a corresponding `indptr_{tableName}.parquet` file containing the CSR row pointers. It has a single integer column with `N+1` entries, where `N` is the number of source nodes. + + diff --git a/src/binder/bind/bind_ddl.cpp b/src/binder/bind/bind_ddl.cpp index 30c8413be4..c01b6f2e9c 100644 --- a/src/binder/bind/bind_ddl.cpp +++ b/src/binder/bind/bind_ddl.cpp @@ -10,6 +10,7 @@ #include "catalog/catalog.h" #include "catalog/catalog_entry/node_table_catalog_entry.h" #include "catalog/catalog_entry/sequence_catalog_entry.h" +#include "common/constants.h" #include "common/enums/extend_direction_util.h" #include "common/exception/binder.h" #include "common/exception/message.h" @@ -170,6 +171,7 @@ static std::string getStorage(const case_insensitive_map_t& options) { if (options.contains(TableOptionConstants::REL_STORAGE_OPTION)) { return options.at(TableOptionConstants::REL_STORAGE_OPTION).toString(); } + return ""; } @@ -217,7 +219,7 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo* if (!storage.empty()) { auto dotPos = storage.find('.'); // Check if storage is database.table format by verifying the attached database exists - // Otherwise, treat as file path (e.g., "dataset/demo-db/icebug-disk/demo" or + // Otherwise, treat as file path (e.g., "dataset/demo-db/graph-std/demo" or // "data.parquet") if (dotPos != std::string::npos) { std::string dbName = storage.substr(0, dotPos); @@ -310,6 +312,17 @@ BoundCreateTableInfo Binder::bindCreateRelTableGroupInfo(const CreateTableInfo* } } + // For icebug-disk rel tables, validate that FROM and TO are icebug-disk node tables + if (TableOptionConstants::isIceBugDiskStorage(storage)) { + auto srcNodeEntry = srcEntry->ptrCast(); + auto dstNodeEntry = dstEntry->ptrCast(); + if (!TableOptionConstants::isIceBugDiskStorage(srcNodeEntry->getStorage()) || + !TableOptionConstants::isIceBugDiskStorage(dstNodeEntry->getStorage())) { + throw BinderException("icebug-disk rel tables require both FROM and TO tables to " + "be icebug-disk node tables."); + } + } + // Use the actual shadow table IDs, not FOREIGN_TABLE_ID // The shadow tables allow the query planner to distinguish between different node tables auto srcTableID = srcEntry->getTableID(); diff --git a/src/catalog/catalog_entry/rel_group_catalog_entry.cpp b/src/catalog/catalog_entry/rel_group_catalog_entry.cpp index 46da2a4fbe..4b9f145bf2 100644 --- a/src/catalog/catalog_entry/rel_group_catalog_entry.cpp +++ b/src/catalog/catalog_entry/rel_group_catalog_entry.cpp @@ -214,7 +214,7 @@ RelGroupCatalogEntry::getBoundExtraCreateInfo(transaction::Transaction*) const { } return std::make_unique( copyVector(propertyCollection.getDefinitions()), srcMultiplicity, dstMultiplicity, - storageDirection, std::move(nodePairs)); + storageDirection, std::move(nodePairs), storage); } } // namespace catalog diff --git a/src/include/common/constants.h b/src/include/common/constants.h index dcbbb43b53..3a04d3660c 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -86,6 +86,11 @@ struct StorageConstants { struct TableOptionConstants { static constexpr char REL_STORAGE_DIRECTION_OPTION[] = "STORAGE_DIRECTION"; static constexpr char REL_STORAGE_OPTION[] = "STORAGE"; + static constexpr std::string_view ICEBUG_DISK_PREFIX = "icebug-disk"; + + static bool isIceBugDiskStorage(const std::string& storage) { + return storage.starts_with(ICEBUG_DISK_PREFIX); + } }; // Hash Index Configurations diff --git a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h index 2da8342a90..c0a2cedd5f 100644 --- a/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h +++ b/src/include/processor/operator/persistent/reader/parquet/parquet_reader.h @@ -48,7 +48,7 @@ class ParquetReader { common::VirtualFileSystem* vfs); bool scanInternal(ParquetReaderScanState& state, common::DataChunk& result); void scan(ParquetReaderScanState& state, common::DataChunk& result); - uint64_t getNumRowsGroups() { return metadata->row_groups.size(); } + uint64_t getNumRowGroups() { return metadata->row_groups.size(); } uint32_t getNumColumns() const { return columnNames.size(); } std::string getColumnName(uint32_t idx) const { return columnNames[idx]; } diff --git a/src/include/processor/operator/scan/scan_multi_rel_tables.h b/src/include/processor/operator/scan/scan_multi_rel_tables.h index fb31640c64..e6b8142a99 100644 --- a/src/include/processor/operator/scan/scan_multi_rel_tables.h +++ b/src/include/processor/operator/scan/scan_multi_rel_tables.h @@ -67,6 +67,7 @@ class ScanMultiRelTable final : public ScanTable { directionInfo{std::move(directionInfo)}, scanState{nullptr}, boundNodeIDVector{nullptr}, scanners{std::move(scanners)}, currentScanner{nullptr} {} + void initGlobalStateInternal(ExecutionContext* context) override; void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; bool getNextTuplesInternal(ExecutionContext* context) override; diff --git a/src/include/processor/operator/scan/scan_rel_table.h b/src/include/processor/operator/scan/scan_rel_table.h index ee94146137..a344f70dbd 100644 --- a/src/include/processor/operator/scan/scan_rel_table.h +++ b/src/include/processor/operator/scan/scan_rel_table.h @@ -80,6 +80,7 @@ class ScanRelTable final : public ScanTable { bool isSource() const override { return sourceMode; } bool isParallel() const override { return !sourceMode; } + void initGlobalStateInternal(ExecutionContext* context) override; void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; bool getNextTuplesInternal(ExecutionContext* context) override; diff --git a/src/include/storage/table/columnar_node_table_base.h b/src/include/storage/table/columnar_node_table_base.h index fed8e34492..f38141eb6c 100644 --- a/src/include/storage/table/columnar_node_table_base.h +++ b/src/include/storage/table/columnar_node_table_base.h @@ -44,6 +44,7 @@ class ColumnarNodeTableBase : public NodeTable { virtual ~ColumnarNodeTableBase() = default; + bool supportsPrimaryKeyScan() const override { return false; } // Columnar tables don't support modifications void insert([[maybe_unused]] transaction::Transaction* transaction, [[maybe_unused]] TableInsertState& insertState) final { diff --git a/src/include/storage/table/ice_disk_node_table.h b/src/include/storage/table/ice_disk_node_table.h new file mode 100644 index 0000000000..93b12ed706 --- /dev/null +++ b/src/include/storage/table/ice_disk_node_table.h @@ -0,0 +1,112 @@ +#pragma once + +#include +#include + +#include "processor/operator/persistent/reader/parquet/parquet_reader.h" +#include "storage/table/node_table.h" + +namespace lbug { +namespace storage { + +class IceDiskNodeTable; + +struct IceDiskNodeTableScanState : public TableScanState { + bool initialized = false; + std::unique_ptr parquetReader; + std::unique_ptr parquetScanState; + bool scanCompleted = false; + std::size_t currentRowGroupIdx = static_cast(common::INVALID_NODE_GROUP_IDX); + bool dataReadCompleted = false; + std::vector>> data; // data[rowGroup][column] + std::size_t currentRowGroupBatchOffset = 0; // offset of current rowGroupBatch + + IceDiskNodeTableScanState([[maybe_unused]] MemoryManager& mm, common::ValueVector* nodeIDVector, + std::vector outputVectors, + std::shared_ptr outChunkState) + : TableScanState{nodeIDVector, std::move(outputVectors), std::move(outChunkState)} { + parquetScanState = std::make_unique(); + } + + void setToTable(const transaction::Transaction* transaction, Table* table_, + std::vector columnIDs_, + std::vector columnPredicateSets_ = {}, + common::RelDataDirection direction = common::RelDataDirection::INVALID) override; +}; + +// Shared state for morsel assignment across parallel scan threads +struct IceDiskNodeTableScanSharedState { +private: + std::mutex mtx; + std::size_t numRowGroups = 0; + std::size_t currentRowGroupIdx = 0; + +public: + void reset(std::size_t totalRowGroups) { + numRowGroups = totalRowGroups; + currentRowGroupIdx = 0; + } + + bool getNextMorsel(IceDiskNodeTableScanState* scanState) { + std::lock_guard lock(mtx); + + if (currentRowGroupIdx < numRowGroups) { + scanState->currentRowGroupIdx = currentRowGroupIdx; + currentRowGroupIdx++; + return true; + } + + return false; + } +}; + +class IceDiskNodeTable final : public NodeTable { +public: + IceDiskNodeTable(const StorageManager* storageManager, + const catalog::NodeTableCatalogEntry* nodeTableEntry, MemoryManager* memoryManager); + + void initializeScanCoordination(const transaction::Transaction* transaction) override; + + void initScanState(transaction::Transaction* transaction, TableScanState& scanState, + bool resetCachedBoundNodeSelVec = true) const override; + + bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override; + + bool supportsPrimaryKeyScan() const override { return false; } + + void insert(transaction::Transaction*, TableInsertState&) override { + throw common::RuntimeException("Cannot insert into icebug-disk-backed node table"); + } + void update(transaction::Transaction*, TableUpdateState&) override { + throw common::RuntimeException("Cannot update icebug-disk-backed node table"); + } + bool delete_(transaction::Transaction*, TableDeleteState&) override { + throw common::RuntimeException("Cannot delete from icebug-disk-backed node table"); + } + + common::row_idx_t getNumTotalRows(const transaction::Transaction* transaction) override; + + const std::string& getParquetFilePath() const { return parquetFilePath; } + const catalog::NodeTableCatalogEntry* getCatalogEntry() const { return nodeTableCatalogEntry; } + IceDiskNodeTableScanSharedState* getTableScanSharedState() const { + return tableScanSharedState.get(); + } + + std::size_t getNumScanMorsels(const transaction::Transaction* transaction) const; + +private: + std::size_t getNumRowGroups(const transaction::Transaction* transaction) const; + void initIceDiskScanForRowGroup(transaction::Transaction* transaction, + IceDiskNodeTableScanState& scanState) const; + void readParquetData(transaction::Transaction* transaction, TableScanState& scanState) const; + +private: + const std::string parquetFilePath; + const catalog::NodeTableCatalogEntry* nodeTableCatalogEntry; + std::vector rowGroupStartOffsets; + mutable std::unique_ptr tableScanSharedState; + constexpr static std::size_t scanRowGroupBatchSize = 2048; // Default batch size +}; + +} // namespace storage +} // namespace lbug diff --git a/src/include/storage/table/ice_disk_rel_table.h b/src/include/storage/table/ice_disk_rel_table.h new file mode 100644 index 0000000000..b56bfc86fd --- /dev/null +++ b/src/include/storage/table/ice_disk_rel_table.h @@ -0,0 +1,118 @@ +#pragma once + +#include +#include + +#include "catalog/catalog_entry/rel_group_catalog_entry.h" +#include "common/exception/runtime.h" +#include "processor/operator/persistent/reader/parquet/parquet_reader.h" +#include "storage/table/rel_table.h" + +namespace lbug { +namespace common { +class VirtualFileSystem; +} // namespace common +namespace main { +class ClientContext; +} // namespace main + +namespace storage { + +class IceDiskRelTable; + +// The scan is reinitialized to the relevant row groups for each bound node. scanBatch is a reusable +// read buffer; it carries no positional state. High-degree nodes are handled by resuming across +// multiple calls. +struct IceDiskRelTableScanState : public RelTableScanState { + std::unique_ptr indicesReader; // null until first use + std::unique_ptr indicesScanState; + std::unique_ptr scanBatch; // reusable read buffer, lazily allocated + + // Resume state for the currently active bound node. + // activeEdgeEnd == 0 means no node is active (start fresh from the next bound node). + uint64_t activeEdgePos = 0; // global edge row to resume from + uint64_t activeEdgeEnd = 0; // exclusive end of the active node's edge range + common::sel_t activeSelPos = 0; // sel-vector position of the active bound node + common::offset_t activeNodeOffset = 0; // node offset of the active bound node (BWD filter) + + IceDiskRelTableScanState(MemoryManager& mm, common::ValueVector* nodeIDVector, + std::vector outputVectors, + std::shared_ptr outChunkState) + : RelTableScanState{mm, nodeIDVector, std::move(outputVectors), std::move(outChunkState)}, + indicesScanState{std::make_unique()} {} + + void setToTable(const transaction::Transaction* transaction, Table* table_, + std::vector columnIDs_, + std::vector columnPredicateSets_ = {}, + common::RelDataDirection direction_ = common::RelDataDirection::FWD) override; +}; + +class IceDiskRelTable final : public RelTable { +public: + IceDiskRelTable(catalog::RelGroupCatalogEntry* relGroupEntry, common::table_id_t fromTableID, + common::table_id_t toTableID, const StorageManager* storageManager, + MemoryManager* memoryManager); + + void initializeScanCoordination(transaction::Transaction* transaction); + void initScanState(transaction::Transaction* transaction, TableScanState& scanState, + bool resetCachedBoundNodeSelVec = true) const override; + + bool scanInternal(transaction::Transaction* transaction, TableScanState& scanState) override; + + void insert(transaction::Transaction*, TableInsertState&) override { + throw common::RuntimeException("Cannot insert into icebug-disk-backed rel table"); + } + void update(transaction::Transaction*, TableUpdateState&) override { + throw common::RuntimeException("Cannot update icebug-disk-backed rel table"); + } + bool delete_(transaction::Transaction*, TableDeleteState&) override { + throw common::RuntimeException("Cannot delete from icebug-disk-backed rel table"); + } + + common::row_idx_t getNumTotalRows(const transaction::Transaction* transaction) override; + + const std::string& getIndicesFilePath() const { return indicesFilePath; } + const std::string& getIndptrFilePath() const { return indptrFilePath; } + const catalog::RelGroupCatalogEntry* getRelGroupCatalogEntry() const { + return relGroupCatalogEntry; + } + +private: + // Lazy-open the indices parquet reader and allocate the reusable scan batch. + void initIndicesReaderIfNeeded(IceDiskRelTableScanState& iceState, main::ClientContext* context, + common::VirtualFileSystem* vfs, MemoryManager* memMgr) const; + + // Compute the CSR edge range for a node. Returns nullopt when the node has no edges. + struct EdgeRange { + uint64_t start; + uint64_t end; + }; + std::optional getEdgeRange(common::offset_t nodeOffset, bool isFwd) const; + + // Find row groups covering [range.start, range.end), read up to DEFAULT_VECTOR_CAPACITY + // edges starting at range.start. Returns {count, nextEdgePos} where nextEdgePos == range.end + // means the node is fully scanned; otherwise resume from nextEdgePos next call. + struct EdgeScanProgress { + uint64_t count; // edges written to output vectors + uint64_t nextEdgePos; // global edge row to resume from next call + }; + EdgeScanProgress collectNodeEdges(RelTableScanState& state, IceDiskRelTableScanState& iceState, + EdgeRange range, common::offset_t nodeOffset, bool isFwd, common::table_id_t nbrTableID, + common::VirtualFileSystem* vfs) const; + + void loadIndptrData(transaction::Transaction* transaction); + void loadIndicesMetadata(transaction::Transaction* transaction); + void copyCachedBoundNodeSelVector(RelTableScanState& relScanState) const; + std::size_t findSourceNodeForRow(std::size_t globalRowIdx) const; + +private: + std::string indicesFilePath; + std::string indptrFilePath; + const catalog::RelGroupCatalogEntry* relGroupCatalogEntry; + // CSR indptr: element i = start of node i's edges. Size = numNodes + 1. + std::vector indptrData; + std::vector indicesRGStarts; +}; + +} // namespace storage +} // namespace lbug diff --git a/src/include/storage/table/ice_disk_utils.h b/src/include/storage/table/ice_disk_utils.h new file mode 100644 index 0000000000..5b7c6eef05 --- /dev/null +++ b/src/include/storage/table/ice_disk_utils.h @@ -0,0 +1,60 @@ +#pragma once + +#include + +#include "common/constants.h" + +namespace lbug { +namespace storage { + +struct CSRFilePaths { + std::string indices; + std::string indptr; +}; + +class IceDiskUtils { +public: + // Parses "icebug-disk", "icebug-disk:", or "icebug-disk:" and returns the path + // component. Returns empty string for the first two forms (caller interprets as current dir) + static std::string getBasePath(const std::string& storage) { + if (!storage.starts_with(common::TableOptionConstants::ICEBUG_DISK_PREFIX)) { + return ""; + } + std::string_view rest = std::string_view(storage).substr( + common::TableOptionConstants::ICEBUG_DISK_PREFIX.size()); + // Strip the optional ':' separator. + if (!rest.empty() && rest[0] == ':') { + rest = rest.substr(1); + } + return std::string(rest); // empty means "current directory" + } + + // Joins a base path with a filename. When base is empty the filename is returned + // as-is (i.e. relative to the current working directory) + static std::string joinPath(const std::string& base, const std::string& part) { + if (base.empty()) { + return part; + } + const char last = base.back(); + if (last == '/' || last == '\\') { + return base + part; + } + return base + "/" + part; + } + + // Get the file path for a given node table's parquet file + static std::string constructNodeTablePath(const std::string& dir, const std::string& name, + const std::string& suffix) { + return IceDiskUtils::joinPath(dir, "nodes_" + name + suffix); + } + + // Get the file paths for a given rel table's CSR files + static CSRFilePaths constructCSRPaths(const std::string& dir, const std::string& name, + const std::string& suffix) { + return {IceDiskUtils::joinPath(dir, "indices_" + name + suffix), + IceDiskUtils::joinPath(dir, "indptr_" + name + suffix)}; + } +}; + +} // namespace storage +} // namespace lbug diff --git a/src/include/storage/table/node_table.h b/src/include/storage/table/node_table.h index 87c2c0fbf0..5e5f12e5ee 100644 --- a/src/include/storage/table/node_table.h +++ b/src/include/storage/table/node_table.h @@ -154,6 +154,11 @@ class LBUG_API NodeTable : public Table { virtual bool lookupPK(const transaction::Transaction* transaction, common::ValueVector* keyVector, uint64_t vectorPos, common::offset_t& result) const; + // Returns true if this table supports efficient PK index-based lookup (PRIMARY_KEY_SCAN). + // Tables without a hash index (e.g. IceDisk) must return false so the optimizer falls back + // to a FILTER+SCAN plan instead of generating PRIMARY_KEY_SCAN nodes. + virtual bool supportsPrimaryKeyScan() const { return tryGetPKIndex() != nullptr; } + void addIndex(std::unique_ptr index); void dropIndex(const std::string& name); diff --git a/src/include/storage/table/parquet_node_table.h b/src/include/storage/table/parquet_node_table.h index 9554471dc2..c66b0e5f0c 100644 --- a/src/include/storage/table/parquet_node_table.h +++ b/src/include/storage/table/parquet_node_table.h @@ -81,7 +81,6 @@ class ParquetNodeTable final : public ColumnarNodeTableBase { private: std::string parquetFilePath; - void initializeParquetReader(transaction::Transaction* transaction) const; void initParquetScanForRowGroup(transaction::Transaction* transaction, ParquetNodeTableScanState& scanState) const; }; diff --git a/src/optimizer/filter_push_down_optimizer.cpp b/src/optimizer/filter_push_down_optimizer.cpp index f9018b6314..ad38adcf0a 100644 --- a/src/optimizer/filter_push_down_optimizer.cpp +++ b/src/optimizer/filter_push_down_optimizer.cpp @@ -4,12 +4,15 @@ #include "binder/expression/property_expression.h" #include "binder/expression/scalar_function_expression.h" #include "main/client_context.h" +#include "main/database.h" #include "planner/operator/extend/logical_extend.h" #include "planner/operator/logical_empty_result.h" #include "planner/operator/logical_filter.h" #include "planner/operator/logical_hash_join.h" #include "planner/operator/logical_table_function_call.h" #include "planner/operator/scan/logical_scan_node_table.h" +#include "storage/storage_manager.h" +#include "storage/table/node_table.h" using namespace lbug::binder; using namespace lbug::common; @@ -194,13 +197,22 @@ std::shared_ptr FilterPushDownOptimizer::visitScanNodeTableRepl } if (primaryKeyEqualityComparison != nullptr) { // Try rewrite index scan auto rhs = primaryKeyEqualityComparison->getChild(1); + bool canDoPKScan = false; if (isConstantExpression(rhs)) { - auto extraInfo = std::make_unique(rhs); + auto* nodeTable = context->getDatabase() + ->getStorageManager() + ->getTable(tableIDs[0]) + ->ptrCast(); + canDoPKScan = nodeTable->supportsPrimaryKeyScan(); + } + if (canDoPKScan) { + auto extraInfo = std::make_unique( + primaryKeyEqualityComparison->getChild(1)); scan.setScanType(LogicalScanNodeTableType::PRIMARY_KEY_SCAN); scan.setExtraInfo(std::move(extraInfo)); scan.computeFlatSchema(); } else { - // Cannot rewrite and add predicate back. + // Cannot rewrite (no PK hash index or non-constant RHS); add predicate back. predicateSet.addPredicate(primaryKeyEqualityComparison); } } diff --git a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp index 57939193ea..0972426319 100644 --- a/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp +++ b/src/processor/operator/persistent/reader/parquet/parquet_reader.cpp @@ -596,7 +596,7 @@ ParquetScanSharedState::ParquetScanSharedState(FileScanInfo fileScanInfo, uint64 for (auto i = fileIdx.load(); i < this->fileScanInfo.getNumFiles(); i++) { auto reader = std::make_unique(this->fileScanInfo.filePaths[i], columnSkips, context); - totalRowsGroups += reader->getNumRowsGroups(); + totalRowsGroups += reader->getNumRowGroups(); } numBlocksReadByFiles = 0; } @@ -608,7 +608,7 @@ static bool parquetSharedStateNext(ParquetScanLocalState& localState, if (sharedState.fileIdx >= sharedState.fileScanInfo.getNumFiles()) { return false; } - if (sharedState.blockIdx < sharedState.readers[sharedState.fileIdx]->getNumRowsGroups()) { + if (sharedState.blockIdx < sharedState.readers[sharedState.fileIdx]->getNumRowGroups()) { localState.reader = sharedState.readers[sharedState.fileIdx].get(); localState.reader->initializeScan(*localState.state, {sharedState.blockIdx}, VirtualFileSystem::GetUnsafe(*sharedState.context)); @@ -616,7 +616,7 @@ static bool parquetSharedStateNext(ParquetScanLocalState& localState, return true; } else { sharedState.numBlocksReadByFiles += - sharedState.readers[sharedState.fileIdx]->getNumRowsGroups(); + sharedState.readers[sharedState.fileIdx]->getNumRowGroups(); sharedState.blockIdx = 0; sharedState.fileIdx++; if (sharedState.fileIdx >= sharedState.fileScanInfo.getNumFiles()) { diff --git a/src/processor/operator/scan/count_rel_table.cpp b/src/processor/operator/scan/count_rel_table.cpp index 18511f1b72..7c7c15bff4 100644 --- a/src/processor/operator/scan/count_rel_table.cpp +++ b/src/processor/operator/scan/count_rel_table.cpp @@ -10,6 +10,7 @@ #include "storage/table/column.h" #include "storage/table/column_chunk_data.h" #include "storage/table/columnar_rel_table_base.h" +#include "storage/table/ice_disk_rel_table.h" #include "storage/table/csr_chunked_node_group.h" #include "storage/table/csr_node_group.h" #include "storage/table/rel_table_data.h" @@ -39,7 +40,7 @@ bool CountRelTable::getNextTuplesInternal(ExecutionContext* context) { auto* memoryManager = context->clientContext->getDatabase()->getMemoryManager(); for (auto* relTable : relTables) { - if (dynamic_cast(relTable) != nullptr) { + if (dynamic_cast(relTable) != nullptr || dynamic_cast(relTable) != nullptr) { totalCount += relTable->getNumTotalRows(transaction); continue; } diff --git a/src/processor/operator/scan/scan_multi_rel_tables.cpp b/src/processor/operator/scan/scan_multi_rel_tables.cpp index 27f5422379..811b113ded 100644 --- a/src/processor/operator/scan/scan_multi_rel_tables.cpp +++ b/src/processor/operator/scan/scan_multi_rel_tables.cpp @@ -3,6 +3,7 @@ #include "processor/execution_context.h" #include "storage/local_storage/local_storage.h" #include "storage/table/arrow_rel_table.h" +#include "storage/table/ice_disk_rel_table.h" #include "storage/table/parquet_rel_table.h" using namespace lbug::common; @@ -60,6 +61,26 @@ bool RelTableCollectionScanner::scan(main::ClientContext* context, RelTableScanS } } +// only for icebug disk table for now as they have shared state +void ScanMultiRelTable::initGlobalStateInternal(ExecutionContext* context) { + auto transaction = Transaction::Get(*context->clientContext); + for (auto& [_, scanner] : scanners) { + bool hasIceDiskTable = false; + for (auto& relInfo : scanner.relInfos) { + if (const auto iceDiskRelTable = + dynamic_cast(relInfo.table)) { + iceDiskRelTable->initializeScanCoordination(transaction); + hasIceDiskTable = true; + break; + } + } + + if (hasIceDiskTable) { + break; + } + } +} + void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { ScanTable::initLocalStateInternal(resultSet, context); auto clientContext = context->clientContext; @@ -69,6 +90,7 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo // Check if any table in any scanner is an external rel table with a custom scan state. bool hasArrowTable = false; bool hasParquetTable = false; + bool hasIceDiskTable = false; for (auto& [_, scanner] : scanners) { for (auto& relInfo : scanner.relInfos) { if (dynamic_cast(relInfo.table) != nullptr) { @@ -79,8 +101,12 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo hasParquetTable = true; break; } + if (dynamic_cast(relInfo.table) != nullptr) { + hasIceDiskTable = true; + break; + } } - if (hasArrowTable || hasParquetTable) { + if (hasArrowTable || hasParquetTable || hasIceDiskTable) { break; } } @@ -95,6 +121,10 @@ void ScanMultiRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionCo scanState = std::make_unique(*MemoryManager::Get(*clientContext), boundNodeIDVector, outVectors, nbrNodeIDVector->state); + } else if (hasIceDiskTable) { + scanState = + std::make_unique(*MemoryManager::Get(*clientContext), + boundNodeIDVector, outVectors, nbrNodeIDVector->state); } else { scanState = std::make_unique(*MemoryManager::Get(*clientContext), boundNodeIDVector, outVectors, nbrNodeIDVector->state); diff --git a/src/processor/operator/scan/scan_node_table.cpp b/src/processor/operator/scan/scan_node_table.cpp index b6e8fb175d..5342b16dca 100644 --- a/src/processor/operator/scan/scan_node_table.cpp +++ b/src/processor/operator/scan/scan_node_table.cpp @@ -7,6 +7,7 @@ #include "storage/local_storage/local_node_table.h" #include "storage/local_storage/local_storage.h" #include "storage/table/arrow_node_table.h" +#include "storage/table/ice_disk_node_table.h" #include "storage/table/parquet_node_table.h" using namespace lbug::common; @@ -25,6 +26,12 @@ static std::unique_ptr createNodeTableScanState(NodeTable* table return std::make_unique(*memoryManager, nodeIDVector, outVectors, nodeIDVector->state); } + + if (dynamic_cast(table) != nullptr) { + return std::make_unique(*memoryManager, nodeIDVector, outVectors, + nodeIDVector->state); + } + return std::make_unique(nodeIDVector, outVectors, nodeIDVector->state); } @@ -65,10 +72,12 @@ void ScanNodeTableSharedState::initialize(const transaction::Transaction* transa common::VirtualFileSystem::resolvePath(context, parquetTable->getParquetFilePath()); auto tempReader = std::make_unique(resolvedPath, columnSkips, context); - this->numCommittedNodeGroups = tempReader->getNumRowsGroups(); + this->numCommittedNodeGroups = tempReader->getNumRowGroups(); } catch (const std::exception& e) { this->numCommittedNodeGroups = 1; } + } else if (const auto iceDiskTable = dynamic_cast(table)) { + this->numCommittedNodeGroups = iceDiskTable->getNumScanMorsels(transaction); } else if (const auto arrowTable = dynamic_cast(table)) { // For Arrow tables, set numCommittedNodeGroups to number of morsels this->numCommittedNodeGroups = @@ -90,7 +99,8 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState, ScanNodeTableProgressSharedState& progressSharedState) { std::unique_lock lck{mtx}; - // ColumnarNodeTables handle morsel assignment internally + // Columnar/Icebug NodeTables handle morsel assignment internally + // TODO: parquet tables https://github.com/LadybugDB/ladybug/issues/245 if (const auto arrowTable = dynamic_cast(this->table)) { const auto tableSharedState = arrowTable->getTableScanSharedState(); @@ -104,6 +114,18 @@ void ScanNodeTableSharedState::nextMorsel(TableScanState& scanState, return; } + if (const auto iceDiskTable = dynamic_cast(this->table)) { + const auto tableSharedState = iceDiskTable->getTableScanSharedState(); + if (tableSharedState->getNextMorsel(static_cast(&scanState))) { + scanState.source = TableScanSource::COMMITTED; + progressSharedState.numMorselsScanned++; + } else { + scanState.source = TableScanSource::NONE; + } + + return; + } + auto& nodeScanState = scanState.cast(); if (currentCommittedGroupIdx < numCommittedNodeGroups) { nodeScanState.nodeGroupIdx = currentCommittedGroupIdx++; @@ -149,9 +171,10 @@ void ScanNodeTable::initCurrentTable(ExecutionContext* context) { outVectors, MemoryManager::Get(*context->clientContext)); currentInfo.initScanState(*scanState, outVectors, context->clientContext); scanState->semiMask = sharedStates[currentTableIdx]->getSemiMask(); - // Call table->initScanState for ParquetNodeTable or ArrowNodeTable + // Call table->initScanState for ParquetNodeTable or ArrowNodeTable or IceDiskNodeTable if (dynamic_cast(tableInfos[currentTableIdx].table) || - dynamic_cast(tableInfos[currentTableIdx].table)) { + dynamic_cast(tableInfos[currentTableIdx].table) || + dynamic_cast(tableInfos[currentTableIdx].table)) { auto transaction = transaction::Transaction::Get(*context->clientContext); tableInfos[currentTableIdx].table->initScanState(transaction, *scanState); } diff --git a/src/processor/operator/scan/scan_rel_table.cpp b/src/processor/operator/scan/scan_rel_table.cpp index f09b7a9abd..a4f94faa8d 100644 --- a/src/processor/operator/scan/scan_rel_table.cpp +++ b/src/processor/operator/scan/scan_rel_table.cpp @@ -9,6 +9,7 @@ #include "storage/local_storage/local_rel_table.h" #include "storage/table/arrow_rel_table.h" #include "storage/table/foreign_rel_table.h" +#include "storage/table/ice_disk_rel_table.h" #include "storage/table/node_table.h" #include "storage/table/parquet_rel_table.h" @@ -69,6 +70,13 @@ void ScanRelTableInfo::initScanState(TableScanState& scanState, initScanStateVectors(scanState, outVectors, MemoryManager::Get(*context)); } +void ScanRelTable::initGlobalStateInternal(ExecutionContext* context) { + if (const auto iceDiskRelTable = dynamic_cast(tableInfo.table)) { + iceDiskRelTable->initializeScanCoordination( + transaction::Transaction::Get(*context->clientContext)); + } +} + void ScanRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { ScanTable::initLocalStateInternal(resultSet, context); auto clientContext = context->clientContext; @@ -78,6 +86,8 @@ void ScanRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext auto* arrowTable = dynamic_cast(tableInfo.table); auto* parquetTable = dynamic_cast(tableInfo.table); auto* foreignTable = dynamic_cast(tableInfo.table); + auto* iceDiskTable = dynamic_cast(tableInfo.table); + if (arrowTable) { scanState = std::make_unique(*MemoryManager::Get(*clientContext), @@ -86,6 +96,10 @@ void ScanRelTable::initLocalStateInternal(ResultSet* resultSet, ExecutionContext scanState = std::make_unique(*MemoryManager::Get(*clientContext), boundNodeIDVector, outVectors, nbrNodeIDVector->state); + } else if (iceDiskTable) { + scanState = + std::make_unique(*MemoryManager::Get(*clientContext), + boundNodeIDVector, outVectors, nbrNodeIDVector->state); } else if (foreignTable) { scanState = std::make_unique(*MemoryManager::Get(*clientContext), diff --git a/src/storage/storage_manager.cpp b/src/storage/storage_manager.cpp index 82a22dd750..b5bbeb768e 100644 --- a/src/storage/storage_manager.cpp +++ b/src/storage/storage_manager.cpp @@ -4,6 +4,7 @@ #include "catalog/catalog_entry/node_table_catalog_entry.h" #include "catalog/catalog_entry/rel_group_catalog_entry.h" #include "common/arrow/arrow.h" +#include "common/constants.h" #include "common/file_system/virtual_file_system.h" #include "common/random_engine.h" #include "common/serializer/in_mem_file_writer.h" @@ -19,6 +20,8 @@ #include "storage/table/arrow_rel_table.h" #include "storage/table/arrow_table_support.h" #include "storage/table/foreign_rel_table.h" +#include "storage/table/ice_disk_node_table.h" +#include "storage/table/ice_disk_rel_table.h" #include "storage/table/node_table.h" #include "storage/table/parquet_node_table.h" #include "storage/table/parquet_rel_table.h" @@ -98,19 +101,16 @@ void StorageManager::recover(main::ClientContext& clientContext, bool throwOnWal void StorageManager::createNodeTable(NodeTableCatalogEntry* entry) { tableNameCache[entry->getTableID()] = entry->getName(); if (!entry->getStorage().empty()) { - // Check if storage is Arrow backed - if (entry->getStorage().substr(0, 8) == "arrow://") { - // Extract Arrow ID from storage string + if (TableOptionConstants::isIceBugDiskStorage(entry->getStorage())) { + tables[entry->getTableID()] = + std::make_unique(this, entry, &memoryManager); + } else if (entry->getStorage().substr(0, 8) == "arrow://") { std::string arrowId = entry->getStorage().substr(8); - - // Retrieve Arrow data from registry (as pointers to registry data) ArrowSchemaWrapper* schema = nullptr; std::vector* arrays = nullptr; if (!ArrowTableSupport::getArrowData(arrowId, schema, arrays)) { throw common::RuntimeException("Failed to retrieve Arrow data for ID: " + arrowId); } - - // Create wrappers that reference registry memory while registry keeps ownership. ArrowSchemaWrapper schemaCopy = createShallowCopy(*schema); std::vector arraysCopy; arraysCopy.reserve(arrays->size()); @@ -169,6 +169,9 @@ void StorageManager::addRelTable(RelGroupCatalogEntry* entry, const RelTableCata tables[info.oid] = std::make_unique(entry, info.nodePair.srcTableID, info.nodePair.dstTableID, this, &memoryManager, fromNodeTable, toNodeTable, std::move(schemaCopy), std::move(arraysCopy), arrowId); + } else if (TableOptionConstants::isIceBugDiskStorage(entry->getStorage())) { + tables[info.oid] = std::make_unique(entry, info.nodePair.srcTableID, + info.nodePair.dstTableID, this, &memoryManager); } else { // Create parquet-backed rel table tables[info.oid] = std::make_unique(entry, info.nodePair.srcTableID, @@ -437,8 +440,14 @@ void StorageManager::deserialize(main::ClientContext* context, const Catalog* ca ->ptrCast(); tableNameCache[tableID] = tableEntry->getName(); if (!tableEntry->getStorage().empty()) { - // Create parquet-backed node table - tables[tableID] = std::make_unique(this, tableEntry, &memoryManager); + if (TableOptionConstants::isIceBugDiskStorage(tableEntry->getStorage())) { + tables[tableID] = + std::make_unique(this, tableEntry, &memoryManager); + } else { + // Create parquet-backed node table + tables[tableID] = + std::make_unique(this, tableEntry, &memoryManager); + } } else { // Create regular node table tables[tableID] = std::make_unique(this, tableEntry, &memoryManager); @@ -465,9 +474,14 @@ void StorageManager::deserialize(main::ClientContext* context, const Catalog* ca RelTableCatalogInfo info = RelTableCatalogInfo::deserialize(deSer); DASSERT(!tables.contains(info.oid)); if (!relGroupEntry->getStorage().empty()) { - // Create parquet-backed rel table - tables[info.oid] = std::make_unique(relGroupEntry, - info.nodePair.srcTableID, info.nodePair.dstTableID, this, &memoryManager); + if (TableOptionConstants::isIceBugDiskStorage(relGroupEntry->getStorage())) { + tables[info.oid] = std::make_unique(relGroupEntry, + info.nodePair.srcTableID, info.nodePair.dstTableID, this, &memoryManager); + } else { + // Create parquet-backed rel table + tables[info.oid] = std::make_unique(relGroupEntry, + info.nodePair.srcTableID, info.nodePair.dstTableID, this, &memoryManager); + } } else { // Create regular rel table tables[info.oid] = std::make_unique(relGroupEntry, diff --git a/src/storage/table/CMakeLists.txt b/src/storage/table/CMakeLists.txt index 54a76cca23..f625831301 100644 --- a/src/storage/table/CMakeLists.txt +++ b/src/storage/table/CMakeLists.txt @@ -19,6 +19,8 @@ add_library(lbug_storage_store dictionary_chunk.cpp dictionary_column.cpp foreign_rel_table.cpp + ice_disk_node_table.cpp + ice_disk_rel_table.cpp in_mem_chunked_node_group_collection.cpp in_memory_exception_chunk.cpp lazy_segment_scanner.cpp diff --git a/src/storage/table/ice_disk_node_table.cpp b/src/storage/table/ice_disk_node_table.cpp new file mode 100644 index 0000000000..fde1a83398 --- /dev/null +++ b/src/storage/table/ice_disk_node_table.cpp @@ -0,0 +1,293 @@ +#include "storage/table/ice_disk_node_table.h" + +#include +#include + +#include "catalog/catalog_entry/node_table_catalog_entry.h" +#include "common/constants.h" +#include "common/data_chunk/sel_vector.h" +#include "common/exception/runtime.h" +#include "common/file_system/virtual_file_system.h" +#include "common/types/value/value.h" +#include "main/client_context.h" +#include "processor/operator/persistent/reader/parquet/parquet_reader.h" +#include "storage/buffer_manager/memory_manager.h" +#include "storage/storage_manager.h" +#include "storage/table/ice_disk_utils.h" +#include "transaction/transaction.h" + +using namespace lbug::catalog; +using namespace lbug::common; +using namespace lbug::processor; +using namespace lbug::transaction; + +namespace lbug { +namespace storage { + +void IceDiskNodeTableScanState::setToTable(const transaction::Transaction* /*transaction*/, + Table* table_, std::vector columnIDs_, + std::vector columnPredicateSets_, common::RelDataDirection /*direction*/) { + // TableScanState::setToTable(transaction, table_, columnIDs_, std::move(columnPredicateSets_)); + table = table_; + columnIDs = std::move(columnIDs_); + columnPredicateSets = std::move(columnPredicateSets_); + // IceDisk node tables don't use NodeGroup infrastructure; skip the base class + // which would dereference the uninitialized nodeGroupScanState. +} + +IceDiskNodeTable::IceDiskNodeTable(const StorageManager* storageManager, + const NodeTableCatalogEntry* nodeTableEntry, MemoryManager* memoryManager) + : NodeTable{storageManager, nodeTableEntry, memoryManager}, + parquetFilePath{IceDiskUtils::constructNodeTablePath( + IceDiskUtils::getBasePath(nodeTableEntry->getStorage()), nodeTableEntry->getName(), + ".parquet")}, + nodeTableCatalogEntry{nodeTableEntry}, + tableScanSharedState{std::make_unique()} {} + +void IceDiskNodeTable::initializeScanCoordination(const Transaction* transaction) { + auto context = transaction->getClientContext(); + if (context) { + auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath); + auto tempReader = + std::make_unique(resolvedPath, std::vector(), context); + auto metadata = tempReader->getMetadata(); + uint64_t currentStartOffset = 0; + + rowGroupStartOffsets.clear(); + for (std::size_t i = 0; i < metadata->row_groups.size(); ++i) { + rowGroupStartOffsets.push_back(currentStartOffset); + currentStartOffset += metadata->row_groups[i].num_rows; + } + + tableScanSharedState->reset(tempReader->getNumRowGroups()); + } +} + +void IceDiskNodeTable::initScanState(Transaction* transaction, TableScanState& scanState, + bool /*resetCachedBoundNodeSelVec*/) const { + auto& iceDiskNodeScanState = static_cast(scanState); + + if (iceDiskNodeScanState.currentRowGroupIdx == + static_cast(common::INVALID_NODE_GROUP_IDX)) { + iceDiskNodeScanState.scanCompleted = true; + return; + } + + iceDiskNodeScanState.scanCompleted = false; + iceDiskNodeScanState.dataReadCompleted = false; + iceDiskNodeScanState.data.clear(); + iceDiskNodeScanState.currentRowGroupBatchOffset = 0; + + // Each scan state gets its own parquet reader for thread safety and initialized only once + if (!iceDiskNodeScanState.initialized) { + auto context = transaction->getClientContext(); + if (!context) { + throw RuntimeException("Invalid client context for IceDisk scan state initialization"); + } + + try { + auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath); + iceDiskNodeScanState.parquetReader = + std::make_unique(resolvedPath, std::vector(), context); + iceDiskNodeScanState.initialized = true; + } catch (const std::exception& e) { + throw RuntimeException("Failed to initialize parquet reader for file '" + + parquetFilePath + "': " + e.what()); + } + } + + // Initialize scan state for the current row group (assigned via shared state) + initIceDiskScanForRowGroup(transaction, iceDiskNodeScanState); +} + +void IceDiskNodeTable::initIceDiskScanForRowGroup(Transaction* transaction, + IceDiskNodeTableScanState& scanState) const { + auto context = transaction->getClientContext(); + if (!context) { + return; + } + + auto vfs = VirtualFileSystem::GetUnsafe(*context); + if (!vfs) { + return; + } + + // Defensive check: ensure parquet reader exists + if (!scanState.parquetReader) { + return; + } + + // Defensive check: ensure parquet scan state exists + if (!scanState.parquetScanState) { + return; + } + + // Re-initialize scan for the specific row groups + // Note: initializeScan can be called multiple times; the first call populates column metadata + scanState.parquetReader->initializeScan(*scanState.parquetScanState, + {scanState.currentRowGroupIdx}, vfs); +} + +bool IceDiskNodeTable::scanInternal(Transaction* transaction, TableScanState& scanState) { + auto& iceDiskNodeScanState = static_cast(scanState); + if (iceDiskNodeScanState.scanCompleted) { + return false; + } + + scanState.resetOutVectors(); + + // Read data for the current row group if not yet done + if (!iceDiskNodeScanState.dataReadCompleted) { + readParquetData(transaction, scanState); + } + + if (iceDiskNodeScanState.currentRowGroupBatchOffset >= iceDiskNodeScanState.data.size()) { + iceDiskNodeScanState.scanCompleted = true; + return false; + } + + auto outputSize = std::min(scanRowGroupBatchSize, + iceDiskNodeScanState.data.size() - iceDiskNodeScanState.currentRowGroupBatchOffset); + auto numColumns = std::min(scanState.outputVectors.size(), + iceDiskNodeScanState.data[iceDiskNodeScanState.currentRowGroupBatchOffset].size()); + + for (std::size_t col = 0; col < numColumns; ++col) { + auto& dstVector = *scanState.outputVectors[col]; + + for (std::size_t i = 0; i < outputSize; ++i) { + auto& value = *iceDiskNodeScanState + .data[iceDiskNodeScanState.currentRowGroupBatchOffset + i][col]; + if (value.isNull()) { + dstVector.setNull(i, true); + } else { + dstVector.copyFromValue(i, value); + } + } + } + + for (std::size_t i = 0; i < outputSize; ++i) { + auto& nodeID = scanState.nodeIDVector->getValue(i); + nodeID.tableID = tableID; + // assign parquet rowIndex + nodeID.offset = rowGroupStartOffsets[iceDiskNodeScanState.currentRowGroupIdx] + + iceDiskNodeScanState.currentRowGroupBatchOffset + i; + } + + iceDiskNodeScanState.currentRowGroupBatchOffset += outputSize; + scanState.outState->getSelVectorUnsafe().setSelSize(outputSize); + return true; +} + +void IceDiskNodeTable::readParquetData(Transaction* transaction, TableScanState& scanState) const { + auto& iceDiskNodeScanState = static_cast(scanState); + auto numColumns = iceDiskNodeScanState.parquetReader->getNumColumns(); + + // Defensive check: ensure parquet file has at least one column + if (numColumns == 0) { + throw RuntimeException("Parquet file '" + parquetFilePath + "' has no columns"); + } + + // Fresh DataChunk with its own state — do NOT share scanState.outState; we accumulate + // rows across batches while the output state is managed by scanInternal() above. + DataChunk parquetDataChunk(numColumns); + auto* memMgr = MemoryManager::Get(*transaction->getClientContext()); + for (uint32_t i = 0; i < numColumns; ++i) { + auto columnType = iceDiskNodeScanState.parquetReader->getColumnType(i).copy(); + parquetDataChunk.insert(i, std::make_shared(std::move(columnType), memMgr)); + } + + // Pre-compute parquet-column → output-column mapping once. + const auto numCols = static_cast(parquetDataChunk.getNumValueVectors()); + std::vector colMap(numCols, INVALID_COLUMN_ID); + for (std::size_t pc = 0; pc < numCols; ++pc) { + const auto& name = iceDiskNodeScanState.parquetReader->getColumnName(pc); + if (!nodeTableCatalogEntry->containsProperty(name)) { + continue; + } + const auto colID = nodeTableCatalogEntry->getColumnID(name); + for (std::size_t oc = 0; oc < scanState.columnIDs.size(); ++oc) { + if (scanState.columnIDs[oc] == colID) { + colMap[pc] = oc; + break; + } + } + } + + // scanInternal() returns true on the initial row-group setup call (batchSize == 0) and on + // each data batch; returns false when the row group is exhausted. Loop to read ALL rows. + while (iceDiskNodeScanState.parquetReader->scanInternal( + *iceDiskNodeScanState.parquetScanState, parquetDataChunk)) { + const auto batchSize = parquetDataChunk.state->getSelVector().getSelSize(); + if (batchSize == 0) { + continue; // row-group setup call — no data yet + } + + const auto base = iceDiskNodeScanState.data.size(); + iceDiskNodeScanState.data.resize(base + batchSize); + + for (std::size_t row = 0; row < batchSize; ++row) { + iceDiskNodeScanState.data[base + row].resize(scanState.outputVectors.size()); + for (std::size_t pc = 0; pc < numCols; ++pc) { + const auto oc = colMap[pc]; + if (oc == INVALID_COLUMN_ID || oc >= iceDiskNodeScanState.data[base + row].size()) { + continue; + } + auto& srcVector = parquetDataChunk.getValueVectorMutable(pc); + if (srcVector.isNull(row)) { + iceDiskNodeScanState.data[base + row][oc] = + std::make_unique(Value::createNullValue()); + } else { + iceDiskNodeScanState.data[base + row][oc] = + std::make_unique(*srcVector.getAsValue(row)); + } + } + } + } + + iceDiskNodeScanState.dataReadCompleted = true; +} + +std::size_t IceDiskNodeTable::getNumTotalRows(const Transaction* transaction) { + auto context = transaction->getClientContext(); + + if (!context) { + return 0; + } + + try { + auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath); + auto tempReader = + std::make_unique(resolvedPath, std::vector(), context); + + return tempReader->getMetadata()->num_rows; + } catch (const std::exception& e) { + // If parquet file is corrupted or invalid, return 0 instead of crashing + return 0; + } +} + +std::size_t IceDiskNodeTable::getNumRowGroups(const transaction::Transaction* transaction) const { + auto context = transaction->getClientContext(); + + if (!context) { + return 0; + } + + try { + auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath); + auto tempReader = + std::make_unique(resolvedPath, std::vector(), context); + + return tempReader->getNumRowGroups(); + } catch (const std::exception& e) { + // If parquet file is corrupted or invalid, return 0 instead of crashing + return 0; + } +} + +std::size_t IceDiskNodeTable::getNumScanMorsels(const transaction::Transaction* transaction) const { + return getNumRowGroups(transaction); +} + +} // namespace storage +} // namespace lbug diff --git a/src/storage/table/ice_disk_rel_table.cpp b/src/storage/table/ice_disk_rel_table.cpp new file mode 100644 index 0000000000..fe79626a99 --- /dev/null +++ b/src/storage/table/ice_disk_rel_table.cpp @@ -0,0 +1,325 @@ +#include "storage/table/ice_disk_rel_table.h" + +#include + +#include "catalog/catalog_entry/rel_group_catalog_entry.h" +#include "common/assert.h" +#include "common/constants.h" +#include "common/data_chunk/data_chunk.h" +#include "common/exception/runtime.h" +#include "common/file_system/virtual_file_system.h" +#include "common/types/internal_id_util.h" +#include "processor/operator/persistent/reader/parquet/parquet_reader.h" +#include "storage/storage_manager.h" +#include "storage/table/ice_disk_utils.h" +#include "transaction/transaction.h" + +using namespace lbug::common; +using namespace lbug::transaction; +using namespace lbug::catalog; + +namespace lbug { +namespace storage { + +void IceDiskRelTableScanState::setToTable(const Transaction* transaction, Table* table_, + std::vector columnIDs_, + std::vector columnPredicateSets_, common::RelDataDirection direction_) { + TableScanState::setToTable(transaction, table_, std::move(columnIDs_), + std::move(columnPredicateSets_)); + direction = direction_; +} + +IceDiskRelTable::IceDiskRelTable(RelGroupCatalogEntry* relGroupEntry, + common::table_id_t fromTableID, common::table_id_t toTableID, + const StorageManager* storageManager, MemoryManager* memoryManager) + : RelTable{relGroupEntry, fromTableID, toTableID, storageManager, memoryManager}, + relGroupCatalogEntry{relGroupEntry} { + const auto base = IceDiskUtils::getBasePath(relGroupEntry->getStorage()); + auto paths = IceDiskUtils::constructCSRPaths(base, relGroupEntry->getName(), ".parquet"); + indicesFilePath = paths.indices; + indptrFilePath = paths.indptr; +} + +void IceDiskRelTable::initializeScanCoordination(Transaction* transaction) { + loadIndptrData(transaction); + loadIndicesMetadata(transaction); +} + +void IceDiskRelTable::initScanState(Transaction* /*transaction*/, TableScanState& scanState, + bool resetCachedBoundNodeSelVec) const { + auto& relScanState = scanState.cast(); + if (resetCachedBoundNodeSelVec) { + copyCachedBoundNodeSelVector(relScanState); + } + relScanState.currBoundNodeIdx = 0; + + auto& iceState = dynamic_cast(scanState); + iceState.activeEdgePos = 0; + iceState.activeEdgeEnd = 0; +} + +bool IceDiskRelTable::scanInternal(Transaction* transaction, TableScanState& scanState) { + auto& state = scanState.cast(); + auto& iceState = dynamic_cast(scanState); + scanState.resetOutVectors(); + + auto* context = transaction->getClientContext(); + auto* vfs = VirtualFileSystem::GetUnsafe(*context); + auto* memMgr = MemoryManager::Get(*context); + + initIndicesReaderIfNeeded(iceState, context, vfs, memMgr); + + const bool isFwd = state.direction != RelDataDirection::BWD; + const auto nbrTableID = isFwd ? getToNodeTableID() : getFromNodeTableID(); + const auto numBoundNodes = state.cachedBoundNodeSelVector.getSelSize(); + + while (true) { + // If the active node still has edges to emit, resume from where we left off. + // Otherwise advance to the next bound node. + if (iceState.activeEdgePos >= iceState.activeEdgeEnd) { + if (state.currBoundNodeIdx >= numBoundNodes) { + break; + } + const auto selPos = state.cachedBoundNodeSelVector[state.currBoundNodeIdx]; + const auto nodeOffset = state.nodeIDVector->getValue(selPos).offset; + state.currBoundNodeIdx++; + + const auto range = getEdgeRange(nodeOffset, isFwd); + if (!range) { + iceState.activeEdgeEnd = 0; + continue; + } + iceState.activeEdgePos = range->start; + iceState.activeEdgeEnd = range->end; + iceState.activeSelPos = selPos; + iceState.activeNodeOffset = nodeOffset; + } + + const auto [count, nextEdgePos] = + collectNodeEdges(state, iceState, {iceState.activeEdgePos, iceState.activeEdgeEnd}, + iceState.activeNodeOffset, isFwd, nbrTableID, vfs); + iceState.activeEdgePos = nextEdgePos; + + if (count == 0) { + continue; + } + + auto selVec = std::make_shared(static_cast(count)); + selVec->setToUnfiltered(static_cast(count)); + state.outState->setSelVector(selVec); + state.setNodeIDVectorToFlat(iceState.activeSelPos); + return true; + } + + state.outState->setSelVector(std::make_shared(0)); + return false; +} + +void IceDiskRelTable::initIndicesReaderIfNeeded(IceDiskRelTableScanState& iceState, + main::ClientContext* context, VirtualFileSystem* vfs, MemoryManager* memMgr) const { + if (iceState.indicesReader) { + return; + } + auto resolvedPath = VirtualFileSystem::resolvePath(context, indicesFilePath); + iceState.indicesReader = + std::make_unique(resolvedPath, std::vector(), context); + // initializeScan triggers createReader() which populates column metadata. + // Use an empty group list to get the schema only, without reading any data. + iceState.indicesReader->initializeScan(*iceState.indicesScanState, {}, vfs); + const uint32_t numCols = iceState.indicesReader->getNumColumns(); + iceState.scanBatch = std::make_unique(numCols); + for (uint32_t col = 0; col < numCols; ++col) { + iceState.scanBatch->insert(col, + std::make_shared(iceState.indicesReader->getColumnType(col).copy(), + memMgr)); + } +} + +std::optional IceDiskRelTable::getEdgeRange(offset_t nodeOffset, + bool isFwd) const { + uint64_t start, end; + if (isFwd) { + if (nodeOffset + 1 >= indptrData.size()) { + return std::nullopt; + } + start = indptrData[nodeOffset]; + end = indptrData[nodeOffset + 1]; + } else { + start = 0; + end = indicesRGStarts.empty() ? 0 : indicesRGStarts.back(); + } + if (start >= end) { + return std::nullopt; + } + return EdgeRange{start, end}; +} + +IceDiskRelTable::EdgeScanProgress IceDiskRelTable::collectNodeEdges(RelTableScanState& state, + IceDiskRelTableScanState& iceState, EdgeRange range, offset_t nodeOffset, bool isFwd, + table_id_t nbrTableID, VirtualFileSystem* vfs) const { + // Locate the first row group containing range.start. + auto it = std::upper_bound(indicesRGStarts.begin(), indicesRGStarts.end(), range.start); + DASSERT(it != indicesRGStarts.begin()); + --it; + const uint64_t startRG = static_cast(std::distance(indicesRGStarts.begin(), it)); + + // Collect all row groups covering [range.start, range.end). + std::vector rowGroups; + for (uint64_t rg = startRG; rg + 1 < indicesRGStarts.size(); ++rg) { + rowGroups.push_back(rg); + if (indicesRGStarts[rg + 1] >= range.end) { + break; + } + } + iceState.indicesReader->initializeScan(*iceState.indicesScanState, rowGroups, vfs); + + uint64_t batchStart = indicesRGStarts[startRG]; + uint64_t count = 0; + uint64_t nextEdgePos = range.end; // default: node fully scanned + bool done = false; + + while (!done) { + // Reset selSize before each scanInternal call: on a row-group transition scanInternal + // returns true without writing data and without updating selSize, so the stale value + // from the previous batch would otherwise be misread as real data. + iceState.scanBatch->state->getSelVectorUnsafe().setSelSize(0); + if (!iceState.indicesReader->scanInternal(*iceState.indicesScanState, + *iceState.scanBatch)) { + break; + } + const auto& batchSel = iceState.scanBatch->state->getSelVector(); + const auto batchSize = batchSel.getSelSize(); + const auto& batch = *iceState.scanBatch; + + for (uint64_t i = 0; i < batchSize; ++i) { + const uint64_t globalRow = batchStart + i; + if (globalRow < range.start) { + continue; + } + if (globalRow >= range.end) { + done = true; + break; + } + + const auto physIdx = batchSel[static_cast(i)]; + const auto destOffset = batch.getValueVector(0).getValue(physIdx); + + if (isFwd) { + if (!state.outputVectors.empty()) { + state.outputVectors[0]->setValue(count, internalID_t{destOffset, nbrTableID}); + } + } else { + if (destOffset != nodeOffset) { + continue; + } + if (!state.outputVectors.empty()) { + state.outputVectors[0]->setValue(count, + internalID_t{findSourceNodeForRow(globalRow), nbrTableID}); + } + } + + for (uint32_t col = 1; + col < batch.getNumValueVectors() && col < state.outputVectors.size(); ++col) { + const auto& vec = batch.getValueVector(col); + if (vec.isNull(physIdx)) { + state.outputVectors[col]->setNull(count, true); + } else { + state.outputVectors[col]->copyFromValue(count, *vec.getAsValue(physIdx)); + } + } + + if (++count >= DEFAULT_VECTOR_CAPACITY) { + // Node has more edges; resume from the next global row on the next call. + nextEdgePos = globalRow + 1; + done = true; + break; + } + } + batchStart += batchSize; + } + + return {count, nextEdgePos}; +} + +common::row_idx_t IceDiskRelTable::getNumTotalRows(const Transaction* transaction) { + auto context = transaction->getClientContext(); + auto resolvedPath = VirtualFileSystem::resolvePath(context, indicesFilePath); + processor::ParquetReader reader(resolvedPath, std::vector(), context); + return reader.getMetadata()->num_rows; +} + +void IceDiskRelTable::loadIndptrData(Transaction* transaction) { + indptrData.clear(); + + auto context = transaction->getClientContext(); + auto* vfs = VirtualFileSystem::GetUnsafe(*context); + auto resolvedPath = VirtualFileSystem::resolvePath(context, indptrFilePath); + auto reader = + std::make_unique(resolvedPath, std::vector(), context); + processor::ParquetReaderScanState scanState; + + std::vector groupsToRead; + for (uint64_t i = 0; i < reader->getMetadata()->row_groups.size(); ++i) { + groupsToRead.push_back(i); + } + reader->initializeScan(scanState, groupsToRead, vfs); + + if (reader->getNumColumns() == 0) { + throw RuntimeException("Indptr parquet file has no columns"); + } + if (!LogicalTypeUtils::isIntegral(reader->getColumnType(0).getLogicalTypeID())) { + throw RuntimeException("Indptr parquet file column must be integer type"); + } + + DataChunk chunk(1); + chunk.insert(0, std::make_shared(reader->getColumnType(0).copy())); + + while (true) { + // Reset selSize before each call so row-group transition calls (which return true + // without updating selSize) are not mistaken for a stale data batch. + chunk.state->getSelVectorUnsafe().setSelSize(0); + if (!reader->scanInternal(scanState, chunk)) { + break; + } + auto& sel = chunk.state->getSelVector(); + for (size_t i = 0; i < sel.getSelSize(); ++i) { + indptrData.push_back(chunk.getValueVector(0).getValue(sel[i])); + } + } +} + +void IceDiskRelTable::loadIndicesMetadata(Transaction* transaction) { + indicesRGStarts.clear(); + auto context = transaction->getClientContext(); + auto resolvedPath = VirtualFileSystem::resolvePath(context, indicesFilePath); + processor::ParquetReader reader(resolvedPath, std::vector(), context); + uint64_t cumulative = 0; + for (const auto& rg : reader.getMetadata()->row_groups) { + indicesRGStarts.push_back(cumulative); + cumulative += static_cast(rg.num_rows); + } + indicesRGStarts.push_back(cumulative); // sentinel = total edge count +} + +void IceDiskRelTable::copyCachedBoundNodeSelVector(RelTableScanState& relScanState) const { + if (relScanState.nodeIDVector->state->getSelVector().isUnfiltered()) { + relScanState.cachedBoundNodeSelVector.setToUnfiltered(); + } else { + relScanState.cachedBoundNodeSelVector.setToFiltered(); + memcpy(relScanState.cachedBoundNodeSelVector.getMutableBuffer().data(), + relScanState.nodeIDVector->state->getSelVector().getMutableBuffer().data(), + relScanState.nodeIDVector->state->getSelVector().getSelSize() * sizeof(sel_t)); + } + relScanState.cachedBoundNodeSelVector.setSelSize( + relScanState.nodeIDVector->state->getSelVector().getSelSize()); +} + +std::size_t IceDiskRelTable::findSourceNodeForRow(std::size_t globalRowIdx) const { + auto it = std::upper_bound(indptrData.begin(), indptrData.end(), globalRowIdx); + DASSERT(it != indptrData.begin()); + --it; + return static_cast(std::distance(indptrData.begin(), it)); +} + +} // namespace storage +} // namespace lbug diff --git a/src/storage/table/parquet_node_table.cpp b/src/storage/table/parquet_node_table.cpp index bd86f4e0df..f632e1ca32 100644 --- a/src/storage/table/parquet_node_table.cpp +++ b/src/storage/table/parquet_node_table.cpp @@ -98,7 +98,7 @@ common::node_group_idx_t ParquetNodeTable::getNumBatches(const Transaction* tran try { auto resolvedPath = VirtualFileSystem::resolvePath(context, parquetFilePath); auto tempReader = std::make_unique(resolvedPath, columnSkips, context); - return tempReader->getNumRowsGroups(); + return tempReader->getNumRowGroups(); } catch (const std::exception& e) { return 1; // Fallback } diff --git a/src/storage/table/parquet_rel_table.cpp b/src/storage/table/parquet_rel_table.cpp index d9e01b0087..a0bf775c9f 100644 --- a/src/storage/table/parquet_rel_table.cpp +++ b/src/storage/table/parquet_rel_table.cpp @@ -107,7 +107,7 @@ void ParquetRelTable::initScanState(Transaction* transaction, TableScanState& sc // For now, assign all row groups to this scan state (will be partitioned by the scan operator) parquetRelScanState.startRowGroup = 0; parquetRelScanState.endRowGroup = parquetRelScanState.indicesReader ? - parquetRelScanState.indicesReader->getNumRowsGroups() : + parquetRelScanState.indicesReader->getNumRowGroups() : 0; parquetRelScanState.currentRowGroup = parquetRelScanState.startRowGroup; parquetRelScanState.nextRowToProcess = 0; @@ -149,7 +149,7 @@ void ParquetRelTable::loadIndptrData(Transaction* transaction) const { auto context = transaction->getClientContext(); auto vfs = VirtualFileSystem::GetUnsafe(*context); std::vector groupsToRead; - for (uint64_t i = 0; i < indptrReader->getNumRowsGroups(); ++i) { + for (uint64_t i = 0; i < indptrReader->getNumRowGroups(); ++i) { groupsToRead.push_back(i); } diff --git a/test/include/test_runner/test_group.h b/test/include/test_runner/test_group.h index b0ba4663cd..c956e46f09 100644 --- a/test/include/test_runner/test_group.h +++ b/test/include/test_runner/test_group.h @@ -116,7 +116,8 @@ struct TestGroup { LBUG, JSON, CSV_TO_JSON, - GRAPH_STD + GRAPH_STD, + ICE_DISK }; DatasetType datasetType; diff --git a/test/test_files/demo_db/demo_db_graph_std.test b/test/test_files/demo_db/demo_db_graph_std.test index 7917b89fa7..a726462e6b 100644 --- a/test/test_files/demo_db/demo_db_graph_std.test +++ b/test/test_files/demo_db/demo_db_graph_std.test @@ -1,4 +1,4 @@ --DATASET GRAPH-STD demo-db/icebug-disk +-DATASET GRAPH-STD demo-db/graph-std -- @@ -75,3 +75,205 @@ Adam|Karissa|40 ---- 2 Adam|Karissa Adam|Zhang + +-LOG CountStarUsers +-STATEMENT MATCH (u:user) RETURN count(*); +---- 1 +4 + +-LOG CountStarEdgesFollows +-STATEMENT MATCH ()-[:follows]->() RETURN count(*); +---- 1 +4 + +-LOG CountStarEdgesLivesIn +-STATEMENT MATCH ()-[:livesin]->() RETURN count(*); +---- 1 +4 + +-LOG AggregateAvgAge +-STATEMENT MATCH (u:user) RETURN avg(u.age); +---- 1 +36.250000 + +-LOG AggregateMinMaxAge +-STATEMENT MATCH (u:user) RETURN min(u.age), max(u.age); +---- 1 +25|50 + +-LOG AggregateSumSince +-STATEMENT MATCH ()-[f:follows]->() RETURN sum(f.since); +---- 1 +8083 + +-LOG AggregateAvgFolloweeAge +-STATEMENT MATCH (u:user)-[:follows]->(v) RETURN u.name, avg(v.age) ORDER BY u.name; +---- 3 +Adam|45.000000 +Karissa|50.000000 +Zhang|25.000000 + +-LOG OutDegreePerUser +-STATEMENT MATCH (u:user)-[:follows]->(v) RETURN u.name, count(v) ORDER BY u.name; +---- 3 +Adam|2 +Karissa|1 +Zhang|1 + +-LOG TwoHopForward +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN a.name, b.name, c.name ORDER BY b.name; +---- 2 +Adam|Karissa|Zhang +Adam|Zhang|Noura + +-LOG TwoHopCrossRel +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:livesin]->(c:city) RETURN b.name, c.name ORDER BY b.name; +---- 2 +Karissa|Waterloo +Zhang|Kitchener + +-LOG TwoHopCount +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN count(c); +---- 1 +2 + +-LOG BackwardInNeighbors +-STATEMENT MATCH (u)<-[:follows]-(v) WHERE u.id = 300 RETURN v.name ORDER BY v.name; +---- 2 +Adam +Karissa + +-LOG BackwardInDegree +-STATEMENT MATCH (u)<-[:follows]-(v) WHERE u.id = 300 RETURN count(v); +---- 1 +2 + +-LOG OptionalMatchFollows +-STATEMENT MATCH (u:user) OPTIONAL MATCH (u)-[:follows]->(v:user) RETURN u.name, v.name ORDER BY u.name, v.name; +---- 5 +Adam|Karissa +Adam|Zhang +Karissa|Zhang +Noura| +Zhang|Noura + +-LOG UndirectedFollows +-STATEMENT MATCH (a:user)-[:follows]-(b:user) RETURN count(*); +---- 1 +8 + +-LOG UndirectedLivesIn +-STATEMENT MATCH (a:user)-[:livesin]-(c:city) RETURN a.name, c.name ORDER BY a.name; +---- 4 +Adam|Waterloo +Karissa|Waterloo +Noura|Guelph +Zhang|Kitchener + +-LOG FilterRelProperty +-STATEMENT MATCH (a:user)-[e:follows {since: 2020}]->(b:user) RETURN a.name, b.name ORDER BY b.name; +---- 2 +Adam|Karissa +Adam|Zhang + +-LOG ReturnDistinctSince +-STATEMENT MATCH (a:user)-[e:follows]->(b:user) RETURN DISTINCT e.since ORDER BY e.since; +---- 3 +2020 +2021 +2022 + +-LOG CountDistinctId +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN count(DISTINCT c.id); +---- 1 +2 + +-LOG SkipRows +-STATEMENT MATCH (u:user) RETURN u.name ORDER BY u.age SKIP 2; +---- 2 +Karissa +Zhang + +-LOG WithPipeline +-STATEMENT MATCH (u:user) WITH avg(u.age) AS avgAge MATCH (b:user) WHERE b.age > avgAge RETURN b.name ORDER BY b.name; +---- 2 +Karissa +Zhang + +-LOG UnionAll +-STATEMENT MATCH (u:user)-[:livesin]->(c:city) WHERE c.name = 'Waterloo' RETURN u.name UNION ALL MATCH (u2:user)-[:livesin]->(c2:city) WHERE c2.name = 'Kitchener' RETURN u2.name; +---- 3 +Zhang +Adam +Karissa + +-LOG UnionAllAge +-STATEMENT MATCH (u:user)-[:follows]->(v:user) WHERE v.name = 'Zhang' RETURN u.age UNION ALL MATCH (u2:user)-[:follows]->(v2:user) WHERE v2.name = 'Karissa' RETURN u2.age; +---- 3 +30 +40 +30 + +-LOG UnionDistinct +-STATEMENT MATCH (u:user)-[:follows]->(v:user) WHERE v.name = 'Zhang' RETURN u.age UNION MATCH (u2:user)-[:follows]->(v2:user) WHERE v2.name = 'Karissa' RETURN u2.age; +---- 2 +30 +40 + +-LOG Unwind1 +-STATEMENT UNWIND ['Amy', 'Bob', 'Carol'] AS x RETURN 'name' AS label, x; +---- 3 +name|Amy +name|Bob +name|Carol + +-LOG Unwind2 +-STATEMENT UNWIND [['Amy'], ['Bob', 'Carol']] AS x RETURN x; +---- 2 +[Amy] +[Bob,Carol] + +-LOG WhereOR +-STATEMENT MATCH (a:user) WHERE a.age > 45 OR starts_with(a.name, 'Kar') RETURN a.name ORDER BY a.name; +---- 2 +Karissa +Zhang + +-LOG WhereIsNotNull +-STATEMENT MATCH (a:user) WHERE a.age IS NOT NULL AND starts_with(a.name, 'Kar') RETURN a.name; +---- 1 +Karissa + +-LOG WhereExists +-STATEMENT MATCH (u:user) WHERE EXISTS { MATCH (u)-[:follows]->(v) } RETURN u.name ORDER BY u.name; +---- 3 +Adam +Karissa +Zhang + +-LOG WhereExistsWithFilter +-STATEMENT MATCH (a:user) WHERE EXISTS { MATCH (a)-[:follows]->(b:user) WHERE b.age > 45 } RETURN a.name ORDER BY a.name; +---- 2 +Adam +Karissa + +-LOG CyclicTriangle +-STATEMENT MATCH (a:user)-[:follows]->(b:user)-[:follows]->(c:user), (a)-[:follows]->(c) RETURN a.name, b.name, c.name; +---- 1 +Adam|Karissa|Zhang + +-LOG VarLenOneToTwo +-STATEMENT MATCH (a:user)-[:follows*1..2]->(b:user) RETURN a.name, b.name ORDER BY a.name, b.name; +---- 7 +Adam|Karissa +Adam|Noura +Adam|Zhang +Adam|Zhang +Karissa|Noura +Karissa|Zhang +Zhang|Noura + +-LOG VarLenThreeHop +-STATEMENT MATCH (a:user)-[:follows*3..3]->(b:user) RETURN a.name, b.name ORDER BY a.name, b.name; +---- 1 +Adam|Noura diff --git a/test/test_files/demo_db/demo_db_ice_disk.test b/test/test_files/demo_db/demo_db_ice_disk.test new file mode 100644 index 0000000000..c673d64967 --- /dev/null +++ b/test/test_files/demo_db/demo_db_ice_disk.test @@ -0,0 +1,279 @@ +-DATASET ICEBUG-DISK demo-db/icebug-disk + +-- + +-CASE DemoDBIceDiskTest + +-LOG MatchUserLivesInCity +-STATEMENT MATCH (u:user)-[l:livesin]->(c:city) RETURN u.name, u.age, c.name; +---- 4 +Adam|30|Waterloo +Karissa|40|Waterloo +Zhang|50|Kitchener +Noura|25|Guelph + +-LOG MatchSingleNodeLabel +-STATEMENT MATCH (a:user) RETURN a.name, a.age; +---- 4 +Adam|30 +Karissa|40 +Zhang|50 +Noura|25 + +-LOG MatchCityNodes +-STATEMENT MATCH (c:city) RETURN c.name, c.population; +---- 3 +Waterloo|150000 +Kitchener|200000 +Guelph|75000 + +-LOG MatchFollowsRel +-STATEMENT MATCH (a:user)-[e:follows]->(b:user) RETURN a.name, b.name, e.since; +---- 4 +Adam|Karissa|2020 +Adam|Zhang|2020 +Karissa|Zhang|2021 +Zhang|Noura|2022 + +-LOG MatchLivesInWithCityPopulation +-STATEMENT MATCH (u:user)-[l:livesin]->(c:city) RETURN u.name, c.name, c.population ORDER BY c.population DESC; +---- 4 +Zhang|Kitchener|200000 +Adam|Waterloo|150000 +Karissa|Waterloo|150000 +Noura|Guelph|75000 + +-LOG MatchLivesInFilterByCity +-STATEMENT MATCH (u:user)-[l:livesin]->(c:city) WHERE c.name = 'Waterloo' RETURN u.name, u.age; +---- 2 +Adam|30 +Karissa|40 + +-LOG MatchLivesInFilterByCityPopulation +-STATEMENT MATCH (u:user)-[l:livesin]->(c:city) WHERE c.population > 100000 RETURN u.name, c.name ORDER BY u.name; +---- 3 +Adam|Waterloo +Karissa|Waterloo +Zhang|Kitchener + +-LOG CountUsersPerCity +-STATEMENT MATCH (u:user)-[l:livesin]->(c:city) RETURN c.name, COUNT(*) AS num_users ORDER BY num_users DESC; +---- 3 +Waterloo|2 +Guelph|1 +Kitchener|1 + +-LOG MatchFollowsWithDestinationAge +-STATEMENT MATCH (a:user)-[e:follows]->(b:user) WHERE b.age > 30 RETURN a.name, b.name, b.age ORDER BY b.age DESC; +---- 3 +Adam|Zhang|50 +Karissa|Zhang|50 +Adam|Karissa|40 + +-LOG MatchFollowsFilterBySourceAndDest +-STATEMENT MATCH (a:user)-[e:follows]->(b:user) WHERE a.age < 40 AND b.age >= 40 RETURN a.name, b.name; +---- 2 +Adam|Karissa +Adam|Zhang + +-LOG CountStarUsers +-STATEMENT MATCH (u:user) RETURN count(*); +---- 1 +4 + +-LOG CountStarEdgesFollows +-STATEMENT MATCH ()-[:follows]->() RETURN count(*); +---- 1 +4 + +-LOG CountStarEdgesLivesIn +-STATEMENT MATCH ()-[:livesin]->() RETURN count(*); +---- 1 +4 + +-LOG AggregateAvgAge +-STATEMENT MATCH (u:user) RETURN avg(u.age); +---- 1 +36.250000 + +-LOG AggregateMinMaxAge +-STATEMENT MATCH (u:user) RETURN min(u.age), max(u.age); +---- 1 +25|50 + +-LOG AggregateSumSince +-STATEMENT MATCH ()-[f:follows]->() RETURN sum(f.since); +---- 1 +8083 + +-LOG AggregateAvgFolloweeAge +-STATEMENT MATCH (u:user)-[:follows]->(v) RETURN u.name, avg(v.age) ORDER BY u.name; +---- 3 +Adam|45.000000 +Karissa|50.000000 +Zhang|25.000000 + +-LOG OutDegreePerUser +-STATEMENT MATCH (u:user)-[:follows]->(v) RETURN u.name, count(v) ORDER BY u.name; +---- 3 +Adam|2 +Karissa|1 +Zhang|1 + +-LOG TwoHopForward +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN a.name, b.name, c.name ORDER BY b.name; +---- 2 +Adam|Karissa|Zhang +Adam|Zhang|Noura + +-LOG TwoHopCrossRel +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:livesin]->(c:city) RETURN b.name, c.name ORDER BY b.name; +---- 2 +Karissa|Waterloo +Zhang|Kitchener + +-LOG TwoHopCount +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN count(c); +---- 1 +2 + +-LOG BackwardInNeighbors +-STATEMENT MATCH (u)<-[:follows]-(v) WHERE u.id = 300 RETURN v.name ORDER BY v.name; +---- 2 +Adam +Karissa + +-LOG BackwardInDegree +-STATEMENT MATCH (u)<-[:follows]-(v) WHERE u.id = 300 RETURN count(v); +---- 1 +2 + +-LOG OptionalMatchFollows +-STATEMENT MATCH (u:user) OPTIONAL MATCH (u)-[:follows]->(v:user) RETURN u.name, v.name ORDER BY u.name, v.name; +---- 5 +Adam|Karissa +Adam|Zhang +Karissa|Zhang +Noura| +Zhang|Noura + +-LOG UndirectedFollows +-STATEMENT MATCH (a:user)-[:follows]-(b:user) RETURN count(*); +---- 1 +8 + +-LOG UndirectedLivesIn +-STATEMENT MATCH (a:user)-[:livesin]-(c:city) RETURN a.name, c.name ORDER BY a.name; +---- 4 +Adam|Waterloo +Karissa|Waterloo +Noura|Guelph +Zhang|Kitchener + +-LOG FilterRelProperty +-STATEMENT MATCH (a:user)-[e:follows {since: 2020}]->(b:user) RETURN a.name, b.name ORDER BY b.name; +---- 2 +Adam|Karissa +Adam|Zhang + +-LOG ReturnDistinctSince +-STATEMENT MATCH (a:user)-[e:follows]->(b:user) RETURN DISTINCT e.since ORDER BY e.since; +---- 3 +2020 +2021 +2022 + +-LOG CountDistinctId +-STATEMENT MATCH (a:user {id: 100})-[:follows]->(b)-[:follows]->(c) RETURN count(DISTINCT c.id); +---- 1 +2 + +-LOG CyclicTriangle +-STATEMENT MATCH (a:user)-[:follows]->(b:user)-[:follows]->(c:user), (a)-[:follows]->(c) RETURN a.name, b.name, c.name; +---- 1 +Adam|Karissa|Zhang + +-LOG SkipRows +-STATEMENT MATCH (u:user) RETURN u.name ORDER BY u.age SKIP 2; +---- 2 +Karissa +Zhang + +-LOG WithPipeline +-STATEMENT MATCH (u:user) WITH avg(u.age) AS avgAge MATCH (b:user) WHERE b.age > avgAge RETURN b.name ORDER BY b.name; +---- 2 +Karissa +Zhang + +-LOG UnionAll +-STATEMENT MATCH (u:user)-[:livesin]->(c:city) WHERE c.name = 'Waterloo' RETURN u.name UNION ALL MATCH (u2:user)-[:livesin]->(c2:city) WHERE c2.name = 'Kitchener' RETURN u2.name; +---- 3 +Zhang +Adam +Karissa + +-LOG UnionAllAge +-STATEMENT MATCH (u:user)-[:follows]->(v:user) WHERE v.name = 'Zhang' RETURN u.age UNION ALL MATCH (u2:user)-[:follows]->(v2:user) WHERE v2.name = 'Karissa' RETURN u2.age; +---- 3 +30 +40 +30 + +-LOG UnionDistinct +-STATEMENT MATCH (u:user)-[:follows]->(v:user) WHERE v.name = 'Zhang' RETURN u.age UNION MATCH (u2:user)-[:follows]->(v2:user) WHERE v2.name = 'Karissa' RETURN u2.age; +---- 2 +30 +40 + +-LOG Unwind1 +-STATEMENT UNWIND ['Amy', 'Bob', 'Carol'] AS x RETURN 'name' AS label, x; +---- 3 +name|Amy +name|Bob +name|Carol + +-LOG Unwind2 +-STATEMENT UNWIND [['Amy'], ['Bob', 'Carol']] AS x RETURN x; +---- 2 +[Amy] +[Bob,Carol] + +-LOG WhereOR +-STATEMENT MATCH (a:user) WHERE a.age > 45 OR starts_with(a.name, 'Kar') RETURN a.name ORDER BY a.name; +---- 2 +Karissa +Zhang + +-LOG WhereIsNotNull +-STATEMENT MATCH (a:user) WHERE a.age IS NOT NULL AND starts_with(a.name, 'Kar') RETURN a.name; +---- 1 +Karissa + +-LOG WhereExists +-STATEMENT MATCH (u:user) WHERE EXISTS { MATCH (u)-[:follows]->(v) } RETURN u.name ORDER BY u.name; +---- 3 +Adam +Karissa +Zhang + +-LOG WhereExistsWithFilter +-STATEMENT MATCH (a:user) WHERE EXISTS { MATCH (a)-[:follows]->(b:user) WHERE b.age > 45 } RETURN a.name ORDER BY a.name; +---- 2 +Adam +Karissa + +-LOG VarLenOneToTwo +-STATEMENT MATCH (a:user)-[:follows*1..2]->(b:user) RETURN a.name, b.name ORDER BY a.name, b.name; +---- 7 +Adam|Karissa +Adam|Noura +Adam|Zhang +Adam|Zhang +Karissa|Noura +Karissa|Zhang +Zhang|Noura + +-LOG VarLenThreeHop +-STATEMENT MATCH (a:user)-[:follows*3..3]->(b:user) RETURN a.name, b.name ORDER BY a.name, b.name; +---- 1 +Adam|Noura diff --git a/test/test_helper/test_helper.cpp b/test/test_helper/test_helper.cpp index 6155b1faed..5fb1a02b98 100644 --- a/test/test_helper/test_helper.cpp +++ b/test/test_helper/test_helper.cpp @@ -45,6 +45,8 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn } std::string line; while (getline(file, line)) { + // replace single quote with double + std::replace(line.begin(), line.end(), '\'', '"'); // If this is a COPY statement, we need to append the LBUG_ROOT_DIRECTORY to the csv // file path. There maybe multiple csv files in the line, so we need to find all of them. std::vector csvFilePaths; @@ -86,16 +88,15 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn fullPath = normalizePathForCypher(std::move(fullPath)); line.replace(line.find(csvFilePath), csvFilePath.length(), fullPath); } - // Also handle storage = 'path' for parquet tables std::vector storagePaths; size_t storageIndex = 0; while (true) { - size_t start = line.find("storage = '", storageIndex); + size_t start = line.find("storage = \"", storageIndex); if (start == std::string::npos) { break; } - start += 11; // length of "storage = '" - size_t end = line.find("'", start); + start += 11; // length of "storage = "" + size_t end = line.find('"', start); if (end == std::string::npos) { break; } @@ -104,6 +105,35 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn storageIndex = end + 1; } for (auto& storagePath : storagePaths) { + static constexpr std::string_view iceBugPrefix = "icebug-disk"; + + if (storagePath.starts_with(iceBugPrefix)) { + // Strip "icebug-disk" prefix and optional ':' separator. + std::string basePath = storagePath.substr(iceBugPrefix.size()); + if (!basePath.empty() && basePath[0] == ':') { + basePath = basePath.substr(1); + } + // Resolve empty or relative paths relative to the schema's directory. + // Absolute paths and object-store URLs (contain "://") are left unchanged. + if (basePath.empty()) { + basePath = normalizePathForCypher(cypherDir.string()); + } else if (basePath.find("://") == std::string::npos && + std::filesystem::path(basePath).is_relative()) { + basePath = normalizePathForCypher((cypherDir / basePath).string()); + } + std::string resolvedStorage = std::string(iceBugPrefix) + ":" + basePath; + size_t pos = line.find(storagePath); + if (pos != std::string::npos) { + line.replace(pos, storagePath.length(), resolvedStorage); + } + continue; + } + + if (storagePath.find("://") != std::string::npos) { + // Non-icebug-disk URI — do not modify. + continue; + } + auto fullPath = storagePath; if (std::filesystem::path(storagePath).is_relative()) { if (std::filesystem::path(storagePath).parent_path().empty()) { diff --git a/test/test_runner/test_parser.cpp b/test/test_runner/test_parser.cpp index dea0ef047d..84e48438a4 100644 --- a/test/test_runner/test_parser.cpp +++ b/test/test_runner/test_parser.cpp @@ -91,6 +91,9 @@ void TestParser::extractDataset() { } else if (datasetType == "GRAPH-STD") { testGroup->datasetType = TestGroup::DatasetType::GRAPH_STD; testGroup->dataset = currentToken.params[2]; + } else if (datasetType == "ICEBUG-DISK") { + testGroup->datasetType = TestGroup::DatasetType::ICE_DISK; + testGroup->dataset = currentToken.params[2]; } else { throw TestException( "Invalid dataset type `" + currentToken.params[1] + "` [" + path + ":" + line + "].");