diff --git a/crates/nodes/src/audio/codecs/opus.rs b/crates/nodes/src/audio/codecs/opus.rs index 7d11b1d98..bdd45cdb8 100644 --- a/crates/nodes/src/audio/codecs/opus.rs +++ b/crates/nodes/src/audio/codecs/opus.rs @@ -188,7 +188,7 @@ impl ProcessorNode for OpusDecoderNode { tracing::info!("OpusDecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -201,10 +201,12 @@ impl ProcessorNode for OpusDecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - - tracing::info!("OpusDecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "OpusDecoderNode", + ) } } @@ -408,7 +410,7 @@ impl ProcessorNode for OpusEncoderNode { tracing::info!("OpusEncoderNode input stream closed after {} frames", frame_count); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -430,10 +432,12 @@ impl ProcessorNode for OpusEncoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - - tracing::info!("OpusEncoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "OpusEncoderNode", + ) } } diff --git a/crates/nodes/src/codec_utils.rs b/crates/nodes/src/codec_utils.rs index 1678a494c..beb2ddc14 100644 --- a/crates/nodes/src/codec_utils.rs +++ b/crates/nodes/src/codec_utils.rs @@ -7,7 +7,7 @@ use std::time::Duration; use opentelemetry::KeyValue; use streamkit_core::stats::NodeStatsTracker; use streamkit_core::types::Packet; -use streamkit_core::NodeContext; +use streamkit_core::{state_helpers, NodeContext, NodeStateSender, StreamKitError}; use tokio::sync::mpsc; /// Bounds a codec's entire lifetime by output idleness. @@ -26,6 +26,69 @@ use tokio::sync::mpsc; /// `SvtAv1EncoderNode::RECEIVE_THREAD_JOIN_TIMEOUT`). pub(crate) const CODEC_IDLE_TIMEOUT: Duration = Duration::from_mins(1); +/// How a [`codec_forward_loop`] run terminated. +/// +/// Distinguishes a clean finish (input/codec/downstream closed, or shutdown) +/// from a degraded one that leaves the output truncated — an idle-watchdog +/// abandonment or a codec-task panic. A degraded run must surface as a node +/// failure rather than a successful `Stopped("input_closed")` so callers and +/// state subscribers can tell it from a complete one (see #539). +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[must_use] +pub enum CodecLoopOutcome { + /// The loop finalized cleanly: the codec finished, input or the downstream + /// channel closed, or a shutdown was requested. + Completed, + /// The idle watchdog abandoned a wedged codec worker mid-stream or + /// mid-flush. Any output already forwarded is truncated, and the worker's + /// native handle was intentionally leaked (it may still be in a blocking + /// FFI call), so the run is degraded, not successful. + WatchdogAbandoned, + /// The codec task panicked, so the loop ended before the stream was fully + /// processed. Any output already forwarded is truncated, exactly like a + /// watchdog abandonment, so the run is degraded, not successful. + CodecPanicked, +} + +/// Emit the terminal node state and return the run result for a codec node, +/// based on how its [`codec_forward_loop`] ended. +/// +/// A clean finish emits `Stopped("input_closed")` and returns `Ok(())`. A +/// degraded finish (watchdog abandonment or codec panic) emits a terminal +/// `Failed` state and returns an `Err`, making the truncated output observable +/// to callers and state subscribers instead of masquerading as a successful +/// run (#539). +/// +/// # Errors +/// +/// Returns [`StreamKitError::Codec`] when `outcome` is +/// [`CodecLoopOutcome::WatchdogAbandoned`] or [`CodecLoopOutcome::CodecPanicked`], +/// i.e. the codec was abandoned or panicked and its output is truncated. +pub fn finalize_codec_run( + outcome: CodecLoopOutcome, + state_tx: &NodeStateSender, + node_id: &str, + label: &str, +) -> Result<(), StreamKitError> { + let reason = match outcome { + CodecLoopOutcome::Completed => { + state_helpers::emit_stopped(state_tx, node_id, "input_closed"); + tracing::info!("{label} finished"); + return Ok(()); + }, + CodecLoopOutcome::WatchdogAbandoned => format!( + "{label} abandoned a wedged codec worker after {CODEC_IDLE_TIMEOUT:?} of output \ + idleness; output is truncated" + ), + CodecLoopOutcome::CodecPanicked => { + format!("{label} codec task panicked mid-stream; output is truncated") + }, + }; + tracing::error!("{reason}"); + state_helpers::emit_failed(state_tx, node_id, reason.clone()); + Err(StreamKitError::Codec(reason)) +} + /// Result of [`bounded_thread_join`]. #[cfg(any(feature = "svt_av1", test))] #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -90,7 +153,7 @@ pub async fn codec_forward_loop( stats: &mut NodeStatsTracker, to_packet: impl Fn(T) -> Packet + Send + Sync, label: &str, -) { +) -> CodecLoopOutcome { async fn forward_one( packet: Packet, context: &mut NodeContext, @@ -124,6 +187,7 @@ pub async fn codec_forward_loop( let mut input_done = false; let mut codec_done = false; + let mut outcome = CodecLoopOutcome::Completed; // Held in an `Option` so the idle watchdog can observe the codec's input // backpressure; taken (dropped) to signal end-of-input to the codec. let mut codec_tx = Some(codec_tx); @@ -169,6 +233,7 @@ pub async fn codec_forward_loop( if let Err(e) = res { if e.is_panic() { tracing::error!("{label} codec task panicked: {e:?}"); + outcome = CodecLoopOutcome::CodecPanicked; break; } } @@ -188,6 +253,7 @@ pub async fn codec_forward_loop( finalizing instead of hanging (likely native encoder deadlock)", if input_done { "closed" } else { "backed up" } ); + outcome = CodecLoopOutcome::WatchdogAbandoned; break; } idle.as_mut().reset(tokio::time::Instant::now() + CODEC_IDLE_TIMEOUT); @@ -205,6 +271,8 @@ pub async fn codec_forward_loop( if !codec_done { codec_task.abort(); } + + outcome } #[cfg(test)] @@ -214,6 +282,7 @@ mod tests { use crate::test_utils::create_test_context; use std::collections::HashMap; use streamkit_core::stats::NodeStatsTracker; + use streamkit_core::NodeStateUpdate; fn test_counter() -> opentelemetry::metrics::Counter { opentelemetry::global::meter("test").u64_counter("test_drain_packets").build() @@ -239,7 +308,7 @@ mod tests { let codec_task = tokio::task::spawn(async {}); let (codec_tx, _codec_rx) = mpsc::channel::>(1); - tokio::time::timeout( + let outcome = tokio::time::timeout( Duration::from_secs(5), codec_forward_loop( &mut ctx, @@ -255,6 +324,12 @@ mod tests { ) .await .expect("loop must finalize when the codec finishes and closes the channel"); + + assert_eq!( + outcome, + CodecLoopOutcome::Completed, + "a clean finish must report Completed, not a watchdog abandonment" + ); } /// Regression for #540: an SVT-AV1 dimension change can abandon a wedged @@ -279,7 +354,7 @@ mod tests { let codec_task = tokio::task::spawn(async {}); let (codec_tx, _codec_rx) = mpsc::channel::>(1); - tokio::time::timeout( + let outcome = tokio::time::timeout( Duration::from_secs(5), codec_forward_loop( &mut ctx, @@ -296,6 +371,11 @@ mod tests { .await .expect("loop must finalize once the codec finishes, even with a leaked sender"); + assert_eq!( + outcome, + CodecLoopOutcome::Completed, + "a leaked sender that the codec finishes around is still a clean finish" + ); assert!( matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))), "buffered result must still be forwarded before finalizing" @@ -324,7 +404,7 @@ mod tests { codec_tx.try_send(vec![0]).unwrap(); assert_eq!(codec_tx.capacity(), 0); - tokio::time::timeout( + let outcome = tokio::time::timeout( Duration::from_mins(10), codec_forward_loop( &mut ctx, @@ -340,6 +420,12 @@ mod tests { ) .await .expect("idle watchdog must abandon the mid-stream-wedged codec instead of hanging"); + + assert_eq!( + outcome, + CodecLoopOutcome::WatchdogAbandoned, + "a watchdog-abandoned codec must surface as a degraded run, not a clean finish" + ); } /// The idle watchdog also bounds a flush that never completes: once input @@ -359,7 +445,7 @@ mod tests { let codec_task = tokio::task::spawn(std::future::pending::<()>()); let (codec_tx, _codec_rx) = mpsc::channel::>(1); - tokio::time::timeout( + let outcome = tokio::time::timeout( Duration::from_mins(10), codec_forward_loop( &mut ctx, @@ -375,6 +461,12 @@ mod tests { ) .await .expect("idle watchdog must finalize a wedged post-input-close flush"); + + assert_eq!( + outcome, + CodecLoopOutcome::WatchdogAbandoned, + "an EOS flush abandoned by the watchdog is a truncated, degraded run" + ); } /// Regression for #540: closing input must re-arm the idle watchdog to a @@ -405,7 +497,7 @@ mod tests { // early — exactly the path the re-arm must lengthen. let (codec_tx, _codec_rx) = mpsc::channel::>(1); - tokio::time::timeout( + let outcome = tokio::time::timeout( Duration::from_mins(10), codec_forward_loop( &mut ctx, @@ -422,6 +514,11 @@ mod tests { .await .expect("loop must finalize"); + assert_eq!( + outcome, + CodecLoopOutcome::Completed, + "a healthy flush that completes within its re-armed budget is a clean finish" + ); assert!( matches!(mock_out.try_recv().await, Some((_, _, Packet::Binary { .. }))), "a flush packet emitted after input close must be forwarded, not pre-empted \ @@ -471,4 +568,106 @@ mod tests { ThreadJoin::Joined ); } + + /// #539: a clean finish emits `Stopped("input_closed")` and returns `Ok`, + /// so a normal end of input is reported as success. + #[test] + fn finalize_maps_clean_finish_to_stopped_ok() { + let (state_tx, mut state_rx) = mpsc::channel::(4); + let state_tx = NodeStateSender::new(state_tx, 0); + + let result = finalize_codec_run(CodecLoopOutcome::Completed, &state_tx, "node", "Label"); + + assert!(result.is_ok(), "a clean finish must return Ok"); + let update = state_rx.try_recv().expect("a terminal state must be emitted"); + assert!( + matches!(update.state, streamkit_core::NodeState::Stopped { .. }), + "a clean finish must emit Stopped, got {:?}", + update.state + ); + } + + /// #539: a watchdog abandonment emits a terminal `Failed` state and returns + /// `Err`, so a truncated encode is programmatically distinguishable from a + /// complete one instead of masquerading as `Stopped("input_closed")` + `Ok`. + #[test] + fn finalize_maps_watchdog_abandonment_to_failed_err() { + let (state_tx, mut state_rx) = mpsc::channel::(4); + let state_tx = NodeStateSender::new(state_tx, 0); + + let result = + finalize_codec_run(CodecLoopOutcome::WatchdogAbandoned, &state_tx, "node", "Label"); + + assert!( + matches!(result, Err(StreamKitError::Codec(_))), + "a watchdog-abandoned run must return a codec error, not Ok" + ); + let update = state_rx.try_recv().expect("a terminal state must be emitted"); + assert!( + matches!(update.state, streamkit_core::NodeState::Failed { .. }), + "a truncated encode must surface as Failed, got {:?}", + update.state + ); + } + + /// #539: a codec-task panic truncates output exactly like a watchdog + /// abandonment, so it must surface as `Failed` + `Err`, not a clean stop. + #[test] + fn finalize_maps_codec_panic_to_failed_err() { + let (state_tx, mut state_rx) = mpsc::channel::(4); + let state_tx = NodeStateSender::new(state_tx, 0); + + let result = + finalize_codec_run(CodecLoopOutcome::CodecPanicked, &state_tx, "node", "Label"); + + assert!( + matches!(result, Err(StreamKitError::Codec(_))), + "a panicked codec run must return a codec error, not Ok" + ); + let update = state_rx.try_recv().expect("a terminal state must be emitted"); + assert!( + matches!(update.state, streamkit_core::NodeState::Failed { .. }), + "a panicked codec run must surface as Failed, got {:?}", + update.state + ); + } + + /// #539: a panicking codec task must end the loop with `CodecPanicked`, not + /// the default `Completed` — otherwise a panic-truncated stream would be + /// finalized as a clean `Stopped("input_closed")` success. + #[tokio::test] + async fn forward_loop_reports_panic_as_degraded() { + let (mut ctx, _mock_out, _state_rx) = create_test_context(HashMap::new(), 1); + let mut stats = NodeStatsTracker::new("test".to_string(), ctx.stats_tx.clone()); + let counter = test_counter(); + + // No results ever arrive; the codec task panics, which must end the loop. + let (_result_tx, mut result_rx) = mpsc::channel::, String>>(8); + let mut input_task = tokio::task::spawn(std::future::pending::<()>()); + let codec_task = tokio::task::spawn(async { panic!("codec worker blew up") }); + let (codec_tx, _codec_rx) = mpsc::channel::>(1); + + let outcome = tokio::time::timeout( + Duration::from_secs(5), + codec_forward_loop( + &mut ctx, + &mut result_rx, + &mut input_task, + codec_task, + codec_tx, + &counter, + &mut stats, + to_binary, + "test", + ), + ) + .await + .expect("loop must finalize when the codec task panics"); + + assert_eq!( + outcome, + CodecLoopOutcome::CodecPanicked, + "a panicked codec task must surface as a degraded run, not a clean finish" + ); + } } diff --git a/crates/nodes/src/video/av1.rs b/crates/nodes/src/video/av1.rs index da5c560b9..70a7168b6 100644 --- a/crates/nodes/src/video/av1.rs +++ b/crates/nodes/src/video/av1.rs @@ -322,7 +322,7 @@ impl ProcessorNode for Av1DecoderNode { tracing::info!("Av1DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -335,9 +335,12 @@ impl ProcessorNode for Av1DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("Av1DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "Av1DecoderNode", + ) } } diff --git a/crates/nodes/src/video/dav1d.rs b/crates/nodes/src/video/dav1d.rs index dca8aaf9b..abc044ee5 100644 --- a/crates/nodes/src/video/dav1d.rs +++ b/crates/nodes/src/video/dav1d.rs @@ -201,7 +201,7 @@ impl ProcessorNode for Dav1dDecoderNode { tracing::info!("Dav1dDecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -214,9 +214,12 @@ impl ProcessorNode for Dav1dDecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("Dav1dDecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "Dav1dDecoderNode", + ) } } diff --git a/crates/nodes/src/video/encoder_trait.rs b/crates/nodes/src/video/encoder_trait.rs index c17b63644..bd2cb7a3b 100644 --- a/crates/nodes/src/video/encoder_trait.rs +++ b/crates/nodes/src/video/encoder_trait.rs @@ -233,7 +233,7 @@ pub async fn run_encoder( tracing::info!("{node_label} input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -250,9 +250,7 @@ pub async fn run_encoder( ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("{} finished", E::NODE_LABEL); - Ok(()) + crate::codec_utils::finalize_codec_run(outcome, &context.state_tx, &node_name, E::NODE_LABEL) } // Layer 2: Codec-level trait (single-thread encoders: VP9, AV1) @@ -547,4 +545,67 @@ mod tests { // The repeat path is exercised because consecutive_slow // is a multiple of WARN_REPEAT_INTERVAL while warned is true. } + + /// #539 end-to-end: when the EOS-flush idle watchdog abandons a wedged + /// codec, `run_encoder` must surface a terminal `Failed` state and return + /// `Err` — not `Stopped("input_closed")` + `Ok` — so a truncated encode is + /// observable to callers and state subscribers. + #[tokio::test(start_paused = true)] + #[allow(clippy::expect_used)] // tests should fail loudly on setup errors + async fn run_encoder_surfaces_watchdog_trip_as_failure() { + use crate::test_utils::create_test_context; + use std::collections::HashMap; + + struct StuckEncoder; + + impl EncoderNodeRunner for StuckEncoder { + const CONTENT_TYPE: &'static str = "video/test"; + const NODE_LABEL: &'static str = "StuckEncoderNode"; + const PACKETS_COUNTER_NAME: &'static str = "test_stuck_packets"; + const DURATION_HISTOGRAM_NAME: &'static str = "test_stuck_duration"; + + fn spawn_codec_task( + self, + encode_rx: mpsc::Receiver<(VideoFrame, Option)>, + result_tx: mpsc::Sender>, + _duration_histogram: opentelemetry::metrics::Histogram, + ) -> tokio::task::JoinHandle<()> { + // Hold both channel ends so the result receiver never closes, + // and never produce output or finish: stands in for a codec + // wedged in a blocking flush that only the watchdog can end. + tokio::spawn(async move { + let _held = (encode_rx, result_tx); + std::future::pending::<()>().await; + }) + } + } + + // Input "in" is already closed, so the encoder goes straight to its + // EOS flush, which here never produces output. + let (in_tx, in_rx) = mpsc::channel::(1); + drop(in_tx); + let mut inputs = HashMap::new(); + inputs.insert("in".to_string(), in_rx); + + let (context, _mock_out, mut state_rx) = create_test_context(inputs, 1); + + let result = + tokio::time::timeout(Duration::from_mins(10), run_encoder(StuckEncoder, context)) + .await + .expect("run_encoder must finalize once the watchdog abandons the codec"); + + assert!( + matches!(result, Err(StreamKitError::Codec(_))), + "a watchdog-abandoned flush must return a codec error, got {result:?}" + ); + + let mut terminal = None; + while let Ok(update) = state_rx.try_recv() { + terminal = Some(update.state); + } + assert!( + matches!(terminal, Some(streamkit_core::NodeState::Failed { .. })), + "the terminal state must be Failed, got {terminal:?}" + ); + } } diff --git a/crates/nodes/src/video/nv_av1.rs b/crates/nodes/src/video/nv_av1.rs index a339c631e..41bc6893b 100644 --- a/crates/nodes/src/video/nv_av1.rs +++ b/crates/nodes/src/video/nv_av1.rs @@ -282,7 +282,7 @@ impl ProcessorNode for NvAv1DecoderNode { tracing::info!("NvAv1DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -295,9 +295,12 @@ impl ProcessorNode for NvAv1DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("NvAv1DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "NvAv1DecoderNode", + ) } } diff --git a/crates/nodes/src/video/pixel_convert.rs b/crates/nodes/src/video/pixel_convert.rs index 8afbbfa30..894eb22d9 100644 --- a/crates/nodes/src/video/pixel_convert.rs +++ b/crates/nodes/src/video/pixel_convert.rs @@ -268,7 +268,7 @@ impl ProcessorNode for PixelConvertNode { tracing::info!("PixelConvertNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -281,9 +281,12 @@ impl ProcessorNode for PixelConvertNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("PixelConvertNode shutting down."); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "PixelConvertNode", + ) } } diff --git a/crates/nodes/src/video/vaapi_av1.rs b/crates/nodes/src/video/vaapi_av1.rs index b85cc0621..19b1adc57 100644 --- a/crates/nodes/src/video/vaapi_av1.rs +++ b/crates/nodes/src/video/vaapi_av1.rs @@ -604,7 +604,7 @@ impl ProcessorNode for VaapiAv1DecoderNode { tracing::info!("VaapiAv1DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -617,9 +617,12 @@ impl ProcessorNode for VaapiAv1DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("VaapiAv1DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "VaapiAv1DecoderNode", + ) } } diff --git a/crates/nodes/src/video/vaapi_h264.rs b/crates/nodes/src/video/vaapi_h264.rs index bc21db614..8dad7f910 100644 --- a/crates/nodes/src/video/vaapi_h264.rs +++ b/crates/nodes/src/video/vaapi_h264.rs @@ -207,7 +207,7 @@ impl ProcessorNode for VaapiH264DecoderNode { tracing::info!("VaapiH264DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -220,9 +220,12 @@ impl ProcessorNode for VaapiH264DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("VaapiH264DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "VaapiH264DecoderNode", + ) } } diff --git a/crates/nodes/src/video/vp9.rs b/crates/nodes/src/video/vp9.rs index 4103bb4b0..d3ceec440 100644 --- a/crates/nodes/src/video/vp9.rs +++ b/crates/nodes/src/video/vp9.rs @@ -269,7 +269,7 @@ impl ProcessorNode for Vp9DecoderNode { tracing::info!("Vp9DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -282,9 +282,12 @@ impl ProcessorNode for Vp9DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("Vp9DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "Vp9DecoderNode", + ) } } diff --git a/crates/nodes/src/video/vulkan_video.rs b/crates/nodes/src/video/vulkan_video.rs index 0ac4be238..e84336bf9 100644 --- a/crates/nodes/src/video/vulkan_video.rs +++ b/crates/nodes/src/video/vulkan_video.rs @@ -286,7 +286,7 @@ impl ProcessorNode for VulkanVideoH264DecoderNode { tracing::info!("VulkanVideoH264DecoderNode input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -299,9 +299,12 @@ impl ProcessorNode for VulkanVideoH264DecoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("VulkanVideoH264DecoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run( + outcome, + &context.state_tx, + &node_name, + "VulkanVideoH264DecoderNode", + ) } } @@ -682,7 +685,7 @@ impl ProcessorNode for VulkanVideoH264EncoderNode { tracing::info!("{node_label} input stream closed"); }); - crate::codec_utils::codec_forward_loop( + let outcome = crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -699,9 +702,7 @@ impl ProcessorNode for VulkanVideoH264EncoderNode { ) .await; - state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("VulkanVideoH264EncoderNode finished"); - Ok(()) + crate::codec_utils::finalize_codec_run(outcome, &context.state_tx, &node_name, node_label) } }