diff --git a/Cargo.toml b/Cargo.toml index 5df714a..a4b7dc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bf-tree" -version = "0.4.8" +version = "0.4.9" edition = "2021" license = "MIT" description = "Bf-Tree is a modern read-write-optimized concurrent larger-than-memory range index in Rust from Microsoft Research." diff --git a/src/config.rs b/src/config.rs index 52f3145..d439c45 100644 --- a/src/config.rs +++ b/src/config.rs @@ -339,6 +339,11 @@ impl Config { self.leaf_page_size } + /// Returns `true` if the storage backend is in-memory (no file-backed storage). + pub fn is_memory_backend(&self) -> bool { + self.storage_backend == StorageBackend::Memory + } + /// Validate the configuration and report any invalid parameter, if found. pub fn validate(&self) -> Result<(), ConfigError> { // Sanity check of the input parameters diff --git a/src/snapshot.rs b/src/snapshot.rs index bfc9b03..5579261 100644 --- a/src/snapshot.rs +++ b/src/snapshot.rs @@ -20,13 +20,18 @@ use crate::{ circular_buffer::{CircularBuffer, TombstoneHandle}, error::ConfigError, fs::VfsImpl, - nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE}, + mini_page_op::LeafOperations, + nodes::{ + leaf_node::{LeafNode, OpType}, + InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE, + }, + range_scan::ScanReturnField, storage::{make_vfs, LeafStorage, PageLocation, PageTable}, sync::atomic::AtomicU64, tree::eviction_callback, - utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo}, + utils::{inner_lock::ReadGuard, stats::LeafStats, BfsVisitor, NodeInfo}, wal::{LogEntry, LogEntryImpl, WriteAheadLog}, - BfTree, Config, WalReader, + BfTree, Config, StorageBackend, WalReader, }; const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN"; @@ -291,6 +296,168 @@ impl BfTree { self.storage.vfs.flush(); self.config.file_path.clone() } + + /// Snapshot an in-memory Bf-Tree to a file on disk. + /// + /// This works by scanning all live records from the in-memory tree and inserting + /// them one-by-one into a new file-backed tree, then calling [`snapshot`] on it. + /// Records are streamed without buffering the entire dataset in memory. + /// + /// For `cache_only` trees (where scan is not supported), falls back to collecting + /// records via BFS traversal before inserting. + /// + /// Returns the snapshot file path. + /// + /// # Panics + /// Panics if `snapshot_path` already exists. + pub fn snapshot_memory_to_disk(&self, snapshot_path: impl AsRef) -> PathBuf { + let snapshot_path = snapshot_path.as_ref(); + assert!( + !snapshot_path.exists(), + "snapshot_memory_to_disk: target file already exists: {:?}", + snapshot_path + ); + + // Build a disk-backed config from the current config. + let mut disk_config = self.config.as_ref().clone(); + disk_config.storage_backend(StorageBackend::Std); + disk_config.cache_only(false); + disk_config.file_path(snapshot_path); + + let disk_tree = BfTree::with_config(disk_config, None) + .expect("Failed to create disk-backed BfTree for snapshot"); + + if self.cache_only { + // cache_only mode does not support scan, so fall back to BFS traversal. + Self::copy_records_via_traversal(self, &disk_tree); + } else { + // Use the scan operator to stream records directly into the disk tree + // without buffering the entire dataset in memory. + Self::copy_records_via_scan(self, &disk_tree); + } + + disk_tree.snapshot() + } + + /// Copy all live records from `src` to `dst` using the scan operator. + /// This streams one record at a time, avoiding large intermediate allocations. + fn copy_records_via_scan(src: &BfTree, dst: &BfTree) { + // Allocate a single reusable buffer large enough for any key+value. + let buf_size = src.config.leaf_page_size; + let mut scan_buf = vec![0u8; buf_size]; + + // Start scanning from the smallest possible key (a single 0x00 byte). + let start_key: &[u8] = &[0u8]; + let mut scan_iter = + match src.scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue) { + Ok(iter) => iter, + Err(_) => return, // empty tree or other issue + }; + + while let Some((key_len, value_len)) = scan_iter.next(&mut scan_buf) { + let key = &scan_buf[..key_len]; + let value = &scan_buf[key_len..key_len + value_len]; + dst.insert(key, value); + } + } + + /// Copy all live records from `src` to `dst` by traversing leaf nodes directly. + /// Used as a fallback when scan is not available (e.g. cache_only mode). + fn copy_records_via_traversal(src: &BfTree, dst: &BfTree) { + let visitor = BfsVisitor::new_all_nodes(src); + for node_info in visitor { + match node_info { + NodeInfo::Leaf { page_id, .. } => { + let mut leaf_entry = src.mapping_table().get_mut(&page_id); + let page_loc = leaf_entry.get_page_location().clone(); + match page_loc { + PageLocation::Mini(ptr) if src.cache_only => { + let leaf_node: &LeafNode = unsafe { &*ptr }; + for kv_meta in leaf_node.meta_iter() { + match kv_meta.op_type() { + OpType::Insert | OpType::Cache => { + let key = leaf_node.get_full_key(kv_meta); + let value = leaf_node.get_value(kv_meta); + dst.insert(&key, value); + } + OpType::Delete | OpType::Phantom => {} + } + } + } + PageLocation::Mini(_) | PageLocation::Full(_) | PageLocation::Base(_) => { + let stats = leaf_entry.get_stats(); + Self::insert_live_records(&stats, dst); + if let Some(base_stats) = &stats.base_node { + Self::insert_live_records(base_stats, dst); + } + } + PageLocation::Null => {} + } + } + NodeInfo::Inner { .. } => {} + } + } + } + + /// Insert live (non-deleted) records from `LeafStats` directly into a target tree. + fn insert_live_records(stats: &LeafStats, dst: &BfTree) { + for (i, key) in stats.keys.iter().enumerate() { + match stats.op_types[i] { + OpType::Insert | OpType::Cache => { + dst.insert(key, &stats.values[i]); + } + OpType::Delete | OpType::Phantom => {} + } + } + } + + /// Load an on-disk Bf-Tree snapshot and return a new in-memory (cache_only) Bf-Tree + /// containing all its records. + /// + /// The caller provides a `Config` that controls the in-memory tree's parameters + /// (circular buffer size, record sizes, leaf page size, etc.). + /// `storage_backend` and `cache_only` are overridden automatically. + /// + /// # Arguments + /// * `snapshot_path` – path to an existing snapshot file on disk. + /// * `memory_config` – configuration for the resulting in-memory tree. + /// + /// # Panics + /// Panics if the snapshot file does not exist. + pub fn new_from_snapshot_disk_to_memory( + snapshot_path: impl AsRef, + memory_config: Config, + ) -> Result { + let snapshot_path = snapshot_path.as_ref(); + assert!( + snapshot_path.exists(), + "new_from_snapshot_disk_to_memory: snapshot file does not exist: {:?}", + snapshot_path + ); + + // Step 1: Open the on-disk snapshot as a normal disk-backed tree. + let mut disk_config = memory_config.clone(); + disk_config.storage_backend(StorageBackend::Std); + disk_config.cache_only(false); + disk_config.file_path(snapshot_path); + + let disk_tree = BfTree::new_from_snapshot(disk_config, None)?; + + // Step 2: Create the in-memory tree. + // Use Memory backend with cache_only=false so that evicted pages + // are stored as heap-backed base pages (no data loss). + let mut mem_config = memory_config; + mem_config.storage_backend(StorageBackend::Memory); + mem_config.cache_only(false); + + let mem_tree = BfTree::with_config(mem_config, None)?; + + // Step 3: Stream records from the disk tree into the memory tree via scan. + // The disk tree is never cache_only, so scan is always available. + Self::copy_records_via_scan(&disk_tree, &mem_tree); + + Ok(mem_tree) + } } /// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better. @@ -441,4 +608,187 @@ mod tests { std::fs::remove_file(tmp_file_path).unwrap(); } + + #[test] + fn snapshot_memory_to_disk_roundtrip() { + let snapshot_path = std::path::PathBuf::from_str("target/test_mem_to_disk.bftree").unwrap(); + // Clean up in case a previous run left the file behind + let _ = std::fs::remove_file(&snapshot_path); + + let min_record_size: usize = 64; + let max_record_size: usize = 2048; + let leaf_page_size: usize = 8192; + + // Create an in-memory (non-cache_only) BfTree + let mut config = Config::new(":memory:", leaf_page_size * 16); + config.cb_min_record_size = min_record_size; + config.cb_max_record_size = max_record_size; + config.leaf_page_size = leaf_page_size; + config.max_fence_len = max_record_size; + + let bftree = BfTree::with_config(config.clone(), None).unwrap(); + + let key_len: usize = min_record_size / 2; + let record_cnt = 200; + let mut key_buffer = vec![0usize; key_len / 8]; + + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + bftree.insert(key, key); + } + + // Snapshot the in-memory tree to disk + let path = bftree.snapshot_memory_to_disk(&snapshot_path); + assert_eq!(path, snapshot_path); + assert!(snapshot_path.exists()); + drop(bftree); + + // Reload the snapshot into a disk-backed tree and verify all records + let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16); + disk_config.storage_backend(crate::StorageBackend::Std); + disk_config.cb_min_record_size = min_record_size; + disk_config.cb_max_record_size = max_record_size; + disk_config.leaf_page_size = leaf_page_size; + disk_config.max_fence_len = max_record_size; + + let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap(); + let mut out_buffer = vec![0u8; key_len]; + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + let result = loaded.read(key, &mut out_buffer); + match result { + LeafReadResult::Found(v) => { + assert_eq!(v as usize, key_len); + assert_eq!(&out_buffer[..key_len], key); + } + other => { + panic!("Key {r} not found, got: {:?}", other); + } + } + } + + std::fs::remove_file(snapshot_path).unwrap(); + } + + #[test] + fn snapshot_memory_to_disk_roundtrip_cache_only() { + let snapshot_path = + std::path::PathBuf::from_str("target/test_cache_to_disk.bftree").unwrap(); + let _ = std::fs::remove_file(&snapshot_path); + + let min_record_size: usize = 64; + let max_record_size: usize = 2048; + let leaf_page_size: usize = 8192; + + // Create a cache-only BfTree + let mut config = Config::new(":cache:", leaf_page_size * 16); + config.cb_min_record_size = min_record_size; + config.cb_max_record_size = max_record_size; + config.leaf_page_size = leaf_page_size; + config.max_fence_len = max_record_size; + + let bftree = BfTree::with_config(config.clone(), None).unwrap(); + + let key_len: usize = min_record_size / 2; + let record_cnt = 200; + let mut key_buffer = vec![0usize; key_len / 8]; + + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + bftree.insert(key, key); + } + + // Snapshot the cache-only tree to disk + let path = bftree.snapshot_memory_to_disk(&snapshot_path); + assert_eq!(path, snapshot_path); + assert!(snapshot_path.exists()); + drop(bftree); + + // Reload the snapshot into a disk-backed tree and verify all records + let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16); + disk_config.storage_backend(crate::StorageBackend::Std); + disk_config.cb_min_record_size = min_record_size; + disk_config.cb_max_record_size = max_record_size; + disk_config.leaf_page_size = leaf_page_size; + disk_config.max_fence_len = max_record_size; + + let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap(); + let mut out_buffer = vec![0u8; key_len]; + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + let result = loaded.read(key, &mut out_buffer); + match result { + LeafReadResult::Found(v) => { + assert_eq!(v as usize, key_len); + assert_eq!(&out_buffer[..key_len], key); + } + other => { + panic!("Key {r} not found, got: {:?}", other); + } + } + } + + std::fs::remove_file(snapshot_path).unwrap(); + } + + #[test] + fn snapshot_disk_to_memory_roundtrip() { + let snapshot_path = std::path::PathBuf::from_str("target/test_disk_to_mem.bftree").unwrap(); + let _ = std::fs::remove_file(&snapshot_path); + + let min_record_size: usize = 64; + let max_record_size: usize = 2048; + let leaf_page_size: usize = 8192; + let record_cnt: usize = 500; + + // Step 1: Build a disk-backed tree, populate it, snapshot to disk. + { + let mut config = Config::new(&snapshot_path, leaf_page_size * 16); + config.storage_backend(crate::StorageBackend::Std); + config.cb_min_record_size = min_record_size; + config.cb_max_record_size = max_record_size; + config.leaf_page_size = leaf_page_size; + config.max_fence_len = max_record_size; + + let tree = BfTree::with_config(config, None).unwrap(); + let key_len = min_record_size / 2; + let mut key_buffer = vec![0usize; key_len / 8]; + + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + tree.insert(key, key); + } + tree.snapshot(); + } + + // Step 2: Load the snapshot into an in-memory (cache_only) tree. + let mut mem_config = Config::new(":memory:", leaf_page_size * 16); + mem_config.cb_min_record_size = min_record_size; + mem_config.cb_max_record_size = max_record_size; + mem_config.leaf_page_size = leaf_page_size; + mem_config.max_fence_len = max_record_size; + + let mem_tree = + BfTree::new_from_snapshot_disk_to_memory(&snapshot_path, mem_config).unwrap(); + + // Step 3: Verify all records are present. + let key_len = min_record_size / 2; + let mut key_buffer = vec![0usize; key_len / 8]; + let mut out_buffer = vec![0u8; key_len]; + for r in 0..record_cnt { + let key = install_value_to_buffer(&mut key_buffer, r); + let result = mem_tree.read(key, &mut out_buffer); + match result { + LeafReadResult::Found(v) => { + assert_eq!(v as usize, key_len); + assert_eq!(&out_buffer[..key_len], key); + } + other => { + panic!("Key {r} not found, got: {:?}", other); + } + } + } + + std::fs::remove_file(snapshot_path).unwrap(); + } } diff --git a/src/tests/concurrent.rs b/src/tests/concurrent.rs index 2c6868d..ef1db26 100644 --- a/src/tests/concurrent.rs +++ b/src/tests/concurrent.rs @@ -56,8 +56,7 @@ fn concurrent_ops() { for _ in 0..config.thread_cnt { let tree_clone = bf_tree.clone(); let handle = thread::spawn(move || { - let mut buffer = Vec::with_capacity(4096); - unsafe { buffer.set_len(4096) }; + let mut buffer = vec![0u8; 4096]; let mut rng = thread_rng(); let current_tid = thread::current().id();