diff --git a/rivetkit-rust/packages/client/src/drivers/mod.rs b/rivetkit-rust/packages/client/src/drivers/mod.rs index 131e7bc2ca..bdfae84b1f 100644 --- a/rivetkit-rust/packages/client/src/drivers/mod.rs +++ b/rivetkit-rust/packages/client/src/drivers/mod.rs @@ -30,11 +30,11 @@ pub enum DriverStopReason { #[derive(Debug)] pub struct DriverHandle { abort_handle: AbortHandle, - sender: mpsc::Sender, + sender: mpsc::UnboundedSender, } impl DriverHandle { - pub fn new(sender: mpsc::Sender, abort_handle: AbortHandle) -> Self { + pub fn new(sender: mpsc::UnboundedSender, abort_handle: AbortHandle) -> Self { Self { sender, abort_handle, @@ -42,7 +42,7 @@ impl DriverHandle { } pub async fn send(&self, msg: Arc) -> Result<()> { - self.sender.send(msg).await?; + self.sender.send(msg)?; Ok(()) } @@ -61,7 +61,7 @@ impl Drop for DriverHandle { pub type DriverConnection = ( DriverHandle, - mpsc::Receiver, + mpsc::UnboundedReceiver, JoinHandle, ); diff --git a/rivetkit-rust/packages/client/src/drivers/ws.rs b/rivetkit-rust/packages/client/src/drivers/ws.rs index 5161e82641..6129cdd82f 100644 --- a/rivetkit-rust/packages/client/src/drivers/ws.rs +++ b/rivetkit-rust/packages/client/src/drivers/ws.rs @@ -37,8 +37,8 @@ pub(crate) async fn connect(args: DriverConnectArgs) -> Result .await .context("Failed to connect to WebSocket via gateway")?; - let (in_tx, in_rx) = mpsc::channel::(32); - let (out_tx, out_rx) = mpsc::channel::(32); + let (in_tx, in_rx) = mpsc::unbounded_channel::(); + let (out_tx, out_rx) = mpsc::unbounded_channel::(); let task = tokio::spawn(start(ws, args.encoding_kind, in_tx, out_rx)); let handle = DriverHandle::new(out_tx, task.abort_handle()); @@ -51,8 +51,8 @@ async fn start( tokio_tungstenite::MaybeTlsStream, >, encoding_kind: EncodingKind, - in_tx: mpsc::Sender, - mut out_rx: mpsc::Receiver, + in_tx: mpsc::UnboundedSender, + mut out_rx: mpsc::UnboundedReceiver, ) -> DriverStopReason { let (mut ws_sink, mut ws_stream) = ws.split(); @@ -85,7 +85,7 @@ async fn start( // Handle ws incoming msg = ws_stream.next() => { let Some(msg) = msg else { - println!("Receiver dropped"); + debug!("Receiver dropped"); return DriverStopReason::ServerDisconnect; }; @@ -97,7 +97,7 @@ async fn start( continue; }; - if let Err(e) = in_tx.send(Arc::new(msg)).await { + if let Err(e) = in_tx.send(Arc::new(msg)) { debug!("Failed to send text message: {}", e); // failure to send means user dropped incoming receiver return DriverStopReason::UserAborted; diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/config.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/config.rs index fac01c866f..bd4cd46de0 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/config.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/config.rs @@ -19,9 +19,6 @@ const DEFAULT_MAX_QUEUE_SIZE: u32 = 1000; const DEFAULT_MAX_QUEUE_MESSAGE_SIZE: u32 = 65_536; const DEFAULT_MAX_INCOMING_MESSAGE_SIZE: u32 = 65_536; const DEFAULT_MAX_OUTGOING_MESSAGE_SIZE: u32 = 1_048_576; -const DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY: usize = 64; -const DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY: usize = 1024; -const DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY: usize = 4096; #[derive(Clone)] pub enum CanHibernateWebSocket { @@ -83,9 +80,6 @@ pub struct ActorConfig { pub max_queue_message_size: u32, pub max_incoming_message_size: u32, pub max_outgoing_message_size: u32, - pub lifecycle_command_inbox_capacity: usize, - pub dispatch_command_inbox_capacity: usize, - pub lifecycle_event_inbox_capacity: usize, pub preload_max_workflow_bytes: Option, pub preload_max_connections_bytes: Option, pub overrides: Option, @@ -233,9 +227,6 @@ impl Default for ActorConfig { max_queue_message_size: DEFAULT_MAX_QUEUE_MESSAGE_SIZE, max_incoming_message_size: DEFAULT_MAX_INCOMING_MESSAGE_SIZE, max_outgoing_message_size: DEFAULT_MAX_OUTGOING_MESSAGE_SIZE, - lifecycle_command_inbox_capacity: DEFAULT_LIFECYCLE_COMMAND_INBOX_CAPACITY, - dispatch_command_inbox_capacity: DEFAULT_DISPATCH_COMMAND_INBOX_CAPACITY, - lifecycle_event_inbox_capacity: DEFAULT_LIFECYCLE_EVENT_INBOX_CAPACITY, preload_max_workflow_bytes: None, preload_max_connections_bytes: None, overrides: None, diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index 935f65eb4b..a1adace45d 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -33,8 +33,6 @@ use crate::actor::schedule::{InternalKeepAwakeCallback, LocalAlarmCallback}; use crate::actor::sleep::{CanSleep, SleepState}; use crate::actor::state::{PendingSave, PersistedActor, RequestSaveOpts}; use crate::actor::task::LifecycleEvent; -#[cfg(not(target_arch = "wasm32"))] -use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error}; use crate::actor::task_types::UserTaskKind; use crate::actor::work_registry::{ActorWorkKind, CountGuard, RegionGuard}; use crate::error::{ActorLifecycle as ActorLifecycleError, ActorRuntime}; @@ -140,9 +138,8 @@ pub(crate) struct ActorContextInner { inspector_attach_count: RwLock>>, inspector_overlay_tx: RwLock>>>>, actor_events: RwLock>>, - pub(super) lifecycle_events: RwLock>>, + pub(super) lifecycle_events: RwLock>>, hibernated_connection_liveness_override: RwLock, Vec)>>>, - pub(super) lifecycle_event_inbox_capacity: usize, pub(super) metrics: ActorMetrics, diagnostics: ActorDiagnostics, actor_id: String, @@ -230,7 +227,6 @@ impl ActorContext { #[cfg(feature = "sqlite-local")] sql.set_vfs_metrics(Arc::new(metrics.clone())); let diagnostics = ActorDiagnostics::new(actor_id.clone()); - let lifecycle_event_inbox_capacity = config.lifecycle_event_inbox_capacity; let state_save_interval = config.state_save_interval; let abort_signal = CancellationToken::new(); let shutdown_deadline = CancellationToken::new(); @@ -307,7 +303,6 @@ impl ActorContext { actor_events: RwLock::new(None), lifecycle_events: RwLock::new(None), hibernated_connection_liveness_override: RwLock::new(None), - lifecycle_event_inbox_capacity, metrics, diagnostics, actor_id, @@ -1243,7 +1238,10 @@ impl ActorContext { .await } - pub(crate) fn configure_lifecycle_events(&self, sender: Option>) { + pub(crate) fn configure_lifecycle_events( + &self, + sender: Option>, + ) { *self.0.lifecycle_events.write() = sender; } @@ -1466,28 +1464,12 @@ impl ActorContext { } fn try_send_lifecycle_event(&self, event: LifecycleEvent, operation: &'static str) { - #[cfg(target_arch = "wasm32")] - let _ = operation; - let Some(sender) = self.0.lifecycle_events.read().clone() else { return; }; - match sender.try_reserve() { - Ok(permit) => { - permit.send(event); - } - #[cfg(target_arch = "wasm32")] - Err(_) => {} - #[cfg(not(target_arch = "wasm32"))] - Err(_) => { - let _ = actor_channel_overloaded_error( - LIFECYCLE_EVENT_INBOX_CHANNEL, - self.0.lifecycle_event_inbox_capacity, - operation, - Some(&self.0.metrics), - ); - } + if sender.send(event).is_err() { + tracing::warn!(operation, "failed to enqueue actor lifecycle event"); } } } diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs index 6fa37232cf..7bcc68bd03 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/diagnostics.rs @@ -11,7 +11,6 @@ const WARNING_LIMIT: usize = 3; // Forced-sync: warning windows are updated from synchronous diagnostics paths. static GLOBAL_WARNINGS: OnceLock>>> = OnceLock::new(); -static ACTOR_WARNINGS: OnceLock>>> = OnceLock::new(); #[derive(Debug)] pub(crate) struct ActorDiagnostics { @@ -43,25 +42,6 @@ impl ActorDiagnostics { } } -pub(crate) fn record_actor_warning( - actor_id: &str, - kind: &'static str, -) -> Option { - let actor_key = format!("{actor_id}:{kind}"); - let per_actor = record_limited_warning(actor_warnings(), actor_key, Instant::now()); - let global = record_limited_warning(global_warnings(), kind.to_owned(), Instant::now()); - - if per_actor.emit && global.emit { - Some(WarningSuppression { - actor_id: actor_id.to_owned(), - per_actor_suppressed: per_actor.suppressed, - global_suppressed: global.suppressed, - }) - } else { - None - } -} - #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct WarningSuppression { pub(crate) actor_id: String, @@ -138,7 +118,3 @@ fn record_limited_warning( fn global_warnings() -> &'static SccHashMap>> { GLOBAL_WARNINGS.get_or_init(SccHashMap::new) } - -fn actor_warnings() -> &'static SccHashMap>> { - ACTOR_WARNINGS.get_or_init(SccHashMap::new) -} diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs index ce9ba3fce3..443262cece 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs @@ -15,7 +15,6 @@ use crate::actor::task_types::{ShutdownKind, StateMutationReason, UserTaskKind}; #[derive(Clone)] pub(crate) struct ActorMetrics { - actor_id: Arc, inner: Arc>, } @@ -29,11 +28,8 @@ struct ActorMetricsInner { active_connections: IntGauge, connections_total: IntCounter, lifecycle_inbox_depth: IntGauge, - lifecycle_inbox_overload_total: CounterVec, dispatch_inbox_depth: IntGauge, - dispatch_inbox_overload_total: CounterVec, lifecycle_event_inbox_depth: IntGauge, - lifecycle_event_overload_total: CounterVec, user_tasks_active: IntGaugeVec, user_task_duration_seconds: HistogramVec, shutdown_wait_seconds: HistogramVec, @@ -100,10 +96,7 @@ impl ActorMetrics { } }; - Self { - actor_id: Arc::from(actor_id), - inner: Arc::new(inner), - } + Self { inner: Arc::new(inner) } } fn try_new_inner(actor_id: &str, actor_name: String) -> Result { @@ -154,40 +147,16 @@ impl ActorMetrics { "current actor lifecycle command inbox depth", )) .context("create lifecycle_inbox_depth gauge")?; - let lifecycle_inbox_overload_total = CounterVec::new( - Opts::new( - "lifecycle_inbox_overload_total", - "total actor lifecycle command inbox overloads", - ), - &["command"], - ) - .context("create lifecycle_inbox_overload_total counter")?; let dispatch_inbox_depth = IntGauge::with_opts(Opts::new( "dispatch_inbox_depth", "current actor dispatch command inbox depth", )) .context("create dispatch_inbox_depth gauge")?; - let dispatch_inbox_overload_total = CounterVec::new( - Opts::new( - "dispatch_inbox_overload_total", - "total actor dispatch command inbox overloads", - ), - &["command"], - ) - .context("create dispatch_inbox_overload_total counter")?; let lifecycle_event_inbox_depth = IntGauge::with_opts(Opts::new( "lifecycle_event_inbox_depth", "current actor lifecycle event inbox depth", )) .context("create lifecycle_event_inbox_depth gauge")?; - let lifecycle_event_overload_total = CounterVec::new( - Opts::new( - "lifecycle_event_overload_total", - "total actor lifecycle event inbox overloads", - ), - &["event"], - ) - .context("create lifecycle_event_overload_total counter")?; let user_tasks_active = IntGaugeVec::new( Opts::new("user_tasks_active", "current active actor user tasks"), &["kind"], @@ -385,11 +354,8 @@ impl ActorMetrics { register_metric(®istry, active_connections.clone()); register_metric(®istry, connections_total.clone()); register_metric(®istry, lifecycle_inbox_depth.clone()); - register_metric(®istry, lifecycle_inbox_overload_total.clone()); register_metric(®istry, dispatch_inbox_depth.clone()); - register_metric(®istry, dispatch_inbox_overload_total.clone()); register_metric(®istry, lifecycle_event_inbox_depth.clone()); - register_metric(®istry, lifecycle_event_overload_total.clone()); register_metric(®istry, user_tasks_active.clone()); register_metric(®istry, user_task_duration_seconds.clone()); register_metric(®istry, shutdown_wait_seconds.clone()); @@ -464,11 +430,8 @@ impl ActorMetrics { active_connections, connections_total, lifecycle_inbox_depth, - lifecycle_inbox_overload_total, dispatch_inbox_depth, - dispatch_inbox_overload_total, lifecycle_event_inbox_depth, - lifecycle_event_overload_total, user_tasks_active, user_task_duration_seconds, shutdown_wait_seconds, @@ -520,10 +483,6 @@ impl ActorMetrics { }) } - pub(crate) fn actor_id(&self) -> &str { - &self.actor_id - } - pub(crate) fn render(&self) -> Result { let Some(inner) = self.inner.as_ref().as_ref() else { return Ok(String::new()); @@ -600,16 +559,6 @@ impl ActorMetrics { .set(depth.try_into().unwrap_or(i64::MAX)); } - pub(crate) fn inc_lifecycle_inbox_overload(&self, command: &str) { - let Some(inner) = self.inner.as_ref().as_ref() else { - return; - }; - inner - .lifecycle_inbox_overload_total - .with_label_values(&[command]) - .inc(); - } - pub(crate) fn set_dispatch_inbox_depth(&self, depth: usize) { let Some(inner) = self.inner.as_ref().as_ref() else { return; @@ -619,16 +568,6 @@ impl ActorMetrics { .set(depth.try_into().unwrap_or(i64::MAX)); } - pub(crate) fn inc_dispatch_inbox_overload(&self, command: &str) { - let Some(inner) = self.inner.as_ref().as_ref() else { - return; - }; - inner - .dispatch_inbox_overload_total - .with_label_values(&[command]) - .inc(); - } - pub(crate) fn set_lifecycle_event_inbox_depth(&self, depth: usize) { let Some(inner) = self.inner.as_ref().as_ref() else { return; @@ -638,16 +577,6 @@ impl ActorMetrics { .set(depth.try_into().unwrap_or(i64::MAX)); } - pub(crate) fn inc_lifecycle_event_overload(&self, event: &str) { - let Some(inner) = self.inner.as_ref().as_ref() else { - return; - }; - inner - .lifecycle_event_overload_total - .with_label_values(&[event]) - .inc(); - } - pub(crate) fn begin_user_task(&self, kind: UserTaskKind) { let Some(inner) = self.inner.as_ref().as_ref() else { return; diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/state.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/state.rs index 55f2137bfd..cc1add1321 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/state.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/state.rs @@ -22,8 +22,6 @@ use crate::actor::persist::{ decode_latest_with_embedded_version, encode_latest_with_embedded_version, }; use crate::actor::task::LifecycleEvent; -#[cfg(not(target_arch = "wasm32"))] -use crate::actor::task::{LIFECYCLE_EVENT_INBOX_CHANNEL, actor_channel_overloaded_error}; use crate::actor::task_types::StateMutationReason; use crate::error::ActorRuntime; #[cfg(feature = "wasm-runtime")] @@ -129,9 +127,9 @@ impl ActorContext { /// Fire-and-forget save request helper. /// - /// If the lifecycle event inbox is overloaded or unavailable, this only logs - /// a warning and returns. That `warn!` is the sole failure signal for this - /// path; callers do not receive a `Result`. Call + /// If the lifecycle event inbox is unavailable, this only logs a warning and + /// returns. That `warn!` is the sole failure signal for this path; callers do + /// not receive a `Result`. Call /// [`Self::request_save_and_wait`] when the caller must observe /// save-request delivery failures. pub fn request_save(&self, opts: RequestSaveOpts) { @@ -179,9 +177,7 @@ impl ActorContext { return; } - if let Ok(permit) = sender.try_reserve() { - permit.send(LifecycleEvent::SaveRequested { immediate }); - } + let _ = sender.send(LifecycleEvent::SaveRequested { immediate }); } pub async fn request_save_and_wait(&self, opts: RequestSaveOpts) -> Result<()> { @@ -230,21 +226,15 @@ impl ActorContext { return Ok(save_request_revision); } - match sender.try_reserve() { - Ok(permit) => { - permit.send(LifecycleEvent::SaveRequested { immediate }); - Ok(save_request_revision) - } - #[cfg(target_arch = "wasm32")] - Err(_) => Ok(save_request_revision), - #[cfg(not(target_arch = "wasm32"))] - Err(_) => Err(actor_channel_overloaded_error( - LIFECYCLE_EVENT_INBOX_CHANNEL, - self.0.lifecycle_event_inbox_capacity, - "save_requested", - Some(&self.0.metrics), - )), - } + sender + .send(LifecycleEvent::SaveRequested { immediate }) + .map(|()| save_request_revision) + .map_err(|_| { + ActorRuntime::NotConfigured { + component: "lifecycle events".to_owned(), + } + .build() + }) } pub(crate) async fn wait_for_save_request(&self, save_request_revision: u64) { @@ -585,7 +575,7 @@ impl ActorContext { self.0.state_revision.fetch_add(1, Ordering::SeqCst); } - fn lifecycle_event_sender(&self) -> Option> { + fn lifecycle_event_sender(&self) -> Option> { self.0.lifecycle_events.read().clone() } diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index b1ead46bd2..e71b380f76 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1,6 +1,6 @@ //! Actor lifecycle task orchestration. //! -//! `ActorTask` deliberately uses four separate bounded `mpsc` receivers instead +//! `ActorTask` deliberately uses four separate unbounded `mpsc` receivers instead //! of one tagged command queue: //! //! - `lifecycle_inbox` carries trusted registry/envoy lifecycle commands: @@ -13,22 +13,17 @@ //! - `actor_event_rx` feeds the user runtime adapter with actor events after //! `ActorTask` accepts dispatch work. //! -//! Keeping these queues split gives the task loop explicit back-pressure and -//! priority boundaries. Client dispatch can fill its own bounded inbox without -//! starving lifecycle stop/destroy commands, while internal save/sleep/inspector -//! events do not compete with untrusted client traffic. The main `tokio::select!` -//! is biased so lifecycle commands are observed first, then internal lifecycle -//! events, then dispatch and timers. During sleep grace, the same priority keeps -//! lifecycle handling live while still draining accepted dispatch replies before -//! final teardown. +//! Keeping these queues split gives the task loop explicit priority boundaries. +//! Client dispatch does not compete directly with lifecycle stop/destroy +//! commands, and internal save/sleep/inspector events do not compete with +//! untrusted client traffic. The main `tokio::select!` is biased so lifecycle +//! commands are observed first, then internal lifecycle events, then dispatch +//! and timers. During sleep grace, the same priority keeps lifecycle handling +//! live while still draining accepted dispatch replies before final teardown. //! -//! Producers reserve capacity with `try_reserve` before constructing channel -//! work. Overload paths therefore fail fast with `actor.overloaded`, record the -//! specific inbox metric (`lifecycle_inbox`, `dispatch_inbox`, -//! `lifecycle_event_inbox`, or `actor_event_inbox`), and avoid orphaning reply -//! oneshots. The sender topology follows the trust boundary: registry/envoy owns -//! lifecycle and dispatch senders, core subsystems enqueue lifecycle events -//! through `ActorContext`, and only `ActorTask` forwards accepted work into the +//! The sender topology follows the trust boundary: registry/envoy owns lifecycle +//! and dispatch senders, core subsystems enqueue lifecycle events through +//! `ActorContext`, and only `ActorTask` forwards accepted work into the //! actor-event stream consumed by user code. use std::future; @@ -50,15 +45,12 @@ use tracing::{Instrument, instrument::WithSubscriber}; use crate::actor::action::ActionDispatchError; use crate::actor::connection::ConnHandle; use crate::actor::context::ActorContext; -#[cfg(not(target_arch = "wasm32"))] -use crate::actor::diagnostics::record_actor_warning; use crate::actor::factory::ActorFactory; use crate::actor::keys::{LAST_PUSHED_ALARM_KEY, PERSIST_DATA_KEY}; use crate::actor::lifecycle_hooks::{ActorEvents, ActorStart, Reply}; use crate::actor::messages::{ ActorEvent, QueueSendResult, Request, Response, SerializeStateReason, StateDelta, }; -use crate::actor::metrics::ActorMetrics; use crate::actor::preload::{PreloadedKv, PreloadedPersistedActor}; use crate::actor::state::{PersistedActor, decode_last_pushed_alarm, decode_persisted_actor}; use crate::actor::task_types::ShutdownKind; @@ -79,9 +71,6 @@ const LONG_SHUTDOWN_DRAIN_WARNING_THRESHOLD: Duration = Duration::from_secs(1); const INSPECTOR_SERIALIZE_STATE_INTERVAL: Duration = Duration::from_millis(50); const INSPECTOR_OVERLAY_CHANNEL_CAPACITY: usize = 32; -pub(crate) const LIFECYCLE_INBOX_CHANNEL: &str = "lifecycle_inbox"; -pub(crate) const DISPATCH_INBOX_CHANNEL: &str = "dispatch_inbox"; -pub(crate) const LIFECYCLE_EVENT_INBOX_CHANNEL: &str = "lifecycle_event_inbox"; pub use crate::actor::task_types::LifecycleState; // Test shim keeps moved tests in crate-root tests/ with private-module access. @@ -198,85 +187,13 @@ impl LifecycleCommand { } } -pub(crate) fn actor_channel_overloaded_error( - channel: &'static str, - capacity: usize, - operation: &'static str, - metrics: Option<&ActorMetrics>, -) -> anyhow::Error { - if let Some(metrics) = metrics { - match channel { - LIFECYCLE_INBOX_CHANNEL => metrics.inc_lifecycle_inbox_overload(operation), - DISPATCH_INBOX_CHANNEL => metrics.inc_dispatch_inbox_overload(operation), - LIFECYCLE_EVENT_INBOX_CHANNEL => metrics.inc_lifecycle_event_overload(operation), - _ => {} - } - } - #[cfg(not(target_arch = "wasm32"))] - { - if let Some(metrics) = metrics { - if let Some(suppression) = - record_actor_warning(metrics.actor_id(), "actor_channel_overloaded") - { - tracing::warn!( - actor_id = %suppression.actor_id, - channel, - capacity, - operation, - event = if channel == LIFECYCLE_EVENT_INBOX_CHANNEL { - operation - } else { - "" - }, - per_actor_suppressed = suppression.per_actor_suppressed, - global_suppressed = suppression.global_suppressed, - "actor bounded channel overloaded" - ); - } - } else { - tracing::warn!( - channel, - capacity, - operation, - "actor bounded channel overloaded" - ); - } - } - #[cfg(target_arch = "wasm32")] - { - let _ = metrics; - anyhow!( - "actor bounded channel overloaded: channel={channel}, capacity={capacity}, operation={operation}" - ) - } - - #[cfg(not(target_arch = "wasm32"))] - { - ActorLifecycleError::Overloaded { - channel: channel.to_owned(), - capacity, - operation: operation.to_owned(), - } - .build() - } -} - pub(crate) fn try_send_lifecycle_command( - sender: &mpsc::Sender, - capacity: usize, - operation: &'static str, + sender: &mpsc::UnboundedSender, command: LifecycleCommand, - metrics: Option<&ActorMetrics>, ) -> Result<()> { - // Reserve capacity before sending so overload paths can return - // `actor.overloaded` without waiting or constructing more channel-owned work. - // Lifecycle callers also avoid creating reply oneshots when a full inbox would - // immediately orphan them. - let permit = sender.try_reserve().map_err(|_| { - actor_channel_overloaded_error(LIFECYCLE_INBOX_CHANNEL, capacity, operation, metrics) - })?; - permit.send(command); - Ok(()) + sender + .send(command) + .map_err(|_| ActorLifecycleError::NotReady.build()) } pub enum DispatchCommand { @@ -328,20 +245,12 @@ impl DispatchCommand { } pub(crate) fn try_send_dispatch_command( - sender: &mpsc::Sender, - capacity: usize, - operation: &'static str, + sender: &mpsc::UnboundedSender, command: DispatchCommand, - metrics: Option<&ActorMetrics>, ) -> Result<()> { - // Match lifecycle command backpressure semantics: capacity is checked before - // handing the value to the channel, which keeps reject paths cheap and avoids - // `try_send` returning a fully built command that must be discarded. - let permit = sender.try_reserve().map_err(|_| { - actor_channel_overloaded_error(DISPATCH_INBOX_CHANNEL, capacity, operation, metrics) - })?; - permit.send(command); - Ok(()) + sender + .send(command) + .map_err(|_| ActorLifecycleError::NotReady.build()) } #[derive(Debug, Clone, PartialEq, Eq)] @@ -392,15 +301,15 @@ pub struct ActorTask { // === INBOX CHANNELS === /// Lifecycle commands (Start / Stop / FireAlarm) sent by the registry /// in response to engine-driven `EnvoyCallbacks` from the envoy client. - pub lifecycle_inbox: mpsc::Receiver, + pub lifecycle_inbox: mpsc::UnboundedReceiver, /// Client-originated work sent by `RegistryDispatcher` in /// `registry/dispatch.rs` (Action, OpenWebSocket, Workflow*) and /// `registry/http.rs` (Http, QueueSend). - pub dispatch_inbox: mpsc::Receiver, + pub dispatch_inbox: mpsc::UnboundedReceiver, /// Internal self-events the actor enqueues onto itself via `ActorContext` /// hooks (save/inspector/activity notifications from /// `actor/state.rs`, `actor/connection.rs`, `actor/context.rs`). - pub lifecycle_events: mpsc::Receiver, + pub lifecycle_events: mpsc::UnboundedReceiver, // === RUNTIME STATE === pub lifecycle: LifecycleState, @@ -463,9 +372,9 @@ impl ActorTask { pub fn new( actor_id: String, generation: u32, - lifecycle_inbox: mpsc::Receiver, - dispatch_inbox: mpsc::Receiver, - lifecycle_events: mpsc::Receiver, + lifecycle_inbox: mpsc::UnboundedReceiver, + dispatch_inbox: mpsc::UnboundedReceiver, + lifecycle_events: mpsc::UnboundedReceiver, factory: Arc, ctx: ActorContext, start_input: Option>, @@ -560,6 +469,7 @@ impl ActorTask { return exit; } } + // TODO: Sample inbox depths periodically instead of on every loop iteration. self.record_inbox_depths(); tokio::select! { biased; diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/dispatch.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/dispatch.rs index e7c2dd981d..c252ea734c 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/dispatch.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/dispatch.rs @@ -3,8 +3,7 @@ use crate::error::ActorLifecycle as ActorLifecycleError; use crate::time; pub(super) async fn dispatch_action_through_task( - dispatch: &mpsc::Sender, - capacity: usize, + dispatch: &mpsc::UnboundedSender, conn: ConnHandle, name: String, args: Vec, @@ -17,15 +16,12 @@ pub(super) async fn dispatch_action_through_task( ); try_send_dispatch_command( dispatch, - capacity, - "dispatch_action", DispatchCommand::Action { name: name.clone(), args, conn, reply: reply_tx, }, - None, ) .map_err(ActionDispatchError::from_anyhow)?; tracing::info!( @@ -80,8 +76,7 @@ where } pub(super) async fn dispatch_websocket_open_through_task( - dispatch: &mpsc::Sender, - capacity: usize, + dispatch: &mpsc::UnboundedSender, conn: ConnHandle, ws: WebSocket, request: Option, @@ -89,15 +84,12 @@ pub(super) async fn dispatch_websocket_open_through_task( let (reply_tx, reply_rx) = oneshot::channel(); try_send_dispatch_command( dispatch, - capacity, - "dispatch_websocket_open", DispatchCommand::OpenWebSocket { conn, ws, request, reply: reply_tx, }, - None, ) .context("actor task stopped before websocket dispatch command could be sent")?; @@ -107,16 +99,12 @@ pub(super) async fn dispatch_websocket_open_through_task( } pub(super) async fn dispatch_workflow_history_through_task( - dispatch: &mpsc::Sender, - capacity: usize, + dispatch: &mpsc::UnboundedSender, ) -> Result>> { let (reply_tx, reply_rx) = oneshot::channel(); try_send_dispatch_command( dispatch, - capacity, - "dispatch_workflow_history", DispatchCommand::WorkflowHistory { reply: reply_tx }, - None, ) .context("actor task stopped before workflow history dispatch command could be sent")?; @@ -126,20 +114,16 @@ pub(super) async fn dispatch_workflow_history_through_task( } pub(super) async fn dispatch_workflow_replay_request_through_task( - dispatch: &mpsc::Sender, - capacity: usize, + dispatch: &mpsc::UnboundedSender, entry_id: Option, ) -> Result>> { let (reply_tx, reply_rx) = oneshot::channel(); try_send_dispatch_command( dispatch, - capacity, - "dispatch_workflow_replay", DispatchCommand::WorkflowReplay { entry_id, reply: reply_tx, }, - None, ) .context("actor task stopped before workflow replay dispatch command could be sent")?; diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index 0333c4095b..0db700ec60 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -62,13 +62,10 @@ impl RegistryDispatcher { let (reply_tx, reply_rx) = oneshot::channel(); try_send_dispatch_command( &instance.dispatch, - instance.factory.config().dispatch_command_inbox_capacity, - "dispatch_http", DispatchCommand::Http { request, reply: reply_tx, }, - Some(instance.ctx.metrics()), ) .context("send actor task HTTP dispatch command")?; @@ -181,7 +178,6 @@ impl RegistryDispatcher { config.action_timeout, dispatch_action_through_task( &instance.dispatch, - config.dispatch_command_inbox_capacity, conn.clone(), action_name.clone(), args, @@ -294,8 +290,6 @@ impl RegistryDispatcher { let (reply_tx, reply_rx) = oneshot::channel(); let dispatch_result = try_send_dispatch_command( &instance.dispatch, - config.dispatch_command_inbox_capacity, - "dispatch_queue_send", DispatchCommand::QueueSend { name: queue_name, body: queue_request.body, @@ -305,7 +299,6 @@ impl RegistryDispatcher { timeout_ms: queue_request.timeout, reply: reply_tx, }, - Some(instance.ctx.metrics()), ); let queue_result = match dispatch_result { diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs index 04bd1088e6..4ff10dbe29 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/inspector.rs @@ -261,7 +261,6 @@ impl RegistryDispatcher { ); let output = dispatch_action_through_task( &instance.dispatch, - instance.factory.config().dispatch_command_inbox_capacity, conn.clone(), action_name.to_owned(), args, @@ -349,7 +348,6 @@ impl RegistryDispatcher { .ctx .internal_keep_awake(dispatch_workflow_history_through_task( &instance.dispatch, - instance.factory.config().dispatch_command_inbox_capacity, )) .await .context("load inspector workflow history"); @@ -366,7 +364,6 @@ impl RegistryDispatcher { .ctx .internal_keep_awake(dispatch_workflow_replay_request_through_task( &instance.dispatch, - instance.factory.config().dispatch_command_inbox_capacity, entry_id, )) .await diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 09fa72bb39..ebaa6b243d 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -41,12 +41,7 @@ use crate::actor::messages::{ActorEvent, QueueSendResult, Request, Response, Sta use crate::actor::preload::{PreloadedKv, PreloadedPersistedActor}; use crate::actor::state::decode_persisted_actor; use crate::actor::task::{ - ActorTask, - DispatchCommand, - LifecycleCommand, - // These helpers reserve bounded-channel capacity before sending; see - // `actor::task` for the backpressure and lifecycle reply rationale. - try_send_dispatch_command, + ActorTask, DispatchCommand, LifecycleCommand, try_send_dispatch_command, try_send_lifecycle_command, }; use crate::actor::task_types::ShutdownKind; @@ -94,8 +89,8 @@ struct ActorTaskHandle { ctx: ActorContext, factory: Arc, inspector: Inspector, - lifecycle: mpsc::Sender, - dispatch: mpsc::Sender, + lifecycle: mpsc::UnboundedSender, + dispatch: mpsc::UnboundedSender, join: Arc>>>>, } @@ -594,30 +589,22 @@ impl RegistryDispatcher { } .build() })?; - let config = factory.config().clone(); - let (lifecycle_tx, lifecycle_rx) = mpsc::channel(config.lifecycle_command_inbox_capacity); - let (dispatch_tx, dispatch_rx) = mpsc::channel(config.dispatch_command_inbox_capacity); - let (lifecycle_events_tx, lifecycle_events_rx) = - mpsc::channel(config.lifecycle_event_inbox_capacity); + let (lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (lifecycle_events_tx, lifecycle_events_rx) = mpsc::unbounded_channel(); request .ctx .configure_lifecycle_events(Some(lifecycle_events_tx)); request.ctx.cancel_sleep_timer(); request.ctx.set_local_alarm_callback(Some(Arc::new({ let lifecycle_tx = lifecycle_tx.clone(); - let metrics = request.ctx.metrics().clone(); - let capacity = config.lifecycle_command_inbox_capacity; move || { let lifecycle_tx = lifecycle_tx.clone(); - let metrics = metrics.clone(); Box::pin(async move { let (reply_tx, reply_rx) = oneshot::channel(); if let Err(error) = try_send_lifecycle_command( &lifecycle_tx, - capacity, - "fire_alarm", LifecycleCommand::FireAlarm { reply: reply_tx }, - Some(&metrics), ) { tracing::warn!(?error, "failed to enqueue actor alarm"); return; @@ -645,10 +632,7 @@ impl RegistryDispatcher { let result: Result> = async { try_send_lifecycle_command( &lifecycle_tx, - config.lifecycle_command_inbox_capacity, - "start_actor", LifecycleCommand::Start { reply: start_tx }, - Some(request.ctx.metrics()), ) .context("send actor task start command")?; start_rx @@ -939,13 +923,10 @@ impl RegistryDispatcher { let (reply_tx, reply_rx) = oneshot::channel(); let shutdown_result = match try_send_lifecycle_command( &instance.lifecycle, - instance.factory.config().lifecycle_command_inbox_capacity, - "stop_actor", LifecycleCommand::Stop { reason: task_stop_reason, reply: reply_tx, }, - Some(instance.ctx.metrics()), ) { Ok(()) => reply_rx .await diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/websocket.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/websocket.rs index acdffac059..8fd698dae2 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/websocket.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/websocket.rs @@ -244,8 +244,6 @@ impl RegistryDispatcher { let on_message_conn = conn.clone(); let on_message_ctx = instance.ctx.clone(); let on_message_dispatch = instance.dispatch.clone(); - let on_message_dispatch_capacity = - instance.factory.config().dispatch_command_inbox_capacity; let on_open: Option EnvoyBoxFuture<()> + Send>> = if is_restoring_hibernatable { @@ -359,7 +357,6 @@ impl RegistryDispatcher { async move { let response = match dispatch_action_through_task( &dispatch, - on_message_dispatch_capacity, conn.clone(), request.name.clone(), request.args.into_vec(), @@ -542,7 +539,6 @@ impl RegistryDispatcher { }; let ctx = instance.ctx.clone(); let dispatch = instance.dispatch.clone(); - let dispatch_capacity = instance.factory.config().dispatch_command_inbox_capacity; let conn_for_close = conn.clone(); let conn_for_message = conn.clone(); let conn_for_open = conn.clone(); @@ -666,7 +662,6 @@ impl RegistryDispatcher { ws.configure_sender(sender); let result = dispatch_websocket_open_through_task( &dispatch, - dispatch_capacity, conn, ws.clone(), Some(request), diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index c837542f97..b947b6c44e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -72,7 +72,7 @@ pub struct ServerlessRequest { pub struct ServerlessResponse { pub status: u16, pub headers: HashMap, - pub body: mpsc::Receiver, ServerlessStreamError>>, + pub body: mpsc::UnboundedReceiver, ServerlessStreamError>>, } #[derive(Clone, Debug, Serialize)] @@ -286,8 +286,8 @@ impl CoreServerlessRuntime { let actor_start = handle.decode_serverless_actor_start(&payload)?; let cancel_token = req.cancel_token; let cache_envoy = self.settings.cache_envoy; - let (tx, rx) = mpsc::channel(16); - let _ = tx.try_send(Ok(SSE_PING_FRAME.to_vec())); + let (tx, rx) = mpsc::unbounded_channel(); + let _ = tx.send(Ok(SSE_PING_FRAME.to_vec())); RuntimeSpawner::spawn(async move { let shutdown_handle = handle.clone(); @@ -302,7 +302,7 @@ impl CoreServerlessRuntime { }; if let Err(error) = result { let error = stream_error(error); - let _ = tx.send(Err(error)).await; + let _ = tx.send(Err(error)); if !cache_envoy { handle.shutdown_and_wait(false).await; } @@ -315,11 +315,11 @@ impl CoreServerlessRuntime { break; } _ = handle.wait_actor_registered_then_stopped(&actor_start.actor_id, actor_start.generation) => { - let _ = tx.send(Ok(SSE_STOPPING_FRAME.to_vec())).await; + let _ = tx.send(Ok(SSE_STOPPING_FRAME.to_vec())); break; } _ = sleep(SSE_PING_INTERVAL) => { - if tx.send(Ok(SSE_PING_FRAME.to_vec())).await.is_err() { + if tx.send(Ok(SSE_PING_FRAME.to_vec())).is_err() { break; } } @@ -607,8 +607,8 @@ fn bytes_response( headers: HashMap, body: Vec, ) -> ServerlessResponse { - let (tx, rx) = mpsc::channel(1); - let _ = tx.try_send(Ok(body)); + let (tx, rx) = mpsc::unbounded_channel(); + let _ = tx.send(Ok(body)); ServerlessResponse { status: status.as_u16(), headers, diff --git a/rivetkit-rust/packages/rivetkit-core/tests/config.rs b/rivetkit-rust/packages/rivetkit-core/tests/config.rs index c4e88504e3..b550599730 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/config.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/config.rs @@ -72,18 +72,6 @@ mod moved_tests { config.max_outgoing_message_size, default.max_outgoing_message_size, ); - assert_eq!( - config.lifecycle_command_inbox_capacity, - default.lifecycle_command_inbox_capacity, - ); - assert_eq!( - config.dispatch_command_inbox_capacity, - default.dispatch_command_inbox_capacity, - ); - assert_eq!( - config.lifecycle_event_inbox_capacity, - default.lifecycle_event_inbox_capacity, - ); assert_eq!( config.preload_max_workflow_bytes, default.preload_max_workflow_bytes, diff --git a/rivetkit-rust/packages/rivetkit-core/tests/connection.rs b/rivetkit-rust/packages/rivetkit-core/tests/connection.rs index f049e4dfba..3128fd6fe1 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/connection.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/connection.rs @@ -22,7 +22,7 @@ mod moved_tests { use crate::kv::Kv; fn next_non_activity_lifecycle_event( - rx: &mut mpsc::Receiver, + rx: &mut mpsc::UnboundedReceiver, ) -> Option { rx.try_recv().ok() } @@ -353,7 +353,7 @@ mod moved_tests { Kv::new_in_memory(), ); let (actor_events_tx, mut actor_events_rx) = mpsc::unbounded_channel(); - let (lifecycle_events_tx, mut lifecycle_events_rx) = mpsc::channel(4); + let (lifecycle_events_tx, mut lifecycle_events_rx) = mpsc::unbounded_channel(); ctx.configure_actor_events(Some(actor_events_tx)); ctx.configure_lifecycle_events(Some(lifecycle_events_tx)); diff --git a/rivetkit-rust/packages/rivetkit-core/tests/context.rs b/rivetkit-rust/packages/rivetkit-core/tests/context.rs index bb1bb2443d..2c9d7cb7d0 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/context.rs @@ -66,7 +66,7 @@ async fn inspector_attach_guard_notifies_on_threshold_edges() { let attach_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); let (overlay_tx, _) = tokio::sync::broadcast::channel(4); ctx.configure_inspector_runtime(std::sync::Arc::clone(&attach_count), overlay_tx); - let (lifecycle_tx, mut lifecycle_rx) = tokio::sync::mpsc::channel(4); + let (lifecycle_tx, mut lifecycle_rx) = tokio::sync::mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(lifecycle_tx)); let first_guard = ctx diff --git a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs index 61eccc978f..33ead234ca 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs @@ -34,4 +34,26 @@ mod moved_tests { .count() ); } + + #[test] + fn actor_inbox_depth_metrics_render() { + let metrics = ActorMetrics::new("actor-inbox-depth", "metrics"); + + metrics.set_lifecycle_inbox_depth(1); + metrics.set_dispatch_inbox_depth(2); + metrics.set_lifecycle_event_inbox_depth(3); + + let rendered = metrics.render().expect("metrics should render"); + assert_metric_value(&rendered, "lifecycle_inbox_depth", "1"); + assert_metric_value(&rendered, "dispatch_inbox_depth", "2"); + assert_metric_value(&rendered, "lifecycle_event_inbox_depth", "3"); + } + + fn assert_metric_value(metrics: &str, name: &str, value: &str) { + let line = metrics + .lines() + .find(|line| line.starts_with(name)) + .unwrap_or_else(|| panic!("{name} should render")); + assert!(line.ends_with(value), "{name} should have value {value}: {line}"); + } } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/modules/task_lifecycle.rs b/rivetkit-rust/packages/rivetkit-core/tests/modules/task_lifecycle.rs index 3419da5920..fa45824a54 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/modules/task_lifecycle.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/modules/task_lifecycle.rs @@ -15,9 +15,9 @@ mod moved_tests { use crate::{ActorConfig, ActorContext, ActorFactory}; fn new_task_with_factory(ctx: ActorContext, factory: Arc) -> ActorTask { - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (_events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (_events_tx, events_rx) = mpsc::unbounded_channel(); ActorTask::new( ctx.actor_id().to_owned(), 0, @@ -94,9 +94,9 @@ mod moved_tests { let mut task = ActorTask::new( "actor-preloaded-startup".to_owned(), 0, - mpsc::channel(4).1, - mpsc::channel(4).1, - mpsc::channel(4).1, + mpsc::unbounded_channel().1, + mpsc::unbounded_channel().1, + mpsc::unbounded_channel().1, factory, ctx.clone(), Some(vec![7, 7, 7]), diff --git a/rivetkit-rust/packages/rivetkit-core/tests/state.rs b/rivetkit-rust/packages/rivetkit-core/tests/state.rs index 467a37abd3..400fe53d14 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/state.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/state.rs @@ -180,12 +180,9 @@ mod moved_tests { async fn request_save_coalesces_and_escalates_to_immediate() { let state = ActorContext::new_for_state_tests( new_in_memory(), - ActorConfig { - lifecycle_event_inbox_capacity: 4, - ..ActorConfig::default() - }, + ActorConfig::default(), ); - let (events_tx, mut events_rx) = mpsc::channel(4); + let (events_tx, mut events_rx) = mpsc::unbounded_channel(); state.configure_lifecycle_events(Some(events_tx)); state.request_save(RequestSaveOpts::default()); @@ -223,11 +220,10 @@ mod moved_tests { new_in_memory(), ActorConfig { state_save_interval: Duration::from_secs(5), - lifecycle_event_inbox_capacity: 4, ..ActorConfig::default() }, ); - let (events_tx, mut events_rx) = mpsc::channel(4); + let (events_tx, mut events_rx) = mpsc::unbounded_channel(); state.configure_lifecycle_events(Some(events_tx)); let now = std::time::Instant::now(); @@ -573,7 +569,7 @@ mod moved_tests { "local", new_in_memory(), ); - let (events_tx, _events_rx) = mpsc::channel(4); + let (events_tx, _events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); ctx.request_save(RequestSaveOpts { diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 3fff1203cd..6b7e2e0b9f 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -132,13 +132,13 @@ mod moved_tests { ctx: ActorContext, ) -> ( ActorTask, - mpsc::Sender, - mpsc::Sender, - mpsc::Sender, + mpsc::UnboundedSender, + mpsc::UnboundedSender, + mpsc::UnboundedSender, ) { - let (lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ( ActorTask::new( "actor-drain".into(), @@ -158,9 +158,9 @@ mod moved_tests { } fn new_task_with_factory(ctx: ActorContext, factory: Arc) -> ActorTask { - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (_events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (_events_tx, events_rx) = mpsc::unbounded_channel(); ActorTask::new( "actor-drain".into(), 0, @@ -735,9 +735,9 @@ mod moved_tests { #[tokio::test] async fn save_tick_respects_debounce_and_immediate_requests() { let ctx = new_with_kv("actor-1", "task-save", Vec::new(), "local", new_in_memory()); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let save_ticks = Arc::new(AtomicUsize::new(0)); @@ -804,9 +804,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let save_ticks = Arc::new(AtomicUsize::new(0)); @@ -862,9 +862,9 @@ mod moved_tests { "local", kv.clone(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new(Default::default(), move |start| { @@ -955,9 +955,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let save_ticks = Arc::new(AtomicUsize::new(0)); @@ -1015,9 +1015,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let save_ticks = Arc::new(AtomicUsize::new(0)); @@ -1100,9 +1100,9 @@ mod moved_tests { async fn sleep_shutdown_persists_actor_and_hibernation_deltas() { let kv = new_in_memory(); let ctx = new_with_kv("actor-sleep", "task-sleep", Vec::new(), "local", kv.clone()); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let disconnects = Arc::new(Mutex::new(Vec::::new())); @@ -1217,9 +1217,9 @@ mod moved_tests { async fn wake_start_clears_previous_sleep_request() { let kv = new_in_memory(); let ctx = new_with_kv("actor-wake", "task-wake", Vec::new(), "local", kv.clone()); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new( @@ -1295,9 +1295,9 @@ mod moved_tests { ); ctx.set_state_initial(vec![1]); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new( @@ -1378,9 +1378,9 @@ mod moved_tests { "local", kv.clone(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let disconnects = Arc::new(Mutex::new(Vec::::new())); @@ -1490,9 +1490,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let seen_conns = Arc::new(Mutex::new(Vec::>::new())); @@ -1716,9 +1716,9 @@ mod moved_tests { .expect("seed hibernation should persist"); let ctx = new_with_kv("actor-wake", "task-wake", Vec::new(), "local", kv.clone()); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); configure_live_hibernated_pairs(&ctx, [(b"gate".as_slice(), b"req1".as_slice())]); let (started_tx, started_rx) = oneshot::channel(); @@ -1795,9 +1795,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let (started_tx, started_rx) = oneshot::channel(); @@ -1913,9 +1913,9 @@ mod moved_tests { let kv = new_in_memory(); let ctx = new_with_kv("actor-hws", "task-hws", Vec::new(), "local", kv.clone()); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new(Default::default(), move |start| { @@ -2021,9 +2021,9 @@ mod moved_tests { kv.clone(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new(Default::default(), move |start| { @@ -2607,7 +2607,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -2622,7 +2621,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: stop_tx, }) - .await .expect("sleep stop should send"); stop_rx .await @@ -2655,7 +2653,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -2670,7 +2667,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("destroy stop should send"); stop_rx .await @@ -2703,7 +2699,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -2716,7 +2711,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: stop_tx, }) - .await .expect("sleep stop should send"); stop_rx .await @@ -2747,7 +2741,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -2773,7 +2766,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: stop_tx, }) - .await .expect("sleep stop should send"); let stop = tokio::spawn(async move { stop_rx.await }); yield_now().await; @@ -2906,7 +2898,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -2919,7 +2910,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: stop_tx, }) - .await .expect("sleep stop command should send"); hook_rx.await.expect("sleep cleanup hook should fire"); assert_eq!( @@ -3002,7 +2992,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3029,7 +3018,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("destroy stop command should send"); hook_rx.await.expect("destroy cleanup hook should fire"); assert_eq!( @@ -3084,7 +3072,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3109,7 +3096,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: sleep_tx, }) - .await .expect("sleep stop should send"); wait_for_count(&begin_sleep_count, 1).await; assert!(ctx.actor_aborted()); @@ -3120,7 +3106,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: sleep_again_tx, }) - .await .expect("second sleep stop should send"); sleep_again_rx .await @@ -3136,7 +3121,6 @@ mod moved_tests { conn: ConnHandle::new("conn-grace", Vec::new(), Vec::new(), false), reply: action_tx, }) - .await .expect("action should send during sleep grace"); assert_eq!( action_rx @@ -3222,7 +3206,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3247,7 +3230,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: sleep_tx, }) - .await .expect("sleep stop should send"); wait_for_count(&begin_sleep_count, 1).await; @@ -3257,7 +3239,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: destroy_tx, }) - .await .expect("destroy stop should send"); destroy_rx .await @@ -3303,7 +3284,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3316,7 +3296,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("destroy stop should send"); let error = stop_rx .await @@ -3373,7 +3352,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3386,7 +3364,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("destroy stop should send"); stop_rx .await @@ -3532,7 +3509,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3551,7 +3527,6 @@ mod moved_tests { reason: ShutdownKind::Sleep, reply: stop_tx, }) - .await .expect("sleep stop should send"); stop_rx .await @@ -3599,7 +3574,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3618,7 +3592,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("destroy stop should send"); stop_rx .await @@ -3793,9 +3766,9 @@ mod moved_tests { "local", new_in_memory(), ); - let (lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); let factory = Arc::new(ActorFactory::new(Default::default(), |start| { Box::pin(async move { @@ -3836,7 +3809,6 @@ mod moved_tests { let (start_tx, start_rx) = oneshot::channel(); lifecycle_tx .send(LifecycleCommand::Start { reply: start_tx }) - .await .expect("start command should send"); start_rx .await @@ -3851,7 +3823,6 @@ mod moved_tests { conn: ConnHandle::new("conn-log-flow", Vec::new(), Vec::new(), false), reply: action_tx, }) - .await .expect("dispatch command should send"); assert_eq!( action_rx @@ -3867,7 +3838,6 @@ mod moved_tests { reason: ShutdownKind::Destroy, reply: stop_tx, }) - .await .expect("stop command should send"); stop_rx .await @@ -4013,9 +3983,9 @@ mod moved_tests { "local", kv.clone(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); configure_live_hibernated_pairs(&ctx, [(b"gliv".as_slice(), b"rliv".as_slice())]); @@ -4138,9 +4108,9 @@ mod moved_tests { "local", kv.clone(), ); - let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4); - let (_dispatch_tx, dispatch_rx) = mpsc::channel(4); - let (events_tx, events_rx) = mpsc::channel(4); + let (_lifecycle_tx, lifecycle_rx) = mpsc::unbounded_channel(); + let (_dispatch_tx, dispatch_rx) = mpsc::unbounded_channel(); + let (events_tx, events_rx) = mpsc::unbounded_channel(); ctx.configure_lifecycle_events(Some(events_tx)); ctx.set_hibernated_connection_liveness_override(std::iter::empty());