Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions crates/nodes/src/audio/codecs/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl ProcessorNode for OpusDecoderNode {
tracing::info!("OpusDecoderNode input stream closed");
});

crate::codec_utils::codec_forward_loop(
let outcome = crate::codec_utils::codec_forward_loop(
&mut context,
&mut result_rx,
&mut input_task,
Expand All @@ -201,10 +201,12 @@ impl ProcessorNode for OpusDecoderNode {
)
.await;

state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed");

tracing::info!("OpusDecoderNode finished");
Ok(())
crate::codec_utils::finalize_codec_run(
outcome,
&context.state_tx,
&node_name,
"OpusDecoderNode",
)
}
}

Expand Down Expand Up @@ -408,7 +410,7 @@ impl ProcessorNode for OpusEncoderNode {
tracing::info!("OpusEncoderNode input stream closed after {} frames", frame_count);
});

crate::codec_utils::codec_forward_loop(
let outcome = crate::codec_utils::codec_forward_loop(
&mut context,
&mut result_rx,
&mut input_task,
Expand All @@ -430,10 +432,12 @@ impl ProcessorNode for OpusEncoderNode {
)
.await;

state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed");

tracing::info!("OpusEncoderNode finished");
Ok(())
crate::codec_utils::finalize_codec_run(
outcome,
&context.state_tx,
&node_name,
"OpusEncoderNode",
)
}
}

Expand Down
213 changes: 206 additions & 7 deletions crates/nodes/src/codec_utils.rs
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Mechanical transformation verified complete across all codec nodes

Confirmed via grep that every file calling codec_forward_loop (opus encoder/decoder, av1 encoder/decoder, dav1d decoder, vp9 encoder/decoder, nv_av1 decoder, vaapi_av1 decoder, vaapi_h264 decoder, vulkan_video h264 encoder/decoder, pixel_convert, and all EncoderNodeRunner implementors via run_encoder) now captures the CodecLoopOutcome and passes it to finalize_codec_run. No call site was missed. SVT-AV1 is covered through the shared run_encoder path in encoder_trait.rs:253.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Duration;
use opentelemetry::KeyValue;
use streamkit_core::stats::NodeStatsTracker;
use streamkit_core::types::Packet;
use streamkit_core::NodeContext;
use streamkit_core::{state_helpers, NodeContext, NodeStateSender, StreamKitError};
use tokio::sync::mpsc;

/// Bounds a codec's entire lifetime by output idleness.
Expand All @@ -26,6 +26,69 @@ use tokio::sync::mpsc;
/// `SvtAv1EncoderNode::RECEIVE_THREAD_JOIN_TIMEOUT`).
pub(crate) const CODEC_IDLE_TIMEOUT: Duration = Duration::from_mins(1);

/// How a [`codec_forward_loop`] run terminated.
///
/// Distinguishes a clean finish (input/codec/downstream closed, or shutdown)
/// from a degraded one that leaves the output truncated — an idle-watchdog
/// abandonment or a codec-task panic. A degraded run must surface as a node
/// failure rather than a successful `Stopped("input_closed")` so callers and
/// state subscribers can tell it from a complete one (see #539).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use]
pub enum CodecLoopOutcome {
Comment on lines +37 to +38

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: The #[must_use] on CodecLoopOutcome prevents silent ignoring of degraded outcomes

The #[must_use] attribute on CodecLoopOutcome (codec_utils.rs:37) is a well-chosen compile-time safety net: if any future caller of codec_forward_loop forgets to pass the outcome to finalize_codec_run, the compiler will emit a warning. This is the kind of annotation that prevents the exact regression this PR fixes (silently treating a degraded run as successful).

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

/// The loop finalized cleanly: the codec finished, input or the downstream
/// channel closed, or a shutdown was requested.
Completed,
/// The idle watchdog abandoned a wedged codec worker mid-stream or
/// mid-flush. Any output already forwarded is truncated, and the worker's
/// native handle was intentionally leaked (it may still be in a blocking
/// FFI call), so the run is degraded, not successful.
WatchdogAbandoned,
/// The codec task panicked, so the loop ended before the stream was fully
/// processed. Any output already forwarded is truncated, exactly like a
/// watchdog abandonment, so the run is degraded, not successful.
CodecPanicked,
}
Comment on lines +36 to +51

@staging-devin-ai-integration staging-devin-ai-integration Bot Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: CodecLoopOutcome is pub, expanding the crate's public API surface

CodecLoopOutcome at codec_utils.rs:38 and finalize_codec_run at codec_utils.rs:62 are both pub (not pub(crate)). Since codec_forward_loop is also pub, this is consistent. However, CODEC_IDLE_TIMEOUT remains pub(crate). If these are only intended for internal use within the nodes crate, pub(crate) would be more restrictive. Currently the nodes crate doesn't appear to be consumed as a library by external code, so this is a minor API hygiene point rather than a real concern.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground


/// Emit the terminal node state and return the run result for a codec node,
/// based on how its [`codec_forward_loop`] ended.
///
/// A clean finish emits `Stopped("input_closed")` and returns `Ok(())`. A
/// degraded finish (watchdog abandonment or codec panic) emits a terminal
/// `Failed` state and returns an `Err`, making the truncated output observable
/// to callers and state subscribers instead of masquerading as a successful
/// run (#539).
///
/// # Errors
///
/// Returns [`StreamKitError::Codec`] when `outcome` is
/// [`CodecLoopOutcome::WatchdogAbandoned`] or [`CodecLoopOutcome::CodecPanicked`],
/// i.e. the codec was abandoned or panicked and its output is truncated.
pub fn finalize_codec_run(
outcome: CodecLoopOutcome,
state_tx: &NodeStateSender,
node_id: &str,
label: &str,
) -> Result<(), StreamKitError> {
let reason = match outcome {
CodecLoopOutcome::Completed => {
state_helpers::emit_stopped(state_tx, node_id, "input_closed");
tracing::info!("{label} finished");
return Ok(());
},
CodecLoopOutcome::WatchdogAbandoned => format!(
"{label} abandoned a wedged codec worker after {CODEC_IDLE_TIMEOUT:?} of output \
idleness; output is truncated"
),
CodecLoopOutcome::CodecPanicked => {
format!("{label} codec task panicked mid-stream; output is truncated")
},
};
tracing::error!("{reason}");
state_helpers::emit_failed(state_tx, node_id, reason.clone());
Err(StreamKitError::Codec(reason))
Comment on lines +87 to +89

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📝 Info: Minor: reason string is cloned where it could be avoided

In finalize_codec_run, reason.clone() at line 88 creates a copy of the error string so it can be passed to both emit_failed and StreamKitError::Codec. Since emit_failed takes impl Into<String>, you could pass &reason (via a &strString conversion inside emit_failed) or reorder to pass the owned reason to Err(StreamKitError::Codec(reason)) first and a reference to emit_failed. This is a negligible allocation on an error path, so not worth changing — just noting for completeness.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

}
Comment on lines +67 to +90

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Behavioral change: codec nodes now return Err on degraded runs where they previously returned Ok

Before this PR, all codec nodes unconditionally called emit_stopped(... "input_closed") and returned Ok(()) regardless of whether the codec was abandoned by the watchdog or panicked. After this PR, watchdog-abandoned and panicked codec runs return Err(StreamKitError::Codec(...)) and emit a Failed state. This is an intentional behavioral change (per #539), but callers that previously relied on codec run() always returning Ok (e.g., error handling in the engine/pipeline executor) should be verified to handle the new Err gracefully. The engine likely already handles node run() errors, but worth confirming no upstream code treats a codec Err differently from other node errors in a way that could cause issues (e.g., retry loops, error propagation to clients).

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground


/// Result of [`bounded_thread_join`].
#[cfg(any(feature = "svt_av1", test))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -90,7 +153,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
stats: &mut NodeStatsTracker,
to_packet: impl Fn(T) -> Packet + Send + Sync,
label: &str,
) {
) -> CodecLoopOutcome {
async fn forward_one(
packet: Packet,
context: &mut NodeContext,
Expand Down Expand Up @@ -124,6 +187,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(

let mut input_done = false;
let mut codec_done = false;
let mut outcome = CodecLoopOutcome::Completed;
// Held in an `Option` so the idle watchdog can observe the codec's input
// backpressure; taken (dropped) to signal end-of-input to the codec.
let mut codec_tx = Some(codec_tx);
Expand Down Expand Up @@ -169,6 +233,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
if let Err(e) = res {
if e.is_panic() {
tracing::error!("{label} codec task panicked: {e:?}");
outcome = CodecLoopOutcome::CodecPanicked;
break;
}
}
Expand All @@ -188,6 +253,7 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
finalizing instead of hanging (likely native encoder deadlock)",
if input_done { "closed" } else { "backed up" }
);
outcome = CodecLoopOutcome::WatchdogAbandoned;
break;
}
idle.as_mut().reset(tokio::time::Instant::now() + CODEC_IDLE_TIMEOUT);
Expand All @@ -205,6 +271,8 @@ pub async fn codec_forward_loop<T: Send + 'static, S: Send>(
if !codec_done {
codec_task.abort();
}

outcome
}

#[cfg(test)]
Expand All @@ -214,6 +282,7 @@ mod tests {
use crate::test_utils::create_test_context;
use std::collections::HashMap;
use streamkit_core::stats::NodeStatsTracker;
use streamkit_core::NodeStateUpdate;

fn test_counter() -> opentelemetry::metrics::Counter<u64> {
opentelemetry::global::meter("test").u64_counter("test_drain_packets").build()
Expand All @@ -239,7 +308,7 @@ mod tests {
let codec_task = tokio::task::spawn(async {});
let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1);

tokio::time::timeout(
let outcome = tokio::time::timeout(
Duration::from_secs(5),
codec_forward_loop(
&mut ctx,
Expand All @@ -255,6 +324,12 @@ mod tests {
)
.await
.expect("loop must finalize when the codec finishes and closes the channel");

assert_eq!(
outcome,
CodecLoopOutcome::Completed,
"a clean finish must report Completed, not a watchdog abandonment"
);
}

/// Regression for #540: an SVT-AV1 dimension change can abandon a wedged
Expand All @@ -279,7 +354,7 @@ mod tests {
let codec_task = tokio::task::spawn(async {});
let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1);

tokio::time::timeout(
let outcome = tokio::time::timeout(
Duration::from_secs(5),
codec_forward_loop(
&mut ctx,
Expand All @@ -296,6 +371,11 @@ mod tests {
.await
.expect("loop must finalize once the codec finishes, even with a leaked sender");

assert_eq!(
outcome,
CodecLoopOutcome::Completed,
"a leaked sender that the codec finishes around is still a clean finish"
);
assert!(
matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))),
"buffered result must still be forwarded before finalizing"
Expand Down Expand Up @@ -324,7 +404,7 @@ mod tests {
codec_tx.try_send(vec![0]).unwrap();
assert_eq!(codec_tx.capacity(), 0);

tokio::time::timeout(
let outcome = tokio::time::timeout(
Duration::from_mins(10),
codec_forward_loop(
&mut ctx,
Expand All @@ -340,6 +420,12 @@ mod tests {
)
.await
.expect("idle watchdog must abandon the mid-stream-wedged codec instead of hanging");

assert_eq!(
outcome,
CodecLoopOutcome::WatchdogAbandoned,
"a watchdog-abandoned codec must surface as a degraded run, not a clean finish"
);
}

/// The idle watchdog also bounds a flush that never completes: once input
Expand All @@ -359,7 +445,7 @@ mod tests {
let codec_task = tokio::task::spawn(std::future::pending::<()>());
let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1);

tokio::time::timeout(
let outcome = tokio::time::timeout(
Duration::from_mins(10),
codec_forward_loop(
&mut ctx,
Expand All @@ -375,6 +461,12 @@ mod tests {
)
.await
.expect("idle watchdog must finalize a wedged post-input-close flush");

assert_eq!(
outcome,
CodecLoopOutcome::WatchdogAbandoned,
"an EOS flush abandoned by the watchdog is a truncated, degraded run"
);
}

/// Regression for #540: closing input must re-arm the idle watchdog to a
Expand Down Expand Up @@ -405,7 +497,7 @@ mod tests {
// early — exactly the path the re-arm must lengthen.
let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1);

tokio::time::timeout(
let outcome = tokio::time::timeout(
Duration::from_mins(10),
codec_forward_loop(
&mut ctx,
Expand All @@ -422,6 +514,11 @@ mod tests {
.await
.expect("loop must finalize");

assert_eq!(
outcome,
CodecLoopOutcome::Completed,
"a healthy flush that completes within its re-armed budget is a clean finish"
);
assert!(
matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))),
"a flush packet emitted after input close must be forwarded, not pre-empted \
Expand Down Expand Up @@ -471,4 +568,106 @@ mod tests {
ThreadJoin::Joined
);
}

/// #539: a clean finish emits `Stopped("input_closed")` and returns `Ok`,
/// so a normal end of input is reported as success.
#[test]
fn finalize_maps_clean_finish_to_stopped_ok() {
let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4);
let state_tx = NodeStateSender::new(state_tx, 0);

let result = finalize_codec_run(CodecLoopOutcome::Completed, &state_tx, "node", "Label");

assert!(result.is_ok(), "a clean finish must return Ok");
let update = state_rx.try_recv().expect("a terminal state must be emitted");
assert!(
matches!(update.state, streamkit_core::NodeState::Stopped { .. }),
"a clean finish must emit Stopped, got {:?}",
update.state
);
}

/// #539: a watchdog abandonment emits a terminal `Failed` state and returns
/// `Err`, so a truncated encode is programmatically distinguishable from a
/// complete one instead of masquerading as `Stopped("input_closed")` + `Ok`.
#[test]
fn finalize_maps_watchdog_abandonment_to_failed_err() {
let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4);
let state_tx = NodeStateSender::new(state_tx, 0);

let result =
finalize_codec_run(CodecLoopOutcome::WatchdogAbandoned, &state_tx, "node", "Label");

assert!(
matches!(result, Err(StreamKitError::Codec(_))),
"a watchdog-abandoned run must return a codec error, not Ok"
);
let update = state_rx.try_recv().expect("a terminal state must be emitted");
assert!(
matches!(update.state, streamkit_core::NodeState::Failed { .. }),
"a truncated encode must surface as Failed, got {:?}",
update.state
);
}

/// #539: a codec-task panic truncates output exactly like a watchdog
/// abandonment, so it must surface as `Failed` + `Err`, not a clean stop.
#[test]
fn finalize_maps_codec_panic_to_failed_err() {
let (state_tx, mut state_rx) = mpsc::channel::<NodeStateUpdate>(4);
let state_tx = NodeStateSender::new(state_tx, 0);

let result =
finalize_codec_run(CodecLoopOutcome::CodecPanicked, &state_tx, "node", "Label");

assert!(
matches!(result, Err(StreamKitError::Codec(_))),
"a panicked codec run must return a codec error, not Ok"
);
let update = state_rx.try_recv().expect("a terminal state must be emitted");
assert!(
matches!(update.state, streamkit_core::NodeState::Failed { .. }),
"a panicked codec run must surface as Failed, got {:?}",
update.state
);
}

/// #539: a panicking codec task must end the loop with `CodecPanicked`, not
/// the default `Completed` — otherwise a panic-truncated stream would be
/// finalized as a clean `Stopped("input_closed")` success.
#[tokio::test]
async fn forward_loop_reports_panic_as_degraded() {
let (mut ctx, _mock_out, _state_rx) = create_test_context(HashMap::new(), 1);
let mut stats = NodeStatsTracker::new("test".to_string(), ctx.stats_tx.clone());
let counter = test_counter();

// No results ever arrive; the codec task panics, which must end the loop.
let (_result_tx, mut result_rx) = mpsc::channel::<Result<Vec<u8>, String>>(8);
let mut input_task = tokio::task::spawn(std::future::pending::<()>());
let codec_task = tokio::task::spawn(async { panic!("codec worker blew up") });
let (codec_tx, _codec_rx) = mpsc::channel::<Vec<u8>>(1);

let outcome = tokio::time::timeout(
Duration::from_secs(5),
codec_forward_loop(
&mut ctx,
&mut result_rx,
&mut input_task,
codec_task,
codec_tx,
&counter,
&mut stats,
to_binary,
"test",
),
)
.await
.expect("loop must finalize when the codec task panics");

assert_eq!(
outcome,
CodecLoopOutcome::CodecPanicked,
"a panicked codec task must surface as a degraded run, not a clean finish"
);
}
}
Loading
Loading