diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs index 4d4d7fb3e3..01c35a83b8 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs @@ -12,7 +12,7 @@ use crate::{ error::DatabaseError, key_selector::KeySelector, options::{ConflictRangeType, MutationType}, - tx_ops::Operation, + tx_ops::{Operation, range_begin_contains, range_end_contains}, value::{KeyValue, Slice, Values}, versionstamp::{ generate_versionstamp, substitute_raw_versionstamp, substitute_versionstamp_if_incomplete, @@ -438,17 +438,8 @@ impl TransactionTask { let txn = self.create_transaction(); let read_opts = ReadOptions::default(); - // Resolve the begin selector - let resolved_begin = - self.resolve_key_selector_for_range(&txn, &begin, begin_or_equal, begin_offset)?; - - // Resolve the end selector - let resolved_end = - self.resolve_key_selector_for_range(&txn, &end, end_or_equal, end_offset)?; - - // Now execute the range query with resolved keys let iter = txn.iterator_opt( - rocksdb::IteratorMode::From(&resolved_begin, rocksdb::Direction::Forward), + rocksdb::IteratorMode::From(&begin, rocksdb::Direction::Forward), read_opts, ); @@ -457,8 +448,12 @@ impl TransactionTask { for item in iter { let (k, v) = item.context("failed to iterate rocksdb for get range")?; - // Check if we've reached the end key - if k.as_ref() >= resolved_end.as_slice() { + + if !range_begin_contains(k.as_ref(), &begin, begin_or_equal, begin_offset) { + continue; + } + + if !range_end_contains(k.as_ref(), &end, end_or_equal, end_offset) { break; } @@ -477,64 +472,6 @@ impl TransactionTask { Ok(Values::new(results)) } - fn resolve_key_selector_for_range( - &self, - txn: &RocksDbTransaction, - key: &[u8], - or_equal: bool, - offset: i32, - ) -> Result> { - // Based on PostgreSQL's interpretation: - // (false, 1) => first_greater_or_equal - // (true, 1) => first_greater_than - // (false, 0) => last_less_than - // (true, 0) => last_less_or_equal - - let read_opts = ReadOptions::default(); - - match (or_equal, offset) { - (false, 1) => { - // first_greater_or_equal: find first key >= search_key - let iter = txn.iterator_opt( - rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward), - read_opts, - ); - for item in iter { - let (k, _v) = item.context( - "failed to iterate rocksdb for range selector first_greater_or_equal", - )?; - return Ok(k.to_vec()); - } - // If no key found, return a key that will make the range empty - Ok(vec![0xff; 255]) - } - (true, 1) => { - // first_greater_than: find first key > search_key - let iter = txn.iterator_opt( - rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward), - read_opts, - ); - for item in iter { - let (k, _v) = item.context( - "failed to iterate rocksdb for range selector first_greater_than", - )?; - // Skip if it's the exact key - if k.as_ref() == key { - continue; - } - return Ok(k.to_vec()); - } - // If no key found, return a key that will make the range empty - Ok(vec![0xff; 255]) - } - _ => { - // For other cases, just use the key as-is for now - // This is a simplification - full implementation would handle all cases - Ok(key.to_vec()) - } - } - } - async fn handle_get_estimated_range_size(&mut self, begin: &[u8], end: &[u8]) -> Result { let range = rocksdb::Range::new(begin, end); diff --git a/engine/packages/universaldb/src/tx_ops.rs b/engine/packages/universaldb/src/tx_ops.rs index 878ae168e8..1efdb2c040 100644 --- a/engine/packages/universaldb/src/tx_ops.rs +++ b/engine/packages/universaldb/src/tx_ops.rs @@ -342,9 +342,6 @@ impl TransactionOperations { return Ok(db_values); } - let begin = opt.begin.key(); - let end = opt.end.key(); - // Start with database results in a map let mut result_map = BTreeMap::new(); for kv in db_values.into_iter() { @@ -357,7 +354,7 @@ impl TransactionOperations { for op in &*self.operations() { match op { Operation::Set { key, value } => { - if key.as_slice() >= begin && key.as_slice() < end { + if range_contains(key.as_slice(), opt) { result_map.insert(key.clone(), value.clone()); } } @@ -382,7 +379,7 @@ impl TransactionOperations { param, op_type, } => { - if key.as_slice() >= begin && key.as_slice() < end { + if range_contains(key.as_slice(), opt) { // Get current value for this key (from result_map or empty if not exists) let current_value = result_map.get(key); let current_slice = current_value.map(|v| &**v); @@ -423,3 +420,32 @@ impl TransactionOperations { .push((begin.to_vec(), end.to_vec(), conflict_type)); } } + +fn range_contains(key: &[u8], opt: &RangeOption<'_>) -> bool { + range_begin_contains( + key, + opt.begin.key(), + opt.begin.or_equal(), + opt.begin.offset(), + ) && range_end_contains(key, opt.end.key(), opt.end.or_equal(), opt.end.offset()) +} + +pub(crate) fn range_begin_contains(key: &[u8], begin: &[u8], or_equal: bool, offset: i32) -> bool { + match (or_equal, offset) { + (false, 1) => key >= begin, + (true, 1) => key > begin, + (false, 0) => key > begin, + (true, 0) => key >= begin, + _ => key >= begin, + } +} + +pub(crate) fn range_end_contains(key: &[u8], end: &[u8], or_equal: bool, offset: i32) -> bool { + match (or_equal, offset) { + (false, 1) => key < end, + (true, 1) => key <= end, + (false, 0) => key < end, + (true, 0) => key <= end, + _ => key < end, + } +} diff --git a/engine/packages/universaldb/tests/integration.rs b/engine/packages/universaldb/tests/integration.rs index ef990a8146..bb927f3f3f 100644 --- a/engine/packages/universaldb/tests/integration.rs +++ b/engine/packages/universaldb/tests/integration.rs @@ -705,6 +705,41 @@ async fn test_range_options(db: &Database) { assert_eq!(results[2].key(), test_subspace.pack(&("range_e",))); assert_eq!(results[2].value(), b"val_e"); + // Test 5: local writes outside the range should not be merged into the result + let results = db + .run(|tx| async move { + let test_subspace = Subspace::from("test"); + let key_b = test_subspace.pack(&("range_b",)); + let key_d = test_subspace.pack(&("range_d",)); + let key_z = test_subspace.pack(&("range_z",)); + + tx.set(&key_z, b"val_z"); + + let range = RangeOption { + begin: KeySelector::first_greater_or_equal(Cow::Owned(key_b)), + end: KeySelector::first_greater_or_equal(Cow::Owned(key_d)), + limit: None, + reverse: false, + mode: StreamingMode::WantAll, + target_bytes: 0, + ..RangeOption::default() + }; + + let vals = tx.get_range(&range, 1, Serializable).await?; + Ok(vals.into_vec()) + }) + .await + .unwrap(); + + assert_eq!( + results.len(), + 2, + "Expected local key outside [b, d) to be excluded" + ); + assert!(!results + .iter() + .any(|r| r.key() == test_subspace.pack(&("range_z",)))); + // Clear test data db.run(|tx| async move { let test_subspace = Subspace::from("test");