-
Notifications
You must be signed in to change notification settings - Fork 0
fix #539: surface SVT-AV1 EOS-flush watchdog trip as node failure #615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
88be14f
b161ba2
5ada31a
22d12ff
022de6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: Mechanical transformation verified complete across all codec nodes Confirmed via grep that every file calling Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ use std::time::Duration; | |
| use opentelemetry::KeyValue; | ||
| use streamkit_core::stats::NodeStatsTracker; | ||
| use streamkit_core::types::Packet; | ||
| use streamkit_core::NodeContext; | ||
| use streamkit_core::{state_helpers, NodeContext, NodeStateSender, StreamKitError}; | ||
| use tokio::sync::mpsc; | ||
|
|
||
| /// Bounds a codec's entire lifetime by output idleness. | ||
|
|
@@ -26,6 +26,69 @@ use tokio::sync::mpsc; | |
| /// `SvtAv1EncoderNode::RECEIVE_THREAD_JOIN_TIMEOUT`). | ||
| pub(crate) const CODEC_IDLE_TIMEOUT: Duration = Duration::from_mins(1); | ||
|
|
||
| /// How a [`codec_forward_loop`] run terminated. | ||
| /// | ||
| /// Distinguishes a clean finish (input/codec/downstream closed, or shutdown) | ||
| /// from a degraded one that leaves the output truncated — an idle-watchdog | ||
| /// abandonment or a codec-task panic. A degraded run must surface as a node | ||
| /// failure rather than a successful `Stopped("input_closed")` so callers and | ||
| /// state subscribers can tell it from a complete one (see #539). | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| #[must_use] | ||
| pub enum CodecLoopOutcome { | ||
|
Comment on lines
+37
to
+38
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: The #[must_use] on CodecLoopOutcome prevents silent ignoring of degraded outcomes The Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
||
| /// The loop finalized cleanly: the codec finished, input or the downstream | ||
| /// channel closed, or a shutdown was requested. | ||
| Completed, | ||
| /// The idle watchdog abandoned a wedged codec worker mid-stream or | ||
| /// mid-flush. Any output already forwarded is truncated, and the worker's | ||
| /// native handle was intentionally leaked (it may still be in a blocking | ||
| /// FFI call), so the run is degraded, not successful. | ||
| WatchdogAbandoned, | ||
| /// The codec task panicked, so the loop ended before the stream was fully | ||
| /// processed. Any output already forwarded is truncated, exactly like a | ||
| /// watchdog abandonment, so the run is degraded, not successful. | ||
| CodecPanicked, | ||
| } | ||
|
Comment on lines
+36
to
+51
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: CodecLoopOutcome is pub, expanding the crate's public API surface
Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
||
|
|
||
| /// Emit the terminal node state and return the run result for a codec node, | ||
| /// based on how its [`codec_forward_loop`] ended. | ||
| /// | ||
| /// A clean finish emits `Stopped("input_closed")` and returns `Ok(())`. A | ||
| /// degraded finish (watchdog abandonment or codec panic) emits a terminal | ||
| /// `Failed` state and returns an `Err`, making the truncated output observable | ||
| /// to callers and state subscribers instead of masquerading as a successful | ||
| /// run (#539). | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns [`StreamKitError::Codec`] when `outcome` is | ||
| /// [`CodecLoopOutcome::WatchdogAbandoned`] or [`CodecLoopOutcome::CodecPanicked`], | ||
| /// i.e. the codec was abandoned or panicked and its output is truncated. | ||
| pub fn finalize_codec_run( | ||
| outcome: CodecLoopOutcome, | ||
| state_tx: &NodeStateSender, | ||
| node_id: &str, | ||
| label: &str, | ||
| ) -> Result<(), StreamKitError> { | ||
| let reason = match outcome { | ||
| CodecLoopOutcome::Completed => { | ||
| state_helpers::emit_stopped(state_tx, node_id, "input_closed"); | ||
| tracing::info!("{label} finished"); | ||
| return Ok(()); | ||
| }, | ||
| CodecLoopOutcome::WatchdogAbandoned => format!( | ||
| "{label} abandoned a wedged codec worker after {CODEC_IDLE_TIMEOUT:?} of output \ | ||
| idleness; output is truncated" | ||
| ), | ||
| CodecLoopOutcome::CodecPanicked => { | ||
| format!("{label} codec task panicked mid-stream; output is truncated") | ||
| }, | ||
| }; | ||
| tracing::error!("{reason}"); | ||
| state_helpers::emit_failed(state_tx, node_id, reason.clone()); | ||
| Err(StreamKitError::Codec(reason)) | ||
|
Comment on lines
+87
to
+89
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 📝 Info: Minor: reason string is cloned where it could be avoided In Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
||
| } | ||
|
Comment on lines
+67
to
+90
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚩 Behavioral change: codec nodes now return Err on degraded runs where they previously returned Ok Before this PR, all codec nodes unconditionally called Was this helpful? React with 👍 or 👎 to provide feedback. Debug |
||
|
|
||
| /// Result of [`bounded_thread_join`]. | ||
| #[cfg(any(feature = "svt_av1", test))] | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
|
|
@@ -90,7 +153,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>( | |
| stats: &mut NodeStatsTracker, | ||
| to_packet: impl Fn(T) -> Packet + Send + Sync, | ||
| label: &str, | ||
| ) { | ||
| ) -> CodecLoopOutcome { | ||
| async fn forward_one( | ||
| packet: Packet, | ||
| context: &mut NodeContext, | ||
|
|
@@ -124,6 +187,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>( | |
|
|
||
| let mut input_done = false; | ||
| let mut codec_done = false; | ||
| let mut outcome = CodecLoopOutcome::Completed; | ||
| // Held in an `Option` so the idle watchdog can observe the codec's input | ||
| // backpressure; taken (dropped) to signal end-of-input to the codec. | ||
| let mut codec_tx = Some(codec_tx); | ||
|
|
@@ -169,6 +233,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>( | |
| if let Err(e) = res { | ||
| if e.is_panic() { | ||
| tracing::error!("{label} codec task panicked: {e:?}"); | ||
| outcome = CodecLoopOutcome::CodecPanicked; | ||
| break; | ||
| } | ||
| } | ||
|
|
@@ -188,6 +253,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>( | |
| finalizing instead of hanging (likely native encoder deadlock)", | ||
| if input_done { "closed" } else { "backed up" } | ||
| ); | ||
| outcome = CodecLoopOutcome::WatchdogAbandoned; | ||
| break; | ||
| } | ||
| idle.as_mut().reset(tokio::time::Instant::now() + CODEC_IDLE_TIMEOUT); | ||
|
|
@@ -205,6 +271,8 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>( | |
| if !codec_done { | ||
| codec_task.abort(); | ||
| } | ||
|
|
||
| outcome | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
@@ -214,6 +282,7 @@ mod tests { | |
| use crate::test_utils::create_test_context; | ||
| use std::collections::HashMap; | ||
| use streamkit_core::stats::NodeStatsTracker; | ||
| use streamkit_core::NodeStateUpdate; | ||
|
|
||
| fn test_counter() -> opentelemetry::metrics::Counter<u64> { | ||
| opentelemetry::global::meter("test").u64_counter("test_drain_packets").build() | ||
|
|
@@ -239,7 +308,7 @@ mod tests { | |
| let codec_task = tokio::task::spawn(async {}); | ||
| let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1); | ||
|
|
||
| tokio::time::timeout( | ||
| let outcome = tokio::time::timeout( | ||
| Duration::from_secs(5), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
|
|
@@ -255,6 +324,12 @@ mod tests { | |
| ) | ||
| .await | ||
| .expect("loop must finalize when the codec finishes and closes the channel"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::Completed, | ||
| "a clean finish must report Completed, not a watchdog abandonment" | ||
| ); | ||
| } | ||
|
|
||
| /// Regression for #540: an SVT-AV1 dimension change can abandon a wedged | ||
|
|
@@ -279,7 +354,7 @@ mod tests { | |
| let codec_task = tokio::task::spawn(async {}); | ||
| let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1); | ||
|
|
||
| tokio::time::timeout( | ||
| let outcome = tokio::time::timeout( | ||
| Duration::from_secs(5), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
|
|
@@ -296,6 +371,11 @@ mod tests { | |
| .await | ||
| .expect("loop must finalize once the codec finishes, even with a leaked sender"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::Completed, | ||
| "a leaked sender that the codec finishes around is still a clean finish" | ||
| ); | ||
| assert!( | ||
| matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))), | ||
| "buffered result must still be forwarded before finalizing" | ||
|
|
@@ -324,7 +404,7 @@ mod tests { | |
| codec_tx.try_send(vec![0]).unwrap(); | ||
| assert_eq!(codec_tx.capacity(), 0); | ||
|
|
||
| tokio::time::timeout( | ||
| let outcome = tokio::time::timeout( | ||
| Duration::from_mins(10), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
|
|
@@ -340,6 +420,12 @@ mod tests { | |
| ) | ||
| .await | ||
| .expect("idle watchdog must abandon the mid-stream-wedged codec instead of hanging"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::WatchdogAbandoned, | ||
| "a watchdog-abandoned codec must surface as a degraded run, not a clean finish" | ||
| ); | ||
| } | ||
|
|
||
| /// The idle watchdog also bounds a flush that never completes: once input | ||
|
|
@@ -359,7 +445,7 @@ mod tests { | |
| let codec_task = tokio::task::spawn(std::future::pending::<()>()); | ||
| let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1); | ||
|
|
||
| tokio::time::timeout( | ||
| let outcome = tokio::time::timeout( | ||
| Duration::from_mins(10), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
|
|
@@ -375,6 +461,12 @@ mod tests { | |
| ) | ||
| .await | ||
| .expect("idle watchdog must finalize a wedged post-input-close flush"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::WatchdogAbandoned, | ||
| "an EOS flush abandoned by the watchdog is a truncated, degraded run" | ||
| ); | ||
| } | ||
|
|
||
| /// Regression for #540: closing input must re-arm the idle watchdog to a | ||
|
|
@@ -405,7 +497,7 @@ mod tests { | |
| // early — exactly the path the re-arm must lengthen. | ||
| let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1); | ||
|
|
||
| tokio::time::timeout( | ||
| let outcome = tokio::time::timeout( | ||
| Duration::from_mins(10), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
|
|
@@ -422,6 +514,11 @@ mod tests { | |
| .await | ||
| .expect("loop must finalize"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::Completed, | ||
| "a healthy flush that completes within its re-armed budget is a clean finish" | ||
| ); | ||
| assert!( | ||
| matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))), | ||
| "a flush packet emitted after input close must be forwarded, not pre-empted \ | ||
|
|
@@ -471,4 +568,106 @@ mod tests { | |
| ThreadJoin::Joined | ||
| ); | ||
| } | ||
|
|
||
| /// #539: a clean finish emits `Stopped("input_closed")` and returns `Ok`, | ||
| /// so a normal end of input is reported as success. | ||
| #[test] | ||
| fn finalize_maps_clean_finish_to_stopped_ok() { | ||
| let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4); | ||
| let state_tx = NodeStateSender::new(state_tx, 0); | ||
|
|
||
| let result = finalize_codec_run(CodecLoopOutcome::Completed, &state_tx, "node", "Label"); | ||
|
|
||
| assert!(result.is_ok(), "a clean finish must return Ok"); | ||
| let update = state_rx.try_recv().expect("a terminal state must be emitted"); | ||
| assert!( | ||
| matches!(update.state, streamkit_core::NodeState::Stopped { .. }), | ||
| "a clean finish must emit Stopped, got {:?}", | ||
| update.state | ||
| ); | ||
| } | ||
|
|
||
| /// #539: a watchdog abandonment emits a terminal `Failed` state and returns | ||
| /// `Err`, so a truncated encode is programmatically distinguishable from a | ||
| /// complete one instead of masquerading as `Stopped("input_closed")` + `Ok`. | ||
| #[test] | ||
| fn finalize_maps_watchdog_abandonment_to_failed_err() { | ||
| let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4); | ||
| let state_tx = NodeStateSender::new(state_tx, 0); | ||
|
|
||
| let result = | ||
| finalize_codec_run(CodecLoopOutcome::WatchdogAbandoned, &state_tx, "node", "Label"); | ||
|
|
||
| assert!( | ||
| matches!(result, Err(StreamKitError::Codec(_))), | ||
| "a watchdog-abandoned run must return a codec error, not Ok" | ||
| ); | ||
| let update = state_rx.try_recv().expect("a terminal state must be emitted"); | ||
| assert!( | ||
| matches!(update.state, streamkit_core::NodeState::Failed { .. }), | ||
| "a truncated encode must surface as Failed, got {:?}", | ||
| update.state | ||
| ); | ||
| } | ||
|
|
||
| /// #539: a codec-task panic truncates output exactly like a watchdog | ||
| /// abandonment, so it must surface as `Failed` + `Err`, not a clean stop. | ||
| #[test] | ||
| fn finalize_maps_codec_panic_to_failed_err() { | ||
| let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4); | ||
| let state_tx = NodeStateSender::new(state_tx, 0); | ||
|
|
||
| let result = | ||
| finalize_codec_run(CodecLoopOutcome::CodecPanicked, &state_tx, "node", "Label"); | ||
|
|
||
| assert!( | ||
| matches!(result, Err(StreamKitError::Codec(_))), | ||
| "a panicked codec run must return a codec error, not Ok" | ||
| ); | ||
| let update = state_rx.try_recv().expect("a terminal state must be emitted"); | ||
| assert!( | ||
| matches!(update.state, streamkit_core::NodeState::Failed { .. }), | ||
| "a panicked codec run must surface as Failed, got {:?}", | ||
| update.state | ||
| ); | ||
| } | ||
|
|
||
| /// #539: a panicking codec task must end the loop with `CodecPanicked`, not | ||
| /// the default `Completed` — otherwise a panic-truncated stream would be | ||
| /// finalized as a clean `Stopped("input_closed")` success. | ||
| #[tokio::test] | ||
| async fn forward_loop_reports_panic_as_degraded() { | ||
| let (mut ctx, _mock_out, _state_rx) = create_test_context(HashMap::new(), 1); | ||
| let mut stats = NodeStatsTracker::new("test".to_string(), ctx.stats_tx.clone()); | ||
| let counter = test_counter(); | ||
|
|
||
| // No results ever arrive; the codec task panics, which must end the loop. | ||
| let (_result_tx, mut result_rx) = mpsc::channel::<Result<Vec<u8>, String>>(8); | ||
| let mut input_task = tokio::task::spawn(std::future::pending::<()>()); | ||
| let codec_task = tokio::task::spawn(async { panic!("codec worker blew up") }); | ||
| let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1); | ||
|
|
||
| let outcome = tokio::time::timeout( | ||
| Duration::from_secs(5), | ||
| codec_forward_loop( | ||
| &mut ctx, | ||
| &mut result_rx, | ||
| &mut input_task, | ||
| codec_task, | ||
| codec_tx, | ||
| &counter, | ||
| &mut stats, | ||
| to_binary, | ||
| "test", | ||
| ), | ||
| ) | ||
| .await | ||
| .expect("loop must finalize when the codec task panics"); | ||
|
|
||
| assert_eq!( | ||
| outcome, | ||
| CodecLoopOutcome::CodecPanicked, | ||
| "a panicked codec task must surface as a degraded run, not a clean finish" | ||
| ); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.