From e054f950da2688e40cc9c857807a883ae4c24764 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 4 May 2026 01:00:17 -0700 Subject: [PATCH] fix(rivetkit-core): allow dispatch during sleep grace --- .../packages/rivetkit-core/src/actor/task.rs | 10 +---- .../packages/rivetkit-core/tests/context.rs | 2 + .../packages/rivetkit-core/tests/schedule.rs | 1 + .../packages/rivetkit-core/tests/task.rs | 45 ++++++++++++++++--- 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index b69d548787..c7e70713f1 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -1164,16 +1164,10 @@ impl ActorTask { self.ctx.warn_work_sent_to_stopping_instance("dispatch"); return Some(ActorLifecycleError::Destroying.build()); } - if self.ctx.sleep_requested() { - self.ctx.warn_work_sent_to_stopping_instance("dispatch"); - return Some(ActorLifecycleError::Stopping.build()); - } match self.lifecycle { - LifecycleState::Started => None, - LifecycleState::SleepGrace - | LifecycleState::SleepFinalize - | LifecycleState::DestroyGrace => { + LifecycleState::Started | LifecycleState::SleepGrace => None, + LifecycleState::SleepFinalize | LifecycleState::DestroyGrace => { self.ctx.warn_work_sent_to_stopping_instance("dispatch"); Some(ActorLifecycleError::Stopping.build()) } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/context.rs b/rivetkit-rust/packages/rivetkit-core/tests/context.rs index 54e091db7d..bb1bb2443d 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/context.rs @@ -312,6 +312,7 @@ mod moved_tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests, pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::from([( actor_id.to_owned(), @@ -360,6 +361,7 @@ mod moved_tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(std::sync::Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( diff --git a/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs b/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs index 38c5775ded..81cf3c6abb 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/schedule.rs @@ -88,6 +88,7 @@ mod moved_tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(EnvoySharedMutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(EnvoySharedMutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(EnvoySharedMutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 0bb047c67a..9649d3c568 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -242,6 +242,7 @@ mod moved_tests { envoy_key: "test-envoy".to_string(), envoy_tx, actors: Arc::new(Mutex::new(HashMap::new())), + actors_notify: Arc::new(tokio::sync::Notify::new()), live_tunnel_requests: Arc::new(Mutex::new(HashMap::new())), pending_hibernation_restores: Arc::new(Mutex::new(HashMap::new())), ws_tx: Arc::new(tokio::sync::Mutex::new( @@ -2812,11 +2813,14 @@ mod moved_tests { }) .await .expect("action should send during sleep grace"); - let _error = action_rx - .await - .expect("action reply should send") - .expect_err("sleep grace should reject new dispatch"); - assert_eq!(action_count.load(Ordering::SeqCst), 0); + assert_eq!( + action_rx + .await + .expect("action reply should send") + .expect("sleep grace should accept new dispatch"), + vec![7, 7, 7] + ); + assert_eq!(action_count.load(Ordering::SeqCst), 1); assert_eq!(destroy_count.load(Ordering::SeqCst), 0); release_tx.send(()).expect("keep-awake release should send"); @@ -2833,6 +2837,37 @@ mod moved_tests { .expect("task run should succeed"); } + #[tokio::test] + async fn sleep_finalize_rejects_new_dispatch() { + let ctx = new_with_kv( + "actor-sleep-finalize-dispatch", + "task-sleep-finalize-dispatch", + Vec::new(), + "local", + new_in_memory(), + ); + let mut task = new_task(ctx); + task.lifecycle = LifecycleState::SleepFinalize; + + let (reply_tx, reply_rx) = oneshot::channel(); + task.handle_dispatch(DispatchCommand::Action { + name: "ping".to_owned(), + args: Vec::new(), + conn: ConnHandle::new("conn-finalize", Vec::new(), Vec::new(), false), + reply: reply_tx, + }) + .await; + + let error = reply_rx + .await + .expect("action reply should send") + .expect_err("sleep finalize should reject new dispatch"); + assert!( + error.to_string().contains("Actor is stopping"), + "expected actor stopping error, got {error:#}" + ); + } + #[cfg(not(debug_assertions))] #[tokio::test] async fn duplicate_destroy_during_sleep_grace_is_acked_and_ignored_in_release() {