diff --git a/engine/sdks/rust/envoy-client/src/actor.rs b/engine/sdks/rust/envoy-client/src/actor.rs index 6c2583e6b8..1a6cac63c5 100644 --- a/engine/sdks/rust/envoy-client/src/actor.rs +++ b/engine/sdks/rust/envoy-client/src/actor.rs @@ -146,6 +146,7 @@ pub fn create_actor( #[tracing::instrument( skip_all, fields( + envoy_key = %shared.envoy_key, actor_id = %actor_id, generation = generation, actor_key = %config.key.as_deref().unwrap_or(""), diff --git a/engine/sdks/rust/envoy-client/src/connection/native.rs b/engine/sdks/rust/envoy-client/src/connection/native.rs index 50631c0a33..2c95d2e261 100644 --- a/engine/sdks/rust/envoy-client/src/connection/native.rs +++ b/engine/sdks/rust/envoy-client/src/connection/native.rs @@ -5,6 +5,7 @@ use futures_util::{SinkExt, StreamExt}; use rivet_envoy_protocol as protocol; use tokio::sync::mpsc; use tokio_tungstenite::tungstenite; +use tracing::Instrument; use vbare::OwnedVersionedData; use crate::context::{SharedContext, WsTxMessage}; @@ -14,7 +15,8 @@ use crate::utils::{BackoffOptions, calculate_backoff, parse_ws_close_reason}; const STABLE_CONNECTION_MS: u64 = 60_000; pub fn start_connection(shared: Arc) { - tokio::spawn(connection_loop(shared)); + let span = tracing::debug_span!("envoy_connection", envoy_key = %shared.envoy_key); + tokio::spawn(connection_loop(shared).instrument(span)); } async fn connection_loop(shared: Arc) { @@ -118,31 +120,38 @@ async fn single_connection( // Spawn write task let shared2 = shared.clone(); - let write_handle = tokio::spawn(async move { - super::send_initial_metadata(&shared2).await; - - while let Some(msg) = ws_rx.recv().await { - match msg { - WsTxMessage::Send(data) => { - if let Err(e) = write.send(tungstenite::Message::Binary(data.into())).await { - tracing::error!(?e, "failed to send ws message"); + let write_span = tracing::debug_span!("envoy_ws_write", envoy_key = %shared2.envoy_key); + let write_handle = tokio::spawn( + async move { + super::send_initial_metadata(&shared2).await; + + while let Some(msg) = ws_rx.recv().await { + match msg { + WsTxMessage::Send(data) => { + let result = write + .send(tungstenite::Message::Binary(data.into())) + .await; + if let Err(e) = result { + tracing::error!(?e, "failed to send ws message"); + break; + } + } + WsTxMessage::Close => { + let _ = write + .send(tungstenite::Message::Close(Some( + tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: "envoy.shutdown".into(), + }, + ))) + .await; break; } } - WsTxMessage::Close => { - let _ = write - .send(tungstenite::Message::Close(Some( - tungstenite::protocol::CloseFrame { - code: tungstenite::protocol::frame::coding::CloseCode::Normal, - reason: "envoy.shutdown".into(), - }, - ))) - .await; - break; - } } } - }); + .instrument(write_span), + ); let mut result = None; diff --git a/engine/sdks/rust/envoy-client/src/connection/wasm.rs b/engine/sdks/rust/envoy-client/src/connection/wasm.rs index c9a3d87037..03ac95b111 100644 --- a/engine/sdks/rust/envoy-client/src/connection/wasm.rs +++ b/engine/sdks/rust/envoy-client/src/connection/wasm.rs @@ -7,6 +7,7 @@ mod imp { use js_sys::{Array, Function, Promise, Reflect, Uint8Array}; use rivet_envoy_protocol as protocol; use tokio::sync::mpsc; + use tracing::Instrument; use vbare::OwnedVersionedData; use wasm_bindgen::{JsCast, JsValue, closure::Closure}; use wasm_bindgen_futures::JsFuture; @@ -28,7 +29,8 @@ mod imp { } pub fn start_connection(shared: Arc) { - wasm_bindgen_futures::spawn_local(connection_loop(shared)); + let span = tracing::debug_span!("envoy_connection", envoy_key = %shared.envoy_key); + wasm_bindgen_futures::spawn_local(connection_loop(shared).instrument(span)); } async fn connection_loop(shared: Arc) { @@ -102,10 +104,11 @@ mod imp { let onmessage = { let event_tx = event_tx.clone(); + let envoy_key = shared.envoy_key.clone(); Closure::::wrap(Box::new(move |event| { let data = event.data(); let Some(bytes) = decode_message_data(data) else { - tracing::warn!("received non-binary websocket message"); + tracing::warn!(%envoy_key, "received non-binary websocket message"); return; }; let _ = event_tx.send(ConnectionEvent::Message(bytes)); @@ -168,6 +171,7 @@ mod imp { let shared = shared.clone(); let ws = ws.clone(); let event_tx = event_tx.clone(); + let write_span = tracing::debug_span!("envoy_ws_write", envoy_key = %shared.envoy_key); async move { super::super::send_initial_metadata(&shared).await; @@ -189,6 +193,7 @@ mod imp { } } } + .instrument(write_span) }); let mut result = None; diff --git a/engine/sdks/rust/envoy-client/src/envoy.rs b/engine/sdks/rust/envoy-client/src/envoy.rs index ba040d722e..0218471b8b 100644 --- a/engine/sdks/rust/envoy-client/src/envoy.rs +++ b/engine/sdks/rust/envoy-client/src/envoy.rs @@ -11,6 +11,7 @@ use crate::async_counter::AsyncCounter; use rivet_envoy_protocol as protocol; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tracing::Instrument; use crate::actor::ToActor; use crate::commands::{ACK_COMMANDS_INTERVAL_MS, handle_commands, send_command_ack}; @@ -332,9 +333,9 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle { processed_command_idx: HashMap::new(), }; - tracing::info!("starting envoy"); - - spawn_detached(envoy_loop(ctx, envoy_rx, start_tx)); + tracing::info!(envoy_key = %shared.envoy_key, "starting envoy"); + let span = tracing::info_span!("envoy_client", envoy_key = %shared.envoy_key); + spawn_detached(envoy_loop(ctx, envoy_rx, start_tx).instrument(span)); handle } @@ -603,9 +604,17 @@ async fn handle_shutdown(ctx: &mut EnvoyContext) { .collect(); let envoy_tx = ctx.shared.envoy_tx.clone(); - spawn_detached(async move { - futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await; - tracing::debug!("all actors stopped during graceful shutdown"); - let _ = envoy_tx.send(ToEnvoyMessage::Stop); - }); + let shutdown_span = tracing::debug_span!( + parent: tracing::Span::current(), + "envoy_graceful_shutdown", + envoy_key = %ctx.shared.envoy_key, + ); + spawn_detached( + async move { + futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await; + tracing::debug!("all actors stopped during graceful shutdown"); + let _ = envoy_tx.send(ToEnvoyMessage::Stop); + } + .instrument(shutdown_span), + ); } diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index 1839c351bf..d724b449d0 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -541,12 +541,18 @@ impl EnvoyHandle { /// Inject a serverless start payload into the envoy. /// The payload is a u16 LE protocol version followed by a serialized ToEnvoy message. pub async fn start_serverless_actor(&self, payload: &[u8]) -> anyhow::Result<()> { + tracing::debug!( + envoy_key = %self.shared.envoy_key, + payload_len = payload.len(), + "received serverless start request" + ); let (message, _) = decode_serverless_actor_start_payload(payload)?; // Wait for envoy to be started before injecting self.started().await?; tracing::debug!( + envoy_key = %self.shared.envoy_key, data = crate::stringify::stringify_to_envoy(&message), "received serverless start" );