From 243e7e1b83635433f4c0efa2657e0a0839f2309e Mon Sep 17 00:00:00 2001 From: Ajay Thorve Date: Thu, 4 Jun 2026 00:10:06 -0700 Subject: [PATCH 1/2] fix: suppress Claude Code lifecycle noise Signed-off-by: Ajay Thorve --- crates/cli/src/alignment/claude_code.rs | 36 +++ crates/cli/src/alignment/mod.rs | 40 +++ crates/cli/src/gateway.rs | 35 +++ crates/cli/src/session.rs | 55 +++- .../coverage/alignment_claude_code_tests.rs | 67 +++- crates/cli/tests/coverage/server_tests.rs | 108 ++++++- crates/cli/tests/coverage/session_tests.rs | 290 ++++++++++++++++++ docs/nemo-relay-cli/claude-code.mdx | 42 ++- .../coding-agents/claude-code/README.md | 27 ++ 9 files changed, 689 insertions(+), 11 deletions(-) diff --git a/crates/cli/src/alignment/claude_code.rs b/crates/cli/src/alignment/claude_code.rs index e1abcccc..26871522 100644 --- a/crates/cli/src/alignment/claude_code.rs +++ b/crates/cli/src/alignment/claude_code.rs @@ -8,6 +8,8 @@ //! session state machine. use axum::http::HeaderMap; +use nemo_relay::api::llm::LlmRequest; +use serde_json::Value; use crate::alignment::json_string_at; use crate::config::header_string; @@ -26,6 +28,40 @@ pub(crate) fn session_id_from_headers(headers: &HeaderMap) -> Option { header_string(headers, "x-claude-code-session-id") } +// Claude Code can issue a tiny pre-user startup probe through the Anthropic gateway before the +// first UserPromptSubmit hook. Treating it as normal LLM work pollutes traces with an unparented +// `user: test` span, so alignment classifies only this native-header plus exact-body harness probe +// for suppression. +pub(crate) fn is_startup_probe( + provider: &str, + model_name: Option<&str>, + request: &LlmRequest, +) -> bool { + if provider != "anthropic.messages" { + return false; + } + if !request.headers.contains_key("x-claude-code-session-id") { + return false; + } + let model = model_name + .or_else(|| request.content.get("model").and_then(Value::as_str)) + .unwrap_or_default(); + if !model.starts_with("claude-") { + return false; + } + if request.content.get("max_tokens").and_then(Value::as_u64) != Some(1) { + return false; + } + let Some(messages) = request.content.get("messages").and_then(Value::as_array) else { + return false; + }; + let [message] = messages.as_slice() else { + return false; + }; + message.get("role").and_then(Value::as_str) == Some("user") + && message.get("content").and_then(Value::as_str) == Some("test") +} + // Claude's `Agent` tool can report either an asynchronous launch acknowledgement or a terminal // worker result. Only the terminal result should close the subagent scope; otherwise parallel // workers that launch in the background are closed before their later tool/LLM hooks arrive. diff --git a/crates/cli/src/alignment/mod.rs b/crates/cli/src/alignment/mod.rs index 65dbbfdf..3d32a46a 100644 --- a/crates/cli/src/alignment/mod.rs +++ b/crates/cli/src/alignment/mod.rs @@ -50,6 +50,28 @@ pub(crate) enum GatewayRouteKind { AnthropicCountTokens, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum GatewayManagementPolicy { + Managed, + UnmanagedProbe { + status: &'static str, + source: &'static str, + }, +} + +impl GatewayManagementPolicy { + pub(crate) fn bypasses_managed_pipeline(self) -> bool { + matches!(self, Self::UnmanagedProbe { .. }) + } + + pub(crate) fn bypass_correlation(self) -> Option<(&'static str, &'static str)> { + match self { + Self::Managed => None, + Self::UnmanagedProbe { status, source } => Some((status, source)), + } + } +} + // Records that a provider-created child session is really a subagent under another session. The // session manager stores this until the child emits its terminal AgentEnded event, then removes the // alias so future unrelated events cannot be reparented through stale state. @@ -406,6 +428,24 @@ pub(crate) fn agent_kind_for_gateway_provider(provider: &str) -> AgentKind { } } +pub(crate) fn gateway_management_policy( + agent_kind: AgentKind, + provider: &str, + model_name: Option<&str>, + request: &LlmRequest, +) -> GatewayManagementPolicy { + if agent_kind == AgentKind::ClaudeCode + && claude_code::is_startup_probe(provider, model_name, request) + { + GatewayManagementPolicy::UnmanagedProbe { + status: "pre_turn_probe", + source: "claude_startup_probe", + } + } else { + GatewayManagementPolicy::Managed + } +} + // Not every harness has a reliable process/session end signal. Claude Code and Codex sessions can // outlive a user-visible run, so the CLI represents their work with bounded turn scopes instead of // exporting a long-lived agent scope that needs synthetic termination. diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index 56154ad8..492507dd 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -181,6 +181,16 @@ async fn run_managed_gateway( prepared: PreparedGatewayRequest, prep: GatewayCallPrep, ) -> Result, CliError> { + if prep.bypass_managed_pipeline { + let session_id = prep.session_id.clone(); + let model = prep.model_name.as_deref().unwrap_or(""); + eprintln!( + "nemo-relay CLI gateway: bypassing managed LLM observability for Claude Code startup probe session={session_id} provider={} model={model}", + prep.provider_name + ); + state.sessions.finish_gateway_call(&session_id).await; + return run_unmanaged_gateway(state, prepared).await; + } let codecs = codecs_for_route(prepared.provider); if prepared.streaming { run_managed_streaming(state, prepared, prep, codecs).await @@ -189,6 +199,29 @@ async fn run_managed_gateway( } } +async fn run_unmanaged_gateway( + state: AppState, + prepared: PreparedGatewayRequest, +) -> Result, CliError> { + if prepared.streaming { + return passthrough_streaming(state, prepared).await; + } + let response = forward_upstream_request( + &state.http, + &prepared.method, + &prepared.upstream_url, + &prepared.body_bytes, + &prepared.headers, + None, + prepared.provider, + ) + .await?; + let status = response.status(); + let headers = response_headers(response.headers()); + let bytes = response.bytes().await?; + build_response(status, headers, Body::from(bytes)) +} + // Codecs registered for each managed provider route. Routes that emit LLM events but lack a typed // codec (count_tokens) return `None` so the runtime still wraps the call but skips annotation. struct RouteCodecs { @@ -246,6 +279,7 @@ async fn run_managed_buffered( metadata, model_name, owner_subagent_id, + bypass_managed_pipeline: _, } = prep; let provider_for_event = provider_name.clone(); let params = LlmCallExecuteParams::builder() @@ -400,6 +434,7 @@ async fn run_managed_streaming( metadata, model_name, owner_subagent_id, + bypass_managed_pipeline: _, } = prep; let params = LlmStreamCallExecuteParams::builder() .name(provider_name) diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index dc5ed526..12703a0f 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -24,8 +24,8 @@ use serde_json::{Map, Value, json}; use tokio::sync::Mutex; use crate::alignment::{ - self, PendingSubagentStart, SessionAlias, SessionAlignmentState, insert_optional, - json_string_at, json_value_at, merge_metadata, + self, GatewayManagementPolicy, PendingSubagentStart, SessionAlias, SessionAlignmentState, + insert_optional, json_string_at, json_value_at, merge_metadata, }; use crate::config::{GatewayConfig, SessionConfig}; use crate::error::CliError; @@ -92,6 +92,7 @@ pub(crate) struct GatewayCallPrep { pub(crate) metadata: Value, pub(crate) model_name: Option, pub(crate) owner_subagent_id: Option, + pub(crate) bypass_managed_pipeline: bool, } struct Session { @@ -965,12 +966,19 @@ impl Session { let stack = self.scope_stack.clone(); let result = TASK_SCOPE_STACK .scope(stack.clone(), async { - self.ensure_turn_started(Value::Null)?; + let policy = self.gateway_management_policy(&start); + if !policy.bypasses_managed_pipeline() { + self.ensure_turn_started(Value::Null)?; + } let mut attributes = LlmAttributes::empty(); if start.streaming { attributes |= LlmAttributes::STREAMING; } - let owner = self.resolve_llm_owner(&start); + let owner = if policy.bypasses_managed_pipeline() { + self.unmanaged_probe_owner(policy) + } else { + self.resolve_llm_owner(&start) + }; self.record_llm_request_affinity( &start.request, owner.subagent_id.as_deref(), @@ -996,6 +1004,7 @@ impl Session { metadata, model_name: start.model_name, owner_subagent_id: owner.subagent_id, + bypass_managed_pipeline: policy.bypasses_managed_pipeline(), }) }) .await; @@ -1065,6 +1074,18 @@ impl Session { self.open_turn(event_metadata, Value::Null, "implicit") } + fn gateway_management_policy(&self, start: &LlmGatewayStart) -> GatewayManagementPolicy { + if self.turn_scope.is_some() { + return GatewayManagementPolicy::Managed; + } + alignment::gateway_management_policy( + self.agent_kind, + &start.provider, + start.model_name.as_deref(), + &start.request, + ) + } + fn open_turn( &mut self, event_metadata: Value, @@ -1319,19 +1340,22 @@ impl Session { Ok(()) } - // Ends a subagent by id. Unknown endings become mark events, while duplicate endings for a - // subagent already closed by another provider-specific completion signal are ignored. Applies - // to Claude Code Agent-tool completion today. + // Ends a subagent by id. Unknown endings usually become mark events, while duplicate endings for + // a subagent already closed by another provider-specific completion signal are ignored. Claude + // Code can also report late orphan stops after a turn has closed; those are logged and ignored + // when there is no active turn so they cannot create lifecycle-only traces. async fn end_subagent(&mut self, event: SubagentEvent) -> Result<(), CliError> { if self.completed_subagents.contains(&event.subagent_id) { return Ok(()); } - self.ensure_turn_started(event.metadata.clone())?; if !self.subagents.contains_key(&event.subagent_id) { eprintln!( "nemo-relay CLI gateway: received {} for subagent {} without a matching start", event.event_name, event.subagent_id ); + if self.agent_kind == AgentKind::ClaudeCode && self.turn_scope.is_none() { + return Ok(()); + } return self.mark( "subagent_end_without_start", SessionEvent { @@ -1343,6 +1367,7 @@ impl Session { }, ); }; + self.ensure_turn_started(event.metadata.clone())?; self.close_subagent_scope(&event.subagent_id, event.payload) .await?; Ok(()) @@ -1826,6 +1851,20 @@ impl Session { } } + fn unmanaged_probe_owner(&self, policy: GatewayManagementPolicy) -> LlmOwnerResolution { + let (status, source) = policy + .bypass_correlation() + .expect("unmanaged probe owner requires unmanaged gateway policy"); + LlmOwnerResolution { + parent: self.root_work_scope(), + subagent_id: None, + status, + source: Some(source.to_string()), + hint: None, + metadata: Value::Null, + } + } + // Converts a consumed hint into an ownership resolution. If the hinted subagent is not // currently active, the LLM is attached to the turn scope but the hint metadata is still // preserved for correlation diagnostics. diff --git a/crates/cli/tests/coverage/alignment_claude_code_tests.rs b/crates/cli/tests/coverage/alignment_claude_code_tests.rs index be590a99..032453ec 100644 --- a/crates/cli/tests/coverage/alignment_claude_code_tests.rs +++ b/crates/cli/tests/coverage/alignment_claude_code_tests.rs @@ -2,7 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use axum::http::HeaderValue; -use serde_json::{Value, json}; +use nemo_relay::api::llm::LlmRequest; +use serde_json::{Map, Value, json}; use super::*; @@ -44,6 +45,70 @@ fn session_id_from_headers_reads_claude_native_header() { ); } +#[test] +fn startup_probe_matches_only_claude_code_preflight_shape() { + let request = LlmRequest { + headers: Map::from_iter([( + "x-claude-code-session-id".to_string(), + json!("claude-session"), + )]), + content: json!({ + "model": "claude-sonnet-4-5", + "max_tokens": 1, + "messages": [ + { + "role": "user", + "content": "test" + } + ] + }), + }; + + assert!(is_startup_probe( + "anthropic.messages", + Some("claude-sonnet-4-5"), + &request + )); + assert!(!is_startup_probe( + "anthropic.count_tokens", + Some("claude-sonnet-4-5"), + &request + )); + assert!(!is_startup_probe( + "anthropic.messages", + Some("gpt-test"), + &request + )); + let missing_claude_header = LlmRequest { + headers: Default::default(), + content: request.content.clone(), + }; + assert!(!is_startup_probe( + "anthropic.messages", + Some("claude-sonnet-4-5"), + &missing_claude_header + )); + + let real_prompt = LlmRequest { + headers: request.headers.clone(), + content: json!({ + "model": "claude-sonnet-4-5", + "max_tokens": 1, + "messages": [ + { + "role": "user", + "content": "real user work" + } + ] + }), + }; + assert!(!is_startup_probe( + "anthropic.messages", + Some("claude-sonnet-4-5"), + &real_prompt + )); +} + #[test] fn completed_subagent_from_agent_tool_accepts_known_result_keys() { for (key, expected) in [ diff --git a/crates/cli/tests/coverage/server_tests.rs b/crates/cli/tests/coverage/server_tests.rs index 12caff1f..1d6ac2ad 100644 --- a/crates/cli/tests/coverage/server_tests.rs +++ b/crates/cli/tests/coverage/server_tests.rs @@ -12,9 +12,11 @@ use axum::response::IntoResponse; use bytes::Bytes; use futures_util::stream; use http_body_util::BodyExt; +use nemo_relay::api::event::ScopeCategory; use nemo_relay::api::registry::{ deregister_tool_conditional_execution_guardrail, register_tool_conditional_execution_guardrail, }; +use nemo_relay::api::subscriber::{deregister_subscriber, flush_subscribers, register_subscriber}; use nemo_relay::plugin::{ ConfigDiagnostic, Plugin, PluginRegistration, PluginRegistrationContext, deregister_plugin, register_plugin, @@ -835,6 +837,95 @@ async fn gateway_forwards_anthropic_count_tokens_without_llm_codec() { assert_eq!(body["input_tokens"], json!(12)); } +#[tokio::test] +async fn gateway_forwards_claude_startup_probe_without_llm_observability() { + let subscriber_name = "server-claude-startup-probe-no-llm-test"; + let _ = deregister_subscriber(subscriber_name); + let captured_llm_starts = Arc::new(std::sync::Mutex::new(Vec::::new())); + let captured = captured_llm_starts.clone(); + register_subscriber( + subscriber_name, + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::Start) + && event.name() == "anthropic.messages" + && event + .metadata() + .and_then(|metadata| metadata.get("gateway_path")) + .and_then(Value::as_str) + == Some("/v1/messages") + && event + .input() + .and_then(|input| input.get("model")) + .and_then(Value::as_str) + == Some("claude-opus-4-8[1m]") + && event + .input() + .and_then(|input| input.get("max_tokens")) + .and_then(Value::as_u64) + == Some(1) + && event + .input() + .and_then(|input| input.get("messages")) + .and_then(Value::as_array) + .and_then(|messages| messages.first()) + .and_then(|message| message.get("content")) + .and_then(Value::as_str) + == Some("test") + { + captured.lock().unwrap().push(json!({ + "input": event.input().cloned().unwrap_or(Value::Null), + "metadata": event.metadata().cloned().unwrap_or(Value::Null) + })); + } + }), + ) + .unwrap(); + + let upstream = spawn_anthropic_upstream().await; + let mut config = test_config(); + config.anthropic_base_url = upstream.url(); + let app = router(config); + let response = app + .oneshot( + Request::builder() + .method("POST") + .uri("/v1/messages") + .header("content-type", "application/json") + .header("x-api-key", "sk-ant-test") + .header("x-claude-code-session-id", "claude-probe") + .body(Body::from( + json!({ + "model": "claude-opus-4-8[1m]", + "max_tokens": 1, + "messages": [ + { + "role": "user", + "content": "test" + } + ] + }) + .to_string(), + )) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let bytes = response.into_body().collect().await.unwrap().to_bytes(); + let body: Value = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(body["path"], json!("/v1/messages")); + assert_eq!(body["model"], json!("claude-opus-4-8[1m]")); + assert_eq!(body["prompt"], json!("test")); + + flush_subscribers().unwrap(); + assert!( + captured_llm_starts.lock().unwrap().is_empty(), + "Claude startup probe must not emit a managed LLM span" + ); + deregister_subscriber(subscriber_name).unwrap(); +} + async fn wait_for_gateway(url: &str) { let client = test_http_client(); for _ in 0..50 { @@ -953,6 +1044,19 @@ async fn spawn_models_upstream() -> TestServer { } async fn spawn_anthropic_upstream() -> TestServer { + async fn messages(headers: HeaderMap, request: Request) -> impl IntoResponse { + let body = request.into_body().collect().await.unwrap().to_bytes(); + let payload: Value = serde_json::from_slice(&body).unwrap(); + Json(json!({ + "path": "/v1/messages", + "x_api_key": headers + .get("x-api-key") + .and_then(|value| value.to_str().ok()), + "model": payload["model"], + "prompt": payload["messages"][0]["content"] + })) + } + async fn count_tokens(headers: HeaderMap, request: Request) -> impl IntoResponse { Json(json!({ "path": request.uri().path(), @@ -963,7 +1067,9 @@ async fn spawn_anthropic_upstream() -> TestServer { })) } - let app = Router::new().route("/v1/messages/count_tokens", post(count_tokens)); + let app = Router::new() + .route("/v1/messages", post(messages)) + .route("/v1/messages/count_tokens", post(count_tokens)); let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); let handle = tokio::spawn(async move { diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index c848dd96..cbffa042 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -2148,6 +2148,296 @@ async fn llm_lifecycle_starts_implicit_gateway_session() { assert!(sessions.contains_key("llm-session")); } +#[tokio::test] +async fn claude_startup_probe_does_not_open_null_input_turn() { + let subscriber_name = "cli-claude-startup-probe-turn-test"; + let _ = deregister_subscriber(subscriber_name); + let captured_turn_starts = Arc::new(StdMutex::new(Vec::::new())); + let captured = captured_turn_starts.clone(); + register_subscriber( + subscriber_name, + Arc::new(move |event| { + if event.scope_category() == Some(ScopeCategory::Start) + && event.name() == "claude-code-turn" + && event + .metadata() + .and_then(|metadata| metadata.get("session_id")) + .and_then(Value::as_str) + == Some("claude-probe") + { + captured.lock().unwrap().push(json!({ + "input": event.input().cloned().unwrap_or(Value::Null), + "metadata": event.metadata().cloned().unwrap_or(Value::Null) + })); + } + }), + ) + .unwrap(); + + let manager = SessionManager::new(session_test_config()); + manager + .apply_events( + &HeaderMap::new(), + vec![NormalizedEvent::AgentStarted(session_event( + "claude-probe", + "SessionStart", + ))], + ) + .await + .unwrap(); + + let prep = manager + .prepare_gateway_call( + &HeaderMap::new(), + LlmGatewayStart { + session_id: Some("claude-probe".into()), + provider: "anthropic.messages".into(), + model_name: Some("claude-opus-4-8[1m]".into()), + request: LlmRequest { + headers: Map::from_iter([( + "x-claude-code-session-id".to_string(), + json!("claude-probe"), + )]), + content: json!({ + "model": "claude-opus-4-8[1m]", + "max_tokens": 1, + "messages": [ + { + "role": "user", + "content": "test" + } + ] + }), + }, + ..llm_start() + }, + ) + .await + .unwrap(); + assert!(prep.parent.is_none()); + assert_eq!( + prep.metadata["llm_correlation_status"], + json!("pre_turn_probe") + ); + assert_eq!( + prep.metadata["llm_correlation_source"], + json!("claude_startup_probe") + ); + manager.finish_gateway_call(&prep.session_id).await; + + manager + .apply_events( + &HeaderMap::new(), + vec![NormalizedEvent::PromptSubmitted(SessionEvent { + session_id: "claude-probe".into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "UserPromptSubmit".into(), + payload: json!({ "prompt": "list contents of this dir" }), + metadata: json!({}), + })], + ) + .await + .unwrap(); + + { + let sessions = manager.inner.lock().await; + let session = sessions.get("claude-probe").expect("session retained"); + assert!( + session.turn_scope.is_some(), + "prompt should open a Claude turn after the pre-turn probe" + ); + } + + flush_subscribers().unwrap(); + let starts = captured_turn_starts.lock().unwrap().clone(); + assert_eq!(starts.len(), 1, "expected one user-visible Claude turn"); + assert_eq!( + starts[0]["input"], + json!({ "prompt": "list contents of this dir" }), + "startup probe must not create a null-input Claude turn" + ); + assert_eq!(starts[0]["metadata"]["turn_index"], json!(1)); + assert_eq!(starts[0]["metadata"]["turn_source"], json!("user_prompt")); + + deregister_subscriber(subscriber_name).unwrap(); +} + +#[tokio::test] +async fn claude_orphan_subagent_stop_after_closed_turn_does_not_open_null_turn() { + let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; + let temp = tempfile::tempdir().unwrap(); + let atif_dir = temp.path().join("atif"); + install_test_atif_plugin(&atif_dir).await; + let subscriber_name = "cli-claude-orphan-subagent-stop-no-null-turn-test"; + let _ = deregister_subscriber(subscriber_name); + let captured_events = Arc::new(StdMutex::new(Vec::::new())); + let captured = captured_events.clone(); + register_subscriber( + subscriber_name, + Arc::new(move |event| { + let event_session_id = event + .metadata() + .and_then(|metadata| metadata.get("session_id")) + .and_then(Value::as_str); + if event_session_id != Some("claude-orphan-stop") { + return; + } + if event.name() == "claude-code-turn" { + captured.lock().unwrap().push(json!({ + "kind": "turn", + "scope_category": event.scope_category(), + "input": event.input().cloned().unwrap_or(Value::Null), + "output": event.output().cloned().unwrap_or(Value::Null), + "metadata": event.metadata().cloned().unwrap_or(Value::Null) + })); + } else if event.name() == "subagent_end_without_start" { + captured.lock().unwrap().push(json!({ + "kind": "orphan_mark", + "data": event.data().cloned().unwrap_or(Value::Null), + "metadata": event.metadata().cloned().unwrap_or(Value::Null) + })); + } + }), + ) + .unwrap(); + + let manager = SessionManager::new(session_test_config()); + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::AgentStarted(session_event("claude-orphan-stop", "SessionStart")), + NormalizedEvent::PromptSubmitted(SessionEvent { + session_id: "claude-orphan-stop".into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "UserPromptSubmit".into(), + payload: json!({ "prompt": "thanks!" }), + metadata: json!({}), + }), + ], + ) + .await + .unwrap(); + let active_llm = manager + .start_llm( + &HeaderMap::new(), + llm_start_with_messages_task("claude-orphan-stop", "thanks!"), + ) + .await + .unwrap(); + manager + .end_llm( + active_llm, + json!({ + "id": "msg_thanks", + "type": "message", + "role": "assistant", + "model": "claude-test", + "content": [ + { + "type": "text", + "text": "You're welcome!" + } + ], + "stop_reason": "end_turn", + "usage": { + "input_tokens": 2, + "output_tokens": 4 + } + }), + json!({}), + ) + .await + .unwrap(); + manager + .apply_events( + &HeaderMap::new(), + vec![ + NormalizedEvent::TurnEnded(SessionEvent { + session_id: "claude-orphan-stop".into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "Stop".into(), + payload: json!({ "content": "You're welcome!" }), + metadata: json!({}), + }), + NormalizedEvent::SubagentEnded(SubagentEvent { + session_id: "claude-orphan-stop".into(), + agent_kind: AgentKind::ClaudeCode, + event_name: "SubagentStop".into(), + subagent_id: "missing-worker".into(), + payload: json!({ + "hook_event_name": "SubagentStop", + "last_assistant_message": "add the event logs to .gitignore" + }), + metadata: json!({ + "hook_event_name": "SubagentStop", + "agent_id": "missing-worker" + }), + }), + ], + ) + .await + .unwrap(); + + let closed = manager + .close_idle_sessions_at( + Instant::now() + AGENT_IDLE_TIMEOUT + Duration::from_secs(1), + AGENT_IDLE_TIMEOUT, + "idle_timeout", + ) + .await + .unwrap(); + + flush_subscribers().unwrap(); + clear_plugin_configuration().unwrap(); + let events = captured_events.lock().unwrap().clone(); + let turn_starts: Vec<_> = events + .iter() + .filter(|event| { + event["kind"] == json!("turn") && event["scope_category"] == json!(ScopeCategory::Start) + }) + .collect(); + let idle_turn_closes: Vec<_> = events + .iter() + .filter(|event| { + event["kind"] == json!("turn") + && event["scope_category"] == json!(ScopeCategory::End) + && event["output"]["status"] == json!("idle_timeout") + }) + .collect(); + + assert_eq!(closed, 0, "orphan SubagentStop must not open an idle turn"); + assert_eq!( + turn_starts.len(), + 1, + "orphan SubagentStop must not create a second Claude turn: {events:#?}" + ); + assert_eq!(turn_starts[0]["input"], json!({ "prompt": "thanks!" })); + assert_eq!( + idle_turn_closes.len(), + 0, + "orphan SubagentStop must not create a turn later closed by idle timeout: {events:#?}" + ); + assert!( + events + .iter() + .all(|event| event["kind"] != json!("orphan_mark")), + "uncorrelatable Claude SubagentStop should not emit a turn-scoped orphan mark: {events:#?}" + ); + + let atif = read_atif_for_session(&atif_dir, "claude-orphan-stop"); + assert_eq!(atif["steps"].as_array().unwrap().len(), 2); + assert!( + !serde_json::to_string(&atif) + .unwrap() + .contains("subagent_end_without_start"), + "ATIF should not include uncorrelatable Claude orphan stop diagnostics: {}", + serde_json::to_string_pretty(&atif).unwrap() + ); + + deregister_subscriber(subscriber_name).unwrap(); +} + #[tokio::test] async fn llm_lifecycle_uses_single_active_hook_session_when_header_is_missing() { let config = GatewayConfig { diff --git a/docs/nemo-relay-cli/claude-code.mdx b/docs/nemo-relay-cli/claude-code.mdx index 26738936..a274a652 100644 --- a/docs/nemo-relay-cli/claude-code.mdx +++ b/docs/nemo-relay-cli/claude-code.mdx @@ -107,6 +107,15 @@ Tool hooks preserve canonical fields such as `tool_use_id`, `tool_name`, `tool_input`, `error`, `duration_ms`, and `is_interrupt`. Subagent hooks use `agent_id` as the subagent identifier and preserve `agent_type` in metadata. +Claude Code traces are turn-oriented. A multi-turn conversation can produce one +root `claude-code-turn` agent span or trajectory per user turn. That is expected +when each span has a real `UserPromptSubmit` payload and assistant output. NeMo +Relay excludes the known Claude Code startup/preflight probe and late +uncorrelatable lifecycle hooks from exported user traces so they do not appear +as synthetic `null`, `user: test`, `idle_timeout`, or lifecycle-only turns. +Suppressed startup probes are still logged by the gateway as internal pre-turn +probe bypasses for debugging. + ## Smoke Test Run a small Claude Code prompt that starts a session and uses one simple tool. @@ -119,7 +128,9 @@ printf '{"session_id":"smoke-claude","hook_event_name":"SessionStart"}' \ ``` The response should be valid Claude Code hook JSON. For most lifecycle events it -is an allow/continue response. +is an allow/continue response. A full observability smoke fixture should produce +expected OpenInference spans, raw ATOF events, and ATIF trajectories for at +least one user turn, one LLM call, and one simple tool call. ## Verify Export @@ -147,3 +158,32 @@ If LLM spans exist but attach to the session instead of a subagent, pass `x-nemo-relay-subagent-id` on gateway requests or include shared `conversation_id`, `generation_id`, or `request_id` values in both hook payloads and provider requests. + +When Relay cannot prove a stronger LLM owner, it keeps the span under the active +turn and adds correlation metadata instead of guessing. Inspect +`llm_correlation_status`, `llm_correlation_source`, and +`llm_correlation_subagent_id` on LLM spans. Common statuses include `explicit`, +`single_hint`, `request_affinity`, `agent_fallback`, and +`ambiguous_fallback`. Tool spans expose the same diagnostic pattern with +`tool_correlation_status`, `tool_correlation_source`, and +`tool_correlation_subagent_id`. + +Late `SubagentStop` hooks can arrive after Claude Code has already closed the +turn, especially when Claude reports background stop-hook state. If the +subagent id has no matching `SubagentStart` and there is no active turn, Relay +logs the missing subagent and suppresses the hook from exported observability so +it does not create a synthetic null turn. If an unknown subagent end arrives +while a turn is active, Relay may emit a `subagent_end_without_start` diagnostic +mark under that turn. + +## Hook Limitations + +Claude Code hooks are available only when Claude Code loads the NeMo Relay +plugin, such as through `nemo-relay claude` or `nemo-relay run --agent claude`. +The standalone gateway can still observe Anthropic LLM traffic, but it cannot +invent missing tool, prompt, compaction, notification, or subagent hooks. + +`UserPromptSubmit`, `AfterAgentResponse`, `AfterAgentThought`, and `Stop` are +used as private correlation and turn-boundary hints. They are not exported as +standalone user-visible mark events unless they also produce a scoped turn, +tool, LLM, or lifecycle observation. diff --git a/integrations/coding-agents/claude-code/README.md b/integrations/coding-agents/claude-code/README.md index 3380f656..114782b5 100644 --- a/integrations/coding-agents/claude-code/README.md +++ b/integrations/coding-agents/claude-code/README.md @@ -26,6 +26,14 @@ The bundle forwards `SessionStart`, `SessionEnd`, `SubagentStart`, `UserPromptSubmit`, `AfterAgentResponse`, `AfterAgentThought`, and `Stop` provide private LLM correlation hints for gateway requests. +Claude Code observability is turn-oriented. A multi-turn session can produce one +root `claude-code-turn` span or ATIF trajectory per user turn. That is expected +when each turn has a real prompt input and assistant output. Known startup +probes, uncorrelatable late stop hooks, and other lifecycle-only noise are +excluded from exported user traces so they do not appear as synthetic `null`, +`user: test`, or `idle_timeout` turns. Startup probes are still logged by the +gateway as internal pre-turn probe bypasses for debugging. + ## Transparent Setup Build or install the gateway binary so `nemo-relay` is on `PATH`. @@ -125,3 +133,22 @@ If LLM spans are present but attached to the top-level agent instead of a subagent, include `x-nemo-relay-subagent-id` on gateway requests or share `conversation_id`, `generation_id`, or `request_id` values between hook payloads and provider requests. + +Relay records correlation diagnostics on exported spans instead of guessing +ownership. Inspect `llm_correlation_status`, `llm_correlation_source`, and +`llm_correlation_subagent_id` for LLM routing, and +`tool_correlation_status`, `tool_correlation_source`, and +`tool_correlation_subagent_id` for tool routing. Fallback statuses such as +`agent_fallback` and `ambiguous_fallback` mean Relay kept the span under the +active turn because the hook and gateway payloads did not prove a subagent +owner. + +Late `SubagentStop` hooks with no matching `SubagentStart` are diagnostic-only. +When there is no active turn, Relay logs the missing subagent and suppresses the +hook from ATOF, OpenInference, and ATIF so it cannot create a null turn. When an +unknown subagent end arrives during an active turn, Relay may emit a +`subagent_end_without_start` mark under that turn. + +Hook events are only available when Claude Code loads this plugin. A standalone +gateway observes Anthropic LLM traffic, but it cannot recover missing prompt, +tool, compaction, notification, or subagent hooks. From eee481b9c9677528c331b60281c5a4e31b3c9768 Mon Sep 17 00:00:00 2001 From: Ajay Thorve Date: Thu, 4 Jun 2026 01:11:43 -0700 Subject: [PATCH 2/2] fix: prune Claude startup probe sessions Signed-off-by: Ajay Thorve --- crates/cli/src/gateway.rs | 23 +++++--- crates/cli/src/session.rs | 35 +++++++++++-- crates/cli/tests/coverage/session_tests.rs | 61 +++++++++++++++++++++- 3 files changed, 107 insertions(+), 12 deletions(-) diff --git a/crates/cli/src/gateway.rs b/crates/cli/src/gateway.rs index 492507dd..f19624e1 100644 --- a/crates/cli/src/gateway.rs +++ b/crates/cli/src/gateway.rs @@ -183,12 +183,16 @@ async fn run_managed_gateway( ) -> Result, CliError> { if prep.bypass_managed_pipeline { let session_id = prep.session_id.clone(); + let prune_empty_session = prep.prune_empty_session_on_finish; let model = prep.model_name.as_deref().unwrap_or(""); eprintln!( "nemo-relay CLI gateway: bypassing managed LLM observability for Claude Code startup probe session={session_id} provider={} model={model}", prep.provider_name ); - state.sessions.finish_gateway_call(&session_id).await; + state + .sessions + .finish_gateway_call(&session_id, prune_empty_session) + .await; return run_unmanaged_gateway(state, prepared).await; } let codecs = codecs_for_route(prepared.provider); @@ -280,6 +284,7 @@ async fn run_managed_buffered( model_name, owner_subagent_id, bypass_managed_pipeline: _, + prune_empty_session_on_finish: _, } = prep; let provider_for_event = provider_name.clone(); let params = LlmCallExecuteParams::builder() @@ -301,7 +306,7 @@ async fn run_managed_buffered( .sessions .record_gateway_response_hints(&session_id, owner_subagent_id, response_json) .await; - state.sessions.finish_gateway_call(&session_id).await; + state.sessions.finish_gateway_call(&session_id, false).await; let (status, headers) = upstream_info .lock() .expect("upstream info lock poisoned") @@ -315,7 +320,7 @@ async fn run_managed_buffered( build_response(status, headers, Body::from(bytes)) } Err(error) => { - state.sessions.finish_gateway_call(&session_id).await; + state.sessions.finish_gateway_call(&session_id, false).await; Err(translate_runtime_error(error, &upstream_error)) } } @@ -409,7 +414,10 @@ async fn run_managed_streaming( // collector and finalizer for managed streaming, so without a codec we cannot use the managed // pipeline. This keeps non-LLM streaming paths working while typed codecs remain optional. let Some(streaming_codec) = codecs.streaming else { - state.sessions.finish_gateway_call(&prep.session_id).await; + state + .sessions + .finish_gateway_call(&prep.session_id, false) + .await; return passthrough_streaming(state, prepared).await; }; let collector = streaming_codec.collector(); @@ -435,6 +443,7 @@ async fn run_managed_streaming( model_name, owner_subagent_id, bypass_managed_pipeline: _, + prune_empty_session_on_finish: _, } = prep; let params = LlmStreamCallExecuteParams::builder() .name(provider_name) @@ -457,7 +466,7 @@ async fn run_managed_streaming( let json_stream = match json_stream_result { Ok(json_stream) => json_stream, Err(error) => { - state.sessions.finish_gateway_call(&session_id).await; + state.sessions.finish_gateway_call(&session_id, false).await; return Err(translate_runtime_error(error, &upstream_error)); } }; @@ -649,7 +658,7 @@ impl GatewayCallGuard { ) .await; } - sessions.finish_gateway_call(&self.session_id).await; + sessions.finish_gateway_call(&self.session_id, false).await; } } } @@ -673,7 +682,7 @@ impl Drop for GatewayCallGuard { .record_gateway_response_hints(&session_id, owner_subagent_id, response) .await; } - sessions.finish_gateway_call(&session_id).await; + sessions.finish_gateway_call(&session_id, false).await; }); } } diff --git a/crates/cli/src/session.rs b/crates/cli/src/session.rs index 12703a0f..832080d6 100644 --- a/crates/cli/src/session.rs +++ b/crates/cli/src/session.rs @@ -93,6 +93,7 @@ pub(crate) struct GatewayCallPrep { pub(crate) model_name: Option, pub(crate) owner_subagent_id: Option, pub(crate) bypass_managed_pipeline: bool, + pub(crate) prune_empty_session_on_finish: bool, } struct Session { @@ -395,10 +396,30 @@ impl SessionManager { // request beats its SessionStart hook), label the session by the provider so ATIF and // Phoenix scopes carry the agent identity instead of freezing on "gateway". let inferred_agent_kind = alignment::agent_kind_for_gateway_provider(&start.provider); + let created_session = !sessions.contains_key(&session_id); let session = sessions .entry(session_id.clone()) - .or_insert_with(|| Session::new(session_id, inferred_agent_kind, config)); - session.prepare_gateway_call(start).await + .or_insert_with(|| Session::new(session_id.clone(), inferred_agent_kind, config)); + let result = session.prepare_gateway_call(start).await; + match result { + Ok(mut prep) => { + prep.prune_empty_session_on_finish = prep.bypass_managed_pipeline + && sessions + .get(&session_id) + .is_some_and(|session| session.is_empty()); + Ok(prep) + } + Err(error) => { + if created_session + && sessions + .get(&session_id) + .is_some_and(|session| session.is_empty()) + { + sessions.remove(&session_id); + } + Err(error) + } + } } /// Marks a managed gateway LLM call as finished for idle-timeout purposes. @@ -406,11 +427,18 @@ impl SessionManager { /// Runtime-managed LLM spans are emitted outside the session lock, so the session keeps a small /// in-flight counter to prevent the idle sweeper from closing a turn while an upstream /// provider request or streaming response is still active. - pub(crate) async fn finish_gateway_call(&self, session_id: &str) { + pub(crate) async fn finish_gateway_call(&self, session_id: &str, prune_empty_session: bool) { let mut sessions = self.inner.lock().await; if let Some(session) = sessions.get_mut(session_id) { session.finish_gateway_call(); } + if prune_empty_session + && sessions + .get(session_id) + .is_some_and(|session| session.is_empty() && session.active_gateway_calls == 0) + { + sessions.remove(session_id); + } } /// Legacy manual-lifecycle close paired with [`Self::start_llm`]. Production gateway traffic @@ -1005,6 +1033,7 @@ impl Session { model_name: start.model_name, owner_subagent_id: owner.subagent_id, bypass_managed_pipeline: policy.bypasses_managed_pipeline(), + prune_empty_session_on_finish: false, }) }) .await; diff --git a/crates/cli/tests/coverage/session_tests.rs b/crates/cli/tests/coverage/session_tests.rs index cbffa042..5ba22234 100644 --- a/crates/cli/tests/coverage/session_tests.rs +++ b/crates/cli/tests/coverage/session_tests.rs @@ -2223,7 +2223,9 @@ async fn claude_startup_probe_does_not_open_null_input_turn() { prep.metadata["llm_correlation_source"], json!("claude_startup_probe") ); - manager.finish_gateway_call(&prep.session_id).await; + manager + .finish_gateway_call(&prep.session_id, prep.prune_empty_session_on_finish) + .await; manager .apply_events( @@ -2262,6 +2264,39 @@ async fn claude_startup_probe_does_not_open_null_input_turn() { deregister_subscriber(subscriber_name).unwrap(); } +#[tokio::test] +async fn claude_startup_probe_only_session_is_pruned_after_finish() { + let manager = SessionManager::new(session_test_config()); + let prep = manager + .prepare_gateway_call(&HeaderMap::new(), claude_startup_probe_start("probe-only")) + .await + .unwrap(); + + assert!(prep.bypass_managed_pipeline); + assert!(prep.prune_empty_session_on_finish); + assert!(manager.inner.lock().await.contains_key("probe-only")); + + manager + .finish_gateway_call(&prep.session_id, prep.prune_empty_session_on_finish) + .await; + assert!(!manager.inner.lock().await.contains_key("probe-only")); + + let next = manager + .prepare_gateway_call( + &HeaderMap::new(), + LlmGatewayStart { + session_id: None, + ..llm_start() + }, + ) + .await + .unwrap(); + assert_eq!( + next.session_id, "gateway-gateway", + "probe-only sessions must not become the single-active fallback" + ); +} + #[tokio::test] async fn claude_orphan_subagent_stop_after_closed_turn_does_not_open_null_turn() { let _guard = OBSERVABILITY_PLUGIN_TEST_LOCK.lock().await; @@ -3930,7 +3965,7 @@ async fn idle_timeout_waits_for_active_gateway_llm_call() { .contains_key("active-gateway-call") ); - manager.finish_gateway_call(&prep.session_id).await; + manager.finish_gateway_call(&prep.session_id, false).await; let closed = manager .close_idle_sessions_at( Instant::now() + AGENT_IDLE_TIMEOUT + Duration::from_secs(1), @@ -4666,6 +4701,28 @@ fn llm_start() -> LlmGatewayStart { } } +fn claude_startup_probe_start(session_id: &str) -> LlmGatewayStart { + LlmGatewayStart { + session_id: Some(session_id.into()), + provider: "anthropic.messages".into(), + model_name: Some("claude-opus-4-8[1m]".into()), + request: LlmRequest { + headers: Map::from_iter([("x-claude-code-session-id".to_string(), json!(session_id))]), + content: json!({ + "model": "claude-opus-4-8[1m]", + "max_tokens": 1, + "messages": [ + { + "role": "user", + "content": "test" + } + ] + }), + }, + ..llm_start() + } +} + fn llm_start_with_messages_task(session_id: &str, task: &str) -> LlmGatewayStart { llm_start_with_content( session_id,