Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions crates/cli/src/alignment/claude_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
//! session state machine.

use axum::http::HeaderMap;
use nemo_relay::api::llm::LlmRequest;
use serde_json::Value;

use crate::alignment::json_string_at;
use crate::config::header_string;
Expand All @@ -26,6 +28,40 @@ pub(crate) fn session_id_from_headers(headers: &HeaderMap) -> Option<String> {
header_string(headers, "x-claude-code-session-id")
}

// Claude Code can issue a tiny pre-user startup probe through the Anthropic gateway before the
// first UserPromptSubmit hook. Treating it as normal LLM work pollutes traces with an unparented
// `user: test` span, so alignment classifies only this native-header plus exact-body harness probe
// for suppression.
pub(crate) fn is_startup_probe(
provider: &str,
model_name: Option<&str>,
request: &LlmRequest,
) -> bool {
if provider != "anthropic.messages" {
return false;
}
if !request.headers.contains_key("x-claude-code-session-id") {
return false;
}
let model = model_name
.or_else(|| request.content.get("model").and_then(Value::as_str))
.unwrap_or_default();
if !model.starts_with("claude-") {
return false;
}
if request.content.get("max_tokens").and_then(Value::as_u64) != Some(1) {
return false;
}
let Some(messages) = request.content.get("messages").and_then(Value::as_array) else {
return false;
};
let [message] = messages.as_slice() else {
return false;
};
message.get("role").and_then(Value::as_str) == Some("user")
&& message.get("content").and_then(Value::as_str) == Some("test")
}

// Claude's `Agent` tool can report either an asynchronous launch acknowledgement or a terminal
// worker result. Only the terminal result should close the subagent scope; otherwise parallel
// workers that launch in the background are closed before their later tool/LLM hooks arrive.
Expand Down
40 changes: 40 additions & 0 deletions crates/cli/src/alignment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ pub(crate) enum GatewayRouteKind {
AnthropicCountTokens,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum GatewayManagementPolicy {
Managed,
UnmanagedProbe {
status: &'static str,
source: &'static str,
},
}

impl GatewayManagementPolicy {
pub(crate) fn bypasses_managed_pipeline(self) -> bool {
matches!(self, Self::UnmanagedProbe { .. })
}

pub(crate) fn bypass_correlation(self) -> Option<(&'static str, &'static str)> {
match self {
Self::Managed => None,
Self::UnmanagedProbe { status, source } => Some((status, source)),
}
}
}

// Records that a provider-created child session is really a subagent under another session. The
// session manager stores this until the child emits its terminal AgentEnded event, then removes the
// alias so future unrelated events cannot be reparented through stale state.
Expand Down Expand Up @@ -406,6 +428,24 @@ pub(crate) fn agent_kind_for_gateway_provider(provider: &str) -> AgentKind {
}
}

pub(crate) fn gateway_management_policy(
agent_kind: AgentKind,
provider: &str,
model_name: Option<&str>,
request: &LlmRequest,
) -> GatewayManagementPolicy {
if agent_kind == AgentKind::ClaudeCode
&& claude_code::is_startup_probe(provider, model_name, request)
{
GatewayManagementPolicy::UnmanagedProbe {
status: "pre_turn_probe",
source: "claude_startup_probe",
}
} else {
GatewayManagementPolicy::Managed
}
}

// Not every harness has a reliable process/session end signal. Claude Code and Codex sessions can
// outlive a user-visible run, so the CLI represents their work with bounded turn scopes instead of
// exporting a long-lived agent scope that needs synthetic termination.
Expand Down
56 changes: 50 additions & 6 deletions crates/cli/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,20 @@ async fn run_managed_gateway(
prepared: PreparedGatewayRequest,
prep: GatewayCallPrep,
) -> Result<Response<Body>, CliError> {
if prep.bypass_managed_pipeline {
let session_id = prep.session_id.clone();
let prune_empty_session = prep.prune_empty_session_on_finish;
let model = prep.model_name.as_deref().unwrap_or("<unknown>");
eprintln!(
"nemo-relay CLI gateway: bypassing managed LLM observability for Claude Code startup probe session={session_id} provider={} model={model}",
prep.provider_name
);
state
.sessions
.finish_gateway_call(&session_id, prune_empty_session)
.await;
return run_unmanaged_gateway(state, prepared).await;
}
let codecs = codecs_for_route(prepared.provider);
if prepared.streaming {
run_managed_streaming(state, prepared, prep, codecs).await
Expand All @@ -189,6 +203,29 @@ async fn run_managed_gateway(
}
}

async fn run_unmanaged_gateway(
state: AppState,
prepared: PreparedGatewayRequest,
) -> Result<Response<Body>, CliError> {
if prepared.streaming {
return passthrough_streaming(state, prepared).await;
}
let response = forward_upstream_request(
&state.http,
&prepared.method,
&prepared.upstream_url,
&prepared.body_bytes,
&prepared.headers,
None,
prepared.provider,
)
.await?;
let status = response.status();
let headers = response_headers(response.headers());
let bytes = response.bytes().await?;
build_response(status, headers, Body::from(bytes))
}

// Codecs registered for each managed provider route. Routes that emit LLM events but lack a typed
// codec (count_tokens) return `None` so the runtime still wraps the call but skips annotation.
struct RouteCodecs {
Expand Down Expand Up @@ -246,6 +283,8 @@ async fn run_managed_buffered(
metadata,
model_name,
owner_subagent_id,
bypass_managed_pipeline: _,
prune_empty_session_on_finish: _,
} = prep;
let provider_for_event = provider_name.clone();
let params = LlmCallExecuteParams::builder()
Expand All @@ -267,7 +306,7 @@ async fn run_managed_buffered(
.sessions
.record_gateway_response_hints(&session_id, owner_subagent_id, response_json)
.await;
state.sessions.finish_gateway_call(&session_id).await;
state.sessions.finish_gateway_call(&session_id, false).await;
let (status, headers) = upstream_info
.lock()
.expect("upstream info lock poisoned")
Expand All @@ -281,7 +320,7 @@ async fn run_managed_buffered(
build_response(status, headers, Body::from(bytes))
}
Err(error) => {
state.sessions.finish_gateway_call(&session_id).await;
state.sessions.finish_gateway_call(&session_id, false).await;
Err(translate_runtime_error(error, &upstream_error))
}
}
Expand Down Expand Up @@ -375,7 +414,10 @@ async fn run_managed_streaming(
// collector and finalizer for managed streaming, so without a codec we cannot use the managed
// pipeline. This keeps non-LLM streaming paths working while typed codecs remain optional.
let Some(streaming_codec) = codecs.streaming else {
state.sessions.finish_gateway_call(&prep.session_id).await;
state
.sessions
.finish_gateway_call(&prep.session_id, false)
.await;
return passthrough_streaming(state, prepared).await;
};
let collector = streaming_codec.collector();
Expand All @@ -400,6 +442,8 @@ async fn run_managed_streaming(
metadata,
model_name,
owner_subagent_id,
bypass_managed_pipeline: _,
prune_empty_session_on_finish: _,
} = prep;
let params = LlmStreamCallExecuteParams::builder()
.name(provider_name)
Expand All @@ -422,7 +466,7 @@ async fn run_managed_streaming(
let json_stream = match json_stream_result {
Ok(json_stream) => json_stream,
Err(error) => {
state.sessions.finish_gateway_call(&session_id).await;
state.sessions.finish_gateway_call(&session_id, false).await;
return Err(translate_runtime_error(error, &upstream_error));
}
};
Expand Down Expand Up @@ -614,7 +658,7 @@ impl GatewayCallGuard {
)
.await;
}
sessions.finish_gateway_call(&self.session_id).await;
sessions.finish_gateway_call(&self.session_id, false).await;
}
}
}
Expand All @@ -638,7 +682,7 @@ impl Drop for GatewayCallGuard {
.record_gateway_response_hints(&session_id, owner_subagent_id, response)
.await;
}
sessions.finish_gateway_call(&session_id).await;
sessions.finish_gateway_call(&session_id, false).await;
});
}
}
Expand Down
Loading
Loading