fix #539: surface SVT-AV1 EOS-flush watchdog trip as node failure#615
Conversation
When the bounded EOS-flush idle watchdog abandons a wedged codec task,
`codec_forward_loop` now reports a `WatchdogAbandoned` outcome instead of
silently finalizing. Encoder/decoder nodes map that outcome via the shared
`finalize_codec_run` helper to a terminal `NodeState::Failed` and an `Err`
return, so a truncated/degraded encode is programmatically distinguishable
from a clean `Stopped("input_closed")` + `Ok(())`.
The deliberate handle-leak tradeoff from #602/#539 is preserved: the
detached worker may still be in a blocking FFI call, so nothing it owns is
dropped.
Refs #539
Signed-off-by: streamkit-devin <devin@streamkit.dev>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #615 +/- ##
========================================
Coverage 84.99% 84.99%
========================================
Files 248 247 -1
Lines 74624 74733 +109
Branches 2381 2378 -3
========================================
+ Hits 63423 63523 +100
- Misses 11196 11205 +9
Partials 5 5
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
📝 Info: Mechanical transformation verified complete across all codec nodes
Confirmed via grep that every file calling codec_forward_loop (opus encoder/decoder, av1 encoder/decoder, dav1d decoder, vp9 encoder/decoder, nv_av1 decoder, vaapi_av1 decoder, vaapi_h264 decoder, vulkan_video h264 encoder/decoder, pixel_convert, and all EncoderNodeRunner implementors via run_encoder) now captures the CodecLoopOutcome and passes it to finalize_codec_run. No call site was missed. SVT-AV1 is covered through the shared run_encoder path in encoder_trait.rs:253.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| #[must_use] | ||
| pub enum CodecLoopOutcome { | ||
| /// The loop finalized cleanly: the codec finished, input or the downstream | ||
| /// channel closed, or a shutdown was requested. | ||
| Completed, | ||
| /// The idle watchdog abandoned a wedged codec worker mid-stream or | ||
| /// mid-flush. Any output already forwarded is truncated, and the worker's | ||
| /// native handle was intentionally leaked (it may still be in a blocking | ||
| /// FFI call), so the run is degraded, not successful. | ||
| WatchdogAbandoned, | ||
| } |
There was a problem hiding this comment.
📝 Info: CodecLoopOutcome is pub, expanding the crate's public API surface
CodecLoopOutcome at codec_utils.rs:38 and finalize_codec_run at codec_utils.rs:62 are both pub (not pub(crate)). Since codec_forward_loop is also pub, this is consistent. However, CODEC_IDLE_TIMEOUT remains pub(crate). If these are only intended for internal use within the nodes crate, pub(crate) would be more restrictive. Currently the nodes crate doesn't appear to be consumed as a library by external code, so this is a minor API hygiene point rather than a real concern.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
A panicking codec task left the forward loop's outcome at its default
Completed, so finalize_codec_run emitted Stopped("input_closed") + Ok(())
for a panic-truncated stream \u2014 the same false success #539 set out to
eliminate. Add a CodecPanicked outcome that maps to Failed + Err.
Also generalize the abandonment message from "encoded output is
truncated" to "output is truncated", since finalize_codec_run is shared
by decoders and the pixel-convert node, not just encoders.
Signed-off-by: streamkit-devin <devin@streamkit.dev>
Signed-off-by: streamkit-devin <devin@streamkit.dev>
After main merged the NodeStateSender refactor (#606), NodeContext.state_tx is a NodeStateSender, not a raw mpsc::Sender<NodeStateUpdate>. Correct the finalize_codec_run parameter type so it matches every call site and the state_helpers::emit_* calls, and wrap the raw channels in the unit tests with NodeStateSender::new. Signed-off-by: streamkit-devin <devin@streamkit.dev>
| #[must_use] | ||
| pub enum CodecLoopOutcome { |
There was a problem hiding this comment.
📝 Info: The #[must_use] on CodecLoopOutcome prevents silent ignoring of degraded outcomes
The #[must_use] attribute on CodecLoopOutcome (codec_utils.rs:37) is a well-chosen compile-time safety net: if any future caller of codec_forward_loop forgets to pass the outcome to finalize_codec_run, the compiler will emit a warning. This is the kind of annotation that prevents the exact regression this PR fixes (silently treating a degraded run as successful).
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| tracing::error!("{reason}"); | ||
| state_helpers::emit_failed(state_tx, node_id, reason.clone()); | ||
| Err(StreamKitError::Codec(reason)) |
There was a problem hiding this comment.
📝 Info: Minor: reason string is cloned where it could be avoided
In finalize_codec_run, reason.clone() at line 88 creates a copy of the error string so it can be passed to both emit_failed and StreamKitError::Codec. Since emit_failed takes impl Into<String>, you could pass &reason (via a &str → String conversion inside emit_failed) or reorder to pass the owned reason to Err(StreamKitError::Codec(reason)) first and a reference to emit_failed. This is a negligible allocation on an error path, so not worth changing — just noting for completeness.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
| pub fn finalize_codec_run( | ||
| outcome: CodecLoopOutcome, | ||
| state_tx: &NodeStateSender, | ||
| node_id: &str, | ||
| label: &str, | ||
| ) -> Result<(), StreamKitError> { | ||
| let reason = match outcome { | ||
| CodecLoopOutcome::Completed => { | ||
| state_helpers::emit_stopped(state_tx, node_id, "input_closed"); | ||
| tracing::info!("{label} finished"); | ||
| return Ok(()); | ||
| }, | ||
| CodecLoopOutcome::WatchdogAbandoned => format!( | ||
| "{label} abandoned a wedged codec worker after {CODEC_IDLE_TIMEOUT:?} of output \ | ||
| idleness; output is truncated" | ||
| ), | ||
| CodecLoopOutcome::CodecPanicked => { | ||
| format!("{label} codec task panicked mid-stream; output is truncated") | ||
| }, | ||
| }; | ||
| tracing::error!("{reason}"); | ||
| state_helpers::emit_failed(state_tx, node_id, reason.clone()); | ||
| Err(StreamKitError::Codec(reason)) | ||
| } |
There was a problem hiding this comment.
🚩 Behavioral change: codec nodes now return Err on degraded runs where they previously returned Ok
Before this PR, all codec nodes unconditionally called emit_stopped(... "input_closed") and returned Ok(()) regardless of whether the codec was abandoned by the watchdog or panicked. After this PR, watchdog-abandoned and panicked codec runs return Err(StreamKitError::Codec(...)) and emit a Failed state. This is an intentional behavioral change (per #539), but callers that previously relied on codec run() always returning Ok (e.g., error handling in the engine/pipeline executor) should be verified to handle the new Err gracefully. The engine likely already handles node run() errors, but worth confirming no upstream code treats a codec Err differently from other node errors in a way that could cause issues (e.g., retry loops, error propagation to clients).
Was this helpful? React with 👍 or 👎 to provide feedback.
Summary
run_encoder(and the decoder nodes) still emittedStopped("input_closed")+ returnedOk(()), so a truncated/degraded encode was indistinguishable from a successful one programmatically (issue SVT-AV1 EOS-flush watchdog trip should surface as an explicit node failure #539).codec_forward_loopnow returns aCodecLoopOutcome(Completed|WatchdogAbandoned|CodecPanicked). A new sharedfinalize_codec_runhelper maps it: a clean finish keepsStopped("input_closed")+Ok(()); a watchdog abandonment or a codec-task panic emits a terminalNodeState::Failedand returnsErr(StreamKitError::Codec(..)). All codec node call sites (encoders viarun_encoder, plus the VP9/AV1/dav1d/VA-API/NVENC/Vulkan decoders, pixel-convert and Opus) route through it, so the new failure contract is uniform.spawn_blockingworker may still be inside a blockingget_packetFFI call, so nothing it owns is dropped — this PR only changes the failure-contract/observability, it does not touch the leak.Pseudocode of the new contract:
Review & Validation
WatchdogAbandoned, and the panic branch the only one yieldingCodecPanicked— a normal codec/input/downstream close or shutdown must stayCompleted(i.e. don't regress clean EOS into a spurious failure).get_packetis dropped.cargo test -p streamkit-nodes(870 passed) and default-featurelint(fmt + clippy) pass locally. Tests added:codec_utilsoutcome assertions on the existing watchdog regression tests,finalize_codec_runmapping (Completed→Stopped/Ok, Abandoned→Failed/Err, Panicked→Failed/Err), a loop-levelforward_loop_reports_panic_as_degraded, and an end-to-endrun_encoder_surfaces_watchdog_trip_as_failure.Notes
CodecPanicked→Failed/Err), closing the same false-success gap as the watchdog trip rather than leaving panics reported as a clean stop.200and the node task handles are never awaited for the response (apps/skit/src/server/oneshot.rsreturns the body stream as soon as the graph is spawned). A node returningErronly ends the output channel, so a truncated oneshot encode still returns HTTP 200 with a short body — the newFailedstate is observable only via state/stats subscribers. SVT-AV1 EOS-flush watchdog trip should surface as an explicit node failure #539's "callers can tell a degraded encode from a complete one" is therefore fully met for the dynamic/subscriber path but not for oneshot HTTP clients; surfacing it there (await/collect node results before finalizing the response) is left as a separate change.vaapi,nvcodec,vulkan_video,dav1d) received the same mechanical change but could not be compiled locally (missinglibva/CUDA/Vulkan/dav1d system libs) — relying on the feature-gated CI jobs to validate them.Closes #539
Link to Devin session: https://staging.itsdev.in/sessions/7a7aa10058014ad5a38ef182be036fb0
Requested by: @streamer45
Devin Review
022de6a