diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index 3fe8639604..f9ac4dcd41 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -260,7 +260,6 @@ pub struct BufferedCommitOutcome { #[derive(Debug, Clone, PartialEq, Eq)] pub enum CommitBufferError { FenceMismatch(String), - StageNotFound(u64), Other(String), } @@ -338,6 +337,7 @@ pub struct VfsContext { state: RwLock, aux_files: RwLock>>, last_error: Mutex>, + transient_commit_error: Mutex>, fatal_error: RwLock>, #[cfg(test)] fail_next_aux_open: Mutex>, @@ -432,7 +432,7 @@ struct RecentPageAccess { #[derive(Debug)] enum GetPagesError { - Fatal(String), + FenceMismatch(String), Other(String), } @@ -980,6 +980,7 @@ impl VfsContext { state: RwLock::new(state), aux_files: RwLock::new(BTreeMap::new()), last_error: Mutex::new(None), + transient_commit_error: Mutex::new(None), fatal_error: RwLock::new(None), #[cfg(test)] fail_next_aux_open: Mutex::new(None), @@ -1018,6 +1019,14 @@ impl VfsContext { self.fatal_error.read().clone() } + fn defer_transient_commit_error(&self, message: String) { + *self.transient_commit_error.lock() = Some(message); + } + + fn take_transient_commit_error(&self) -> Option { + self.transient_commit_error.lock().take() + } + pub(crate) fn take_last_error(&self) -> Option { self.last_error.lock().take() } @@ -1143,13 +1152,9 @@ impl VfsContext { self.state.read().dead } - fn mark_dead(&self, message: String) { - self.set_last_error(message); - self.state.write().dead = true; - } - fn mark_fatal(&self, message: String) { - self.mark_dead(message.clone()); + self.set_last_error(message.clone()); + self.state.write().dead = true; let mut fatal_error = self.fatal_error.write(); if fatal_error.is_none() { *fatal_error = Some(message); @@ -1455,7 +1460,7 @@ impl VfsContext { return Ok(resolved); } if is_head_fence_mismatch_response(&error) { - return Err(GetPagesError::Fatal(error.message)); + return Err(GetPagesError::FenceMismatch(error.message)); } Err(GetPagesError::Other(error.message)) } @@ -1755,12 +1760,7 @@ fn assert_batch_atomic_probe(db: *mut sqlite3, vfs: &SqliteVfs) -> std::result:: fn handle_non_finalize_commit_error(ctx: &VfsContext, err: &CommitBufferError) { match err { CommitBufferError::FenceMismatch(message) => ctx.mark_fatal(message.clone()), - CommitBufferError::StageNotFound(stage_id) => { - ctx.mark_dead(format!( - "sqlite stage {stage_id} missing during commit finalize" - )); - } - CommitBufferError::Other(message) => ctx.mark_dead(message.clone()), + CommitBufferError::Other(message) => ctx.set_last_error(message.clone()), } } @@ -2238,7 +2238,7 @@ unsafe extern "C" fn io_read( let resolved = match ctx.resolve_pages(&requested_pages, true) { Ok(pages) => pages, - Err(GetPagesError::Fatal(message)) => { + Err(GetPagesError::FenceMismatch(message)) => { tracing::error!( actor_id = %ctx.actor_id, requested_pages = ?requested_pages, @@ -2255,7 +2255,7 @@ unsafe extern "C" fn io_read( error = %message, "sqlite xRead failed to resolve pages" ); - ctx.mark_dead(message); + ctx.set_last_error(message); return SQLITE_IOERR_READ; } }; @@ -2380,12 +2380,12 @@ unsafe extern "C" fn io_write( } else { match ctx.resolve_pages(&pages_to_resolve, false) { Ok(pages) => pages, - Err(GetPagesError::Fatal(message)) => { + Err(GetPagesError::FenceMismatch(message)) => { ctx.mark_fatal(message); return SQLITE_IOERR_WRITE; } Err(GetPagesError::Other(message)) => { - ctx.mark_dead(message); + ctx.set_last_error(message); return SQLITE_IOERR_WRITE; } } @@ -2489,6 +2489,10 @@ unsafe extern "C" fn io_sync(p_file: *mut sqlite3_file, _flags: c_int) -> c_int return SQLITE_OK; } let ctx = &*file.ctx; + if let Some(message) = ctx.take_transient_commit_error() { + ctx.set_last_error(message); + return SQLITE_IOERR_FSYNC; + } match ctx.flush_dirty_pages() { Ok(_) => SQLITE_OK, Err(err) => { @@ -2575,6 +2579,9 @@ unsafe extern "C" fn io_file_control( ?err, "sqlite atomic write file control failed" ); + if let CommitBufferError::Other(message) = &err { + ctx.defer_transient_commit_error(message.clone()); + } handle_finalize_fence_error(ctx, &err); SQLITE_IOERR } @@ -3029,6 +3036,8 @@ impl Drop for NativeDatabase { Err(err) => { handle_non_finalize_commit_error(ctx, &err); tracing::warn!(?err, "failed to flush sqlite database before close"); + self.db = ptr::null_mut(); + return; } } } diff --git a/engine/packages/depot-client/tests/inline/vfs.rs b/engine/packages/depot-client/tests/inline/vfs.rs index e537e0292b..1deb13e0ab 100644 --- a/engine/packages/depot-client/tests/inline/vfs.rs +++ b/engine/packages/depot-client/tests/inline/vfs.rs @@ -2399,29 +2399,36 @@ fn direct_engine_marks_vfs_dead_after_transport_errors() { ) .expect("v2 vfs should register"); let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open"); + let ctx = direct_vfs_ctx(&db); + + { + let mut state = ctx.state.write(); + state.write_buffer.dirty.insert(1, empty_db_page()); + state.db_size_pages = 1; + } hooks.fail_next_commit("InjectedTransportError: commit transport dropped"); - let err = sqlite_exec( - db.as_ptr(), - "CREATE TABLE broken (id INTEGER PRIMARY KEY, value TEXT NOT NULL);", - ) - .expect_err("failing transport commit should surface as an IO error"); + let err = ctx + .flush_dirty_pages() + .expect_err("failing transport commit should surface as a VFS error"); assert!( - err.contains("I/O") || err.contains("disk I/O"), - "sqlite should surface transport failure as an IO error: {err}", + matches!(err, CommitBufferError::Other(ref message) if message.contains("InjectedTransportError")), + "VFS should surface transport failure as an IO error: {err:?}", ); assert!( - direct_vfs_ctx(&db).is_dead(), - "transport error should kill the v2 VFS" + !ctx.is_dead(), + "transport error should not poison the v2 VFS" + ); + assert!( + ctx.clone_fatal_error().is_none(), + "transport error should not be stored as fatal" ); assert_eq!( db.take_last_kv_error().as_deref(), Some("InjectedTransportError: commit transport dropped"), ); - assert!( - sqlite_query_i64(db.as_ptr(), "PRAGMA page_count;").is_err(), - "subsequent reads should fail once the VFS is dead", - ); + ctx.flush_dirty_pages() + .expect("retry after unapplied transport failure should succeed"); } #[test] @@ -2459,13 +2466,23 @@ fn flush_dirty_pages_marks_vfs_dead_after_transport_error() { "flush failure should surface as a transport error: {err:?}", ); assert!( - ctx.is_dead(), - "flush transport failure should poison the VFS" + !ctx.is_dead(), + "flush transport failure should not poison the VFS" + ); + assert!( + ctx.clone_fatal_error().is_none(), + "flush transport failure should not be stored as fatal" ); assert_eq!( db.take_last_kv_error().as_deref(), Some("InjectedTransportError: flush transport dropped"), ); + ctx.flush_dirty_pages() + .expect("retry after unapplied transport failure should succeed"); + assert!( + ctx.state.read().write_buffer.dirty.is_empty(), + "successful retry should clear dirty pages", + ); } #[test] @@ -2505,13 +2522,120 @@ fn commit_atomic_write_marks_vfs_dead_after_transport_error() { "atomic-write failure should surface as a transport error: {err:?}", ); assert!( - ctx.is_dead(), - "commit_atomic_write transport failure should poison the VFS", + !ctx.is_dead(), + "commit_atomic_write transport failure should not poison the VFS", + ); + assert!( + ctx.clone_fatal_error().is_none(), + "commit_atomic_write transport failure should not be stored as fatal", ); assert_eq!( db.take_last_kv_error().as_deref(), Some("InjectedTransportError: atomic transport dropped"), ); + ctx.commit_atomic_write() + .expect("retry after unapplied atomic transport failure should succeed"); + assert!( + !ctx.state.read().write_buffer.in_atomic_write, + "successful retry should leave atomic-write mode", + ); +} + +#[test] +fn lost_commit_response_fails_later_on_head_fence_mismatch() { + let runtime = direct_runtime(); + let harness = DirectEngineHarness::new(); + let engine = runtime.block_on(harness.open_engine()); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); + let vfs = SqliteVfs::register_with_transport( + &next_test_name("sqlite-direct-vfs"), + transport, + harness.actor_id.clone(), + runtime.handle().clone(), + VfsConfig::default(), + None, + ) + .expect("v2 vfs should register"); + let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open"); + let ctx = direct_vfs_ctx(&db); + + { + let mut state = ctx.state.write(); + state.write_buffer.dirty.insert(1, empty_db_page()); + state.db_size_pages = 1; + } + + hooks.fail_next_commit_after_apply("InjectedTransportError: commit response lost"); + let err = ctx + .flush_dirty_pages() + .expect_err("lost commit response should surface as a transport error"); + assert!( + matches!(err, CommitBufferError::Other(ref message) if message.contains("commit response lost")), + "lost response should be ambiguous before the next fence check: {err:?}", + ); + assert!( + !ctx.is_dead(), + "ambiguous lost response should not immediately poison the VFS" + ); + assert!(ctx.clone_fatal_error().is_none()); + + let err = ctx + .flush_dirty_pages() + .expect_err("retry after applied lost response should hit the stale head fence"); + assert!( + matches!(err, CommitBufferError::FenceMismatch(ref message) if message.contains("head fence mismatch")), + "retry should confirm the stale fence: {err:?}", + ); + assert!(ctx.is_dead()); + assert!( + ctx.clone_fatal_error() + .is_some_and(|message| message.contains("head fence mismatch")), + "confirmed fence mismatch should be stored as fatal" + ); +} + +#[test] +fn unapplied_commit_transport_failure_can_retry_from_same_head() { + let runtime = direct_runtime(); + let harness = DirectEngineHarness::new(); + let engine = runtime.block_on(harness.open_engine()); + let transport = Arc::new(DirectDepotTransport::new(engine)); + let hooks = transport.direct_hooks(); + let vfs = SqliteVfs::register_with_transport( + &next_test_name("sqlite-direct-vfs"), + transport, + harness.actor_id.clone(), + runtime.handle().clone(), + VfsConfig::default(), + None, + ) + .expect("v2 vfs should register"); + let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open"); + let ctx = direct_vfs_ctx(&db); + + { + let mut state = ctx.state.write(); + state.write_buffer.dirty.insert(1, empty_db_page()); + state.db_size_pages = 1; + } + + hooks.fail_next_commit("InjectedTransportError: commit dropped before apply"); + let err = ctx + .flush_dirty_pages() + .expect_err("pre-apply transport error should surface"); + assert!( + matches!(err, CommitBufferError::Other(ref message) if message.contains("before apply")), + "pre-apply failure should be generic transport error: {err:?}", + ); + assert!(!ctx.is_dead()); + assert!(ctx.clone_fatal_error().is_none()); + + ctx.flush_dirty_pages() + .expect("retry from same head should succeed"); + let state = ctx.state.read(); + assert!(state.write_buffer.dirty.is_empty()); + assert!(state.head_txid.is_some()); } #[test] @@ -3038,8 +3162,8 @@ fn direct_engine_fresh_reopen_recovers_after_poisoned_handle() { "sqlite should surface transport failure as an IO error: {err}", ); assert!( - direct_vfs_ctx(&db).is_dead(), - "transport error should kill the live VFS", + !direct_vfs_ctx(&db).is_dead(), + "transport error should not poison the live VFS", ); drop(db); diff --git a/engine/packages/depot-client/tests/inline/vfs_support.rs b/engine/packages/depot-client/tests/inline/vfs_support.rs index 803819cd8d..3ff9b5a561 100644 --- a/engine/packages/depot-client/tests/inline/vfs_support.rs +++ b/engine/packages/depot-client/tests/inline/vfs_support.rs @@ -480,11 +480,16 @@ impl SqliteTransport for DirectDepotTransport { ) .await { - Ok(result) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk( - protocol::SqliteCommitOk { - head_txid: Some(result.head_txid), - }, - )), + Ok(result) => { + if let Some(message) = self.storage.hooks.take_commit_after_apply_error() { + return Err(anyhow::anyhow!(message)); + } + Ok(protocol::SqliteCommitResponse::SqliteCommitOk( + protocol::SqliteCommitOk { + head_txid: Some(result.head_txid), + }, + )) + } Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse( sqlite_error_response(&err), )), @@ -603,6 +608,7 @@ impl ColdTier for CountingColdTier { #[derive(Default)] pub(crate) struct DirectTransportHooks { fail_next_commit: Mutex>, + fail_next_commit_after_apply: Mutex>, fail_next_get_pages: Mutex>, hang_next_commit: Mutex, pause_next_commit: Mutex>, @@ -615,6 +621,10 @@ impl DirectTransportHooks { *self.fail_next_commit.lock() = Some(message.into()); } + pub(crate) fn fail_next_commit_after_apply(&self, message: impl Into) { + *self.fail_next_commit_after_apply.lock() = Some(message.into()); + } + pub(crate) fn fail_next_get_pages(&self, message: impl Into) { *self.fail_next_get_pages.lock() = Some(message.into()); } @@ -660,6 +670,10 @@ impl DirectTransportHooks { self.fail_next_commit.lock().take() } + pub(crate) fn take_commit_after_apply_error(&self) -> Option { + self.fail_next_commit_after_apply.lock().take() + } + pub(crate) fn take_get_pages_error(&self) -> Option { self.fail_next_get_pages.lock().take() }