diff --git a/Cargo.lock b/Cargo.lock index a6d1c14287..cea8c345bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1855,7 +1855,6 @@ dependencies = [ "moka", "parking_lot", "rivet-config", - "rivet-envoy-client", "rivet-envoy-protocol", "rivet-pools", "rivet-test-deps", @@ -6000,6 +5999,7 @@ name = "rivetkit-core" version = "2.3.0-rc.4" dependencies = [ "anyhow", + "async-trait", "base64 0.22.1", "ciborium", "depot-client", diff --git a/engine/packages/depot-client/Cargo.toml b/engine/packages/depot-client/Cargo.toml index 45f4dad735..d9861bf1c6 100644 --- a/engine/packages/depot-client/Cargo.toml +++ b/engine/packages/depot-client/Cargo.toml @@ -12,9 +12,9 @@ crate-type = ["lib"] [dependencies] anyhow.workspace = true +async-trait.workspace = true crossbeam-channel = "0.5" libsqlite3-sys = { version = "0.30", features = ["bundled"] } -rivet-envoy-client = { workspace = true, features = ["native-transport"] } tokio.workspace = true tracing.workspace = true getrandom = "0.2" @@ -25,7 +25,6 @@ moka = { version = "0.12", default-features = false, features = ["sync"] } parking_lot.workspace = true [dev-dependencies] -async-trait.workspace = true depot = { workspace = true, features = ["test-faults"] } futures-util.workspace = true gas.workspace = true diff --git a/engine/packages/depot-client/src/database.rs b/engine/packages/depot-client/src/database.rs index dcf847bb18..b6d621eb39 100644 --- a/engine/packages/depot-client/src/database.rs +++ b/engine/packages/depot-client/src/database.rs @@ -1,14 +1,15 @@ use std::sync::Arc; use anyhow::{Result, anyhow}; -use rivet_envoy_client::handle::EnvoyHandle; use tokio::runtime::Handle; use crate::{ query::{BindParam, ExecResult, ExecuteResult, QueryResult}, + transport::EmbeddedDepotSqliteTransport, vfs::{ - NativeVfsHandle, SqliteTransport, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot, - VfsConfig, VfsPreloadHintSnapshot, fetch_initial_main_page_for_registration, + NativeVfsHandle, SqliteTransportHandle, SqliteVfs, SqliteVfsMetrics, + SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot, + fetch_initial_main_page_for_registration, }, worker::SqliteWorkerHandle, }; @@ -23,16 +24,15 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String { format!("envoy-sqlite-{actor_id}-g{generation}") } -pub async fn open_database_from_envoy( - handle: EnvoyHandle, +pub async fn open_database_from_transport( + transport: SqliteTransportHandle, actor_id: String, generation: u64, rt_handle: Handle, metrics: Option>, ) -> Result { let vfs_name = vfs_name_for_actor_database(&actor_id, generation); - let transport = SqliteTransport::from_envoy(handle); - let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id) + let initial_main_page = fetch_initial_main_page_for_registration(transport.clone(), &actor_id) .await .map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?; let vfs = Arc::new( @@ -53,34 +53,21 @@ pub async fn open_database_from_envoy( Ok(native_db) } -pub async fn open_database_from_conveyer( +pub async fn open_database_from_embedded_depot( db: Arc, actor_id: String, generation: u64, rt_handle: Handle, metrics: Option>, ) -> Result { - let vfs_name = vfs_name_for_actor_database(&actor_id, generation); - let transport = SqliteTransport::from_conveyer(db); - let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id) - .await - .map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?; - let vfs = Arc::new( - SqliteVfs::register_with_transport_and_initial_page( - &vfs_name, - transport, - actor_id.clone(), - rt_handle, - VfsConfig::default(), - initial_main_page, - metrics.clone(), - ) - .map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?, - ); - - let native_db = NativeDatabaseHandle::new_with_metrics(vfs, actor_id, metrics)?; - native_db.initialize().await?; - Ok(native_db) + open_database_from_transport( + Arc::new(EmbeddedDepotSqliteTransport::new(db)), + actor_id, + generation, + rt_handle, + metrics, + ) + .await } impl NativeDatabaseHandle { diff --git a/engine/packages/depot-client/src/lib.rs b/engine/packages/depot-client/src/lib.rs index e4222e258e..758cd81e7f 100644 --- a/engine/packages/depot-client/src/lib.rs +++ b/engine/packages/depot-client/src/lib.rs @@ -23,6 +23,9 @@ pub mod optimization_flags; /// SQLite query execution helpers. pub mod query; +/// SQLite transport adapters for same-process Depot usage. +pub mod transport; + pub use depot_client_types as types; /// Custom SQLite VFS for actor-side depot transport. diff --git a/engine/packages/depot-client/src/transport.rs b/engine/packages/depot-client/src/transport.rs new file mode 100644 index 0000000000..fc7e8b2ebf --- /dev/null +++ b/engine/packages/depot-client/src/transport.rs @@ -0,0 +1,86 @@ +//! SQLite transport adapters. +//! +//! `EmbeddedDepotSqliteTransport` is for deployments where the SQLite VFS runs in the +//! same process or server as the Depot backend. It calls `depot::conveyer::Db` +//! directly instead of routing page operations through an actor Envoy transport. + +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use rivet_envoy_protocol as protocol; + +use crate::vfs::SqliteTransport; + +pub struct EmbeddedDepotSqliteTransport { + db: Arc, +} + +impl EmbeddedDepotSqliteTransport { + pub fn new(db: Arc) -> Self { + Self { db } + } +} + +#[async_trait] +impl SqliteTransport for EmbeddedDepotSqliteTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> Result { + match self.db.get_pages(request.pgnos).await { + Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( + protocol::SqliteGetPagesOk { + pages: pages + .into_iter() + .map(|page| protocol::SqliteFetchedPage { + pgno: page.pgno, + bytes: page.bytes, + }) + .collect(), + }, + )), + Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( + protocol::SqliteErrorResponse { + message: sqlite_error_reason(&err), + }, + )), + } + } + + async fn commit( + &self, + request: protocol::SqliteCommitRequest, + ) -> Result { + match self + .db + .commit( + request + .dirty_pages + .into_iter() + .map(|page| depot::types::DirtyPage { + pgno: page.pgno, + bytes: page.bytes, + }) + .collect(), + request.db_size_pages, + request.now_ms, + ) + .await + { + Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk), + Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( + protocol::SqliteErrorResponse { + message: sqlite_error_reason(&err), + }, + )), + } + } +} + +fn sqlite_error_reason(err: &anyhow::Error) -> String { + err.chain() + .map(ToString::to_string) + .collect::>() + .join(": ") +} diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index f6566247b2..a419739e33 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -4,8 +4,6 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::ffi::{CStr, CString, c_char, c_int, c_void}; -#[cfg(test)] -use std::future::Future; use std::ptr; use std::slice; use std::sync::Arc; @@ -13,14 +11,12 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, Instant}; use anyhow::Result; +use async_trait::async_trait; use libsqlite3_sys::*; use moka::sync::Cache; use parking_lot::{Mutex, RwLock}; -use rivet_envoy_client::handle::EnvoyHandle; use rivet_envoy_protocol as protocol; use tokio::runtime::Handle; -#[cfg(test)] -use tokio::runtime::RuntimeFlavor; use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags}; @@ -95,40 +91,6 @@ fn panic_message(payload: &Box) -> String { } } -#[cfg(test)] -fn block_on_runtime(runtime: &Handle, future: F) -> std::result::Result -where - F: Future + Send + 'static, - T: Send + 'static, -{ - if Handle::try_current().is_err() { - return Ok(runtime.block_on(future)); - } - - if runtime.runtime_flavor() == RuntimeFlavor::CurrentThread { - return Err( - "sqlite VFS registration cannot block on a current-thread Tokio runtime".to_string(), - ); - } - - let runtime = runtime.clone(); - // VFS registration is synchronous because SQLite VFS callbacks are synchronous, but native - // actor startup often opens SQLite while already running on a Tokio worker. Blocking the - // current worker with `Handle::block_on` would panic, so only the open-time metadata fetch is - // bridged through a short standalone thread. SQL execution itself stays on the SQLite worker. - std::thread::Builder::new() - .name("sqlite-vfs-runtime-bridge".to_string()) - .spawn(move || runtime.block_on(future)) - .map_err(|err| format!("spawn sqlite VFS runtime bridge: {err}"))? - .join() - .map_err(|panic| { - format!( - "sqlite VFS runtime bridge panicked: {}", - panic_message(&panic) - ) - }) -} - macro_rules! vfs_catch_unwind { ($err_val:expr, $body:expr) => { match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| $body)) { @@ -141,168 +103,20 @@ macro_rules! vfs_catch_unwind { }; } -#[derive(Clone)] -pub(crate) struct SqliteTransport { - inner: Arc, -} - -enum SqliteTransportInner { - Envoy(EnvoyHandle), - Conveyer(Arc), - #[cfg(test)] - Direct(Arc), -} - -impl SqliteTransport { - pub(crate) fn from_envoy(handle: EnvoyHandle) -> Self { - Self { - inner: Arc::new(SqliteTransportInner::Envoy(handle)), - } - } - - pub(crate) fn from_conveyer(db: Arc) -> Self { - Self { - inner: Arc::new(SqliteTransportInner::Conveyer(db)), - } - } - - #[cfg(test)] - fn from_direct(storage: Arc) -> Self { - Self { - inner: Arc::new(SqliteTransportInner::Direct(storage)), - } - } - - #[cfg(test)] - fn direct_hooks(&self) -> Option> { - match &*self.inner { - SqliteTransportInner::Direct(storage) => Some(Arc::clone(&storage.hooks)), - SqliteTransportInner::Envoy(_) => None, - SqliteTransportInner::Conveyer(_) => None, - } - } - +#[async_trait] +pub trait SqliteTransport: Send + Sync { async fn get_pages( &self, - req: protocol::SqliteGetPagesRequest, - ) -> Result { - match &*self.inner { - SqliteTransportInner::Envoy(handle) => handle.sqlite_get_pages(req).await, - SqliteTransportInner::Conveyer(db) => match db.get_pages(req.pgnos).await { - Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( - protocol::SqliteGetPagesOk { - pages: pages - .into_iter() - .map(|page| protocol::SqliteFetchedPage { - pgno: page.pgno, - bytes: page.bytes, - }) - .collect(), - }, - )), - Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( - protocol::SqliteErrorResponse { - message: sqlite_error_reason(&err), - }, - )), - }, - #[cfg(test)] - SqliteTransportInner::Direct(storage) => { - let pgnos = req.pgnos.clone(); - match storage.get_pages(&req.actor_id, &pgnos).await { - Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( - protocol::SqliteGetPagesOk { - pages: pages - .into_iter() - .map(tests::protocol_fetched_page) - .collect(), - }, - )), - Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( - tests::sqlite_error_response(&err), - )), - } - } - } - } + request: protocol::SqliteGetPagesRequest, + ) -> Result; async fn commit( &self, - req: protocol::SqliteCommitRequest, - ) -> Result { - match &*self.inner { - SqliteTransportInner::Envoy(handle) => handle.sqlite_commit(req).await, - SqliteTransportInner::Conveyer(db) => match db - .commit( - req.dirty_pages - .into_iter() - .map(|page| depot::types::DirtyPage { - pgno: page.pgno, - bytes: page.bytes, - }) - .collect(), - req.db_size_pages, - req.now_ms, - ) - .await - { - Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk), - Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( - protocol::SqliteErrorResponse { - message: sqlite_error_reason(&err), - }, - )), - }, - #[cfg(test)] - SqliteTransportInner::Direct(storage) => { - storage.hooks.record_commit_request(req.clone()); - if storage.hooks.take_commit_hang() { - std::future::pending().await - } - if let Some(message) = storage.hooks.take_commit_error() { - return Err(anyhow::anyhow!(message)); - } - storage.hooks.pause_commit_if_requested(); - - let actor_id = req.actor_id.clone(); - let dirty_pages = req - .dirty_pages - .into_iter() - .map(tests::storage_dirty_page) - .collect::>(); - let actor_db = storage.actor_db(actor_id.clone()).await; - match actor_db - .commit(dirty_pages.clone(), req.db_size_pages, req.now_ms) - .await - { - Ok(_) => { - if !storage.is_strict_mode() { - if let Err(err) = storage - .apply_commit(&actor_id, dirty_pages, req.db_size_pages) - .await - { - return Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( - tests::sqlite_error_response(&err), - )); - } - } - Ok(protocol::SqliteCommitResponse::SqliteCommitOk) - } - Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( - tests::sqlite_error_response(&err), - )), - } - } - } - } + request: protocol::SqliteCommitRequest, + ) -> Result; } -fn sqlite_error_reason(err: &anyhow::Error) -> String { - err.chain() - .map(ToString::to_string) - .collect::>() - .join(": ") -} +pub type SqliteTransportHandle = Arc; fn sqlite_now_ms() -> Result { use std::time::{SystemTime, UNIX_EPOCH}; @@ -475,7 +289,7 @@ enum CommitWait { pub struct VfsContext { actor_id: String, runtime: Handle, - transport: SqliteTransport, + transport: SqliteTransportHandle, config: VfsConfig, state: RwLock, aux_files: RwLock>>, @@ -979,7 +793,7 @@ impl VfsContext { fn new( actor_id: String, runtime: Handle, - transport: SqliteTransport, + transport: SqliteTransportHandle, config: VfsConfig, io_methods: sqlite3_io_methods, initial_main_page: Option>, @@ -989,31 +803,6 @@ impl VfsContext { if let Some(page) = initial_main_page { state.seed_main_page(page); } - #[cfg(test)] - if let SqliteTransportInner::Direct(storage) = &*transport.inner { - if storage.is_strict_mode() { - if let Some(page) = - fetch_initial_main_page_for_test(&transport, &runtime, &actor_id)? - { - state.seed_main_page(page); - } - } else { - let storage = Arc::clone(storage); - let actor_id = actor_id.clone(); - let snapshot = - block_on_runtime( - &runtime, - async move { storage.snapshot_pages(&actor_id).await }, - )?; - if snapshot.db_size_pages > 0 { - state.db_size_pages = snapshot.db_size_pages; - state.page_cache.invalidate_all(); - for (pgno, bytes) in snapshot.pages { - state.page_cache.insert(pgno, bytes); - } - } - } - } Ok(Self { actor_id, @@ -1114,7 +903,7 @@ impl VfsContext { CommitWait<(BufferedCommitOutcome, CommitTransportMetrics)>, CommitBufferError, > { - let commit = commit_buffered_pages(&self.transport, request); + let commit = commit_buffered_pages(&*self.transport, request); let result = if let Some(timeout) = timeout { match self .runtime @@ -1261,6 +1050,7 @@ impl VfsContext { seed_pgno, prediction_budget, predicted_pgnos, + db_size_pages, ) = { let mut state = self.state.write(); for pgno in target_pgnos.iter().copied() { @@ -1302,6 +1092,7 @@ impl VfsContext { seed_pgno, prediction_budget, predicted_pgnos, + state.db_size_pages, ) }; @@ -1355,12 +1146,57 @@ impl VfsContext { match response { protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok) => { let page_cache = { self.state.read().page_cache.clone() }; + #[cfg(debug_assertions)] + let mut returned_pgnos = HashSet::new(); + #[cfg(debug_assertions)] + let mut returned_missing_pages = Vec::new(); + #[cfg(debug_assertions)] + let mut returned_missing_in_range_pages = Vec::new(); for fetched in ok.pages { + #[cfg(debug_assertions)] + { + returned_pgnos.insert(fetched.pgno); + if fetched.bytes.is_none() { + returned_missing_pages.push(fetched.pgno); + if fetched.pgno <= db_size_pages { + returned_missing_in_range_pages.push(fetched.pgno); + } + } + } if let Some(bytes) = &fetched.bytes { page_cache.insert(fetched.pgno, bytes.clone()); } resolved.insert(fetched.pgno, fetched.bytes); } + #[cfg(debug_assertions)] + { + let absent_response_pages = to_fetch + .iter() + .copied() + .filter(|pgno| !returned_pgnos.contains(pgno)) + .collect::>(); + let absent_in_range_pages = absent_response_pages + .iter() + .copied() + .filter(|pgno| *pgno <= db_size_pages) + .collect::>(); + if !returned_missing_in_range_pages.is_empty() + || !absent_in_range_pages.is_empty() + { + tracing::warn!( + actor_id = %self.actor_id, + requested_pages = ?target_pgnos, + missing_pages = ?missing, + fetch_pages = ?to_fetch, + db_size_pages, + returned_missing_pages = ?returned_missing_pages, + returned_missing_in_range_pages = ?returned_missing_in_range_pages, + absent_response_pages = ?absent_response_pages, + absent_in_range_pages = ?absent_in_range_pages, + "sqlite get_pages returned missing pages within declared db size" + ); + } + } for pgno in missing { resolved.entry(pgno).or_insert(None); } @@ -1443,12 +1279,26 @@ impl VfsContext { tracing::debug!( dirty_pages = request.dirty_pages.len(), path = ?outcome.path, + requested_db_size_pages = request.new_db_size_pages, db_size_pages = outcome.db_size_pages, request_build_ns, serialize_ns = transport_metrics.serialize_ns, transport_ns = transport_metrics.transport_ns, "vfs commit complete (flush)" ); + #[cfg(debug_assertions)] + { + if outcome.db_size_pages != request.new_db_size_pages { + tracing::warn!( + actor_id = %self.actor_id, + dirty_pages = request.dirty_pages.len(), + path = ?outcome.path, + requested_db_size_pages = request.new_db_size_pages, + outcome_db_size_pages = outcome.db_size_pages, + "sqlite flush commit returned db size different from request" + ); + } + } let state_update_start = Instant::now(); let mut state = self.state.write(); state.db_size_pages = request.new_db_size_pages; @@ -1545,6 +1395,19 @@ impl VfsContext { transport_ns = transport_metrics.transport_ns, "vfs commit complete (atomic)" ); + #[cfg(debug_assertions)] + { + if outcome.db_size_pages != request.new_db_size_pages { + tracing::warn!( + actor_id = %self.actor_id, + dirty_pages = request.dirty_pages.len(), + path = ?outcome.path, + requested_db_size_pages = request.new_db_size_pages, + outcome_db_size_pages = outcome.db_size_pages, + "sqlite atomic commit returned db size different from request" + ); + } + } self.clear_last_error(); let state_update_start = Instant::now(); let mut state = self.state.write(); @@ -1650,14 +1513,14 @@ fn mark_dead_from_fence_commit_error(ctx: &VfsContext, err: &CommitBufferError) } pub(crate) async fn fetch_initial_main_page_for_registration( - transport: &SqliteTransport, + transport: SqliteTransportHandle, actor_id: &str, ) -> std::result::Result>, String> { - fetch_initial_main_page(transport.clone(), actor_id.to_string()).await + fetch_initial_main_page(transport, actor_id.to_string()).await } async fn fetch_initial_main_page( - transport: SqliteTransport, + transport: SqliteTransportHandle, actor_id: String, ) -> std::result::Result>, String> { let request_actor_id = actor_id.clone(); @@ -1694,24 +1557,9 @@ async fn fetch_initial_main_page( } } -#[cfg(test)] -fn fetch_initial_main_page_for_test( - transport: &SqliteTransport, - runtime: &Handle, - actor_id: &str, -) -> std::result::Result>, String> { - let transport = transport.clone(); - let actor_id = actor_id.to_string(); - block_on_runtime(runtime, async move { - fetch_initial_main_page(transport, actor_id).await - })? -} - fn is_initial_main_page_missing(message: &str) -> bool { message.contains("sqlite database was not found in this bucket branch") || message.contains("sqlite meta missing for get_pages") - || message - .contains("strict DirectStorage forbids mirror fallback for missing depot metadata") } fn next_temp_aux_path() -> String { @@ -1726,7 +1574,7 @@ unsafe fn get_aux_state(file: &VfsFile) -> Option<&AuxFileHandle> { } async fn commit_buffered_pages( - transport: &SqliteTransport, + transport: &dyn SqliteTransport, request: BufferedCommitRequest, ) -> std::result::Result<(BufferedCommitOutcome, CommitTransportMetrics), CommitBufferError> { let mut metrics = CommitTransportMetrics::default(); @@ -2077,9 +1925,12 @@ unsafe extern "C" fn io_read( Err(_) => return SQLITE_IOERR_READ, }; let page_size = ctx.page_size(); - let file_size = { + let (file_size, db_size_pages) = { let state = ctx.state.read(); - state.db_size_pages as usize * state.page_size + ( + state.db_size_pages as usize * state.page_size, + state.db_size_pages, + ) }; let resolved = match ctx.resolve_pages(&requested_pages, true) { @@ -2091,6 +1942,29 @@ unsafe extern "C" fn io_read( }; ctx.clear_last_error(); + #[cfg(debug_assertions)] + { + let missing_in_range_pages = requested_pages + .iter() + .copied() + .filter(|pgno| *pgno <= db_size_pages) + .filter(|pgno| !matches!(resolved.get(pgno), Some(Some(_)))) + .collect::>(); + if !missing_in_range_pages.is_empty() { + tracing::warn!( + actor_id = %ctx.actor_id, + offset = i_offset, + amount = i_amt, + page_size, + db_size_pages, + file_size, + requested_pages = ?requested_pages, + missing_in_range_pages = ?missing_in_range_pages, + "sqlite xRead would zero-fill pages within declared db size" + ); + } + } + buf.fill(0); for pgno in requested_pages { let Some(Some(bytes)) = resolved.get(&pgno) else { @@ -2165,8 +2039,8 @@ unsafe extern "C" fn io_write( let amt = i_amt as usize; let is_aligned_full_page = offset % page_size == 0 && amt % page_size == 0; - let resolved = if is_aligned_full_page { - HashMap::new() + let (resolved, existing_db_size_pages) = if is_aligned_full_page { + (HashMap::new(), None) } else { let (db_size_pages, pages_to_resolve): (u32, Vec) = { let state = ctx.state.read(); @@ -2197,8 +2071,31 @@ unsafe extern "C" fn io_write( resolved.entry(*pgno).or_insert(None); } } - resolved + (resolved, Some(db_size_pages)) }; + #[cfg(debug_assertions)] + { + if let Some(db_size_pages) = existing_db_size_pages { + let missing_existing_pages = target_pages + .iter() + .copied() + .filter(|pgno| *pgno <= db_size_pages) + .filter(|pgno| !matches!(resolved.get(pgno), Some(Some(_)))) + .collect::>(); + if !missing_existing_pages.is_empty() { + tracing::warn!( + actor_id = %ctx.actor_id, + offset = i_offset, + amount = i_amt, + page_size, + db_size_pages, + target_pages = ?target_pages, + missing_existing_pages = ?missing_existing_pages, + "sqlite xWrite partial update would synthesize existing pages from zeros" + ); + } + } + } let mut dirty_pages = BTreeMap::new(); for pgno in target_pages { @@ -2588,24 +2485,6 @@ unsafe extern "C" fn vfs_get_last_error( } impl SqliteVfs { - pub fn register( - name: &str, - handle: EnvoyHandle, - actor_id: String, - runtime: Handle, - config: VfsConfig, - metrics: Option>, - ) -> std::result::Result { - Self::register_with_transport( - name, - SqliteTransport::from_envoy(handle), - actor_id, - runtime, - config, - metrics, - ) - } - pub(crate) fn take_last_error(&self) -> Option { self.ctx.take_last_error() } @@ -2622,9 +2501,10 @@ impl SqliteVfs { self.ctx.sqlite_vfs_metrics() } + #[cfg(test)] pub(crate) fn register_with_transport( name: &str, - transport: SqliteTransport, + transport: SqliteTransportHandle, actor_id: String, runtime: Handle, config: VfsConfig, @@ -2637,7 +2517,7 @@ impl SqliteVfs { pub(crate) fn register_with_transport_and_initial_page( name: &str, - transport: SqliteTransport, + transport: SqliteTransportHandle, actor_id: String, runtime: Handle, config: VfsConfig, diff --git a/engine/packages/depot-client/src/worker.rs b/engine/packages/depot-client/src/worker.rs index cd4fa5e10f..ad1e5fb94c 100644 --- a/engine/packages/depot-client/src/worker.rs +++ b/engine/packages/depot-client/src/worker.rs @@ -269,8 +269,8 @@ impl SqliteWorkerHandle { join.join() .map_err(|panic| anyhow!("sqlite worker panicked: {}", panic_message(&panic))) }) - .await - .context("join sqlite worker join task")? + .await + .context("join sqlite worker join task")? } fn join_worker_in_background(&self, start: Instant) { diff --git a/engine/packages/depot-client/tests/inline/fault/CLAUDE.md b/engine/packages/depot-client/tests/inline/fault/CLAUDE.md index aa13dabf56..49ffb02999 100644 --- a/engine/packages/depot-client/tests/inline/fault/CLAUDE.md +++ b/engine/packages/depot-client/tests/inline/fault/CLAUDE.md @@ -1,8 +1,8 @@ # CLAUDE.md -- Fault-injection tests must exercise the real SQLite VFS through DirectStorage into depot. +- Fault-injection tests must exercise the real SQLite VFS through the explicit DirectDepotTransport. - Do not add mock, envoy, or alternate transport variants to this fault suite. -- Strict DirectStorage mode must fail if reads are served from mirrors or seeded VFS cache state. +- DirectDepotTransport must fail if reads require mirror fallback or seeded VFS cache state. - Faults must be depot semantic faults: fail, pause, delay, or drop depot-owned artifacts. - Do not add arbitrary byte corruption, UDB driver faults, Gasoline correctness tests, or global Rivet fault injection here. - Tests must validate with native SQLite oracle comparison and depot invariant scanning. diff --git a/engine/packages/depot-client/tests/inline/fault/scenario.rs b/engine/packages/depot-client/tests/inline/fault/scenario.rs index 4f22141d5e..1489b5d224 100644 --- a/engine/packages/depot-client/tests/inline/fault/scenario.rs +++ b/engine/packages/depot-client/tests/inline/fault/scenario.rs @@ -42,8 +42,8 @@ use universaldb::{ }; use super::super::{ - DirectStorage, DirectStorageStats, NativeDatabase, SqliteTransport, SqliteVfs, VfsConfig, - open_database, + DirectDepotTransport, DirectStorage, DirectStorageStats, NativeDatabase, SqliteVfs, VfsConfig, + fetch_initial_main_page_for_registration, open_database, }; use super::oracle::{ AmbiguousOracleOutcome, NativeSqliteOracle, OracleCommitSemantics, OracleVerification, @@ -359,7 +359,6 @@ impl FaultScenarioCtx { pub(crate) async fn reload_database(&self) -> Result<()> { self.close_database(); - self.inner.storage.enable_strict_mode(); self.inner .storage .evict_actor_db(&self.inner.actor_id) @@ -386,13 +385,6 @@ impl FaultScenarioCtx { after.stats.mirror_reads ); } - if after.stats.mirror_fills != before.stats.mirror_fills { - bail!( - "strict workload used mirror fills: before={}, after={}", - before.stats.mirror_fills, - after.stats.mirror_fills - ); - } if after.stats.mirror_seeds != before.stats.mirror_seeds { bail!( "strict workload used mirror seeds: before={}, after={}", @@ -571,7 +563,6 @@ impl FaultScenarioCtx { async fn enter_strict_workload_mode(&self) -> Result<()> { self.close_database(); - self.inner.storage.enable_strict_mode(); self.inner .storage .evict_actor_db(&self.inner.actor_id) @@ -737,7 +728,6 @@ impl FaultScenarioCtx { } pub(crate) async fn read_page_from_depot(&self, pgno: u32) -> Result<()> { - self.inner.storage.enable_strict_mode(); self.inner .storage .evict_actor_db(&self.inner.actor_id) @@ -874,12 +864,20 @@ fn open_fault_database( ) -> Result { let mut config = VfsConfig::default(); config.assert_batch_atomic = false; - let vfs = SqliteVfs::register_with_transport( + let transport = Arc::new(DirectDepotTransport::new(storage)); + let initial_main_page = handle + .block_on(fetch_initial_main_page_for_registration( + transport.clone(), + actor_id, + )) + .map_err(anyhow::Error::msg)?; + let vfs = SqliteVfs::register_with_transport_and_initial_page( &super::super::next_test_name("sqlite-fault-vfs"), - SqliteTransport::from_direct(storage), + transport, actor_id.to_string(), handle.clone(), config, + initial_main_page, None, ) .map_err(anyhow::Error::msg)?; diff --git a/engine/packages/depot-client/tests/inline/vfs.rs b/engine/packages/depot-client/tests/inline/vfs.rs index 8847ec5214..679f114d4c 100644 --- a/engine/packages/depot-client/tests/inline/vfs.rs +++ b/engine/packages/depot-client/tests/inline/vfs.rs @@ -2,8 +2,8 @@ mod fault; mod vfs_support; pub(super) use vfs_support::{ - DirectStorage, DirectStorageStats, DirectTransportHooks, protocol_fetched_page, - sqlite_error_response, storage_dirty_page, + DirectDepotTransport, DirectMirrorTransport, DirectStorage, DirectStorageStats, + storage_dirty_page, }; use std::sync::atomic::{AtomicU64, Ordering}; @@ -87,12 +87,20 @@ impl DirectEngineHarness { actor_id: &str, config: VfsConfig, ) -> NativeDatabase { - let vfs = SqliteVfs::register_with_transport( + let transport = Arc::new(DirectDepotTransport::new(engine)); + let initial_main_page = runtime + .block_on(fetch_initial_main_page_for_registration( + transport.clone(), + actor_id, + )) + .expect("initial main page preload should succeed"); + let vfs = SqliteVfs::register_with_transport_and_initial_page( &next_test_name("sqlite-direct-vfs"), - SqliteTransport::from_direct(engine), + transport, actor_id.to_string(), runtime.handle().clone(), config, + initial_main_page, None, ) .expect("v2 vfs should register"); @@ -110,7 +118,7 @@ impl DirectEngineHarness { VfsContext::new( self.actor_id.clone(), runtime.handle().clone(), - SqliteTransport::from_direct(engine), + Arc::new(DirectDepotTransport::new(engine)), VfsConfig::default(), unsafe { std::mem::zeroed() }, None, @@ -137,13 +145,21 @@ fn open_worker_handle_with_metrics( metrics: Option>, ) -> crate::database::NativeDatabaseHandle { let engine = runtime.block_on(harness.open_engine()); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let initial_main_page = runtime + .block_on(fetch_initial_main_page_for_registration( + transport.clone(), + &harness.actor_id, + )) + .expect("initial main page preload should succeed"); let vfs = Arc::new( - SqliteVfs::register_with_transport( + SqliteVfs::register_with_transport_and_initial_page( &next_test_name("sqlite-worker-vfs"), - SqliteTransport::from_direct(engine), + transport, harness.actor_id.clone(), runtime.handle().clone(), VfsConfig::default(), + initial_main_page, None, ) .expect("worker vfs should register"), @@ -638,29 +654,27 @@ fn direct_engine_open_engine_is_concurrency_safe() { } #[test] -fn vfs_register_inside_runtime_worker_does_not_block_on_current_thread() { +fn initial_page_fetch_inside_block_in_place_can_use_runtime_handle() { let runtime = direct_runtime(); runtime.block_on(async { let harness = Arc::new(DirectEngineHarness::new()); let engine = harness.open_engine().await; - engine.enable_strict_mode(); let actor_id = harness.actor_id.clone(); - let runtime = tokio::runtime::Handle::current(); - - tokio::task::spawn(async move { - let vfs = SqliteVfs::register_with_transport( - &next_test_name("sqlite-runtime-worker-vfs"), - SqliteTransport::from_direct(engine), - actor_id, - runtime, - VfsConfig::default(), - None, - ) - .expect("vfs should register from a runtime worker"); - drop(vfs); + let handle = tokio::runtime::Handle::current(); + + let initial_main_page = tokio::task::spawn(async move { + tokio::task::block_in_place(|| { + handle.block_on(fetch_initial_main_page_for_registration( + Arc::new(DirectDepotTransport::new(engine)), + &actor_id, + )) + }) }) .await - .expect("runtime worker task should finish"); + .expect("runtime worker task should finish") + .expect("initial main page preload should succeed"); + + assert!(initial_main_page.is_none()); }); } @@ -805,7 +819,6 @@ fn strict_direct_reopen_ignores_poisoned_mirror_and_reads_depot() { let engine = runtime.block_on(harness.open_engine()); runtime.block_on(engine.poison_mirror_page(&harness.actor_id, 1, vec![0xdb; 4096], page_count)); - engine.enable_strict_mode(); runtime.block_on(engine.evict_actor_db(&harness.actor_id)); let before = engine.stats(); @@ -818,12 +831,11 @@ fn strict_direct_reopen_ignores_poisoned_mirror_and_reads_depot() { let after = engine.stats(); assert!(after.depot_get_pages > before.depot_get_pages); assert_eq!(after.mirror_reads, before.mirror_reads); - assert_eq!(after.mirror_fills, before.mirror_fills); assert_eq!(after.mirror_seeds, before.mirror_seeds); } #[test] -fn strict_direct_mode_rejects_mirror_fallback_and_seed_paths() { +fn direct_depot_transport_rejects_mirror_fallback() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); @@ -839,31 +851,69 @@ fn strict_direct_mode_rejects_mirror_fallback_and_seed_paths() { )) .expect("non-strict mirror seed should succeed"); - engine.enable_strict_mode(); runtime.block_on(engine.evict_actor_db(&harness.actor_id)); let before = engine.stats(); let err = runtime .block_on(engine.get_pages(&harness.actor_id, &[1])) - .expect_err("strict mode should not read from the mirror"); + .expect_err("depot transport should not read from the mirror"); assert!(!err.to_string().is_empty()); let after = engine.stats(); assert_eq!(after.mirror_reads, before.mirror_reads); - assert_eq!(after.mirror_fills, before.mirror_fills); +} - let err = runtime - .block_on(engine.apply_commit( +#[test] +fn direct_mirror_transport_reopens_from_mirror_pages() { + let runtime = direct_runtime(); + let harness = DirectEngineHarness::new(); + let engine = runtime.block_on(harness.open_engine()); + let transport = Arc::new(DirectMirrorTransport::new(engine.clone())); + let hooks = transport.direct_hooks(); + + { + let vfs = SqliteVfs::register_with_transport( + &next_test_name("sqlite-mirror-vfs"), + transport.clone(), + harness.actor_id.clone(), + runtime.handle().clone(), + VfsConfig::default(), + None, + ) + .expect("mirror vfs should register"); + let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open"); + sqlite_exec(db.as_ptr(), "PRAGMA user_version = 314;") + .expect("user_version write should succeed"); + } + + let initial_main_page = runtime + .block_on(fetch_initial_main_page_for_registration( + transport.clone(), &harness.actor_id, - vec![storage_dirty_page(protocol::SqliteDirtyPage { - pgno: 1, - bytes: empty_db_page(), - })], - 1, )) - .expect_err("strict mode should reject mirror seed"); + .expect("mirror preload should succeed"); + let vfs = SqliteVfs::register_with_transport_and_initial_page( + &next_test_name("sqlite-mirror-vfs"), + transport, + harness.actor_id.clone(), + runtime.handle().clone(), + VfsConfig::default(), + initial_main_page, + None, + ) + .expect("mirror vfs should register"); + let reopened = open_database(vfs, &harness.actor_id).expect("sqlite database should reopen"); + assert_eq!( + sqlite_query_i64(reopened.as_ptr(), "PRAGMA user_version;") + .expect("mirror user_version read should succeed"), + 314 + ); assert!( - err.to_string() - .contains("forbids mirror-backed cache seeding") + !hooks.commit_requests().is_empty(), + "mirror transport should record commit requests", ); + let stats = engine.stats(); + assert_eq!(stats.depot_get_pages, 0); + assert!(stats.mirror_reads > 0); + assert!(stats.mirror_seeds > 0); } #[test] @@ -882,17 +932,17 @@ fn strict_direct_reopen_counts_cold_tier_get_for_cold_covered_page() { let engine = runtime.block_on(harness.open_engine()); let page = runtime - .block_on(engine.snapshot_pages(&harness.actor_id)) - .pages - .get(&1) - .cloned() + .block_on(engine.get_pages(&harness.actor_id, &[1])) + .expect("page 1 should be fetched from depot") + .into_iter() + .find(|page| page.pgno == 1) + .and_then(|page| page.bytes) .expect("page 1 should be present"); assert_eq!(&page[..16], b"SQLite format 3\0"); runtime .block_on(engine.seed_page_as_cold_ref(&harness.actor_id, 1, page)) .expect("cold ref should seed"); runtime.block_on(engine.poison_mirror_page(&harness.actor_id, 1, vec![0xcd; 4096], page_count)); - engine.enable_strict_mode(); runtime.block_on(engine.evict_actor_db(&harness.actor_id)); let before = engine.stats(); @@ -906,7 +956,6 @@ fn strict_direct_reopen_counts_cold_tier_get_for_cold_covered_page() { assert!(after.depot_get_pages > before.depot_get_pages); assert!(after.cold_gets > before.cold_gets); assert_eq!(after.mirror_reads, before.mirror_reads); - assert_eq!(after.mirror_fills, before.mirror_fills); assert_eq!(after.mirror_seeds, before.mirror_seeds); } @@ -926,17 +975,17 @@ fn strict_direct_warmed_shard_cache_does_not_count_as_cold_tier_evidence() { let engine = runtime.block_on(harness.open_engine()); let page = runtime - .block_on(engine.snapshot_pages(&harness.actor_id)) - .pages - .get(&1) - .cloned() + .block_on(engine.get_pages(&harness.actor_id, &[1])) + .expect("page 1 should be fetched from depot") + .into_iter() + .find(|page| page.pgno == 1) + .and_then(|page| page.bytes) .expect("page 1 should be present"); assert_eq!(&page[..16], b"SQLite format 3\0"); runtime .block_on(engine.seed_page_as_cold_ref(&harness.actor_id, 1, page)) .expect("cold ref should seed"); runtime.block_on(engine.poison_mirror_page(&harness.actor_id, 1, vec![0xcd; 4096], page_count)); - engine.enable_strict_mode(); runtime.block_on(engine.evict_actor_db(&harness.actor_id)); let before_warm = engine.stats(); @@ -966,7 +1015,6 @@ fn strict_direct_warmed_shard_cache_does_not_count_as_cold_tier_evidence() { assert!(after.depot_get_pages > before.depot_get_pages); assert_eq!(after.cold_gets, before.cold_gets); assert_eq!(after.mirror_reads, before.mirror_reads); - assert_eq!(after.mirror_fills, before.mirror_fills); assert_eq!(after.mirror_seeds, before.mirror_seeds); } @@ -2073,10 +2121,8 @@ fn direct_engine_marks_vfs_dead_after_transport_errors() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2117,10 +2163,8 @@ fn flush_dirty_pages_marks_vfs_dead_after_transport_error() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2163,10 +2207,8 @@ fn commit_atomic_write_marks_vfs_dead_after_transport_error() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2211,7 +2253,7 @@ fn commit_atomic_write_clears_last_error_on_success() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); + let transport = Arc::new(DirectDepotTransport::new(engine)); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2252,10 +2294,8 @@ fn concurrent_reader_during_commit_atomic_observes_consistent_snapshot() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let ctx = VfsContext::new( harness.actor_id.clone(), runtime.handle().clone(), @@ -2366,7 +2406,7 @@ fn vfs_delete_main_db_resets_in_memory_state() { let harness = DirectEngineHarness::new(); let vfs_name = next_test_name("sqlite-direct-vfs"); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); + let transport = Arc::new(DirectDepotTransport::new(engine)); let vfs = SqliteVfs::register_with_transport( &vfs_name, transport, @@ -2697,10 +2737,8 @@ fn direct_engine_fresh_reopen_recovers_after_poisoned_handle() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine.clone()); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine.clone())); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2785,10 +2823,8 @@ fn direct_engine_crash_with_dirty_buffer_recovers_last_commit() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine.clone()); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine.clone())); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -2951,10 +2987,8 @@ fn native_database_drop_times_out_pending_commit() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let vfs = SqliteVfs::register_with_transport( &next_test_name("sqlite-direct-vfs"), transport, @@ -3190,11 +3224,20 @@ fn truncate_main_file_discards_pages_beyond_eof() { fn resolve_pages_surfaces_read_path_error_response() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); - let ctx = harness.open_context(&runtime); - ctx.transport - .direct_hooks() - .expect("direct transport should expose test hooks") - .fail_next_get_pages("InjectedGetPagesError: read path dropped"); + let engine = runtime.block_on(harness.open_engine()); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); + let ctx = VfsContext::new( + harness.actor_id.clone(), + runtime.handle().clone(), + transport, + VfsConfig::default(), + unsafe { std::mem::zeroed() }, + None, + None, + ) + .expect("vfs context should build"); + hooks.fail_next_get_pages("InjectedGetPagesError: read path dropped"); let err = ctx .resolve_pages(&[2], false) @@ -3211,14 +3254,12 @@ fn commit_buffered_pages_uses_fast_path() { let runtime = direct_runtime(); let harness = DirectEngineHarness::new(); let engine = runtime.block_on(harness.open_engine()); - let transport = SqliteTransport::from_direct(engine); - let hooks = transport - .direct_hooks() - .expect("direct transport should expose test hooks"); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); let outcome = runtime .block_on(commit_buffered_pages( - &transport, + &*transport, BufferedCommitRequest { actor_id: harness.actor_id.clone(), new_db_size_pages: 1, diff --git a/engine/packages/depot-client/tests/inline/vfs_support.rs b/engine/packages/depot-client/tests/inline/vfs_support.rs index 969d090545..65182e9696 100644 --- a/engine/packages/depot-client/tests/inline/vfs_support.rs +++ b/engine/packages/depot-client/tests/inline/vfs_support.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::sync::{ Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicU64, Ordering}, mpsc, }; @@ -10,7 +10,6 @@ use async_trait::async_trait; use depot::{ cold_tier::{ColdTier, ColdTierObjectMetadata}, conveyer::{Db, db::CompactionSignaler}, - error::SqliteStorageError, fault::DepotFaultController, keys::{ SHARD_SIZE, branch_compaction_cold_shard_key, branch_compaction_root_key, @@ -18,7 +17,7 @@ use depot::{ }, ltx::{LtxHeader, encode_ltx_v3}, types::{ - ColdShardRef, CompactionRoot, DatabaseBranchId, DirtyPage, decode_db_head, + ColdShardRef, CompactionRoot, DBHead, DatabaseBranchId, DirtyPage, decode_db_head, encode_cold_shard_ref, encode_compaction_root, }, workflows::compaction::DeltasAvailable, @@ -29,6 +28,8 @@ use rivet_pools::{__rivet_util::Id, NodeId}; use sha2::{Digest, Sha256}; use universaldb::utils::IsolationLevel::Serializable; +use super::super::SqliteTransport; + pub(crate) struct DirectStorage { db: Arc, node_id: NodeId, @@ -36,7 +37,6 @@ pub(crate) struct DirectStorage { actor_dbs: scc::HashMap>, page_mirrors: scc::HashMap>>, compaction_signals: Arc>>, - strict: AtomicBool, counters: Arc, fault_controller: Option, pub(crate) hooks: Arc, @@ -88,7 +88,6 @@ impl DirectStorage { actor_dbs: scc::HashMap::new(), page_mirrors: scc::HashMap::new(), compaction_signals: Arc::new(Mutex::new(Vec::new())), - strict: AtomicBool::new(false), counters, fault_controller, hooks: Arc::new(DirectTransportHooks::default()), @@ -140,19 +139,10 @@ impl DirectStorage { let _ = self.actor_dbs.remove_async(&actor_id.to_string()).await; } - pub(crate) fn enable_strict_mode(&self) { - self.strict.store(true, Ordering::SeqCst); - } - - pub(crate) fn is_strict_mode(&self) -> bool { - self.strict.load(Ordering::SeqCst) - } - pub(crate) fn stats(&self) -> DirectStorageStats { DirectStorageStats { depot_get_pages: self.counters.depot_get_pages.load(Ordering::SeqCst), mirror_reads: self.counters.mirror_reads.load(Ordering::SeqCst), - mirror_fills: self.counters.mirror_fills.load(Ordering::SeqCst), mirror_seeds: self.counters.mirror_seeds.load(Ordering::SeqCst), cold_gets: self.counters.cold_gets.load(Ordering::SeqCst), } @@ -185,18 +175,47 @@ impl DirectStorage { pgno: u32, bytes: Vec, ) -> Result<()> { + let head = self.read_head(actor_id).await?; + let shard_id = pgno / SHARD_SIZE; let snapshot = self.snapshot_pages(actor_id).await; - let mut dirty_pages = snapshot - .pages + let shard_pgnos = (1..=head.db_size_pages) + .filter(|candidate_pgno| *candidate_pgno / SHARD_SIZE == shard_id) + .collect::>(); + let missing_pgnos = shard_pgnos .iter() - .filter(|(candidate_pgno, _)| **candidate_pgno / SHARD_SIZE == pgno / SHARD_SIZE) - .map(|(pgno, bytes)| DirtyPage { - pgno: *pgno, - bytes: bytes.clone(), + .filter(|candidate_pgno| !snapshot.pages.contains_key(candidate_pgno)) + .copied() + .collect::>(); + let fetched_pages = if missing_pgnos.is_empty() { + Vec::new() + } else { + self.actor_db(actor_id.to_string()) + .await + .get_pages(missing_pgnos) + .await? + }; + let mut fetched_by_pgno = fetched_pages + .into_iter() + .filter_map(|page| page.bytes.map(|bytes| (page.pgno, bytes))) + .collect::>(); + fetched_by_pgno.entry(pgno).or_insert(bytes); + + let dirty_pages = shard_pgnos + .into_iter() + .filter_map(|candidate_pgno| { + snapshot + .pages + .get(&candidate_pgno) + .cloned() + .or_else(|| fetched_by_pgno.remove(&candidate_pgno)) + .map(|bytes| DirtyPage { + pgno: candidate_pgno, + bytes, + }) }) .collect::>(); if dirty_pages.is_empty() { - dirty_pages.push(DirtyPage { pgno, bytes }); + bail!("cold-ref seed could not load pages for shard {shard_id}"); } self.seed_pages_as_cold_ref(actor_id, pgno, dirty_pages) .await @@ -272,6 +291,11 @@ impl DirectStorage { } pub(crate) async fn read_branch_head(&self, actor_id: &str) -> Result<(DatabaseBranchId, u64)> { + let head = self.read_head(actor_id).await?; + Ok((head.branch_id, head.head_txid)) + } + + async fn read_head(&self, actor_id: &str) -> Result { let actor_id = actor_id.to_string(); self.db .run(move |tx| { @@ -290,7 +314,7 @@ impl DirectStorage { .get(&branch_meta_head_key(branch_id), Serializable) .await? .context("database head should exist")?; - Ok((branch_id, decode_db_head(&head)?.head_txid)) + decode_db_head(&head) } }) .await @@ -316,67 +340,14 @@ impl DirectStorage { let actor_db = self.actor_db(actor_id.to_string()).await; self.counters.depot_get_pages.fetch_add(1, Ordering::SeqCst); - match actor_db.get_pages(pgnos.to_vec()).await { - Ok(pages) if self.strict.load(Ordering::SeqCst) => Ok(pages), - Ok(pages) => self.fill_from_mirror(actor_id, pgnos, pages).await, - Err(err) => { - if matches!( - depot_error(&err), - Some(SqliteStorageError::MetaMissing { operation }) - if *operation == "get_pages" - ) { - if self.strict.load(Ordering::SeqCst) { - return Err(anyhow::anyhow!( - "strict DirectStorage forbids mirror fallback for missing depot metadata" - )); - } - Ok(self.read_mirror(actor_id, pgnos).await) - } else { - Err(err) - } - } - } + actor_db.get_pages(pgnos.to_vec()).await } - async fn fill_from_mirror( + pub(crate) async fn read_mirror( &self, actor_id: &str, pgnos: &[u32], - pages: Vec, - ) -> anyhow::Result> { - self.counters.mirror_fills.fetch_add(1, Ordering::SeqCst); - if self.strict.load(Ordering::SeqCst) { - return Err(anyhow::anyhow!( - "strict DirectStorage forbids mirror-backed cache seeding" - )); - } - - let mut by_pgno = pages - .into_iter() - .map(|page| (page.pgno, page)) - .collect::>(); - let mirror_pages = self.read_mirror(actor_id, pgnos).await; - for page in mirror_pages { - if page.bytes.is_some() - || by_pgno - .get(&page.pgno) - .is_none_or(|existing| existing.bytes.is_none()) - { - by_pgno.insert(page.pgno, page); - } - } - Ok(pgnos - .iter() - .map(|pgno| { - by_pgno.remove(pgno).unwrap_or(depot::types::FetchedPage { - pgno: *pgno, - bytes: None, - }) - }) - .collect()) - } - - async fn read_mirror(&self, actor_id: &str, pgnos: &[u32]) -> Vec { + ) -> Vec { self.counters.mirror_reads.fetch_add(1, Ordering::SeqCst); let mirror = self.page_mirror(actor_id.to_string()).await; let mirror = mirror.lock(); @@ -400,11 +371,6 @@ impl DirectStorage { db_size_pages: u32, ) -> anyhow::Result<()> { self.counters.mirror_seeds.fetch_add(1, Ordering::SeqCst); - if self.strict.load(Ordering::SeqCst) { - return Err(anyhow::anyhow!( - "strict DirectStorage forbids mirror-backed cache seeding" - )); - } let mirror = self.page_mirror(actor_id.to_string()).await; let mut mirror = mirror.lock(); @@ -425,11 +391,134 @@ impl DirectStorage { } } +pub(crate) struct DirectDepotTransport { + storage: Arc, +} + +impl DirectDepotTransport { + pub(crate) fn new(storage: Arc) -> Self { + Self { storage } + } + + pub(crate) fn direct_hooks(&self) -> Arc { + Arc::clone(&self.storage.hooks) + } +} + +#[async_trait] +impl SqliteTransport for DirectDepotTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> Result { + let pgnos = request.pgnos.clone(); + match self.storage.get_pages(&request.actor_id, &pgnos).await { + Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( + protocol::SqliteGetPagesOk { + pages: pages.into_iter().map(protocol_fetched_page).collect(), + }, + )), + Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( + sqlite_error_response(&err), + )), + } + } + + async fn commit( + &self, + request: protocol::SqliteCommitRequest, + ) -> Result { + self.storage + .hooks + .apply_commit_hooks(request.clone()) + .await?; + + let actor_id = request.actor_id.clone(); + let dirty_pages = request + .dirty_pages + .into_iter() + .map(storage_dirty_page) + .collect::>(); + let actor_db = self.storage.actor_db(actor_id).await; + match actor_db + .commit(dirty_pages, request.db_size_pages, request.now_ms) + .await + { + Ok(_) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk), + Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( + sqlite_error_response(&err), + )), + } + } +} + +pub(crate) struct DirectMirrorTransport { + storage: Arc, +} + +impl DirectMirrorTransport { + pub(crate) fn new(storage: Arc) -> Self { + Self { storage } + } + + pub(crate) fn direct_hooks(&self) -> Arc { + Arc::clone(&self.storage.hooks) + } +} + +#[async_trait] +impl SqliteTransport for DirectMirrorTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> Result { + if let Some(message) = self.storage.hooks.take_get_pages_error() { + return Err(anyhow::anyhow!(message)); + } + + let pages = self + .storage + .read_mirror(&request.actor_id, &request.pgnos) + .await; + Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk( + protocol::SqliteGetPagesOk { + pages: pages.into_iter().map(protocol_fetched_page).collect(), + }, + )) + } + + async fn commit( + &self, + request: protocol::SqliteCommitRequest, + ) -> Result { + self.storage + .hooks + .apply_commit_hooks(request.clone()) + .await?; + + let actor_id = request.actor_id.clone(); + let dirty_pages = request + .dirty_pages + .into_iter() + .map(storage_dirty_page) + .collect::>(); + match self + .storage + .apply_commit(&actor_id, dirty_pages, request.db_size_pages) + .await + { + Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk), + Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( + sqlite_error_response(&err), + )), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct DirectStorageStats { pub(crate) depot_get_pages: u64, pub(crate) mirror_reads: u64, - pub(crate) mirror_fills: u64, pub(crate) mirror_seeds: u64, pub(crate) cold_gets: u64, } @@ -438,7 +527,6 @@ pub(crate) struct DirectStorageStats { struct DirectStorageCounters { depot_get_pages: AtomicU64, mirror_reads: AtomicU64, - mirror_fills: AtomicU64, mirror_seeds: AtomicU64, cold_gets: AtomicU64, } @@ -535,6 +623,21 @@ impl DirectTransportHooks { let _ = gate.reached.send(()); let _ = gate.resume.recv(); } + + pub(crate) async fn apply_commit_hooks( + &self, + req: protocol::SqliteCommitRequest, + ) -> Result<()> { + self.record_commit_request(req); + if self.take_commit_hang() { + std::future::pending().await + } + if let Some(message) = self.take_commit_error() { + return Err(anyhow::anyhow!(message)); + } + self.pause_commit_if_requested(); + Ok(()) + } } pub(crate) struct DirectCommitPause { @@ -573,10 +676,6 @@ pub(crate) fn storage_dirty_page(page: protocol::SqliteDirtyPage) -> depot::type } } -fn depot_error(err: &anyhow::Error) -> Option<&SqliteStorageError> { - err.downcast_ref::() -} - fn sqlite_error_reason(err: &anyhow::Error) -> String { err.chain() .map(ToString::to_string) diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 657771e6ac..804af9c913 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -30,9 +30,7 @@ use url::Url; use crate::RouteTarget; use crate::request_context::RequestContext; use crate::response_body::ResponseBody; -use crate::route::{ - CacheKeyFn, ResolveRouteOutput, RouteCache, RoutingFn, RoutingOutput, -}; +use crate::route::{CacheKeyFn, ResolveRouteOutput, RouteCache, RoutingFn, RoutingOutput}; use crate::utils::{InFlightCounter, RateLimiter}; use crate::{ WebSocketHandle, custom_serve::HibernationResult, errors, metrics, task_group::TaskGroup, utils, diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index 8c18f3ef30..381cf12667 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -8,7 +8,7 @@ use depot::{ }, }; use depot_client::{ - database::{NativeDatabaseHandle, open_database_from_conveyer}, + database::{NativeDatabaseHandle, open_database_from_embedded_depot}, types::{BindParam, ColumnValue, ExecuteResult, QueryResult}, }; use futures_util::{FutureExt, TryStreamExt}; @@ -467,10 +467,20 @@ async fn handle_sqlite_get_pages_response( request: protocol::SqliteGetPagesRequest, ) -> protocol::SqliteGetPagesResponse { let actor_id = request.actor_id.clone(); + let pgnos = request.pgnos.clone(); + let expected_generation = request.expected_generation; + let expected_head_txid = request.expected_head_txid; match handle_sqlite_get_pages(ctx, conn, request).await { Ok(response) => response, Err(err) => { - tracing::error!(actor_id = %actor_id, ?err, "sqlite get_pages request failed"); + tracing::error!( + actor_id = %actor_id, + ?pgnos, + ?expected_generation, + ?expected_head_txid, + ?err, + "sqlite get_pages request failed" + ); protocol::SqliteGetPagesResponse::SqliteErrorResponse(sqlite_error_response(&err)) } } @@ -916,7 +926,7 @@ async fn remote_sqlite_executor_from_parts( let actor_id = actor_id.to_string(); let database = cell .get_or_try_init(|| async move { - open_database_from_conveyer( + open_database_from_embedded_depot( actor_db, actor_id, generation, diff --git a/engine/sdks/rust/envoy-client/src/context.rs b/engine/sdks/rust/envoy-client/src/context.rs index b3363daace..06487bd708 100644 --- a/engine/sdks/rust/envoy-client/src/context.rs +++ b/engine/sdks/rust/envoy-client/src/context.rs @@ -5,9 +5,9 @@ use std::sync::atomic::AtomicBool; use crate::async_counter::AsyncCounter; use rivet_envoy_protocol as protocol; -use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::sync::Notify; +use tokio::sync::mpsc; use tokio::sync::watch; use crate::actor::ToActor; diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index 86bde62913..1839c351bf 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -570,53 +570,52 @@ impl EnvoyHandle { fn decode_serverless_actor_start_payload( payload: &[u8], ) -> anyhow::Result<(protocol::ToEnvoy, ServerlessActorStart)> { - use vbare::OwnedVersionedData; + use vbare::OwnedVersionedData; - if payload.len() < 2 { - anyhow::bail!("serverless start payload too short"); - } + if payload.len() < 2 { + anyhow::bail!("serverless start payload too short"); + } - let version = u16::from_le_bytes([payload[0], payload[1]]); - if version != protocol::PROTOCOL_VERSION { - anyhow::bail!( - "serverless start payload does not match protocol version: {version} vs {}", - protocol::PROTOCOL_VERSION + let version = u16::from_le_bytes([payload[0], payload[1]]); + if version != protocol::PROTOCOL_VERSION { + anyhow::bail!( + "serverless start payload does not match protocol version: {version} vs {}", + protocol::PROTOCOL_VERSION + ); + } + + let message = match crate::protocol::versioned::ToEnvoy::deserialize(&payload[2..], version) { + Ok(message) => message, + Err(err) if version == protocol::PROTOCOL_VERSION => { + tracing::debug!( + ?err, + "serverless start payload failed current-version decode, retrying as v1-compatible body" ); + crate::protocol::versioned::ToEnvoy::deserialize( + &payload[2..], + protocol::PROTOCOL_VERSION - 1, + )? } + Err(err) => return Err(err), + }; - let message = match crate::protocol::versioned::ToEnvoy::deserialize(&payload[2..], version) - { - Ok(message) => message, - Err(err) if version == protocol::PROTOCOL_VERSION => { - tracing::debug!( - ?err, - "serverless start payload failed current-version decode, retrying as v1-compatible body" - ); - crate::protocol::versioned::ToEnvoy::deserialize( - &payload[2..], - protocol::PROTOCOL_VERSION - 1, - )? - } - Err(err) => return Err(err), - }; - - let protocol::ToEnvoy::ToEnvoyCommands(ref commands) = message else { - anyhow::bail!("invalid serverless payload: expected ToEnvoyCommands"); - }; - if commands.len() != 1 { - anyhow::bail!("invalid serverless payload: expected exactly 1 command"); - } - if !matches!(commands[0].inner, protocol::Command::CommandStartActor(_)) { - anyhow::bail!("invalid serverless payload: expected CommandStartActor"); - } + let protocol::ToEnvoy::ToEnvoyCommands(ref commands) = message else { + anyhow::bail!("invalid serverless payload: expected ToEnvoyCommands"); + }; + if commands.len() != 1 { + anyhow::bail!("invalid serverless payload: expected exactly 1 command"); + } + if !matches!(commands[0].inner, protocol::Command::CommandStartActor(_)) { + anyhow::bail!("invalid serverless payload: expected CommandStartActor"); + } - let actor_start = ServerlessActorStart { - actor_id: commands[0].checkpoint.actor_id.clone(), - generation: commands[0].checkpoint.generation, - }; + let actor_start = ServerlessActorStart { + actor_id: commands[0].checkpoint.actor_id.clone(), + generation: commands[0].checkpoint.generation, + }; - Ok((message, actor_start)) - } + Ok((message, actor_start)) +} impl EnvoyHandle { async fn send_kv_request( diff --git a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts index c77d7b3cea..a450f09bf6 100644 --- a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts +++ b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts @@ -1206,13 +1206,40 @@ async function runActorDriver( const lateMs = Math.max(0, Date.now() - scheduledAt); const startedAt = performance.now(); - const result = await handle.runCycle({ + const input = { seed: `${args.seed}:${actorIndex}`, cycle, insertRows: args.insertRows, rowBytes: args.rowBytes, scanRows: args.scanRows, + }; + writeEvent(jsonlPath, { + kind: "cycle_start", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + input, + timestamp: new Date().toISOString(), }); + let result: Awaited>; + try { + result = await handle.runCycle(input); + } catch (err) { + writeEvent(jsonlPath, { + kind: "cycle_error", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + durationMs: performance.now() - startedAt, + error: stringifyError(err), + timestamp: new Date().toISOString(), + }); + throw err; + } assertCycle(result); const durationMs = performance.now() - startedAt; if (!wroteActorWake) { @@ -1305,13 +1332,40 @@ async function runChurnActorDriver( const lateMs = Math.max(0, Date.now() - scheduledAt); const startedAt = performance.now(); - const result = await handle.runCycle({ + const input = { seed: `${args.seed}:${actorIndex}`, cycle, insertRows: args.insertRows, rowBytes: args.rowBytes, scanRows: args.scanRows, + }; + writeEvent(jsonlPath, { + kind: "cycle_start", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + input, + timestamp: new Date().toISOString(), }); + let result: Awaited>; + try { + result = await handle.runCycle(input); + } catch (err) { + writeEvent(jsonlPath, { + kind: "cycle_error", + actorIndex, + key, + cycle, + scheduledAt: new Date(scheduledAt).toISOString(), + lateMs, + durationMs: performance.now() - startedAt, + error: stringifyError(err), + timestamp: new Date().toISOString(), + }); + throw err; + } assertCycle(result); const durationMs = performance.now() - startedAt; writeEvent(jsonlPath, { diff --git a/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts b/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts index 2279dafa88..7eca794c8a 100644 --- a/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts +++ b/examples/kitchen-sink/src/actors/testing/sqlite-memory-pressure.ts @@ -223,8 +223,54 @@ export const sqliteMemoryPressure = actor({ const scanRows = Math.max(1, finiteInt(input.scanRows, DEFAULT_SCAN_ROWS)); const now = Date.now(); let insertedRows = 0; + const logStage = ( + stage: string, + phase: "start" | "end" | "error", + fields: Record = {}, + ) => { + console.log( + JSON.stringify({ + kind: "sqlite_memory_pressure_run_cycle_stage", + actorId: c.actorId, + seed: input.seed, + cycle: input.cycle, + stage, + phase, + elapsedMs: performance.now() - startedAt, + timestamp: new Date().toISOString(), + ...fields, + }), + ); + }; + const executeTimed = async ( + stage: string, + sql: string, + ...args: unknown[] + ) => { + const stageStartedAt = performance.now(); + logStage(stage, "start", { argCount: args.length }); + try { + const rows = await c.db.execute(sql, ...args); + logStage(stage, "end", { + durationMs: performance.now() - stageStartedAt, + rowCount: rows.length, + }); + return rows; + } catch (err) { + logStage(stage, "error", { + durationMs: performance.now() - stageStartedAt, + error: err instanceof Error ? err.message : String(err), + }); + throw err; + } + }; + logStage("run_cycle", "start", { + insertRows, + rowBytes, + scanRows, + }); - await c.db.execute("BEGIN"); + await executeTimed("begin", "BEGIN"); try { while (insertedRows < insertRows) { const batchRows = Math.min( @@ -246,28 +292,36 @@ export const sqliteMemoryPressure = actor({ ); } - await c.db.execute( + await executeTimed( + "insert_batch", `INSERT INTO pressure_rows (seed, cycle, bucket, payload, touched_count, created_at) VALUES ${placeholders.join(", ")}`, ...args, ); insertedRows += batchRows; + logStage("insert_batch_progress", "end", { + insertedRows, + batchRows, + }); } - await c.db.execute("COMMIT"); + await executeTimed("commit", "COMMIT"); } catch (err) { - await c.db.execute("ROLLBACK").catch(() => undefined); + await executeTimed("rollback", "ROLLBACK").catch(() => undefined); throw err; } - const scan = await c.db.execute( + const scan = await executeTimed( + "scan_recent", "SELECT id, length(payload) AS payload_bytes FROM pressure_rows ORDER BY id DESC LIMIT ?", scanRows, ); - const bucketAgg = await c.db.execute( + const bucketAgg = await executeTimed( + "bucket_agg", "SELECT bucket, COUNT(*) AS rows, SUM(length(payload)) AS bytes FROM pressure_rows WHERE bucket BETWEEN ? AND ? GROUP BY bucket ORDER BY bucket", input.cycle % 16, (input.cycle % 16) + 15, ); - await c.db.execute( + await executeTimed( + "touch_recent", "UPDATE pressure_rows SET touched_count = touched_count + 1 WHERE id IN (SELECT id FROM pressure_rows ORDER BY id DESC LIMIT ?)", Math.min(scanRows, insertRows), ); @@ -291,20 +345,31 @@ export const sqliteMemoryPressure = actor({ // deletedRows = afterDelete.count; // } - const rowStats = await queryOne<{ - active_rows: number; - active_bytes: number | null; - }>( - c.db, + const rowStatsRows = await executeTimed( + "row_stats", "SELECT COUNT(*) AS active_rows, COALESCE(SUM(length(payload)), 0) AS active_bytes FROM pressure_rows", ); - const integrity = await queryOne<{ integrity_check: string }>( - c.db, + const rowStats = rowStatsRows[0] as + | { + active_rows: number; + active_bytes: number | null; + } + | undefined; + if (!rowStats) throw new Error("query returned no rows: row_stats"); + const integrityRows = await executeTimed( + "integrity_check", "PRAGMA integrity_check", ); + const integrity = integrityRows[0] as + | { integrity_check: string } + | undefined; + if (!integrity) { + throw new Error("query returned no rows: integrity_check"); + } const durationMs = performance.now() - startedAt; - await c.db.execute( + await executeTimed( + "record_cycle", "INSERT OR REPLACE INTO pressure_cycles (cycle, seed, inserted_rows, deleted_rows, active_rows, active_bytes, duration_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", input.cycle, input.seed, @@ -316,6 +381,22 @@ export const sqliteMemoryPressure = actor({ now, ); + const storageStartedAt = performance.now(); + logStage("storage_stats", "start"); + const storage = await storageStats(c.db); + logStage("storage_stats", "end", { + durationMs: performance.now() - storageStartedAt, + pageCount: storage.page_count, + dbSizePages: storage.vfs?.dbSizePages ?? null, + pageCacheEntries: storage.vfs?.pageCacheEntries ?? null, + }); + logStage("run_cycle", "end", { + durationMs, + activeRows: rowStats.active_rows, + activeBytes: rowStats.active_bytes ?? 0, + pageCount: storage.page_count, + }); + return { seed: input.seed, cycle: input.cycle, @@ -326,7 +407,7 @@ export const sqliteMemoryPressure = actor({ scannedRows: scan.length, bucketsRead: bucketAgg.length, integrityCheck: integrity.integrity_check, - storage: await storageStats(c.db), + storage, durationMs, }; }, diff --git a/examples/kitchen-sink/src/server.ts b/examples/kitchen-sink/src/server.ts index cc9bc0aae5..ea5b8cb8ef 100644 --- a/examples/kitchen-sink/src/server.ts +++ b/examples/kitchen-sink/src/server.ts @@ -54,9 +54,16 @@ async function memoryBreakdown(forceGc: boolean) { const heap = v8.getHeapStatistics(); const spaces = v8.getHeapSpaceStatistics(); const nativeNonV8Estimate = Math.max(0, memory.rss - heap.total_heap_size); - const registryDiagnostics = await registry.diagnostics().catch((error: unknown) => ({ - error: error instanceof Error ? error.message : String(error), - })); + const diagnostics = + "diagnostics" in registry && + typeof registry.diagnostics === "function" + ? registry.diagnostics.bind(registry) + : undefined; + const registryDiagnostics = diagnostics + ? await diagnostics().catch((error: unknown) => ({ + error: error instanceof Error ? error.message : String(error), + })) + : { error: "registry diagnostics unavailable" }; return { pid: process.pid, diff --git a/rivetkit-rust/packages/rivetkit-core/Cargo.toml b/rivetkit-rust/packages/rivetkit-core/Cargo.toml index f79ba7883a..7cccabfd73 100644 --- a/rivetkit-rust/packages/rivetkit-core/Cargo.toml +++ b/rivetkit-rust/packages/rivetkit-core/Cargo.toml @@ -17,11 +17,12 @@ native-runtime = [ ] wasm-runtime = ["rivet-envoy-client/wasm-transport"] sqlite = ["sqlite-local"] -sqlite-local = ["native-runtime", "dep:depot-client"] +sqlite-local = ["native-runtime", "dep:async-trait", "dep:depot-client"] sqlite-remote = [] [dependencies] anyhow.workspace = true +async-trait = { workspace = true, optional = true } base64.workspace = true ciborium.workspace = true futures.workspace = true diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs index 40fff2898b..0b1a9b9ce6 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs @@ -19,6 +19,9 @@ use tokio::sync::Mutex as AsyncMutex; #[cfg(feature = "sqlite-local")] use tokio::task::JoinHandle; +#[cfg(feature = "sqlite-local")] +mod envoy_sqlite_transport; + #[cfg(feature = "sqlite-local")] use crate::error::ActorLifecycle; use crate::error::SqliteRuntimeError; @@ -27,13 +30,15 @@ use crate::runtime::RuntimeSpawner; #[cfg(feature = "sqlite-local")] use depot_client::{ - database::{NativeDatabaseHandle, open_database_from_envoy}, + database::{NativeDatabaseHandle, open_database_from_transport}, vfs::{SqliteVfsMetrics, SqliteVfsMetricsSnapshot}, worker::{ SQLITE_WORKER_QUEUE_CAPACITY, SqliteWorkerCloseTimeoutError, SqliteWorkerClosingError, SqliteWorkerDeadError, SqliteWorkerOverloadedError, }, }; +#[cfg(feature = "sqlite-local")] +use envoy_sqlite_transport::EnvoySqliteTransport; #[cfg(not(feature = "sqlite-local"))] #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] @@ -163,8 +168,8 @@ impl SqliteDb { let rt_handle = tokio::runtime::Handle::try_current() .context("open sqlite database requires a tokio runtime")?; - let native_db = open_database_from_envoy( - config.handle.clone(), + let native_db = open_database_from_transport( + Arc::new(EnvoySqliteTransport::new(config.handle.clone())), config.actor_id.clone(), config .generation diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite/envoy_sqlite_transport.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite/envoy_sqlite_transport.rs new file mode 100644 index 0000000000..5b8678c728 --- /dev/null +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite/envoy_sqlite_transport.rs @@ -0,0 +1,31 @@ +use anyhow::Result; +use async_trait::async_trait; +use depot_client::vfs::SqliteTransport; +use rivet_envoy_client::{handle::EnvoyHandle, protocol}; + +pub(super) struct EnvoySqliteTransport { + handle: EnvoyHandle, +} + +impl EnvoySqliteTransport { + pub(super) fn new(handle: EnvoyHandle) -> Self { + Self { handle } + } +} + +#[async_trait] +impl SqliteTransport for EnvoySqliteTransport { + async fn get_pages( + &self, + request: protocol::SqliteGetPagesRequest, + ) -> Result { + self.handle.sqlite_get_pages(request).await + } + + async fn commit( + &self, + request: protocol::SqliteCommitRequest, + ) -> Result { + self.handle.sqlite_commit(request).await + } +}