diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index f0b099c1fe..499c088540 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -400,6 +400,10 @@ async fn handle_message( crate::metrics::SQLITE_COMMIT_ENVOY_RESPONSE_DURATION .observe(timed_response.commit_completed_at.elapsed().as_secs_f64()); } + protocol::ToRivet::ToRivetSqlitePersistPreloadHintsRequest(req) => { + let response = handle_sqlite_persist_preload_hints_response(ctx, &conn, req.data).await; + send_sqlite_persist_preload_hints_response(&conn, req.request_id, response).await?; + } protocol::ToRivet::ToRivetSqliteExecRequest(req) => { send_sqlite_exec_response( &conn, @@ -512,6 +516,27 @@ async fn handle_sqlite_commit_response( } } +async fn handle_sqlite_persist_preload_hints_response( + ctx: &StandaloneCtx, + conn: &Conn, + request: protocol::SqlitePersistPreloadHintsRequest, +) -> protocol::SqlitePersistPreloadHintsResponse { + let actor_id = request.actor_id.clone(); + match handle_sqlite_persist_preload_hints(ctx, conn, request).await { + Ok(response) => response, + Err(err) => { + tracing::error!( + actor_id = %actor_id, + ?err, + "sqlite persist preload hints request failed" + ); + protocol::SqlitePersistPreloadHintsResponse::SqliteErrorResponse( + sqlite_error_response(&err), + ) + } + } +} + async fn ack_commands( ctx: &StandaloneCtx, namespace_id: Id, @@ -728,6 +753,15 @@ async fn handle_sqlite_commit( Ok(response) } +async fn handle_sqlite_persist_preload_hints( + ctx: &StandaloneCtx, + conn: &Conn, + request: protocol::SqlitePersistPreloadHintsRequest, +) -> Result { + validate_sqlite_actor(ctx, conn, &request.actor_id).await?; + Ok(protocol::SqlitePersistPreloadHintsResponse::SqlitePersistPreloadHintsOk) +} + async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str) -> Result<()> { let actor_id = Id::parse(actor_id).context("invalid sqlite actor id")?; let actor = ctx @@ -884,6 +918,21 @@ async fn send_sqlite_commit_response( .await } +async fn send_sqlite_persist_preload_hints_response( + conn: &Conn, + request_id: u32, + data: protocol::SqlitePersistPreloadHintsResponse, +) -> Result<()> { + send_to_envoy( + conn, + protocol::ToEnvoy::ToEnvoySqlitePersistPreloadHintsResponse( + protocol::ToEnvoySqlitePersistPreloadHintsResponse { request_id, data }, + ), + "sqlite persist preload hints response", + ) + .await +} + async fn send_sqlite_exec_response( conn: &Conn, request_id: u32, diff --git a/engine/sdks/rust/envoy-client/src/envoy.rs b/engine/sdks/rust/envoy-client/src/envoy.rs index b79e28e4af..85cdafd7bf 100644 --- a/engine/sdks/rust/envoy-client/src/envoy.rs +++ b/engine/sdks/rust/envoy-client/src/envoy.rs @@ -30,7 +30,8 @@ use crate::sqlite::{ fail_sent_remote_sqlite_requests_with_indeterminate_result, fail_sqlite_requests_with_shutdown, handle_remote_sqlite_exec_response, handle_remote_sqlite_execute_response, handle_remote_sqlite_execute_write_response, handle_remote_sqlite_request, - handle_sqlite_commit_response, handle_sqlite_get_pages_response, handle_sqlite_request, + handle_sqlite_commit_response, handle_sqlite_get_pages_response, + handle_sqlite_persist_preload_hints_response, handle_sqlite_request, process_unsent_remote_sqlite_requests, process_unsent_sqlite_requests, }; use crate::tunnel::{ @@ -542,6 +543,9 @@ async fn handle_conn_message( protocol::ToEnvoy::ToEnvoySqliteCommitResponse(response) => { handle_sqlite_commit_response(ctx, response).await; } + protocol::ToEnvoy::ToEnvoySqlitePersistPreloadHintsResponse(response) => { + handle_sqlite_persist_preload_hints_response(ctx, response).await; + } protocol::ToEnvoy::ToEnvoySqliteExecResponse(response) => { handle_remote_sqlite_exec_response(ctx, response).await; } diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index 5e3af875ec..cb468d1553 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -419,6 +419,34 @@ impl EnvoyHandle { } } + pub async fn sqlite_persist_preload_hints( + &self, + request: protocol::SqlitePersistPreloadHintsRequest, + ) -> anyhow::Result { + match self + .send_sqlite_request(SqliteRequest::PersistPreloadHints(request)) + .await? + { + SqliteResponse::PersistPreloadHints(response) => Ok(response), + _ => anyhow::bail!("unexpected sqlite persist preload hints response type"), + } + } + + pub fn sqlite_persist_preload_hints_fire_and_forget( + &self, + request: protocol::SqlitePersistPreloadHintsRequest, + ) -> anyhow::Result<()> { + let (tx, _rx) = tokio::sync::oneshot::channel(); + self.shared + .envoy_tx + .send(ToEnvoyMessage::SqliteRequest { + request: SqliteRequest::PersistPreloadHints(request), + response_tx: tx, + }) + .map_err(|_| anyhow::anyhow!("envoy channel closed"))?; + Ok(()) + } + pub async fn remote_sqlite_exec( &self, request: protocol::SqliteExecRequest, diff --git a/engine/sdks/rust/envoy-client/src/sqlite.rs b/engine/sdks/rust/envoy-client/src/sqlite.rs index a18e7c3c53..8bd826b34a 100644 --- a/engine/sdks/rust/envoy-client/src/sqlite.rs +++ b/engine/sdks/rust/envoy-client/src/sqlite.rs @@ -10,11 +10,13 @@ use crate::utils::{EnvoyShutdownError, RemoteSqliteIndeterminateResultError}; pub enum SqliteRequest { GetPages(protocol::SqliteGetPagesRequest), Commit(protocol::SqliteCommitRequest), + PersistPreloadHints(protocol::SqlitePersistPreloadHintsRequest), } pub enum SqliteResponse { GetPages(protocol::SqliteGetPagesResponse), Commit(protocol::SqliteCommitResponse), + PersistPreloadHints(protocol::SqlitePersistPreloadHintsResponse), } #[derive(Clone, Debug)] @@ -133,6 +135,18 @@ pub async fn handle_sqlite_commit_response( ); } +pub async fn handle_sqlite_persist_preload_hints_response( + ctx: &mut EnvoyContext, + response: protocol::ToEnvoySqlitePersistPreloadHintsResponse, +) { + handle_sqlite_response( + ctx, + response.request_id, + SqliteResponse::PersistPreloadHints(response.data), + "sqlite_persist_preload_hints", + ); +} + pub async fn handle_remote_sqlite_exec_response( ctx: &mut EnvoyContext, response: protocol::ToEnvoySqliteExecResponse, @@ -222,6 +236,11 @@ pub async fn send_single_sqlite_request(ctx: &mut EnvoyContext, request_id: u32) SqliteRequest::Commit(data) => protocol::ToRivet::ToRivetSqliteCommitRequest( protocol::ToRivetSqliteCommitRequest { request_id, data }, ), + SqliteRequest::PersistPreloadHints(data) => { + protocol::ToRivet::ToRivetSqlitePersistPreloadHintsRequest( + protocol::ToRivetSqlitePersistPreloadHintsRequest { request_id, data }, + ) + } }; ws_send(&ctx.shared, message).await; diff --git a/engine/sdks/rust/envoy-client/src/stringify.rs b/engine/sdks/rust/envoy-client/src/stringify.rs index 9c21fd3a8a..f41dd81694 100644 --- a/engine/sdks/rust/envoy-client/src/stringify.rs +++ b/engine/sdks/rust/envoy-client/src/stringify.rs @@ -275,6 +275,12 @@ pub fn stringify_to_rivet(message: &protocol::ToRivet) -> String { val.request_id ) } + protocol::ToRivet::ToRivetSqlitePersistPreloadHintsRequest(val) => { + format!( + "ToRivetSqlitePersistPreloadHintsRequest{{requestId: {}, actorId: \"{}\", generation: {}}}", + val.request_id, val.data.actor_id, val.data.generation + ) + } protocol::ToRivet::ToRivetSqliteExecRequest(val) => { format!( "ToRivetSqliteExecRequest{{requestId: {}, actorId: \"{}\", generation: {}}}", @@ -345,6 +351,12 @@ pub fn stringify_to_envoy(message: &protocol::ToEnvoy) -> String { val.request_id ) } + protocol::ToEnvoy::ToEnvoySqlitePersistPreloadHintsResponse(val) => { + format!( + "ToEnvoySqlitePersistPreloadHintsResponse{{requestId: {}}}", + val.request_id + ) + } protocol::ToEnvoy::ToEnvoySqliteExecResponse(val) => { format!("ToEnvoySqliteExecResponse{{requestId: {}}}", val.request_id) }