diff --git a/Cargo.toml b/Cargo.toml index 91c44bf..5d0753f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.5.0" +version = "0.6.0" edition = "2024" rust-version = "1.92" authors = ["init4"] @@ -35,13 +35,13 @@ incremental = false [workspace.dependencies] # internal -signet-hot = { version = "0.5.0", path = "./crates/hot" } -signet-hot-mdbx = { version = "0.5.0", path = "./crates/hot-mdbx" } -signet-cold = { version = "0.5.0", path = "./crates/cold" } -signet-cold-mdbx = { version = "0.5.0", path = "./crates/cold-mdbx" } -signet-cold-sql = { version = "0.5.0", path = "./crates/cold-sql" } -signet-storage = { version = "0.5.0", path = "./crates/storage" } -signet-storage-types = { version = "0.5.0", path = "./crates/types" } +signet-hot = { version = "0.6.0", path = "./crates/hot" } +signet-hot-mdbx = { version = "0.6.0", path = "./crates/hot-mdbx" } +signet-cold = { version = "0.6.0", path = "./crates/cold" } +signet-cold-mdbx = { version = "0.6.0", path = "./crates/cold-mdbx" } +signet-cold-sql = { version = "0.6.0", path = "./crates/cold-sql" } +signet-storage = { version = "0.6.0", path = "./crates/storage" } +signet-storage-types = { version = "0.6.0", path = "./crates/types" } # External, in-house signet-libmdbx = { version = "0.8.0" } @@ -67,6 +67,7 @@ serde = { version = "1.0.217", features = ["derive"] } tempfile = "3.20.0" thiserror = "2.0.18" tokio = { version = "1.45.0", features = ["full"] } +tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["rt"] } itertools = "0.14" lru = "0.16" diff --git a/crates/cold-mdbx/Cargo.toml b/crates/cold-mdbx/Cargo.toml index 0f10765..93c6f33 100644 --- a/crates/cold-mdbx/Cargo.toml +++ b/crates/cold-mdbx/Cargo.toml @@ -23,6 +23,7 @@ signet-hot-mdbx.workspace = true signet-libmdbx.workspace = true signet-storage-types.workspace = true thiserror.workspace = true +tokio.workspace = true [dev-dependencies] signet-hot-mdbx = { workspace = true, features = ["test-utils"] } diff --git a/crates/cold-mdbx/src/backend.rs b/crates/cold-mdbx/src/backend.rs index 7fe9e3c..14d6856 100644 --- a/crates/cold-mdbx/src/backend.rs +++ b/crates/cold-mdbx/src/backend.rs @@ -10,8 +10,9 @@ use crate::{ }; use alloy::{consensus::transaction::Recovered, primitives::BlockNumber}; use signet_cold::{ - BlockData, ColdReceipt, ColdResult, ColdStorage, Confirmed, Filter, HeaderSpecifier, - ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, + BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter, + HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, + ZenithHeaderSpecifier, }; use signet_hot::{ KeySer, MAX_KEY_SIZE, ValSer, @@ -77,28 +78,146 @@ fn write_block_to_tx( Ok(()) } +/// Unwrap a `Result` or send the error through the stream and return. +macro_rules! try_stream { + ($sender:expr, $expr:expr) => { + match $expr { + Ok(v) => v, + Err(e) => { + let _ = + $sender.blocking_send(Err(ColdStorageError::backend(MdbxColdError::from(e)))); + return; + } + } + }; +} + +/// Produce a log stream using a single MDBX read transaction. +/// +/// Runs synchronously on a blocking thread. The `Tx` snapshot +/// provides MVCC consistency — the snapshot is self-consistent and +/// no reorg detection is needed within it. +fn produce_log_stream_blocking( + env: DatabaseEnv, + filter: Filter, + from: BlockNumber, + to: BlockNumber, + max_logs: usize, + sender: tokio::sync::mpsc::Sender>, + deadline: std::time::Instant, +) { + // Open a read-only transaction. MDBX's MVCC guarantees a + // consistent snapshot for the lifetime of this transaction, + // so no reorg detection is needed. + let tx = try_stream!(sender, env.tx()); + + // Reuse cursors across blocks to avoid re-opening them on + // every iteration (same pattern as get_logs_inner). + let mut header_cursor = try_stream!(sender, tx.traverse::()); + let mut receipt_cursor = try_stream!(sender, tx.traverse_dual::()); + + let mut total = 0usize; + + // Walk through blocks one at a time, filtering and sending + // matching logs over the channel. + for block_num in from..=to { + // Check the deadline before starting each block so we + // don't begin reading after the caller's timeout. + if std::time::Instant::now() > deadline { + let _ = sender.blocking_send(Err(ColdStorageError::StreamDeadlineExceeded)); + return; + } + + // Look up the block header for its hash and timestamp, + // which are attached to every emitted RpcLog. + let sealed = match try_stream!(sender, header_cursor.exact(&block_num)) { + Some(v) => v, + None => continue, + }; + let block_hash = sealed.hash(); + let block_timestamp = sealed.timestamp; + + // Iterate over all receipts (and their embedded logs) for + // this block, applying the caller's address/topic filter. + let iter = try_stream!(sender, receipt_cursor.iter_k2(&block_num)); + + let remaining = max_logs.saturating_sub(total); + let mut block_count = 0usize; + + for result in iter { + let (tx_idx, ir): (u64, IndexedReceipt) = try_stream!(sender, result); + let tx_hash = ir.tx_hash; + let first_log_index = ir.first_log_index; + for (log_idx, log) in ir.receipt.inner.logs.into_iter().enumerate() { + if !filter.matches(&log) { + continue; + } + // Enforce the global log limit across all blocks. + block_count += 1; + if block_count > remaining { + let _ = sender + .blocking_send(Err(ColdStorageError::TooManyLogs { limit: max_logs })); + return; + } + let rpc_log = RpcLog { + inner: log, + block_hash: Some(block_hash), + block_number: Some(block_num), + block_timestamp: Some(block_timestamp), + transaction_hash: Some(tx_hash), + transaction_index: Some(tx_idx), + log_index: Some(first_log_index + log_idx as u64), + removed: false, + }; + // Send the log to the caller via blocking_send + // (we're on a spawn_blocking thread, not async). + if sender.blocking_send(Ok(rpc_log)).is_err() { + return; // receiver dropped + } + } + } + + // Early exit if we've already hit the limit — no need to + // read the next block. + total += block_count; + if total >= max_logs { + return; + } + } +} + /// MDBX-based cold storage backend. /// /// This backend stores historical blockchain data in an MDBX database. /// It implements the [`ColdStorage`] trait for use with the cold storage /// task runner. -#[derive(Debug)] pub struct MdbxColdBackend { /// The MDBX environment. env: DatabaseEnv, } +impl std::fmt::Debug for MdbxColdBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MdbxColdBackend").finish_non_exhaustive() + } +} + impl MdbxColdBackend { + /// Create a new backend from an existing MDBX environment. + const fn from_env(env: DatabaseEnv) -> Self { + Self { env } + } + /// Open an existing MDBX cold storage database in read-only mode. pub fn open_ro(path: &Path) -> Result { let env = DatabaseArguments::new().open_ro(path)?; - Ok(Self { env }) + Ok(Self::from_env(env)) } /// Open or create an MDBX cold storage database in read-write mode. pub fn open_rw(path: &Path) -> Result { let env = DatabaseArguments::new().open_rw(path)?; - let backend = Self { env }; + let backend = Self::from_env(env); backend.create_tables()?; Ok(backend) } @@ -110,7 +229,7 @@ impl MdbxColdBackend { args: DatabaseArguments, ) -> Result { let env = DatabaseEnv::open(path, kind, args)?; - let backend = Self { env }; + let backend = Self::from_env(env); if kind.is_rw() { backend.create_tables()?; } @@ -437,7 +556,7 @@ impl MdbxColdBackend { fn get_logs_inner( &self, - filter: Filter, + filter: &Filter, max_logs: usize, ) -> Result, MdbxColdError> { let tx = self.env.tx()?; @@ -570,12 +689,30 @@ impl ColdStorage for MdbxColdBackend { async fn get_logs( &self, - filter: Filter, + filter: &Filter, max_logs: usize, ) -> ColdResult> { Ok(self.get_logs_inner(filter, max_logs)?) } + async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) { + let env = self.env.clone(); + let filter = filter.clone(); + let std_deadline = params.deadline.into_std(); + let _ = tokio::task::spawn_blocking(move || { + produce_log_stream_blocking( + env, + filter, + params.from, + params.to, + params.max_logs, + params.sender, + std_deadline, + ); + }) + .await; + } + async fn get_latest_block(&self) -> ColdResult> { let tx = self.env.tx().map_err(MdbxColdError::from)?; let mut cursor = tx.new_cursor::().map_err(MdbxColdError::from)?; @@ -611,6 +748,6 @@ mod tests { async fn mdbx_backend_conformance() { let dir = tempdir().unwrap(); let backend = MdbxColdBackend::open_rw(dir.path()).unwrap(); - conformance(&backend).await.unwrap(); + conformance(backend).await.unwrap(); } } diff --git a/crates/cold-sql/Cargo.toml b/crates/cold-sql/Cargo.toml index 0b6828c..5fb6f86 100644 --- a/crates/cold-sql/Cargo.toml +++ b/crates/cold-sql/Cargo.toml @@ -17,11 +17,14 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] alloy = { workspace = true, optional = true } +alloy-primitives = { version = "1.5.6", features = ["sqlx"] } signet-cold.workspace = true signet-storage-types = { workspace = true, optional = true } signet-zenith = { workspace = true, optional = true } sqlx = { workspace = true } thiserror.workspace = true +tokio = { workspace = true, optional = true } +tokio-stream = { workspace = true, optional = true } [dev-dependencies] signet-cold = { workspace = true, features = ["test-utils"] } @@ -29,6 +32,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } [features] default = [] -postgres = ["sqlx/postgres", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith"] -sqlite = ["sqlx/sqlite", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith"] +postgres = ["sqlx/postgres", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith", "dep:tokio", "dep:tokio-stream"] +sqlite = ["sqlx/sqlite", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith", "dep:tokio"] test-utils = ["signet-cold/test-utils", "sqlite", "postgres"] diff --git a/crates/cold-sql/migrations/001_initial.sql b/crates/cold-sql/migrations/001_initial.sql index 47b5fc4..b6d73a3 100644 --- a/crates/cold-sql/migrations/001_initial.sql +++ b/crates/cold-sql/migrations/001_initial.sql @@ -64,6 +64,7 @@ CREATE TABLE IF NOT EXISTS receipts ( tx_type INTEGER NOT NULL, success INTEGER NOT NULL, cumulative_gas_used INTEGER NOT NULL, + first_log_index INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (block_number, tx_index) ); diff --git a/crates/cold-sql/migrations/001_initial_pg.sql b/crates/cold-sql/migrations/001_initial_pg.sql index 280342c..7307d14 100644 --- a/crates/cold-sql/migrations/001_initial_pg.sql +++ b/crates/cold-sql/migrations/001_initial_pg.sql @@ -62,6 +62,7 @@ CREATE TABLE IF NOT EXISTS receipts ( tx_type INTEGER NOT NULL, success INTEGER NOT NULL, cumulative_gas_used BIGINT NOT NULL, + first_log_index BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (block_number, tx_index) ); diff --git a/crates/cold-sql/src/backend.rs b/crates/cold-sql/src/backend.rs index 001e8cc..3921319 100644 --- a/crates/cold-sql/src/backend.rs +++ b/crates/cold-sql/src/backend.rs @@ -10,28 +10,29 @@ use crate::columns::{ COL_BENEFICIARY, COL_BLOB_GAS_USED, COL_BLOB_VERSIONED_HASHES, COL_BLOCK_DATA_HASH, COL_BLOCK_HASH, COL_BLOCK_LOG_INDEX, COL_BLOCK_NUMBER, COL_BLOCK_TIMESTAMP, COL_CHAIN_ID, COL_CNT, COL_CUMULATIVE_GAS_USED, COL_DATA, COL_DIFFICULTY, COL_EVENT_TYPE, - COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FROM_ADDRESS, COL_GAS, COL_GAS_LIMIT, COL_GAS_PRICE, - COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOG_COUNT, COL_LOGS_BLOOM, COL_MAX_BN, - COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, COL_MIX_HASH, - COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, COL_PARENT_HASH, - COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS, COL_ROLLUP_CHAIN_ID, - COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, COL_STATE_ROOT, - COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, - COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, COL_VALUE, - COL_WITHDRAWALS_ROOT, + COL_EXCESS_BLOB_GAS, COL_EXTRA_DATA, COL_FIRST_LOG_INDEX, COL_FROM_ADDRESS, COL_GAS, + COL_GAS_LIMIT, COL_GAS_PRICE, COL_GAS_USED, COL_HOST_BLOCK_NUMBER, COL_INPUT, COL_LOGS_BLOOM, + COL_MAX_BN, COL_MAX_FEE_PER_BLOB_GAS, COL_MAX_FEE_PER_GAS, COL_MAX_PRIORITY_FEE_PER_GAS, + COL_MIX_HASH, COL_NONCE, COL_OMMERS_HASH, COL_ORDER_INDEX, COL_PARENT_BEACON_BLOCK_ROOT, + COL_PARENT_HASH, COL_PRIOR_GAS, COL_RECEIPTS_ROOT, COL_REQUESTS_HASH, COL_REWARD_ADDRESS, + COL_ROLLUP_CHAIN_ID, COL_ROLLUP_RECIPIENT, COL_SENDER, COL_SIG_R, COL_SIG_S, COL_SIG_Y_PARITY, + COL_STATE_ROOT, COL_SUCCESS, COL_TIMESTAMP, COL_TO_ADDRESS, COL_TOKEN, COL_TOPIC0, COL_TOPIC1, + COL_TOPIC2, COL_TOPIC3, COL_TRANSACTIONS_ROOT, COL_TX_HASH, COL_TX_INDEX, COL_TX_TYPE, + COL_VALUE, COL_WITHDRAWALS_ROOT, }; use crate::convert::{ EVENT_ENTER, EVENT_ENTER_TOKEN, EVENT_TRANSACT, build_receipt, decode_access_list_or_empty, - decode_authorization_list, decode_b256_vec, decode_u128_required, decode_u256, - encode_access_list, encode_authorization_list, encode_b256_vec, encode_u128, encode_u256, - from_address, from_i64, to_address, to_i64, + decode_authorization_list, decode_b256_vec, decode_u128_required, encode_access_list, + encode_authorization_list, encode_b256_vec, encode_u128, from_i64, to_i64, }; use alloy::{ consensus::{ Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy, TxType, transaction::Recovered, }, - primitives::{Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature}, + primitives::{ + Address, B256, BlockNumber, Bloom, Bytes, Log, LogData, Sealable, Signature, TxKind, U256, + }, }; use signet_cold::{ BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter, @@ -71,6 +72,7 @@ use sqlx::{AnyPool, Row}; #[derive(Debug, Clone)] pub struct SqlColdBackend { pool: AnyPool, + is_postgres: bool, } impl SqlColdBackend { @@ -97,8 +99,9 @@ impl SqlColdBackend { }; // Execute via pool to ensure the migration uses the same // connection that subsequent queries will use. + let is_postgres = backend == "PostgreSQL"; sqlx::raw_sql(migration).execute(&pool).await?; - Ok(Self { pool }) + Ok(Self { pool, is_postgres }) } /// Connect to a database URL and create the backend. @@ -126,9 +129,8 @@ impl SqlColdBackend { match spec { HeaderSpecifier::Number(n) => Ok(Some(n)), HeaderSpecifier::Hash(hash) => { - let hash_bytes = hash.as_slice(); let row = sqlx::query("SELECT block_number FROM headers WHERE block_hash = $1") - .bind(hash_bytes) + .bind(hash) .fetch_optional(&self.pool) .await?; Ok(row.map(|r| from_i64(r.get::(COL_BLOCK_NUMBER)))) @@ -163,47 +165,165 @@ impl SqlColdBackend { tx.commit().await?; Ok(()) } + + // ======================================================================== + // Streaming helpers + // ======================================================================== + + /// Stream logs using a PostgreSQL REPEATABLE READ transaction. + /// + /// The transaction provides a consistent snapshot across all per-block + /// queries, eliminating the need for anchor-hash reorg detection. + /// Rows are streamed individually rather than materialised per block. + #[cfg(feature = "postgres")] + async fn produce_log_stream_pg(&self, filter: &Filter, params: signet_cold::StreamParams) { + use tokio_stream::StreamExt; + + /// Unwrap a `Result` or send the error through the stream and return. + macro_rules! try_stream { + ($sender:expr, $expr:expr) => { + match $expr { + Ok(v) => v, + Err(e) => { + let _ = $sender + .send(Err(ColdStorageError::backend(SqlColdError::from(e)))) + .await; + return; + } + } + }; + } + + let signet_cold::StreamParams { from, to, max_logs, sender, deadline } = params; + + // Open a REPEATABLE READ transaction so all per-block queries see a + // consistent snapshot. This makes reorg detection unnecessary — if a + // reorg lands mid-stream the transaction still reads the old data. + let mut tx = try_stream!(sender, self.pool.begin().await); + try_stream!( + sender, + sqlx::query("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ").execute(&mut *tx).await + ); + + // Build the parameterised query once. $1 is the block number + // (bound per iteration); remaining parameters are the address + // and topic filters from the user's request. + let (filter_clause, filter_params) = build_log_filter_clause(filter, 2); + let data_sql = format!( + "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \ + (r.first_log_index + l.log_index) AS block_log_index \ + FROM logs l \ + JOIN headers h ON l.block_number = h.block_number \ + JOIN transactions t ON l.block_number = t.block_number \ + AND l.tx_index = t.tx_index \ + JOIN receipts r ON l.block_number = r.block_number \ + AND l.tx_index = r.tx_index \ + WHERE l.block_number = $1{filter_clause} \ + ORDER BY l.tx_index, l.log_index" + ); + + let mut total = 0usize; + + // Walk through blocks one at a time, streaming matching log rows + // from each block directly to the channel. + for block_num in from..=to { + // Check the deadline before starting each block so we + // don't begin a new query after the caller's timeout. + if tokio::time::Instant::now() > deadline { + let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await; + return; + } + + let mut query = sqlx::query(&data_sql).bind(to_i64(block_num)); + for param in &filter_params { + query = query.bind(*param); + } + + // Stream rows from this block's query. Each row is converted + // to an RpcLog and sent over the channel immediately rather + // than being collected into a Vec first. + let mut stream = query.fetch(&mut *tx); + while let Some(row_result) = stream.next().await { + let r = try_stream!(sender, row_result); + + // Enforce the global log limit across all blocks. + total += 1; + if total > max_logs { + let _ = + sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await; + return; + } + + let log = log_from_row(&r); + let rpc_log = RpcLog { + inner: log, + block_hash: Some(r.get(COL_BLOCK_HASH)), + block_number: Some(from_i64(r.get::(COL_BLOCK_NUMBER))), + block_timestamp: Some(from_i64(r.get::(COL_BLOCK_TIMESTAMP))), + transaction_hash: Some(r.get(COL_TX_HASH)), + transaction_index: Some(from_i64(r.get::(COL_TX_INDEX))), + log_index: Some(from_i64(r.get::(COL_BLOCK_LOG_INDEX))), + removed: false, + }; + // Send the log to the caller. The timeout ensures we + // stop if the deadline passes while back-pressured. + match tokio::time::timeout_at(deadline, sender.send(Ok(rpc_log))).await { + Ok(Ok(())) => {} + Ok(Err(_)) => return, // receiver dropped + Err(_) => { + let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await; + return; + } + } + } + + // Early exit if we've already hit the limit — no need to + // query the next block. + if total >= max_logs { + return; + } + } + } } // ============================================================================ // Row → domain type conversion (read path) // ============================================================================ -/// Extract a required BLOB column from a row. -fn blob(r: &sqlx::any::AnyRow, col: &str) -> Vec { +/// Extract a required BLOB column from a row as a borrowed slice. +fn blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> &'r [u8] { r.get(col) } -/// Extract an optional BLOB column from a row. -fn opt_blob(r: &sqlx::any::AnyRow, col: &str) -> Option> { +/// Extract an optional BLOB column from a row as a borrowed slice. +fn opt_blob<'r>(r: &'r sqlx::any::AnyRow, col: &str) -> Option<&'r [u8]> { r.get(col) } /// Build a [`Header`] from an [`sqlx::any::AnyRow`]. fn header_from_row(r: &sqlx::any::AnyRow) -> Result { Ok(Header { - parent_hash: B256::from_slice(&blob(r, COL_PARENT_HASH)), - ommers_hash: B256::from_slice(&blob(r, COL_OMMERS_HASH)), - beneficiary: Address::from_slice(&blob(r, COL_BENEFICIARY)), - state_root: B256::from_slice(&blob(r, COL_STATE_ROOT)), - transactions_root: B256::from_slice(&blob(r, COL_TRANSACTIONS_ROOT)), - receipts_root: B256::from_slice(&blob(r, COL_RECEIPTS_ROOT)), - logs_bloom: Bloom::from_slice(&blob(r, COL_LOGS_BLOOM)), - difficulty: decode_u256(&blob(r, COL_DIFFICULTY))?, + parent_hash: r.get(COL_PARENT_HASH), + ommers_hash: r.get(COL_OMMERS_HASH), + beneficiary: r.get(COL_BENEFICIARY), + state_root: r.get(COL_STATE_ROOT), + transactions_root: r.get(COL_TRANSACTIONS_ROOT), + receipts_root: r.get(COL_RECEIPTS_ROOT), + logs_bloom: Bloom::from_slice(blob(r, COL_LOGS_BLOOM)), + difficulty: r.get(COL_DIFFICULTY), number: from_i64(r.get(COL_BLOCK_NUMBER)), gas_limit: from_i64(r.get(COL_GAS_LIMIT)), gas_used: from_i64(r.get(COL_GAS_USED)), timestamp: from_i64(r.get(COL_TIMESTAMP)), - extra_data: Bytes::from(blob(r, COL_EXTRA_DATA)), - mix_hash: B256::from_slice(&blob(r, COL_MIX_HASH)), - nonce: alloy::primitives::B64::from_slice(&blob(r, COL_NONCE)), + extra_data: r.get(COL_EXTRA_DATA), + mix_hash: r.get(COL_MIX_HASH), + nonce: r.get(COL_NONCE), base_fee_per_gas: r.get::, _>(COL_BASE_FEE_PER_GAS).map(from_i64), - withdrawals_root: opt_blob(r, COL_WITHDRAWALS_ROOT).map(|b| B256::from_slice(&b)), + withdrawals_root: r.get(COL_WITHDRAWALS_ROOT), blob_gas_used: r.get::, _>(COL_BLOB_GAS_USED).map(from_i64), excess_blob_gas: r.get::, _>(COL_EXCESS_BLOB_GAS).map(from_i64), - parent_beacon_block_root: opt_blob(r, COL_PARENT_BEACON_BLOCK_ROOT) - .map(|b| B256::from_slice(&b)), - requests_hash: opt_blob(r, COL_REQUESTS_HASH).map(|b| B256::from_slice(&b)), + parent_beacon_block_root: r.get(COL_PARENT_BEACON_BLOCK_ROOT), + requests_hash: r.get(COL_REQUESTS_HASH), }) } @@ -211,11 +331,8 @@ fn header_from_row(r: &sqlx::any::AnyRow) -> Result { fn tx_from_row(r: &sqlx::any::AnyRow) -> Result { use alloy::consensus::EthereumTxEnvelope; - let sig = Signature::new( - decode_u256(&r.get::, _>(COL_SIG_R))?, - decode_u256(&r.get::, _>(COL_SIG_S))?, - r.get::(COL_SIG_Y_PARITY) != 0, - ); + let sig = + Signature::new(r.get(COL_SIG_R), r.get(COL_SIG_S), r.get::(COL_SIG_Y_PARITY) != 0); let tx_type_raw = r.get::(COL_TX_TYPE) as u8; let tx_type = TxType::try_from(tx_type_raw) @@ -224,18 +341,18 @@ fn tx_from_row(r: &sqlx::any::AnyRow) -> Result let chain_id: Option = r.get(COL_CHAIN_ID); let nonce = from_i64(r.get(COL_NONCE)); let gas_limit = from_i64(r.get(COL_GAS_LIMIT)); - let to_addr = opt_blob(r, COL_TO_ADDRESS); - let value = decode_u256(&r.get::, _>(COL_VALUE))?; - let input = Bytes::from(r.get::, _>(COL_INPUT)); + let to_addr: Option
= r.get(COL_TO_ADDRESS); + let value: U256 = r.get(COL_VALUE); + let input: Bytes = r.get(COL_INPUT); match tx_type { TxType::Legacy => { let tx = TxLegacy { chain_id: chain_id.map(from_i64), nonce, - gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?, + gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?, gas_limit, - to: from_address(to_addr.as_deref()), + to: to_addr.map_or(TxKind::Create, TxKind::Call), value, input, }; @@ -248,12 +365,12 @@ fn tx_from_row(r: &sqlx::any::AnyRow) -> Result .ok_or_else(|| SqlColdError::Convert("EIP2930 requires chain_id".into()))?, ), nonce, - gas_price: decode_u128_required(&opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?, + gas_price: decode_u128_required(opt_blob(r, COL_GAS_PRICE), COL_GAS_PRICE)?, gas_limit, - to: from_address(to_addr.as_deref()), + to: to_addr.map_or(TxKind::Create, TxKind::Call), value, input, - access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?, + access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?, }; Ok(EthereumTxEnvelope::Eip2930(Signed::new_unhashed(tx, sig))) } @@ -266,82 +383,80 @@ fn tx_from_row(r: &sqlx::any::AnyRow) -> Result nonce, gas_limit, max_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_FEE_PER_GAS), + opt_blob(r, COL_MAX_FEE_PER_GAS), COL_MAX_FEE_PER_GAS, )?, max_priority_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), + opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), COL_MAX_PRIORITY_FEE_PER_GAS, )?, - to: from_address(to_addr.as_deref()), + to: to_addr.map_or(TxKind::Create, TxKind::Call), value, input, - access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?, + access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?, }; Ok(EthereumTxEnvelope::Eip1559(Signed::new_unhashed(tx, sig))) } TxType::Eip4844 => { - let tx = - TxEip4844 { - chain_id: from_i64(chain_id.ok_or_else(|| { - SqlColdError::Convert("EIP4844 requires chain_id".into()) - })?), - nonce, - gas_limit, - max_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_FEE_PER_GAS), - COL_MAX_FEE_PER_GAS, - )?, - max_priority_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), - COL_MAX_PRIORITY_FEE_PER_GAS, - )?, - to: Address::from_slice(to_addr.as_deref().ok_or_else(|| { - SqlColdError::Convert("EIP4844 requires to_address".into()) - })?), - value, - input, - access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?, - blob_versioned_hashes: decode_b256_vec( - opt_blob(r, COL_BLOB_VERSIONED_HASHES).as_deref().ok_or_else(|| { - SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into()) - })?, - )?, - max_fee_per_blob_gas: decode_u128_required( - &opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS), - COL_MAX_FEE_PER_BLOB_GAS, - )?, - }; + let tx = TxEip4844 { + chain_id: from_i64( + chain_id + .ok_or_else(|| SqlColdError::Convert("EIP4844 requires chain_id".into()))?, + ), + nonce, + gas_limit, + max_fee_per_gas: decode_u128_required( + opt_blob(r, COL_MAX_FEE_PER_GAS), + COL_MAX_FEE_PER_GAS, + )?, + max_priority_fee_per_gas: decode_u128_required( + opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), + COL_MAX_PRIORITY_FEE_PER_GAS, + )?, + to: to_addr + .ok_or_else(|| SqlColdError::Convert("EIP4844 requires to_address".into()))?, + value, + input, + access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?, + blob_versioned_hashes: decode_b256_vec( + opt_blob(r, COL_BLOB_VERSIONED_HASHES).ok_or_else(|| { + SqlColdError::Convert("EIP4844 requires blob_versioned_hashes".into()) + })?, + )?, + max_fee_per_blob_gas: decode_u128_required( + opt_blob(r, COL_MAX_FEE_PER_BLOB_GAS), + COL_MAX_FEE_PER_BLOB_GAS, + )?, + }; Ok(EthereumTxEnvelope::Eip4844(Signed::new_unhashed(tx, sig))) } TxType::Eip7702 => { - let tx = - TxEip7702 { - chain_id: from_i64(chain_id.ok_or_else(|| { - SqlColdError::Convert("EIP7702 requires chain_id".into()) - })?), - nonce, - gas_limit, - max_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_FEE_PER_GAS), - COL_MAX_FEE_PER_GAS, - )?, - max_priority_fee_per_gas: decode_u128_required( - &opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), - COL_MAX_PRIORITY_FEE_PER_GAS, - )?, - to: Address::from_slice(to_addr.as_deref().ok_or_else(|| { - SqlColdError::Convert("EIP7702 requires to_address".into()) - })?), - value, - input, - access_list: decode_access_list_or_empty(&opt_blob(r, COL_ACCESS_LIST))?, - authorization_list: decode_authorization_list( - opt_blob(r, COL_AUTHORIZATION_LIST).as_deref().ok_or_else(|| { - SqlColdError::Convert("EIP7702 requires authorization_list".into()) - })?, - )?, - }; + let tx = TxEip7702 { + chain_id: from_i64( + chain_id + .ok_or_else(|| SqlColdError::Convert("EIP7702 requires chain_id".into()))?, + ), + nonce, + gas_limit, + max_fee_per_gas: decode_u128_required( + opt_blob(r, COL_MAX_FEE_PER_GAS), + COL_MAX_FEE_PER_GAS, + )?, + max_priority_fee_per_gas: decode_u128_required( + opt_blob(r, COL_MAX_PRIORITY_FEE_PER_GAS), + COL_MAX_PRIORITY_FEE_PER_GAS, + )?, + to: to_addr + .ok_or_else(|| SqlColdError::Convert("EIP7702 requires to_address".into()))?, + value, + input, + access_list: decode_access_list_or_empty(opt_blob(r, COL_ACCESS_LIST))?, + authorization_list: decode_authorization_list( + opt_blob(r, COL_AUTHORIZATION_LIST).ok_or_else(|| { + SqlColdError::Convert("EIP7702 requires authorization_list".into()) + })?, + )?, + }; Ok(EthereumTxEnvelope::Eip7702(Signed::new_unhashed(tx, sig))) } } @@ -349,7 +464,7 @@ fn tx_from_row(r: &sqlx::any::AnyRow) -> Result /// Build a [`RecoveredTx`] from a row that includes `from_address`. fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result { - let sender = Address::from_slice(&r.get::, _>(COL_FROM_ADDRESS)); + let sender: Address = r.get(COL_FROM_ADDRESS); let tx = tx_from_row(r)?; // SAFETY: the sender was recovered at append time and stored in from_address. Ok(Recovered::new_unchecked(tx, sender)) @@ -359,48 +474,35 @@ fn recovered_tx_from_row(r: &sqlx::any::AnyRow) -> Result Log { let topics = [COL_TOPIC0, COL_TOPIC1, COL_TOPIC2, COL_TOPIC3] .into_iter() - .filter_map(|col| r.get::>, _>(col)) - .map(|t| B256::from_slice(&t)) + .filter_map(|col| r.get::, _>(col)) .collect(); - Log { - address: Address::from_slice(&r.get::, _>(COL_ADDRESS)), - data: LogData::new_unchecked(topics, Bytes::from(r.get::, _>(COL_DATA))), - } + Log { address: r.get(COL_ADDRESS), data: LogData::new_unchecked(topics, r.get(COL_DATA)) } } /// Build a [`DbSignetEvent`] from an [`sqlx::any::AnyRow`]. fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result { let event_type = r.get::(COL_EVENT_TYPE) as i16; let order = from_i64(r.get(COL_ORDER_INDEX)); - let rollup_chain_id = decode_u256(&r.get::, _>(COL_ROLLUP_CHAIN_ID))?; + let rollup_chain_id: U256 = r.get(COL_ROLLUP_CHAIN_ID); match event_type { EVENT_TRANSACT => { - let sender = Address::from_slice( - opt_blob(r, COL_SENDER) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?, - ); - let to = Address::from_slice( - opt_blob(r, COL_TO_ADDRESS) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?, - ); - let value = decode_u256( - opt_blob(r, COL_VALUE) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?, - )?; - let gas = decode_u256( - opt_blob(r, COL_GAS) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?, - )?; - let max_fee = - decode_u256(opt_blob(r, COL_MAX_FEE_PER_GAS).as_deref().ok_or_else(|| { - SqlColdError::Convert("Transact requires max_fee_per_gas".into()) - })?)?; - let data = Bytes::from(opt_blob(r, COL_DATA).unwrap_or_default()); + let sender: Address = r + .get::, _>(COL_SENDER) + .ok_or_else(|| SqlColdError::Convert("Transact requires sender".into()))?; + let to: Address = r + .get::, _>(COL_TO_ADDRESS) + .ok_or_else(|| SqlColdError::Convert("Transact requires to".into()))?; + let value: U256 = r + .get::, _>(COL_VALUE) + .ok_or_else(|| SqlColdError::Convert("Transact requires value".into()))?; + let gas: U256 = r + .get::, _>(COL_GAS) + .ok_or_else(|| SqlColdError::Convert("Transact requires gas".into()))?; + let max_fee: U256 = r + .get::, _>(COL_MAX_FEE_PER_GAS) + .ok_or_else(|| SqlColdError::Convert("Transact requires max_fee_per_gas".into()))?; + let data: Bytes = r.get::, _>(COL_DATA).unwrap_or_default(); Ok(DbSignetEvent::Transact( order, @@ -416,15 +518,12 @@ fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result { - let recipient = - Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else( - || SqlColdError::Convert("Enter requires rollup_recipient".into()), - )?); - let amount = decode_u256( - opt_blob(r, COL_AMOUNT) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?, - )?; + let recipient: Address = r + .get::, _>(COL_ROLLUP_RECIPIENT) + .ok_or_else(|| SqlColdError::Convert("Enter requires rollup_recipient".into()))?; + let amount: U256 = r + .get::, _>(COL_AMOUNT) + .ok_or_else(|| SqlColdError::Convert("Enter requires amount".into()))?; Ok(DbSignetEvent::Enter( order, @@ -432,20 +531,16 @@ fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result { - let token = Address::from_slice( - opt_blob(r, COL_TOKEN) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?, - ); - let recipient = - Address::from_slice(opt_blob(r, COL_ROLLUP_RECIPIENT).as_deref().ok_or_else( - || SqlColdError::Convert("EnterToken requires rollup_recipient".into()), - )?); - let amount = decode_u256( - opt_blob(r, COL_AMOUNT) - .as_deref() - .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?, - )?; + let token: Address = r + .get::, _>(COL_TOKEN) + .ok_or_else(|| SqlColdError::Convert("EnterToken requires token".into()))?; + let recipient: Address = + r.get::, _>(COL_ROLLUP_RECIPIENT).ok_or_else(|| { + SqlColdError::Convert("EnterToken requires rollup_recipient".into()) + })?; + let amount: U256 = r + .get::, _>(COL_AMOUNT) + .ok_or_else(|| SqlColdError::Convert("EnterToken requires amount".into()))?; Ok(DbSignetEvent::EnterToken( order, @@ -464,14 +559,11 @@ fn signet_event_from_row(r: &sqlx::any::AnyRow) -> Result Result { Ok(DbZenithHeader(Zenith::BlockHeader { - hostBlockNumber: decode_u256(&blob(r, COL_HOST_BLOCK_NUMBER))?, - rollupChainId: decode_u256(&blob(r, COL_ROLLUP_CHAIN_ID))?, - gasLimit: decode_u256(&blob(r, COL_GAS_LIMIT))?, - rewardAddress: Address::from_slice(&blob(r, COL_REWARD_ADDRESS)), - blockDataHash: alloy::primitives::FixedBytes::<32>::from_slice(&blob( - r, - COL_BLOCK_DATA_HASH, - )), + hostBlockNumber: r.get(COL_HOST_BLOCK_NUMBER), + rollupChainId: r.get(COL_ROLLUP_CHAIN_ID), + gasLimit: r.get(COL_GAS_LIMIT), + rewardAddress: r.get(COL_REWARD_ADDRESS), + blockDataHash: r.get(COL_BLOCK_DATA_HASH), })) } @@ -488,7 +580,7 @@ async fn write_block_to_tx( // Insert header let block_hash = data.header.hash_slow(); - let difficulty = encode_u256(&data.header.difficulty); + let difficulty = &data.header.difficulty; sqlx::query( "INSERT INTO headers ( block_number, block_hash, parent_hash, ommers_hash, beneficiary, @@ -502,27 +594,27 @@ async fn write_block_to_tx( )", ) .bind(bn) - .bind(block_hash.as_slice()) - .bind(data.header.parent_hash.as_slice()) - .bind(data.header.ommers_hash.as_slice()) - .bind(data.header.beneficiary.as_slice()) - .bind(data.header.state_root.as_slice()) - .bind(data.header.transactions_root.as_slice()) - .bind(data.header.receipts_root.as_slice()) - .bind(data.header.logs_bloom.as_slice()) - .bind(difficulty.as_slice()) + .bind(block_hash) + .bind(data.header.parent_hash) + .bind(data.header.ommers_hash) + .bind(data.header.beneficiary) + .bind(data.header.state_root) + .bind(data.header.transactions_root) + .bind(data.header.receipts_root) + .bind(data.header.logs_bloom) + .bind(difficulty) .bind(to_i64(data.header.gas_limit)) .bind(to_i64(data.header.gas_used)) .bind(to_i64(data.header.timestamp)) - .bind(data.header.extra_data.as_ref()) - .bind(data.header.mix_hash.as_slice()) - .bind(data.header.nonce.as_slice()) + .bind(&data.header.extra_data) + .bind(data.header.mix_hash) + .bind(data.header.nonce) .bind(data.header.base_fee_per_gas.map(to_i64)) - .bind(data.header.withdrawals_root.as_ref().map(|r| r.as_slice())) + .bind(data.header.withdrawals_root.as_ref()) .bind(data.header.blob_gas_used.map(to_i64)) .bind(data.header.excess_blob_gas.map(to_i64)) - .bind(data.header.parent_beacon_block_root.as_ref().map(|r| r.as_slice())) - .bind(data.header.requests_hash.as_ref().map(|r| r.as_slice())) + .bind(data.header.parent_beacon_block_root.as_ref()) + .bind(data.header.requests_hash.as_ref()) .execute(&mut **tx) .await?; @@ -531,20 +623,24 @@ async fn write_block_to_tx( insert_transaction(tx, bn, to_i64(idx as u64), recovered_tx).await?; } - // Insert receipts and logs + // Insert receipts and logs, computing first_log_index as a running + // sum of log counts (same algorithm as the MDBX IndexedReceipt path). + let mut first_log_index = 0i64; for (idx, receipt) in data.receipts.iter().enumerate() { let tx_idx = to_i64(idx as u64); sqlx::query( - "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used) - VALUES ($1, $2, $3, $4, $5)", + "INSERT INTO receipts (block_number, tx_index, tx_type, success, cumulative_gas_used, first_log_index) + VALUES ($1, $2, $3, $4, $5, $6)", ) .bind(bn) .bind(tx_idx) .bind(receipt.tx_type as i32) .bind(receipt.inner.status.coerce_status() as i32) .bind(to_i64(receipt.inner.cumulative_gas_used)) + .bind(first_log_index) .execute(&mut **tx) .await?; + first_log_index += receipt.inner.logs.len() as i64; for (log_idx, log) in receipt.inner.logs.iter().enumerate() { let topics = log.topics(); @@ -555,12 +651,12 @@ async fn write_block_to_tx( .bind(bn) .bind(tx_idx) .bind(to_i64(log_idx as u64)) - .bind(log.address.as_slice()) - .bind(topics.first().map(|t| t.as_slice())) - .bind(topics.get(1).map(|t| t.as_slice())) - .bind(topics.get(2).map(|t| t.as_slice())) - .bind(topics.get(3).map(|t| t.as_slice())) - .bind(log.data.data.as_ref()) + .bind(log.address) + .bind(topics.first()) + .bind(topics.get(1)) + .bind(topics.get(2)) + .bind(topics.get(3)) + .bind(&log.data.data) .execute(&mut **tx) .await?; } @@ -574,9 +670,6 @@ async fn write_block_to_tx( // Insert zenith header if let Some(zh) = &data.zenith_header { let h = &zh.0; - let host_bn = encode_u256(&h.hostBlockNumber); - let chain_id = encode_u256(&h.rollupChainId); - let gas_limit = encode_u256(&h.gasLimit); sqlx::query( "INSERT INTO zenith_headers ( block_number, host_block_number, rollup_chain_id, @@ -584,11 +677,11 @@ async fn write_block_to_tx( ) VALUES ($1, $2, $3, $4, $5, $6)", ) .bind(bn) - .bind(host_bn.as_slice()) - .bind(chain_id.as_slice()) - .bind(gas_limit.as_slice()) - .bind(h.rewardAddress.as_slice()) - .bind(h.blockDataHash.as_slice()) + .bind(h.hostBlockNumber) + .bind(h.rollupChainId) + .bind(h.gasLimit) + .bind(h.rewardAddress) + .bind(h.blockDataHash) .execute(&mut **tx) .await?; } @@ -613,7 +706,7 @@ async fn insert_transaction( macro_rules! sig { ($s:expr) => {{ let sig = $s.signature(); - (sig.v() as i32, encode_u256(&sig.r()), encode_u256(&sig.s())) + (sig.v() as i32, sig.r(), sig.s()) }}; } let (sig_y, sig_r, sig_s) = match tx { @@ -643,15 +736,11 @@ async fn insert_transaction( }; let (value, to_addr) = match tx { - EthereumTxEnvelope::Legacy(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)), - EthereumTxEnvelope::Eip2930(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)), - EthereumTxEnvelope::Eip1559(s) => (encode_u256(&s.tx().value), to_address(&s.tx().to)), - EthereumTxEnvelope::Eip4844(s) => { - (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec())) - } - EthereumTxEnvelope::Eip7702(s) => { - (encode_u256(&s.tx().value), Some(s.tx().to.as_slice().to_vec())) - } + EthereumTxEnvelope::Legacy(s) => (s.tx().value, s.tx().to.to()), + EthereumTxEnvelope::Eip2930(s) => (s.tx().value, s.tx().to.to()), + EthereumTxEnvelope::Eip1559(s) => (s.tx().value, s.tx().to.to()), + EthereumTxEnvelope::Eip4844(s) => (s.tx().value, Some(&s.tx().to)), + EthereumTxEnvelope::Eip7702(s) => (s.tx().value, Some(&s.tx().to)), }; let input: &[u8] = match tx { @@ -720,16 +809,16 @@ async fn insert_transaction( ) .bind(bn) .bind(tx_index) - .bind(tx_hash.as_slice()) + .bind(tx_hash) .bind(tx_type) .bind(sig_y) - .bind(sig_r.as_slice()) - .bind(sig_s.as_slice()) + .bind(sig_r) + .bind(sig_s) .bind(chain_id) .bind(nonce) .bind(gas_limit) - .bind(to_addr.as_deref()) - .bind(value.as_slice()) + .bind(to_addr) + .bind(value) .bind(input) .bind(gas_price.as_ref().map(|v| v.as_slice())) .bind(max_fee.as_ref().map(|v| v.as_slice())) @@ -738,7 +827,7 @@ async fn insert_transaction( .bind(blob_hashes.as_deref()) .bind(access_list.as_deref()) .bind(auth_list.as_deref()) - .bind(sender.as_slice()) + .bind(sender) .execute(&mut *conn) .await?; @@ -753,20 +842,17 @@ async fn insert_signet_event( event: &DbSignetEvent, ) -> Result<(), SqlColdError> { let (event_type, order, chain_id) = match event { - DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), encode_u256(&t.rollupChainId)), - DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), encode_u256(&e.rollupChainId)), - DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), encode_u256(&e.rollupChainId)), + DbSignetEvent::Transact(o, t) => (0i32, to_i64(*o), &t.rollupChainId), + DbSignetEvent::Enter(o, e) => (1i32, to_i64(*o), &e.rollupChainId), + DbSignetEvent::EnterToken(o, e) => (2i32, to_i64(*o), &e.rollupChainId), }; let (value, gas, max_fee, amount) = match event { - DbSignetEvent::Transact(_, t) => ( - Some(encode_u256(&t.value)), - Some(encode_u256(&t.gas)), - Some(encode_u256(&t.maxFeePerGas)), - None, - ), - DbSignetEvent::Enter(_, e) => (None, None, None, Some(encode_u256(&e.amount))), - DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(encode_u256(&e.amount))), + DbSignetEvent::Transact(_, t) => { + (Some(&t.value), Some(&t.gas), Some(&t.maxFeePerGas), None) + } + DbSignetEvent::Enter(_, e) => (None, None, None, Some(&e.amount)), + DbSignetEvent::EnterToken(_, e) => (None, None, None, Some(&e.amount)), }; sqlx::query( @@ -780,30 +866,30 @@ async fn insert_signet_event( .bind(event_index) .bind(event_type) .bind(order) - .bind(chain_id.as_slice()) + .bind(chain_id) .bind(match event { - DbSignetEvent::Transact(_, t) => Some(t.sender.as_slice()), + DbSignetEvent::Transact(_, t) => Some(&t.sender), _ => None, }) .bind(match event { - DbSignetEvent::Transact(_, t) => Some(t.to.as_slice()), + DbSignetEvent::Transact(_, t) => Some(&t.to), _ => None, }) - .bind(value.as_ref().map(|v| v.as_slice())) - .bind(gas.as_ref().map(|v| v.as_slice())) - .bind(max_fee.as_ref().map(|v| v.as_slice())) + .bind(value) + .bind(gas) + .bind(max_fee) .bind(match event { - DbSignetEvent::Transact(_, t) => Some(t.data.as_ref()), + DbSignetEvent::Transact(_, t) => Some(&t.data), _ => None, }) .bind(match event { - DbSignetEvent::Enter(_, e) => Some(e.rollupRecipient.as_slice()), - DbSignetEvent::EnterToken(_, e) => Some(e.rollupRecipient.as_slice()), + DbSignetEvent::Enter(_, e) => Some(&e.rollupRecipient), + DbSignetEvent::EnterToken(_, e) => Some(&e.rollupRecipient), _ => None, }) - .bind(amount.as_ref().map(|v| v.as_slice())) + .bind(amount) .bind(match event { - DbSignetEvent::EnterToken(_, e) => Some(e.token.as_slice()), + DbSignetEvent::EnterToken(_, e) => Some(&e.token), _ => None, }) .execute(&mut *conn) @@ -812,6 +898,75 @@ async fn insert_signet_event( Ok(()) } +// ============================================================================ +// Log filter helpers +// ============================================================================ + +/// Append a SQL filter clause for a set of byte-encoded values. +/// +/// For a single value, generates ` AND {column} = ${idx}`. +/// For multiple values, generates ` AND {column} IN (${idx}, ...)`. +/// Returns the next available parameter index. +fn append_filter_clause<'a>( + clause: &mut String, + params: &mut Vec<&'a [u8]>, + mut idx: u32, + column: &str, + values: impl ExactSizeIterator, +) -> u32 { + use std::fmt::Write; + + let len = values.len(); + if len == 1 { + write!(clause, " AND {column} = ${idx}").unwrap(); + values.for_each(|v| params.push(v)); + return idx + 1; + } + write!(clause, " AND {column} IN (").unwrap(); + for (i, v) in values.enumerate() { + if i > 0 { + clause.push_str(", "); + } + write!(clause, "${idx}").unwrap(); + params.push(v); + idx += 1; + } + clause.push(')'); + idx +} + +fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8]>) { + let mut clause = String::new(); + let mut params: Vec<&[u8]> = Vec::new(); + let mut idx = start_idx; + + if !filter.address.is_empty() { + idx = append_filter_clause( + &mut clause, + &mut params, + idx, + "l.address", + filter.address.iter().map(|a| a.as_slice()), + ); + } + + let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"]; + for (i, topic_filter) in filter.topics.iter().enumerate() { + if topic_filter.is_empty() { + continue; + } + idx = append_filter_clause( + &mut clause, + &mut params, + idx, + topic_cols[i], + topic_filter.iter().map(|v| v.as_slice()), + ); + } + + (clause, params) +} + // ============================================================================ // ColdStorage implementation // ============================================================================ @@ -847,7 +1002,7 @@ impl ColdStorage for SqlColdBackend { JOIN headers h ON t.block_number = h.block_number WHERE t.tx_hash = $1", ) - .bind(hash.as_slice()) + .bind(hash) .fetch_optional(&self.pool) .await .map_err(SqlColdError::from)?, @@ -868,7 +1023,7 @@ impl ColdStorage for SqlColdBackend { JOIN headers h ON t.block_number = h.block_number WHERE h.block_hash = $1 AND t.tx_index = $2", ) - .bind(block_hash.as_slice()) + .bind(block_hash) .bind(to_i64(index)) .fetch_optional(&self.pool) .await @@ -881,8 +1036,7 @@ impl ColdStorage for SqlColdBackend { let block = from_i64(r.get::(COL_BLOCK_NUMBER)); let index = from_i64(r.get::(COL_TX_INDEX)); - let hash_bytes: Vec = r.get(COL_BLOCK_HASH); - let block_hash = B256::from_slice(&hash_bytes); + let block_hash = r.get(COL_BLOCK_HASH); let recovered = recovered_tx_from_row(&r).map_err(ColdStorageError::from)?; let meta = ConfirmationMeta::new(block, block_hash, index); Ok(Some(Confirmed::new(recovered, meta))) @@ -918,7 +1072,7 @@ impl ColdStorage for SqlColdBackend { let row = sqlx::query( "SELECT block_number, tx_index FROM transactions WHERE tx_hash = $1", ) - .bind(hash.as_slice()) + .bind(hash) .fetch_optional(&self.pool) .await .map_err(SqlColdError::from)?; @@ -954,8 +1108,8 @@ impl ColdStorage for SqlColdBackend { let bn: i64 = rr.get(COL_BLOCK_NUMBER); let tx_idx: i64 = rr.get(COL_TX_INDEX); - let tx_hash = B256::from_slice(&rr.get::, _>(COL_TX_HASH)); - let sender = Address::from_slice(&rr.get::, _>(COL_FROM_ADDRESS)); + let tx_hash = rr.get(COL_TX_HASH); + let sender = rr.get(COL_FROM_ADDRESS); let tx_type = rr.get::(COL_TX_TYPE) as i16; let success = rr.get::(COL_SUCCESS) != 0; let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); @@ -973,12 +1127,11 @@ impl ColdStorage for SqlColdBackend { let built = build_receipt(tx_type, success, cumulative_gas_used, logs) .map_err(ColdStorageError::from)?; - // Compute gas_used and first_log_index by querying prior receipts + // Read first_log_index directly from the receipt row; compute + // gas_used from the prior receipt's cumulative gas. + let first_log_index: u64 = from_i64(rr.get::(COL_FIRST_LOG_INDEX)); let prior = sqlx::query( - "SELECT CAST(SUM( - (SELECT COUNT(*) FROM logs l WHERE l.block_number = $1 AND l.tx_index = r.tx_index) - ) AS bigint) as log_count, - CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas + "SELECT CAST(MAX(r.cumulative_gas_used) AS bigint) as prior_gas FROM receipts r WHERE r.block_number = $1 AND r.tx_index < $2", ) .bind(to_i64(block)) @@ -986,8 +1139,6 @@ impl ColdStorage for SqlColdBackend { .fetch_one(&self.pool) .await .map_err(SqlColdError::from)?; - - let first_log_index: u64 = prior.get::, _>(COL_LOG_COUNT).unwrap_or(0) as u64; let prior_cumulative_gas: u64 = prior.get::, _>(COL_PRIOR_GAS).unwrap_or(0) as u64; let gas_used = built.inner.cumulative_gas_used - prior_cumulative_gas; @@ -1040,8 +1191,8 @@ impl ColdStorage for SqlColdBackend { .enumerate() .map(|(idx, rr)| { let tx_idx: i64 = rr.get(COL_TX_INDEX); - let tx_hash = B256::from_slice(&rr.get::, _>(COL_TX_HASH)); - let sender = Address::from_slice(&rr.get::, _>(COL_FROM_ADDRESS)); + let tx_hash = rr.get(COL_TX_HASH); + let sender = rr.get(COL_FROM_ADDRESS); let tx_type = rr.get::(COL_TX_TYPE) as i16; let success = rr.get::(COL_SUCCESS) != 0; let cumulative_gas_used: i64 = rr.get(COL_CUMULATIVE_GAS_USED); @@ -1139,69 +1290,20 @@ impl ColdStorage for SqlColdBackend { rows.iter().map(|r| zenith_header_from_row(r).map_err(ColdStorageError::from)).collect() } - async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult> { + async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult> { let from = filter.get_from_block().unwrap_or(0); let to = filter.get_to_block().unwrap_or(u64::MAX); - // Build the shared WHERE clause for both count and data queries. - let mut where_clause = String::from("l.block_number >= $1 AND l.block_number <= $2"); - let mut params: Vec> = Vec::new(); - let mut idx = 3u32; - - // Address filter - if !filter.address.is_empty() { - let addrs: Vec<_> = filter.address.iter().collect(); - if addrs.len() == 1 { - where_clause.push_str(&format!(" AND l.address = ${idx}")); - params.push(addrs[0].as_slice().to_vec()); - idx += 1; - } else { - let placeholders: String = addrs - .iter() - .enumerate() - .map(|(i, _)| format!("${}", idx + i as u32)) - .collect::>() - .join(", "); - where_clause.push_str(&format!(" AND l.address IN ({placeholders})")); - for addr in &addrs { - params.push(addr.as_slice().to_vec()); - } - idx += addrs.len() as u32; - } - } - - // Topic filters - let topic_cols = ["l.topic0", "l.topic1", "l.topic2", "l.topic3"]; - for (i, topic_filter) in filter.topics.iter().enumerate() { - if topic_filter.is_empty() { - continue; - } - let values: Vec<_> = topic_filter.iter().collect(); - if values.len() == 1 { - where_clause.push_str(&format!(" AND {} = ${idx}", topic_cols[i])); - params.push(values[0].as_slice().to_vec()); - idx += 1; - } else { - let placeholders: String = values - .iter() - .enumerate() - .map(|(j, _)| format!("${}", idx + j as u32)) - .collect::>() - .join(", "); - where_clause.push_str(&format!(" AND {} IN ({placeholders})", topic_cols[i])); - for v in &values { - params.push(v.as_slice().to_vec()); - } - idx += values.len() as u32; - } - } + // Build WHERE clause: block range ($1, $2) + address/topic filters. + let (filter_clause, params) = build_log_filter_clause(filter, 3); + let where_clause = format!("l.block_number >= $1 AND l.block_number <= $2{filter_clause}"); // Run a cheap COUNT(*) query first to reject queries that exceed // the limit without loading any row data. let count_sql = format!("SELECT COUNT(*) as cnt FROM logs l WHERE {where_clause}"); let mut count_query = sqlx::query(&count_sql).bind(to_i64(from)).bind(to_i64(to)); for param in ¶ms { - count_query = count_query.bind(param.as_slice()); + count_query = count_query.bind(*param); } let count_row = count_query.fetch_one(&self.pool).await.map_err(SqlColdError::from)?; let count = from_i64(count_row.get::(COL_CNT)) as usize; @@ -1213,21 +1315,19 @@ impl ColdStorage for SqlColdBackend { // for block_log_index (absolute position within block). let data_sql = format!( "SELECT l.*, h.block_hash, h.timestamp AS block_timestamp, t.tx_hash, \ - (SELECT COUNT(*) FROM logs l2 \ - WHERE l2.block_number = l.block_number \ - AND (l2.tx_index < l.tx_index \ - OR (l2.tx_index = l.tx_index AND l2.log_index < l.log_index)) \ - ) AS block_log_index \ + (r.first_log_index + l.log_index) AS block_log_index \ FROM logs l \ JOIN headers h ON l.block_number = h.block_number \ JOIN transactions t ON l.block_number = t.block_number \ AND l.tx_index = t.tx_index \ + JOIN receipts r ON l.block_number = r.block_number \ + AND l.tx_index = r.tx_index \ WHERE {where_clause} \ ORDER BY l.block_number, l.tx_index, l.log_index" ); let mut query = sqlx::query(&data_sql).bind(to_i64(from)).bind(to_i64(to)); for param in ¶ms { - query = query.bind(param.as_slice()); + query = query.bind(*param); } let rows = query.fetch_all(&self.pool).await.map_err(SqlColdError::from)?; @@ -1235,15 +1335,12 @@ impl ColdStorage for SqlColdBackend { rows.into_iter() .map(|r| { let log = log_from_row(&r); - let block_number = from_i64(r.get::(COL_BLOCK_NUMBER)); - let block_hash_bytes: Vec = r.get(COL_BLOCK_HASH); - let tx_hash_bytes: Vec = r.get(COL_TX_HASH); Ok(RpcLog { inner: log, - block_hash: Some(B256::from_slice(&block_hash_bytes)), - block_number: Some(block_number), + block_hash: Some(r.get(COL_BLOCK_HASH)), + block_number: Some(from_i64(r.get::(COL_BLOCK_NUMBER))), block_timestamp: Some(from_i64(r.get::(COL_BLOCK_TIMESTAMP))), - transaction_hash: Some(B256::from_slice(&tx_hash_bytes)), + transaction_hash: Some(r.get(COL_TX_HASH)), transaction_index: Some(from_i64(r.get::(COL_TX_INDEX))), log_index: Some(from_i64(r.get::(COL_BLOCK_LOG_INDEX))), removed: false, @@ -1252,6 +1349,14 @@ impl ColdStorage for SqlColdBackend { .collect::>>() } + async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) { + #[cfg(feature = "postgres")] + if self.is_postgres { + return self.produce_log_stream_pg(filter, params).await; + } + signet_cold::produce_log_stream_default(self, filter, params).await; + } + async fn get_latest_block(&self) -> ColdResult> { let row = sqlx::query("SELECT MAX(block_number) as max_bn FROM headers") .fetch_one(&self.pool) @@ -1277,36 +1382,16 @@ impl ColdStorage for SqlColdBackend { let bn = to_i64(block); let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM logs WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM transactions WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM receipts WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM signet_events WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM zenith_headers WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; - sqlx::query("DELETE FROM headers WHERE block_number > $1") - .bind(bn) - .execute(&mut *tx) - .await - .map_err(SqlColdError::from)?; + // Delete child tables first, then headers (preserves FK ordering). + for table in + ["logs", "transactions", "receipts", "signet_events", "zenith_headers", "headers"] + { + sqlx::query(&format!("DELETE FROM {table} WHERE block_number > $1")) + .bind(bn) + .execute(&mut *tx) + .await + .map_err(SqlColdError::from)?; + } tx.commit().await.map_err(SqlColdError::from)?; Ok(()) @@ -1321,7 +1406,7 @@ mod tests { #[tokio::test] async fn sqlite_conformance() { let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap(); - conformance(&backend).await.unwrap(); + conformance(backend).await.unwrap(); } #[tokio::test] @@ -1331,6 +1416,6 @@ mod tests { return; }; let backend = SqlColdBackend::connect(&url).await.unwrap(); - conformance(&backend).await.unwrap(); + conformance(backend).await.unwrap(); } } diff --git a/crates/cold-sql/src/columns.rs b/crates/cold-sql/src/columns.rs index 087edc4..f0fc8a3 100644 --- a/crates/cold-sql/src/columns.rs +++ b/crates/cold-sql/src/columns.rs @@ -53,6 +53,7 @@ pub(crate) const COL_FROM_ADDRESS: &str = "from_address"; // ── receipt columns ───────────────────────────────────────────────────────── pub(crate) const COL_SUCCESS: &str = "success"; pub(crate) const COL_CUMULATIVE_GAS_USED: &str = "cumulative_gas_used"; +pub(crate) const COL_FIRST_LOG_INDEX: &str = "first_log_index"; // ── log columns ───────────────────────────────────────────────────────────── pub(crate) const COL_ADDRESS: &str = "address"; @@ -79,7 +80,6 @@ pub(crate) const COL_BLOCK_DATA_HASH: &str = "block_data_hash"; // ── query-specific aliases ────────────────────────────────────────────────── pub(crate) const COL_CNT: &str = "cnt"; -pub(crate) const COL_LOG_COUNT: &str = "log_count"; pub(crate) const COL_PRIOR_GAS: &str = "prior_gas"; pub(crate) const COL_BLOCK_LOG_INDEX: &str = "block_log_index"; pub(crate) const COL_BLOCK_TIMESTAMP: &str = "block_timestamp"; diff --git a/crates/cold-sql/src/convert.rs b/crates/cold-sql/src/convert.rs index 4263ea5..b90ae4f 100644 --- a/crates/cold-sql/src/convert.rs +++ b/crates/cold-sql/src/convert.rs @@ -11,7 +11,7 @@ use alloy::{ eip2930::{AccessList, AccessListItem}, eip7702::{Authorization, SignedAuthorization}, }, - primitives::{Address, B256, Log, TxKind, U256}, + primitives::{Address, B256, Log, U256}, }; use signet_storage_types::Receipt; @@ -33,19 +33,6 @@ pub(crate) const fn from_i64(v: i64) -> u64 { // Fixed-size type encoding // ============================================================================ -/// Encode a U256 as 32 big-endian bytes. -pub(crate) const fn encode_u256(v: &U256) -> [u8; 32] { - v.to_be_bytes::<32>() -} - -/// Decode a U256 from big-endian bytes. -pub(crate) fn decode_u256(data: &[u8]) -> Result { - if data.len() < 32 { - return Err(SqlColdError::Convert(format!("U256 requires 32 bytes, got {}", data.len()))); - } - Ok(U256::from_be_slice(data)) -} - /// Encode a u128 as 16 big-endian bytes. pub(crate) const fn encode_u128(v: u128) -> [u8; 16] { v.to_be_bytes() @@ -59,42 +46,18 @@ pub(crate) fn decode_u128(data: &[u8]) -> Result { Ok(u128::from_be_bytes(arr)) } -// ============================================================================ -// Address / TxKind helpers -// ============================================================================ - -/// Encode a [`TxKind`] to an optional address blob for SQL. -pub(crate) fn to_address(kind: &TxKind) -> Option> { - match kind { - TxKind::Call(addr) => Some(addr.as_slice().to_vec()), - TxKind::Create => None, - } -} - -/// Decode an optional address blob back to a [`TxKind`]. -pub(crate) fn from_address(data: Option<&[u8]>) -> TxKind { - data.map_or(TxKind::Create, |b| TxKind::Call(Address::from_slice(b))) -} - // ============================================================================ // Nullable field helpers // ============================================================================ /// Decode a required u128 from a nullable blob column. -pub(crate) fn decode_u128_required( - data: &Option>, - field: &str, -) -> Result { - data.as_ref() - .ok_or_else(|| SqlColdError::Convert(format!("{field} is required"))) - .and_then(|b| decode_u128(b)) +pub(crate) fn decode_u128_required(data: Option<&[u8]>, field: &str) -> Result { + data.ok_or_else(|| SqlColdError::Convert(format!("{field} is required"))).and_then(decode_u128) } /// Decode an access list from a nullable blob column, defaulting to empty. -pub(crate) fn decode_access_list_or_empty( - data: &Option>, -) -> Result { - data.as_ref().map(|b| decode_access_list(b)).transpose().map(|opt| opt.unwrap_or_default()) +pub(crate) fn decode_access_list_or_empty(data: Option<&[u8]>) -> Result { + data.map(decode_access_list).transpose().map(|opt| opt.unwrap_or_default()) } // ============================================================================ diff --git a/crates/cold-sql/src/lib.rs b/crates/cold-sql/src/lib.rs index 55a0cf8..4a2e7cf 100644 --- a/crates/cold-sql/src/lib.rs +++ b/crates/cold-sql/src/lib.rs @@ -32,6 +32,8 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg))] +// Trait impls for sqlx integration (Address, B256, Bytes, FixedBytes). +use alloy_primitives as _; mod error; pub use error::SqlColdError; diff --git a/crates/cold/Cargo.toml b/crates/cold/Cargo.toml index 146d5fb..d9c5779 100644 --- a/crates/cold/Cargo.toml +++ b/crates/cold/Cargo.toml @@ -21,6 +21,7 @@ lru.workspace = true signet-storage-types.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true diff --git a/crates/cold/src/conformance.rs b/crates/cold/src/conformance.rs index f17b56d..6b0454b 100644 --- a/crates/cold/src/conformance.rs +++ b/crates/cold/src/conformance.rs @@ -5,32 +5,40 @@ //! a custom backend, call the test functions with your backend instance. use crate::{ - BlockData, ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, - ReceiptSpecifier, TransactionSpecifier, + BlockData, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle, ColdStorageTask, + Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, TransactionSpecifier, }; use alloy::{ - consensus::transaction::Recovered, - consensus::{Header, Receipt as AlloyReceipt, Sealable, Signed, TxLegacy}, + consensus::{ + Header, Receipt as AlloyReceipt, Sealable, Signed, TxLegacy, transaction::Recovered, + }, primitives::{ Address, B256, BlockNumber, Bytes, Log, LogData, Signature, TxKind, U256, address, }, }; use signet_storage_types::{Receipt, RecoveredTx, TransactionSigned}; +use std::time::Duration; +use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; /// Run all conformance tests against a backend. /// /// This is the main entry point for testing a custom backend implementation. -pub async fn conformance(backend: &B) -> ColdResult<()> { - test_empty_storage(backend).await?; - test_append_and_read_header(backend).await?; - test_header_hash_lookup(backend).await?; - test_transaction_lookups(backend).await?; - test_receipt_lookups(backend).await?; - test_truncation(backend).await?; - test_batch_append(backend).await?; - test_confirmation_metadata(backend).await?; - test_cold_receipt_metadata(backend).await?; - test_get_logs(backend).await?; +pub async fn conformance(backend: B) -> ColdResult<()> { + let cancel = CancellationToken::new(); + let handle = ColdStorageTask::spawn(backend, cancel.clone()); + test_empty_storage(&handle).await?; + test_append_and_read_header(&handle).await?; + test_header_hash_lookup(&handle).await?; + test_transaction_lookups(&handle).await?; + test_receipt_lookups(&handle).await?; + test_truncation(&handle).await?; + test_batch_append(&handle).await?; + test_confirmation_metadata(&handle).await?; + test_cold_receipt_metadata(&handle).await?; + test_get_logs(&handle).await?; + test_stream_logs(&handle).await?; + cancel.cancel(); Ok(()) } @@ -88,86 +96,84 @@ fn make_test_block_with_txs(block_number: BlockNumber, tx_count: usize) -> Block } /// Test that empty storage returns None/empty for all lookups. -pub async fn test_empty_storage(backend: &B) -> ColdResult<()> { - assert!(backend.get_header(HeaderSpecifier::Number(0)).await?.is_none()); - assert!(backend.get_header(HeaderSpecifier::Hash(B256::ZERO)).await?.is_none()); - assert!(backend.get_latest_block().await?.is_none()); - assert!(backend.get_transactions_in_block(0).await?.is_empty()); - assert!(backend.get_receipts_in_block(0).await?.is_empty()); - assert_eq!(backend.get_transaction_count(0).await?, 0); +pub async fn test_empty_storage(handle: &ColdStorageHandle) -> ColdResult<()> { + assert!(handle.get_header(HeaderSpecifier::Number(0)).await?.is_none()); + assert!(handle.get_header(HeaderSpecifier::Hash(B256::ZERO)).await?.is_none()); + assert!(handle.get_latest_block().await?.is_none()); + assert!(handle.get_transactions_in_block(0).await?.is_empty()); + assert!(handle.get_receipts_in_block(0).await?.is_empty()); + assert_eq!(handle.get_transaction_count(0).await?, 0); Ok(()) } /// Test basic append and read for headers. -pub async fn test_append_and_read_header(backend: &B) -> ColdResult<()> { +pub async fn test_append_and_read_header(handle: &ColdStorageHandle) -> ColdResult<()> { let block_data = make_test_block(100); let expected_header = block_data.header.clone(); - backend.append_block(block_data).await?; + handle.append_block(block_data).await?; - let retrieved = backend.get_header(HeaderSpecifier::Number(100)).await?; - assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().hash(), expected_header.hash()); + let retrieved = handle.get_header(HeaderSpecifier::Number(100)).await?.unwrap(); + assert_eq!(retrieved.hash(), expected_header.hash()); Ok(()) } /// Test header lookup by hash. -pub async fn test_header_hash_lookup(backend: &B) -> ColdResult<()> { +pub async fn test_header_hash_lookup(handle: &ColdStorageHandle) -> ColdResult<()> { let block_data = make_test_block(101); let header_hash = block_data.header.hash(); - backend.append_block(block_data).await?; + handle.append_block(block_data).await?; - let retrieved = backend.get_header(HeaderSpecifier::Hash(header_hash)).await?; - assert!(retrieved.is_some()); - assert_eq!(retrieved.unwrap().hash(), header_hash); + let retrieved = handle.get_header(HeaderSpecifier::Hash(header_hash)).await?.unwrap(); + assert_eq!(retrieved.hash(), header_hash); // Non-existent hash should return None - let missing = backend.get_header(HeaderSpecifier::Hash(B256::ZERO)).await?; + let missing = handle.get_header(HeaderSpecifier::Hash(B256::ZERO)).await?; assert!(missing.is_none()); Ok(()) } /// Test transaction lookups by hash and by block+index. -pub async fn test_transaction_lookups(backend: &B) -> ColdResult<()> { +pub async fn test_transaction_lookups(handle: &ColdStorageHandle) -> ColdResult<()> { let block_data = make_test_block(200); - backend.append_block(block_data).await?; + handle.append_block(block_data).await?; - let txs = backend.get_transactions_in_block(200).await?; - let count = backend.get_transaction_count(200).await?; + let txs = handle.get_transactions_in_block(200).await?; + let count = handle.get_transaction_count(200).await?; assert_eq!(txs.len() as u64, count); Ok(()) } /// Test receipt lookups. -pub async fn test_receipt_lookups(backend: &B) -> ColdResult<()> { +pub async fn test_receipt_lookups(handle: &ColdStorageHandle) -> ColdResult<()> { let block_data = make_test_block(201); - backend.append_block(block_data).await?; + handle.append_block(block_data).await?; - let receipts = backend.get_receipts_in_block(201).await?; + let receipts = handle.get_receipts_in_block(201).await?; assert!(receipts.is_empty()); Ok(()) } /// Test that transaction and receipt lookups return correct metadata. -pub async fn test_confirmation_metadata(backend: &B) -> ColdResult<()> { +pub async fn test_confirmation_metadata(handle: &ColdStorageHandle) -> ColdResult<()> { let block = make_test_block_with_txs(600, 3); let expected_hash = block.header.hash(); let tx_hashes: Vec<_> = block.transactions.iter().map(|tx| *tx.tx_hash()).collect(); let expected_senders: Vec<_> = block.transactions.iter().map(|tx| *tx.signer()).collect(); - backend.append_block(block).await?; + handle.append_block(block).await?; // Verify transaction metadata via hash lookup for (idx, tx_hash) in tx_hashes.iter().enumerate() { let confirmed = - backend.get_transaction(TransactionSpecifier::Hash(*tx_hash)).await?.unwrap(); + handle.get_transaction(TransactionSpecifier::Hash(*tx_hash)).await?.unwrap(); assert_eq!(confirmed.meta().block_number(), 600); assert_eq!(confirmed.meta().block_hash(), expected_hash); assert_eq!(confirmed.meta().transaction_index(), idx as u64); @@ -175,7 +181,7 @@ pub async fn test_confirmation_metadata(backend: &B) -> ColdResu } // Verify transaction metadata via block+index lookup - let confirmed = backend + let confirmed = handle .get_transaction(TransactionSpecifier::BlockAndIndex { block: 600, index: 1 }) .await? .unwrap(); @@ -185,7 +191,7 @@ pub async fn test_confirmation_metadata(backend: &B) -> ColdResu assert_eq!(*confirmed.inner().signer(), expected_senders[1]); // Verify transaction metadata via block_hash+index lookup - let confirmed = backend + let confirmed = handle .get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash: expected_hash, index: 2, @@ -197,13 +203,13 @@ pub async fn test_confirmation_metadata(backend: &B) -> ColdResu assert_eq!(*confirmed.inner().signer(), expected_senders[2]); // Verify receipt metadata via tx hash lookup - let cold_receipt = backend.get_receipt(ReceiptSpecifier::TxHash(tx_hashes[0])).await?.unwrap(); + let cold_receipt = handle.get_receipt(ReceiptSpecifier::TxHash(tx_hashes[0])).await?.unwrap(); assert_eq!(cold_receipt.block_number, 600); assert_eq!(cold_receipt.block_hash, expected_hash); assert_eq!(cold_receipt.transaction_index, 0); // Verify receipt metadata via block+index lookup - let cold_receipt = backend + let cold_receipt = handle .get_receipt(ReceiptSpecifier::BlockAndIndex { block: 600, index: 2 }) .await? .unwrap(); @@ -211,51 +217,51 @@ pub async fn test_confirmation_metadata(backend: &B) -> ColdResu assert_eq!(cold_receipt.transaction_index, 2); // Non-existent lookups return None - assert!(backend.get_transaction(TransactionSpecifier::Hash(B256::ZERO)).await?.is_none()); - assert!(backend.get_receipt(ReceiptSpecifier::TxHash(B256::ZERO)).await?.is_none()); + assert!(handle.get_transaction(TransactionSpecifier::Hash(B256::ZERO)).await?.is_none()); + assert!(handle.get_receipt(ReceiptSpecifier::TxHash(B256::ZERO)).await?.is_none()); Ok(()) } /// Test truncation removes data correctly. -pub async fn test_truncation(backend: &B) -> ColdResult<()> { +pub async fn test_truncation(handle: &ColdStorageHandle) -> ColdResult<()> { // Append blocks 300, 301, 302 - backend.append_block(make_test_block(300)).await?; - backend.append_block(make_test_block(301)).await?; - backend.append_block(make_test_block(302)).await?; + handle.append_block(make_test_block(300)).await?; + handle.append_block(make_test_block(301)).await?; + handle.append_block(make_test_block(302)).await?; // Truncate above 300 (removes 301, 302) - backend.truncate_above(300).await?; + handle.truncate_above(300).await?; // Block 300 should still exist - assert!(backend.get_header(HeaderSpecifier::Number(300)).await?.is_some()); + assert!(handle.get_header(HeaderSpecifier::Number(300)).await?.is_some()); // Blocks 301, 302 should be gone - assert!(backend.get_header(HeaderSpecifier::Number(301)).await?.is_none()); - assert!(backend.get_header(HeaderSpecifier::Number(302)).await?.is_none()); + assert!(handle.get_header(HeaderSpecifier::Number(301)).await?.is_none()); + assert!(handle.get_header(HeaderSpecifier::Number(302)).await?.is_none()); // Latest should now be 300 - assert_eq!(backend.get_latest_block().await?, Some(300)); + assert_eq!(handle.get_latest_block().await?, Some(300)); Ok(()) } /// Test batch append. -pub async fn test_batch_append(backend: &B) -> ColdResult<()> { +pub async fn test_batch_append(handle: &ColdStorageHandle) -> ColdResult<()> { let blocks = vec![make_test_block(400), make_test_block(401), make_test_block(402)]; - backend.append_blocks(blocks).await?; + handle.append_blocks(blocks).await?; - assert!(backend.get_header(HeaderSpecifier::Number(400)).await?.is_some()); - assert!(backend.get_header(HeaderSpecifier::Number(401)).await?.is_some()); - assert!(backend.get_header(HeaderSpecifier::Number(402)).await?.is_some()); + assert!(handle.get_header(HeaderSpecifier::Number(400)).await?.is_some()); + assert!(handle.get_header(HeaderSpecifier::Number(401)).await?.is_some()); + assert!(handle.get_header(HeaderSpecifier::Number(402)).await?.is_some()); Ok(()) } /// Test ColdReceipt metadata: gas_used, first_log_index, tx_hash, /// block_hash, block_number, transaction_index, from. -pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResult<()> { +pub async fn test_cold_receipt_metadata(handle: &ColdStorageHandle) -> ColdResult<()> { // Block with 3 receipts having 2, 3, and 1 logs respectively. let header = Header { number: 700, ..Default::default() }; let sealed = header.seal_slow(); @@ -270,10 +276,10 @@ pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResu ]; let block = BlockData::new(sealed, transactions, receipts, vec![], None); - backend.append_block(block).await?; + handle.append_block(block).await?; // First receipt: gas_used=21000, first log index=0 - let first = backend + let first = handle .get_receipt(ReceiptSpecifier::BlockAndIndex { block: 700, index: 0 }) .await? .unwrap(); @@ -287,7 +293,7 @@ pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResu assert_eq!(first.receipt.logs[1].log_index, Some(1)); // Second receipt: gas_used=21000, first log index=2 - let second = backend + let second = handle .get_receipt(ReceiptSpecifier::BlockAndIndex { block: 700, index: 1 }) .await? .unwrap(); @@ -297,7 +303,7 @@ pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResu assert_eq!(second.receipt.logs.len(), 3); // Third receipt: gas_used=21000, first log index=5 (2+3) - let third = backend + let third = handle .get_receipt(ReceiptSpecifier::BlockAndIndex { block: 700, index: 2 }) .await? .unwrap(); @@ -306,14 +312,14 @@ pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResu assert_eq!(third.receipt.logs[0].log_index, Some(5)); // Lookup by tx hash - let by_hash = backend.get_receipt(ReceiptSpecifier::TxHash(tx_hashes[1])).await?.unwrap(); + let by_hash = handle.get_receipt(ReceiptSpecifier::TxHash(tx_hashes[1])).await?.unwrap(); assert_eq!(by_hash.transaction_index, 1); assert_eq!(by_hash.tx_hash, tx_hashes[1]); assert_eq!(by_hash.from, expected_senders[1]); assert_eq!(by_hash.receipt.logs[0].log_index, Some(2)); // Verify gas_used on all receipts via get_receipts_in_block - let all = backend.get_receipts_in_block(700).await?; + let all = handle.get_receipts_in_block(700).await?; assert_eq!(all.len(), 3); assert_eq!(all[0].gas_used, 21000); assert_eq!(all[1].gas_used, 21000); @@ -321,7 +327,7 @@ pub async fn test_cold_receipt_metadata(backend: &B) -> ColdResu // Non-existent returns None assert!( - backend + handle .get_receipt(ReceiptSpecifier::BlockAndIndex { block: 999, index: 0 }) .await? .is_none() @@ -353,7 +359,7 @@ fn make_test_block_with_receipts(block_number: BlockNumber, receipts: Vec(backend: &B) -> ColdResult<()> { +pub async fn test_get_logs(handle: &ColdStorageHandle) -> ColdResult<()> { let addr_a = Address::with_last_byte(0xAA); let addr_b = Address::with_last_byte(0xBB); let topic0_transfer = B256::with_last_byte(0x01); @@ -383,15 +389,15 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { vec![make_receipt_from_logs(vec![make_test_log(addr_b, vec![topic0_transfer], vec![4])])]; let block_801 = make_test_block_with_receipts(801, receipts_801); - backend.append_block(block_800).await?; - backend.append_block(block_801).await?; + handle.append_block(block_800).await?; + handle.append_block(block_801).await?; // --- Empty range returns empty --- - let empty = backend.get_logs(Filter::new().from_block(900).to_block(999), usize::MAX).await?; + let empty = handle.get_logs(Filter::new().from_block(900).to_block(999), usize::MAX).await?; assert!(empty.is_empty()); // --- All logs in range 800..=801 (no address/topic filter) --- - let all = backend.get_logs(Filter::new().from_block(800).to_block(801), usize::MAX).await?; + let all = handle.get_logs(Filter::new().from_block(800).to_block(801), usize::MAX).await?; assert_eq!(all.len(), 4); // Verify ordering by (block_number, transaction_index) assert_eq!((all[0].block_number, all[0].transaction_index), (Some(800), Some(0))); @@ -415,19 +421,18 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(all[2].transaction_hash, Some(tx1_hash_800)); // --- Block range filtering --- - let only_800 = - backend.get_logs(Filter::new().from_block(800).to_block(800), usize::MAX).await?; + let only_800 = handle.get_logs(Filter::new().from_block(800).to_block(800), usize::MAX).await?; assert_eq!(only_800.len(), 3); // --- Single address filter --- - let addr_a_logs = backend + let addr_a_logs = handle .get_logs(Filter::new().from_block(800).to_block(801).address(addr_a), usize::MAX) .await?; assert_eq!(addr_a_logs.len(), 2); assert!(addr_a_logs.iter().all(|l| l.inner.address == addr_a)); // --- Multi-address filter --- - let both_addr = backend + let both_addr = handle .get_logs( Filter::new().from_block(800).to_block(801).address(vec![addr_a, addr_b]), usize::MAX, @@ -436,7 +441,7 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(both_addr.len(), 4); // --- Topic0 filter --- - let transfers = backend + let transfers = handle .get_logs( Filter::new().from_block(800).to_block(801).event_signature(topic0_transfer), usize::MAX, @@ -445,7 +450,7 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(transfers.len(), 3); // --- Topic0 multi-value (OR within position) --- - let transfer_or_approval = backend + let transfer_or_approval = handle .get_logs( Filter::new() .from_block(800) @@ -457,7 +462,7 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(transfer_or_approval.len(), 4); // --- Multi-topic: topic0 AND topic1 --- - let specific = backend + let specific = handle .get_logs( Filter::new() .from_block(800) @@ -471,13 +476,13 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(specific[0].inner.address, addr_a); // --- Topic1 filter with topic0 wildcard --- - let by_sender = backend + let by_sender = handle .get_logs(Filter::new().from_block(800).to_block(801).topic1(topic1_sender), usize::MAX) .await?; assert_eq!(by_sender.len(), 1); // --- Combined address + topic filter --- - let addr_a_transfers = backend + let addr_a_transfers = handle .get_logs( Filter::new() .from_block(800) @@ -490,12 +495,116 @@ pub async fn test_get_logs(backend: &B) -> ColdResult<()> { assert_eq!(addr_a_transfers.len(), 2); // --- max_logs errors when exceeded --- - let err = backend.get_logs(Filter::new().from_block(800).to_block(801), 2).await; + let err = handle.get_logs(Filter::new().from_block(800).to_block(801), 2).await; assert!(matches!(err, Err(ColdStorageError::TooManyLogs { limit: 2 }))); // --- max_logs at exact count succeeds --- - let exact = backend.get_logs(Filter::new().from_block(800).to_block(801), 4).await?; + let exact = handle.get_logs(Filter::new().from_block(800).to_block(801), 4).await?; assert_eq!(exact.len(), 4); Ok(()) } + +/// Collect a [`crate::LogStream`] into a `Vec`, returning the first error if any. +async fn collect_stream(mut stream: crate::LogStream) -> ColdResult> { + let mut results = Vec::new(); + while let Some(item) = stream.next().await { + results.push(item?); + } + Ok(results) +} + +/// Test stream_logs produces identical results to get_logs for all filter +/// combinations from the existing test_get_logs suite. +/// +/// Uses block numbers 850-851 to avoid collisions with test_get_logs data +/// (800-801). Streaming is tested via [`ColdStorageHandle`] since the +/// streaming loop lives in [`ColdStorageTask`]. +pub async fn test_stream_logs(handle: &ColdStorageHandle) -> ColdResult<()> { + let addr_a = Address::with_last_byte(0xAA); + let addr_b = Address::with_last_byte(0xBB); + let topic0_transfer = B256::with_last_byte(0x01); + let topic0_approval = B256::with_last_byte(0x02); + let topic1_sender = B256::with_last_byte(0x10); + + // Block 850: 2 txs, tx0 has 2 logs, tx1 has 1 log + let receipts_850 = vec![ + make_receipt_from_logs(vec![ + make_test_log(addr_a, vec![topic0_transfer, topic1_sender], vec![1]), + make_test_log(addr_b, vec![topic0_approval], vec![2]), + ]), + make_receipt_from_logs(vec![make_test_log(addr_a, vec![topic0_transfer], vec![3])]), + ]; + let block_850 = make_test_block_with_receipts(850, receipts_850); + handle.append_block(block_850).await?; + + // Block 851: 1 tx, 1 log + let receipts_851 = + vec![make_receipt_from_logs(vec![make_test_log(addr_b, vec![topic0_transfer], vec![4])])]; + let block_851 = make_test_block_with_receipts(851, receipts_851); + handle.append_block(block_851).await?; + + // Helper: verify stream matches get_logs for the same filter. + let filters = vec![ + // All logs + Filter::new().from_block(850).to_block(851), + // Single block + Filter::new().from_block(850).to_block(850), + // Address filter + Filter::new().from_block(850).to_block(851).address(addr_a), + // Topic filter + Filter::new().from_block(850).to_block(851).event_signature(topic0_transfer), + // Combined address + topic + Filter::new() + .from_block(850) + .to_block(851) + .address(addr_a) + .event_signature(topic0_transfer), + // Empty range + Filter::new().from_block(900).to_block(999), + ]; + + for filter in filters { + let expected = handle.get_logs(filter.clone(), usize::MAX).await?; + let stream = handle.stream_logs(filter, usize::MAX, Duration::from_secs(60)).await?; + let streamed = collect_stream(stream).await?; + assert_eq!(expected.len(), streamed.len(), "stream length mismatch"); + for (e, s) in expected.iter().zip(streamed.iter()) { + assert_eq!(e.block_number, s.block_number); + assert_eq!(e.transaction_index, s.transaction_index); + assert_eq!(e.log_index, s.log_index); + assert_eq!(e.block_hash, s.block_hash); + assert_eq!(e.transaction_hash, s.transaction_hash); + assert_eq!(e.inner.address, s.inner.address); + } + } + + // --- max_logs: stream yields TooManyLogs error --- + let stream = handle + .stream_logs(Filter::new().from_block(850).to_block(851), 2, Duration::from_secs(60)) + .await?; + let result = collect_stream(stream).await; + assert!(matches!(result, Err(ColdStorageError::TooManyLogs { limit: 2 }))); + + // --- max_logs across block boundary: limit reports the original max_logs --- + // Block 860 has 2 logs, block 861 has 2 logs. With max_logs=3 the limit + // is exceeded during block 861 and the error must report `limit: 3`. + let receipts_860 = vec![make_receipt_from_logs(vec![ + make_test_log(addr_a, vec![topic0_transfer], vec![10]), + make_test_log(addr_b, vec![topic0_transfer], vec![11]), + ])]; + let receipts_861 = vec![make_receipt_from_logs(vec![ + make_test_log(addr_a, vec![topic0_transfer], vec![12]), + make_test_log(addr_b, vec![topic0_transfer], vec![13]), + ])]; + handle.append_block(make_test_block_with_receipts(860, receipts_860)).await?; + handle.append_block(make_test_block_with_receipts(861, receipts_861)).await?; + + let stream = handle + .stream_logs(Filter::new().from_block(860).to_block(861), 3, Duration::from_secs(60)) + .await?; + let result = collect_stream(stream).await; + assert!(matches!(result, Err(ColdStorageError::TooManyLogs { limit: 3 }))); + + Ok(()) +} diff --git a/crates/cold/src/error.rs b/crates/cold/src/error.rs index 1834033..c92fc7c 100644 --- a/crates/cold/src/error.rs +++ b/crates/cold/src/error.rs @@ -41,6 +41,22 @@ pub enum ColdStorageError { limit: usize, }, + /// The streaming operation exceeded its deadline. + /// + /// The backend enforces a wall-clock limit on streaming operations + /// to prevent unbounded resource acquisition. Partial results may + /// have been delivered before this error. + #[error("stream deadline exceeded")] + StreamDeadlineExceeded, + + /// A reorg was detected during a streaming operation. + /// + /// The anchor block hash changed between chunks, indicating that + /// the data being streamed may no longer be consistent. Partial + /// results may have been delivered before this error. + #[error("reorg detected during streaming")] + ReorgDetected, + /// The cold storage task has terminated. /// /// The channel is closed because the task has stopped (panic, cancellation, diff --git a/crates/cold/src/lib.rs b/crates/cold/src/lib.rs index 1cf8003..ec843cd 100644 --- a/crates/cold/src/lib.rs +++ b/crates/cold/src/lib.rs @@ -156,8 +156,10 @@ pub use specifier::{ mod cold_receipt; pub use cold_receipt::ColdReceipt; +mod stream; +pub use stream::{StreamParams, produce_log_stream_default}; mod traits; -pub use traits::{BlockData, ColdStorage}; +pub use traits::{BlockData, ColdStorage, LogStream}; /// Task module containing the storage task runner and handles. pub mod task; diff --git a/crates/cold/src/mem.rs b/crates/cold/src/mem.rs index 057e06d..7ba203c 100644 --- a/crates/cold/src/mem.rs +++ b/crates/cold/src/mem.rs @@ -47,11 +47,16 @@ struct MemColdBackendInner { /// /// This backend is thread-safe and suitable for concurrent access. /// All operations are protected by an async read-write lock. -#[derive(Default)] pub struct MemColdBackend { inner: Arc>, } +impl Default for MemColdBackend { + fn default() -> Self { + Self { inner: Arc::new(RwLock::new(MemColdBackendInner::default())) } + } +} + impl MemColdBackend { /// Create a new empty in-memory backend. pub fn new() -> Self { @@ -206,7 +211,7 @@ impl ColdStorage for MemColdBackend { }) } - async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult> { + async fn get_logs(&self, filter: &Filter, max_logs: usize) -> ColdResult> { let inner = self.inner.read().await; let mut results = Vec::new(); @@ -241,6 +246,10 @@ impl ColdStorage for MemColdBackend { Ok(results) } + async fn produce_log_stream(&self, filter: &Filter, params: crate::StreamParams) { + crate::produce_log_stream_default(self, filter, params).await; + } + async fn get_latest_block(&self) -> ColdResult> { let inner = self.inner.read().await; Ok(inner.headers.last_key_value().map(|(k, _)| *k)) @@ -336,6 +345,6 @@ mod test { #[tokio::test] async fn mem_backend_conformance() { let backend = MemColdBackend::new(); - conformance(&backend).await.unwrap(); + conformance(backend).await.unwrap(); } } diff --git a/crates/cold/src/request.rs b/crates/cold/src/request.rs index 29ff364..4a2f64c 100644 --- a/crates/cold/src/request.rs +++ b/crates/cold/src/request.rs @@ -4,11 +4,12 @@ //! Reads and writes use separate channels with their own request types. use crate::{ - BlockData, ColdReceipt, ColdStorageError, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, - RpcLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, + BlockData, ColdReceipt, ColdStorageError, Confirmed, Filter, HeaderSpecifier, LogStream, + ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use alloy::primitives::BlockNumber; use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; +use std::time::Duration; use tokio::sync::oneshot; /// Response sender type alias that propagates Result types. @@ -118,6 +119,17 @@ pub enum ColdReadRequest { /// The response channel. resp: Responder>, }, + /// Stream logs matching a filter. + StreamLogs { + /// The log filter. + filter: Box, + /// Maximum number of logs to stream. + max_logs: usize, + /// Requested stream deadline (clamped to the task's max). + deadline: Duration, + /// Response channel returning the log stream. + resp: Responder, + }, // --- Metadata --- /// Get the latest block number. diff --git a/crates/cold/src/stream.rs b/crates/cold/src/stream.rs new file mode 100644 index 0000000..c709d56 --- /dev/null +++ b/crates/cold/src/stream.rs @@ -0,0 +1,130 @@ +//! Log-streaming helper for backends without snapshot semantics. + +use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog}; +use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption}; +use tokio::sync::mpsc; + +/// Parameters for a log-streaming request. +/// +/// Bundles the block range, limits, channel, and deadline that every +/// [`ColdStorage::produce_log_stream`] implementation needs. +#[derive(Debug)] +pub struct StreamParams { + /// First block in range (inclusive). + pub from: BlockNumber, + /// Last block in range (inclusive). + pub to: BlockNumber, + /// Maximum number of matching logs before aborting with + /// [`ColdStorageError::TooManyLogs`]. + pub max_logs: usize, + /// Channel for sending log results. + pub sender: mpsc::Sender>, + /// Deadline after which the stream is aborted with + /// [`ColdStorageError::StreamDeadlineExceeded`]. + pub deadline: tokio::time::Instant, +} + +/// Log-streaming implementation for backends without snapshot semantics. +/// +/// Captures an anchor hash from the `to` block at the start and +/// re-checks it before each block to detect reorgs. Uses +/// [`ColdStorage::get_header`] for anchor checks and +/// [`ColdStorage::get_logs`] with single-block filters per block. +/// +/// Backends that hold a consistent read snapshot (MDBX, PostgreSQL +/// with REPEATABLE READ) should provide their own +/// [`ColdStorage::produce_log_stream`] implementation instead. +pub async fn produce_log_stream_default( + backend: &B, + filter: &Filter, + params: StreamParams, +) { + let StreamParams { from, to, max_logs, sender, deadline } = params; + + // Capture the hash of the `to` block before we start iterating. + // Without snapshot isolation we have no guarantee that the + // underlying data stays consistent, so we re-check this hash + // before each block to detect reorgs. + let anchor_hash = match backend.get_header(HeaderSpecifier::Number(to)).await { + Ok(Some(h)) => h.hash(), + Ok(None) => return, // block doesn't exist; empty stream + Err(e) => { + let _ = sender.send(Err(e)).await; + return; + } + }; + + let mut total = 0usize; + + // Clone the filter once; we reuse it across blocks by mutating + // only the block range, avoiding per-block clones of the address + // and topic arrays. + let mut block_filter = filter.clone(); + + // Walk through blocks one at a time, fetching matching logs from + // each block and sending them over the channel individually. + for block_num in from..=to { + // Check the deadline before starting each block so we + // don't begin a new query after the caller's timeout. + if tokio::time::Instant::now() > deadline { + let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await; + return; + } + + // Re-check the anchor hash to detect reorgs that may have + // occurred since we started streaming. + match backend.get_header(HeaderSpecifier::Number(to)).await { + Ok(Some(h)) if h.hash() == anchor_hash => {} + Ok(_) => { + let _ = sender.send(Err(ColdStorageError::ReorgDetected)).await; + return; + } + Err(e) => { + let _ = sender.send(Err(e)).await; + return; + } + } + + // Fetch all matching logs for this single block. The + // remaining budget shrinks as we accumulate results so + // `get_logs` can reject early if a single block overflows. + let remaining = max_logs.saturating_sub(total); + block_filter.block_option = FilterBlockOption::Range { + from_block: Some(block_num.into()), + to_block: Some(block_num.into()), + }; + let block_logs = match backend.get_logs(&block_filter, remaining).await { + Ok(logs) => logs, + Err(ColdStorageError::TooManyLogs { .. }) => { + let _ = sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await; + return; + } + Err(e) => { + let _ = sender.send(Err(e)).await; + return; + } + }; + + total += block_logs.len(); + + // Send each log individually over the channel. The timeout + // ensures we stop if the deadline passes while back-pressured + // by a slow receiver. + for log in block_logs { + match tokio::time::timeout_at(deadline, sender.send(Ok(log))).await { + Ok(Ok(())) => {} + Ok(Err(_)) => return, // receiver dropped + Err(_) => { + let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await; + return; + } + } + } + + // Early exit if we've already hit the limit — no need to + // query the next block. + if total >= max_logs { + return; + } + } +} diff --git a/crates/cold/src/task/handle.rs b/crates/cold/src/task/handle.rs index d05b425..be49132 100644 --- a/crates/cold/src/task/handle.rs +++ b/crates/cold/src/task/handle.rs @@ -11,11 +11,12 @@ use crate::{ AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError, - ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, + ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, }; use alloy::primitives::{B256, BlockNumber}; use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader}; +use std::time::Duration; use tokio::sync::{mpsc, oneshot}; /// Map a [`mpsc::error::TrySendError`] to the appropriate @@ -266,6 +267,40 @@ impl ColdStorageReadHandle { self.send(ColdReadRequest::GetLogs { filter: Box::new(filter), max_logs, resp }, rx).await } + /// Stream logs matching a filter. + /// + /// Returns a [`LogStream`] that yields matching logs in order. + /// Consume with `StreamExt::next()` until `None`. If the last item + /// is `Err(...)`, an error occurred (deadline, too many logs, etc.). + /// + /// The `deadline` is clamped to the task's configured maximum. + /// + /// # Partial Delivery + /// + /// One or more `Ok(log)` items may be delivered before a terminal + /// `Err(...)`. Consumers must be prepared for partial results — for + /// example, a reorg or deadline expiry can interrupt a stream that + /// has already yielded some logs. + /// + /// # Resource Management + /// + /// The stream holds a backend concurrency permit. Dropping the + /// stream releases the permit. Drop early if results are no + /// longer needed. + pub async fn stream_logs( + &self, + filter: Filter, + max_logs: usize, + deadline: Duration, + ) -> ColdResult { + let (resp, rx) = oneshot::channel(); + self.send( + ColdReadRequest::StreamLogs { filter: Box::new(filter), max_logs, deadline, resp }, + rx, + ) + .await + } + // ========================================================================== // Metadata // ========================================================================== diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index ecb2ba6..b0a2114 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -8,18 +8,30 @@ //! //! Transaction, receipt, and header lookups are served from an LRU cache, //! avoiding repeated backend reads for frequently queried items. +//! +//! # Log Streaming +//! +//! The task owns the streaming configuration (max deadline, concurrency +//! limit) and delegates the streaming loop to the backend via +//! [`ColdStorage::produce_log_stream`]. Callers supply a per-request +//! deadline that is clamped to the task's configured maximum. use super::cache::ColdCache; use crate::{ - ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageHandle, ColdWriteRequest, - Confirmed, HeaderSpecifier, ReceiptSpecifier, TransactionSpecifier, + ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle, + ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier, + TransactionSpecifier, }; use signet_storage_types::{RecoveredTx, SealedHeader}; -use std::sync::Arc; -use tokio::sync::{Mutex, mpsc}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::{Mutex, Semaphore, mpsc}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{debug, instrument}; +/// Default maximum deadline for streaming operations. +const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60); + /// Channel size for cold storage read requests. const READ_CHANNEL_SIZE: usize = 256; @@ -29,6 +41,12 @@ const WRITE_CHANNEL_SIZE: usize = 256; /// Maximum concurrent read request handlers. const MAX_CONCURRENT_READERS: usize = 64; +/// Maximum concurrent streaming operations. +const MAX_CONCURRENT_STREAMS: usize = 8; + +/// Channel buffer size for streaming operations. +const STREAM_CHANNEL_BUFFER: usize = 256; + /// Shared state for the cold storage task, holding the backend and cache. /// /// This is wrapped in an `Arc` so that spawned read handlers can access @@ -36,6 +54,8 @@ const MAX_CONCURRENT_READERS: usize = 64; struct ColdStorageTaskInner { backend: B, cache: Mutex, + max_stream_deadline: Duration, + stream_semaphore: Arc, } impl ColdStorageTaskInner { @@ -80,7 +100,7 @@ impl ColdStorageTaskInner { } /// Handle a read request, checking the cache first where applicable. - async fn handle_read(&self, req: ColdReadRequest) { + async fn handle_read(self: &Arc, req: ColdReadRequest) { match req { ColdReadRequest::GetHeader { spec, resp } => { let result = if let HeaderSpecifier::Number(n) = &spec { @@ -140,7 +160,10 @@ impl ColdStorageTaskInner { let _ = resp.send(self.backend.get_zenith_headers(spec).await); } ColdReadRequest::GetLogs { filter, max_logs, resp } => { - let _ = resp.send(self.backend.get_logs(*filter, max_logs).await); + let _ = resp.send(self.backend.get_logs(&filter, max_logs).await); + } + ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => { + let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await); } ColdReadRequest::GetLatestBlock { resp } => { let _ = resp.send(self.backend.get_latest_block().await); @@ -148,6 +171,51 @@ impl ColdStorageTaskInner { } } + /// Stream logs matching a filter. + /// + /// Acquires a concurrency permit, resolves the block range, then + /// spawns a producer task that delegates to + /// [`ColdStorage::produce_log_stream`]. + async fn handle_stream_logs( + self: &Arc, + filter: crate::Filter, + max_logs: usize, + deadline: Duration, + ) -> ColdResult { + let permit = self + .stream_semaphore + .clone() + .acquire_owned() + .await + .map_err(|_| ColdStorageError::Cancelled)?; + + let from = filter.get_from_block().unwrap_or(0); + let to = match filter.get_to_block() { + Some(to) => to, + None => { + let Some(latest) = self.backend.get_latest_block().await? else { + let (_tx, rx) = mpsc::channel(1); + return Ok(ReceiverStream::new(rx)); + }; + latest + } + }; + + let effective = deadline.min(self.max_stream_deadline); + let deadline_instant = tokio::time::Instant::now() + effective; + let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER); + let inner = Arc::clone(self); + + tokio::spawn(async move { + let _permit = permit; + let params = + crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant }; + inner.backend.produce_log_stream(&filter, params).await; + }); + + Ok(ReceiverStream::new(rx)) + } + /// Handle a write request, invalidating the cache on truncation. async fn handle_write(&self, req: ColdWriteRequest) { match req { @@ -186,6 +254,13 @@ impl ColdStorageTaskInner { /// This design prioritizes write ordering for correctness while allowing /// read throughput to scale with concurrency. /// +/// # Log Streaming +/// +/// The task owns the streaming configuration (max deadline, concurrency +/// limit) and delegates the streaming loop to the backend via +/// [`ColdStorage::produce_log_stream`]. Callers supply a per-request +/// deadline that is clamped to the task's configured maximum. +/// /// # Caching /// /// Transaction, receipt, and header lookups are served from an LRU cache @@ -212,7 +287,12 @@ impl ColdStorageTask { let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE); let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE); let task = Self { - inner: Arc::new(ColdStorageTaskInner { backend, cache: Mutex::new(ColdCache::new()) }), + inner: Arc::new(ColdStorageTaskInner { + backend, + cache: Mutex::new(ColdCache::new()), + max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE, + stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)), + }), read_receiver, write_receiver, cancel_token, diff --git a/crates/cold/src/traits.rs b/crates/cold/src/traits.rs index 88c76db..247ef17 100644 --- a/crates/cold/src/traits.rs +++ b/crates/cold/src/traits.rs @@ -7,13 +7,33 @@ use crate::{ ColdReceipt, ColdResult, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, - SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, + SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier, }; use alloy::primitives::BlockNumber; use signet_storage_types::{ DbSignetEvent, DbZenithHeader, ExecutedBlock, Receipt, RecoveredTx, SealedHeader, }; use std::future::Future; +use tokio_stream::wrappers::ReceiverStream; + +/// A stream of log results backed by a bounded channel. +/// +/// Each item is a `ColdResult`. The stream produces `Ok(log)` items +/// until complete, or yields a final `Err(e)` on failure. The stream ends +/// (`None`) when all matching logs have been delivered or after an error. +/// +/// # Partial Delivery +/// +/// One or more `Ok(log)` items may be delivered before a terminal +/// `Err(...)`. Consumers must be prepared for partial results — for +/// example, a reorg or deadline expiry can interrupt a stream that has +/// already yielded some logs. +/// +/// # Resource Management +/// +/// The stream holds a backend concurrency permit. Dropping the stream +/// releases the permit. Drop early if results are no longer needed. +pub type LogStream = ReceiverStream>; /// Data for appending a complete block to cold storage. #[derive(Debug, Clone)] @@ -179,10 +199,35 @@ pub trait ColdStorage: Send + Sync + 'static { /// [`ColdStorageError::TooManyLogs`]: crate::ColdStorageError::TooManyLogs fn get_logs( &self, - filter: Filter, + filter: &Filter, max_logs: usize, ) -> impl Future>> + Send; + // --- Streaming --- + + /// Produce a log stream by iterating blocks and sending matching logs. + /// + /// Implementations should hold a consistent read snapshot for the + /// duration when possible — backends with snapshot semantics (MDBX, + /// PostgreSQL with REPEATABLE READ) need no additional reorg detection. + /// + /// Backends without snapshot semantics can delegate to + /// [`produce_log_stream_default`], which uses per-block + /// [`get_header`] / [`get_logs`] calls with anchor-hash reorg + /// detection. + /// + /// All errors are sent through `sender`. When this method returns, + /// the sender is dropped, closing the stream. + /// + /// [`get_header`]: ColdStorage::get_header + /// [`get_logs`]: ColdStorage::get_logs + /// [`produce_log_stream_default`]: crate::produce_log_stream_default + fn produce_log_stream( + &self, + filter: &Filter, + params: StreamParams, + ) -> impl Future + Send; + // --- Write operations --- /// Append a single block to cold storage. diff --git a/crates/cold/tests/conformance.rs b/crates/cold/tests/conformance.rs index 7d38dd7..a0360f1 100644 --- a/crates/cold/tests/conformance.rs +++ b/crates/cold/tests/conformance.rs @@ -5,5 +5,5 @@ use signet_cold::{conformance::conformance, mem::MemColdBackend}; #[tokio::test] async fn mem_backend_conformance() { let backend = MemColdBackend::new(); - conformance(&backend).await.unwrap(); + conformance(backend).await.unwrap(); } diff --git a/crates/hot-mdbx/src/lib.rs b/crates/hot-mdbx/src/lib.rs index 25e7836..d0cc255 100644 --- a/crates/hot-mdbx/src/lib.rs +++ b/crates/hot-mdbx/src/lib.rs @@ -248,7 +248,7 @@ impl DatabaseArguments { /// MDBX database environment. Wraps the low-level [Environment], and /// implements the [`HotKv`] trait. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DatabaseEnv { /// Libmdbx-sys environment. inner: Environment,