Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions engine/packages/depot-client/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ pub struct BufferedCommitOutcome {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommitBufferError {
FenceMismatch(String),
StageNotFound(u64),
Other(String),
}

Expand Down Expand Up @@ -338,6 +337,7 @@ pub struct VfsContext {
state: RwLock<VfsState>,
aux_files: RwLock<BTreeMap<String, Arc<AuxFileState>>>,
last_error: Mutex<Option<String>>,
transient_commit_error: Mutex<Option<String>>,
fatal_error: RwLock<Option<String>>,
#[cfg(test)]
fail_next_aux_open: Mutex<Option<String>>,
Expand Down Expand Up @@ -432,7 +432,7 @@ struct RecentPageAccess {

#[derive(Debug)]
enum GetPagesError {
Fatal(String),
FenceMismatch(String),
Other(String),
}

Expand Down Expand Up @@ -980,6 +980,7 @@ impl VfsContext {
state: RwLock::new(state),
aux_files: RwLock::new(BTreeMap::new()),
last_error: Mutex::new(None),
transient_commit_error: Mutex::new(None),
fatal_error: RwLock::new(None),
#[cfg(test)]
fail_next_aux_open: Mutex::new(None),
Expand Down Expand Up @@ -1018,6 +1019,14 @@ impl VfsContext {
self.fatal_error.read().clone()
}

fn defer_transient_commit_error(&self, message: String) {
*self.transient_commit_error.lock() = Some(message);
}

fn take_transient_commit_error(&self) -> Option<String> {
self.transient_commit_error.lock().take()
}

pub(crate) fn take_last_error(&self) -> Option<String> {
self.last_error.lock().take()
}
Expand Down Expand Up @@ -1143,13 +1152,9 @@ impl VfsContext {
self.state.read().dead
}

fn mark_dead(&self, message: String) {
self.set_last_error(message);
self.state.write().dead = true;
}

fn mark_fatal(&self, message: String) {
self.mark_dead(message.clone());
self.set_last_error(message.clone());
self.state.write().dead = true;
let mut fatal_error = self.fatal_error.write();
if fatal_error.is_none() {
*fatal_error = Some(message);
Expand Down Expand Up @@ -1455,7 +1460,7 @@ impl VfsContext {
return Ok(resolved);
}
if is_head_fence_mismatch_response(&error) {
return Err(GetPagesError::Fatal(error.message));
return Err(GetPagesError::FenceMismatch(error.message));
}
Err(GetPagesError::Other(error.message))
}
Expand Down Expand Up @@ -1755,12 +1760,7 @@ fn assert_batch_atomic_probe(db: *mut sqlite3, vfs: &SqliteVfs) -> std::result::
fn handle_non_finalize_commit_error(ctx: &VfsContext, err: &CommitBufferError) {
match err {
CommitBufferError::FenceMismatch(message) => ctx.mark_fatal(message.clone()),
CommitBufferError::StageNotFound(stage_id) => {
ctx.mark_dead(format!(
"sqlite stage {stage_id} missing during commit finalize"
));
}
CommitBufferError::Other(message) => ctx.mark_dead(message.clone()),
CommitBufferError::Other(message) => ctx.set_last_error(message.clone()),
}
}

Expand Down Expand Up @@ -2238,7 +2238,7 @@ unsafe extern "C" fn io_read(

let resolved = match ctx.resolve_pages(&requested_pages, true) {
Ok(pages) => pages,
Err(GetPagesError::Fatal(message)) => {
Err(GetPagesError::FenceMismatch(message)) => {
tracing::error!(
actor_id = %ctx.actor_id,
requested_pages = ?requested_pages,
Expand All @@ -2255,7 +2255,7 @@ unsafe extern "C" fn io_read(
error = %message,
"sqlite xRead failed to resolve pages"
);
ctx.mark_dead(message);
ctx.set_last_error(message);
return SQLITE_IOERR_READ;
}
};
Expand Down Expand Up @@ -2380,12 +2380,12 @@ unsafe extern "C" fn io_write(
} else {
match ctx.resolve_pages(&pages_to_resolve, false) {
Ok(pages) => pages,
Err(GetPagesError::Fatal(message)) => {
Err(GetPagesError::FenceMismatch(message)) => {
ctx.mark_fatal(message);
return SQLITE_IOERR_WRITE;
}
Err(GetPagesError::Other(message)) => {
ctx.mark_dead(message);
ctx.set_last_error(message);
return SQLITE_IOERR_WRITE;
}
}
Expand Down Expand Up @@ -2489,6 +2489,10 @@ unsafe extern "C" fn io_sync(p_file: *mut sqlite3_file, _flags: c_int) -> c_int
return SQLITE_OK;
}
let ctx = &*file.ctx;
if let Some(message) = ctx.take_transient_commit_error() {
ctx.set_last_error(message);
return SQLITE_IOERR_FSYNC;
}
match ctx.flush_dirty_pages() {
Ok(_) => SQLITE_OK,
Err(err) => {
Expand Down Expand Up @@ -2575,6 +2579,9 @@ unsafe extern "C" fn io_file_control(
?err,
"sqlite atomic write file control failed"
);
if let CommitBufferError::Other(message) = &err {
ctx.defer_transient_commit_error(message.clone());
}
handle_finalize_fence_error(ctx, &err);
SQLITE_IOERR
}
Expand Down Expand Up @@ -3029,6 +3036,8 @@ impl Drop for NativeDatabase {
Err(err) => {
handle_non_finalize_commit_error(ctx, &err);
tracing::warn!(?err, "failed to flush sqlite database before close");
self.db = ptr::null_mut();
return;
}
}
}
Expand Down
162 changes: 143 additions & 19 deletions engine/packages/depot-client/tests/inline/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2399,29 +2399,36 @@ fn direct_engine_marks_vfs_dead_after_transport_errors() {
)
.expect("v2 vfs should register");
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
let ctx = direct_vfs_ctx(&db);

{
let mut state = ctx.state.write();
state.write_buffer.dirty.insert(1, empty_db_page());
state.db_size_pages = 1;
}

hooks.fail_next_commit("InjectedTransportError: commit transport dropped");
let err = sqlite_exec(
db.as_ptr(),
"CREATE TABLE broken (id INTEGER PRIMARY KEY, value TEXT NOT NULL);",
)
.expect_err("failing transport commit should surface as an IO error");
let err = ctx
.flush_dirty_pages()
.expect_err("failing transport commit should surface as a VFS error");
assert!(
err.contains("I/O") || err.contains("disk I/O"),
"sqlite should surface transport failure as an IO error: {err}",
matches!(err, CommitBufferError::Other(ref message) if message.contains("InjectedTransportError")),
"VFS should surface transport failure as an IO error: {err:?}",
);
assert!(
direct_vfs_ctx(&db).is_dead(),
"transport error should kill the v2 VFS"
!ctx.is_dead(),
"transport error should not poison the v2 VFS"
);
assert!(
ctx.clone_fatal_error().is_none(),
"transport error should not be stored as fatal"
);
assert_eq!(
db.take_last_kv_error().as_deref(),
Some("InjectedTransportError: commit transport dropped"),
);
assert!(
sqlite_query_i64(db.as_ptr(), "PRAGMA page_count;").is_err(),
"subsequent reads should fail once the VFS is dead",
);
ctx.flush_dirty_pages()
.expect("retry after unapplied transport failure should succeed");
}

#[test]
Expand Down Expand Up @@ -2459,13 +2466,23 @@ fn flush_dirty_pages_marks_vfs_dead_after_transport_error() {
"flush failure should surface as a transport error: {err:?}",
);
assert!(
ctx.is_dead(),
"flush transport failure should poison the VFS"
!ctx.is_dead(),
"flush transport failure should not poison the VFS"
);
assert!(
ctx.clone_fatal_error().is_none(),
"flush transport failure should not be stored as fatal"
);
assert_eq!(
db.take_last_kv_error().as_deref(),
Some("InjectedTransportError: flush transport dropped"),
);
ctx.flush_dirty_pages()
.expect("retry after unapplied transport failure should succeed");
assert!(
ctx.state.read().write_buffer.dirty.is_empty(),
"successful retry should clear dirty pages",
);
}

#[test]
Expand Down Expand Up @@ -2505,13 +2522,120 @@ fn commit_atomic_write_marks_vfs_dead_after_transport_error() {
"atomic-write failure should surface as a transport error: {err:?}",
);
assert!(
ctx.is_dead(),
"commit_atomic_write transport failure should poison the VFS",
!ctx.is_dead(),
"commit_atomic_write transport failure should not poison the VFS",
);
assert!(
ctx.clone_fatal_error().is_none(),
"commit_atomic_write transport failure should not be stored as fatal",
);
assert_eq!(
db.take_last_kv_error().as_deref(),
Some("InjectedTransportError: atomic transport dropped"),
);
ctx.commit_atomic_write()
.expect("retry after unapplied atomic transport failure should succeed");
assert!(
!ctx.state.read().write_buffer.in_atomic_write,
"successful retry should leave atomic-write mode",
);
}

#[test]
fn lost_commit_response_fails_later_on_head_fence_mismatch() {
let runtime = direct_runtime();
let harness = DirectEngineHarness::new();
let engine = runtime.block_on(harness.open_engine());
let transport = Arc::new(DirectDepotTransport::new(engine));
let hooks = transport.direct_hooks();
let vfs = SqliteVfs::register_with_transport(
&next_test_name("sqlite-direct-vfs"),
transport,
harness.actor_id.clone(),
runtime.handle().clone(),
VfsConfig::default(),
None,
)
.expect("v2 vfs should register");
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
let ctx = direct_vfs_ctx(&db);

{
let mut state = ctx.state.write();
state.write_buffer.dirty.insert(1, empty_db_page());
state.db_size_pages = 1;
}

hooks.fail_next_commit_after_apply("InjectedTransportError: commit response lost");
let err = ctx
.flush_dirty_pages()
.expect_err("lost commit response should surface as a transport error");
assert!(
matches!(err, CommitBufferError::Other(ref message) if message.contains("commit response lost")),
"lost response should be ambiguous before the next fence check: {err:?}",
);
assert!(
!ctx.is_dead(),
"ambiguous lost response should not immediately poison the VFS"
);
assert!(ctx.clone_fatal_error().is_none());

let err = ctx
.flush_dirty_pages()
.expect_err("retry after applied lost response should hit the stale head fence");
assert!(
matches!(err, CommitBufferError::FenceMismatch(ref message) if message.contains("head fence mismatch")),
"retry should confirm the stale fence: {err:?}",
);
assert!(ctx.is_dead());
assert!(
ctx.clone_fatal_error()
.is_some_and(|message| message.contains("head fence mismatch")),
"confirmed fence mismatch should be stored as fatal"
);
}

#[test]
fn unapplied_commit_transport_failure_can_retry_from_same_head() {
let runtime = direct_runtime();
let harness = DirectEngineHarness::new();
let engine = runtime.block_on(harness.open_engine());
let transport = Arc::new(DirectDepotTransport::new(engine));
let hooks = transport.direct_hooks();
let vfs = SqliteVfs::register_with_transport(
&next_test_name("sqlite-direct-vfs"),
transport,
harness.actor_id.clone(),
runtime.handle().clone(),
VfsConfig::default(),
None,
)
.expect("v2 vfs should register");
let db = open_database(vfs, &harness.actor_id).expect("sqlite database should open");
let ctx = direct_vfs_ctx(&db);

{
let mut state = ctx.state.write();
state.write_buffer.dirty.insert(1, empty_db_page());
state.db_size_pages = 1;
}

hooks.fail_next_commit("InjectedTransportError: commit dropped before apply");
let err = ctx
.flush_dirty_pages()
.expect_err("pre-apply transport error should surface");
assert!(
matches!(err, CommitBufferError::Other(ref message) if message.contains("before apply")),
"pre-apply failure should be generic transport error: {err:?}",
);
assert!(!ctx.is_dead());
assert!(ctx.clone_fatal_error().is_none());

ctx.flush_dirty_pages()
.expect("retry from same head should succeed");
let state = ctx.state.read();
assert!(state.write_buffer.dirty.is_empty());
assert!(state.head_txid.is_some());
}

#[test]
Expand Down Expand Up @@ -3038,8 +3162,8 @@ fn direct_engine_fresh_reopen_recovers_after_poisoned_handle() {
"sqlite should surface transport failure as an IO error: {err}",
);
assert!(
direct_vfs_ctx(&db).is_dead(),
"transport error should kill the live VFS",
!direct_vfs_ctx(&db).is_dead(),
"transport error should not poison the live VFS",
);

drop(db);
Expand Down
Loading
Loading