Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b23b800
feat(cold): add stream_logs for incremental log streaming
prestwich Feb 14, 2026
ca0c502
fix: resolve broken rustdoc link for LogStream
prestwich Feb 14, 2026
2e3b4f1
refactor: address PR review — DRY streaming, functional rewrites, cle…
prestwich Feb 14, 2026
fc45e45
refactor: delegate produce_log_stream to backends for snapshot consis…
prestwich Feb 15, 2026
43815e9
fix: snapshot isolation, O(1) log index, and streaming correctness
prestwich Feb 15, 2026
e3d5ec1
refactor: remove avoidable Vec allocations in log streaming
prestwich Feb 15, 2026
9eee067
refactor: remove get_block_hash and get_logs_block from ColdStorage t…
prestwich Feb 15, 2026
8aa2303
refactor: make DatabaseEnv cloneable, eliminate SQL row allocations
prestwich Feb 15, 2026
4dc7eb1
refactor: borrow filter params instead of cloning to Vec<Vec<u8>>
prestwich Feb 15, 2026
42a0faf
refactor: remove produce_log_stream default, move helper to stream.rs
prestwich Feb 15, 2026
0fb996e
fix: report correct max_logs in TooManyLogs during streaming
prestwich Feb 15, 2026
7c285bf
refactor: introduce StreamParams to reduce produce_log_stream arguments
prestwich Feb 15, 2026
dba35f4
refactor: reduce streaming error-handling boilerplate with try_stream…
prestwich Feb 15, 2026
e14ae6b
refactor: borrow Filter in get_logs to eliminate per-block clones
prestwich Feb 15, 2026
fa4db39
style: tighten code style across cold storage crates
prestwich Feb 15, 2026
5b9be69
refactor: use alloy-primitives sqlx impls for direct row decoding
prestwich Feb 15, 2026
d867bec
docs: add explanatory comments to log stream production methods
prestwich Feb 15, 2026
16d347a
docs: add explanatory comments to MDBX log stream production
prestwich Feb 15, 2026
627cad9
chore: use u256 sqlx impl
prestwich Feb 15, 2026
5491846
refactor: use alloy-primitives sqlx impls for direct row encoding
prestwich Feb 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.5.0"
version = "0.6.0"
edition = "2024"
rust-version = "1.92"
authors = ["init4"]
Expand Down Expand Up @@ -35,13 +35,13 @@ incremental = false

[workspace.dependencies]
# internal
signet-hot = { version = "0.5.0", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.5.0", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.5.0", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.5.0", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.5.0", path = "./crates/cold-sql" }
signet-storage = { version = "0.5.0", path = "./crates/storage" }
signet-storage-types = { version = "0.5.0", path = "./crates/types" }
signet-hot = { version = "0.6.0", path = "./crates/hot" }
signet-hot-mdbx = { version = "0.6.0", path = "./crates/hot-mdbx" }
signet-cold = { version = "0.6.0", path = "./crates/cold" }
signet-cold-mdbx = { version = "0.6.0", path = "./crates/cold-mdbx" }
signet-cold-sql = { version = "0.6.0", path = "./crates/cold-sql" }
signet-storage = { version = "0.6.0", path = "./crates/storage" }
signet-storage-types = { version = "0.6.0", path = "./crates/types" }

# External, in-house
signet-libmdbx = { version = "0.8.0" }
Expand All @@ -67,6 +67,7 @@ serde = { version = "1.0.217", features = ["derive"] }
tempfile = "3.20.0"
thiserror = "2.0.18"
tokio = { version = "1.45.0", features = ["full"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["rt"] }
itertools = "0.14"
lru = "0.16"
Expand Down
1 change: 1 addition & 0 deletions crates/cold-mdbx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ signet-hot-mdbx.workspace = true
signet-libmdbx.workspace = true
signet-storage-types.workspace = true
thiserror.workspace = true
tokio.workspace = true

[dev-dependencies]
signet-hot-mdbx = { workspace = true, features = ["test-utils"] }
Expand Down
155 changes: 146 additions & 9 deletions crates/cold-mdbx/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use crate::{
};
use alloy::{consensus::transaction::Recovered, primitives::BlockNumber};
use signet_cold::{
BlockData, ColdReceipt, ColdResult, ColdStorage, Confirmed, Filter, HeaderSpecifier,
ReceiptSpecifier, SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
ZenithHeaderSpecifier,
};
use signet_hot::{
KeySer, MAX_KEY_SIZE, ValSer,
Expand Down Expand Up @@ -77,28 +78,146 @@ fn write_block_to_tx(
Ok(())
}

/// Unwrap a `Result` or send the error through the stream and return.
macro_rules! try_stream {
($sender:expr, $expr:expr) => {
match $expr {
Ok(v) => v,
Err(e) => {
let _ =
$sender.blocking_send(Err(ColdStorageError::backend(MdbxColdError::from(e))));
return;
}
}
};
}

/// Produce a log stream using a single MDBX read transaction.
///
/// Runs synchronously on a blocking thread. The `Tx<Ro>` snapshot
/// provides MVCC consistency — the snapshot is self-consistent and
/// no reorg detection is needed within it.
fn produce_log_stream_blocking(
env: DatabaseEnv,
filter: Filter,
from: BlockNumber,
to: BlockNumber,
max_logs: usize,
sender: tokio::sync::mpsc::Sender<ColdResult<RpcLog>>,
deadline: std::time::Instant,
) {
// Open a read-only transaction. MDBX's MVCC guarantees a
// consistent snapshot for the lifetime of this transaction,
// so no reorg detection is needed.
let tx = try_stream!(sender, env.tx());

// Reuse cursors across blocks to avoid re-opening them on
// every iteration (same pattern as get_logs_inner).
let mut header_cursor = try_stream!(sender, tx.traverse::<ColdHeaders>());
let mut receipt_cursor = try_stream!(sender, tx.traverse_dual::<ColdReceipts>());

let mut total = 0usize;

// Walk through blocks one at a time, filtering and sending
// matching logs over the channel.
for block_num in from..=to {
// Check the deadline before starting each block so we
// don't begin reading after the caller's timeout.
if std::time::Instant::now() > deadline {
let _ = sender.blocking_send(Err(ColdStorageError::StreamDeadlineExceeded));
return;
}

// Look up the block header for its hash and timestamp,
// which are attached to every emitted RpcLog.
let sealed = match try_stream!(sender, header_cursor.exact(&block_num)) {
Some(v) => v,
None => continue,
};
let block_hash = sealed.hash();
let block_timestamp = sealed.timestamp;

// Iterate over all receipts (and their embedded logs) for
// this block, applying the caller's address/topic filter.
let iter = try_stream!(sender, receipt_cursor.iter_k2(&block_num));

let remaining = max_logs.saturating_sub(total);
let mut block_count = 0usize;

for result in iter {
let (tx_idx, ir): (u64, IndexedReceipt) = try_stream!(sender, result);
let tx_hash = ir.tx_hash;
let first_log_index = ir.first_log_index;
for (log_idx, log) in ir.receipt.inner.logs.into_iter().enumerate() {
if !filter.matches(&log) {
continue;
}
// Enforce the global log limit across all blocks.
block_count += 1;
if block_count > remaining {
let _ = sender
.blocking_send(Err(ColdStorageError::TooManyLogs { limit: max_logs }));
return;
}
let rpc_log = RpcLog {
inner: log,
block_hash: Some(block_hash),
block_number: Some(block_num),
block_timestamp: Some(block_timestamp),
transaction_hash: Some(tx_hash),
transaction_index: Some(tx_idx),
log_index: Some(first_log_index + log_idx as u64),
removed: false,
};
// Send the log to the caller via blocking_send
// (we're on a spawn_blocking thread, not async).
if sender.blocking_send(Ok(rpc_log)).is_err() {
return; // receiver dropped
}
}
}

// Early exit if we've already hit the limit — no need to
// read the next block.
total += block_count;
if total >= max_logs {
return;
}
}
}

/// MDBX-based cold storage backend.
///
/// This backend stores historical blockchain data in an MDBX database.
/// It implements the [`ColdStorage`] trait for use with the cold storage
/// task runner.
#[derive(Debug)]
pub struct MdbxColdBackend {
/// The MDBX environment.
env: DatabaseEnv,
}

impl std::fmt::Debug for MdbxColdBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MdbxColdBackend").finish_non_exhaustive()
}
}

impl MdbxColdBackend {
/// Create a new backend from an existing MDBX environment.
const fn from_env(env: DatabaseEnv) -> Self {
Self { env }
}

/// Open an existing MDBX cold storage database in read-only mode.
pub fn open_ro(path: &Path) -> Result<Self, MdbxColdError> {
let env = DatabaseArguments::new().open_ro(path)?;
Ok(Self { env })
Ok(Self::from_env(env))
}

/// Open or create an MDBX cold storage database in read-write mode.
pub fn open_rw(path: &Path) -> Result<Self, MdbxColdError> {
let env = DatabaseArguments::new().open_rw(path)?;
let backend = Self { env };
let backend = Self::from_env(env);
backend.create_tables()?;
Ok(backend)
}
Expand All @@ -110,7 +229,7 @@ impl MdbxColdBackend {
args: DatabaseArguments,
) -> Result<Self, MdbxColdError> {
let env = DatabaseEnv::open(path, kind, args)?;
let backend = Self { env };
let backend = Self::from_env(env);
if kind.is_rw() {
backend.create_tables()?;
}
Expand Down Expand Up @@ -437,7 +556,7 @@ impl MdbxColdBackend {

fn get_logs_inner(
&self,
filter: Filter,
filter: &Filter,
max_logs: usize,
) -> Result<Vec<signet_cold::RpcLog>, MdbxColdError> {
let tx = self.env.tx()?;
Expand Down Expand Up @@ -570,12 +689,30 @@ impl ColdStorage for MdbxColdBackend {

async fn get_logs(
&self,
filter: Filter,
filter: &Filter,
max_logs: usize,
) -> ColdResult<Vec<signet_cold::RpcLog>> {
Ok(self.get_logs_inner(filter, max_logs)?)
}

async fn produce_log_stream(&self, filter: &Filter, params: signet_cold::StreamParams) {
let env = self.env.clone();
let filter = filter.clone();
let std_deadline = params.deadline.into_std();
let _ = tokio::task::spawn_blocking(move || {
produce_log_stream_blocking(
env,
filter,
params.from,
params.to,
params.max_logs,
params.sender,
std_deadline,
);
})
.await;
}

async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
let tx = self.env.tx().map_err(MdbxColdError::from)?;
let mut cursor = tx.new_cursor::<ColdHeaders>().map_err(MdbxColdError::from)?;
Expand Down Expand Up @@ -611,6 +748,6 @@ mod tests {
async fn mdbx_backend_conformance() {
let dir = tempdir().unwrap();
let backend = MdbxColdBackend::open_rw(dir.path()).unwrap();
conformance(&backend).await.unwrap();
conformance(backend).await.unwrap();
}
}
7 changes: 5 additions & 2 deletions crates/cold-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
alloy = { workspace = true, optional = true }
alloy-primitives = { version = "1.5.6", features = ["sqlx"] }
signet-cold.workspace = true
signet-storage-types = { workspace = true, optional = true }
signet-zenith = { workspace = true, optional = true }
sqlx = { workspace = true }
thiserror.workspace = true
tokio = { workspace = true, optional = true }
tokio-stream = { workspace = true, optional = true }

[dev-dependencies]
signet-cold = { workspace = true, features = ["test-utils"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

[features]
default = []
postgres = ["sqlx/postgres", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith"]
sqlite = ["sqlx/sqlite", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith"]
postgres = ["sqlx/postgres", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith", "dep:tokio", "dep:tokio-stream"]
sqlite = ["sqlx/sqlite", "sqlx/any", "sqlx/runtime-tokio-rustls", "dep:alloy", "dep:signet-storage-types", "dep:signet-zenith", "dep:tokio"]
test-utils = ["signet-cold/test-utils", "sqlite", "postgres"]
1 change: 1 addition & 0 deletions crates/cold-sql/migrations/001_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ CREATE TABLE IF NOT EXISTS receipts (
tx_type INTEGER NOT NULL,
success INTEGER NOT NULL,
cumulative_gas_used INTEGER NOT NULL,
first_log_index INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (block_number, tx_index)
);

Expand Down
1 change: 1 addition & 0 deletions crates/cold-sql/migrations/001_initial_pg.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ CREATE TABLE IF NOT EXISTS receipts (
tx_type INTEGER NOT NULL,
success INTEGER NOT NULL,
cumulative_gas_used BIGINT NOT NULL,
first_log_index BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (block_number, tx_index)
);

Expand Down
Loading