diff --git a/Cargo.lock b/Cargo.lock index 3d01356a..7194911b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2923,6 +2923,7 @@ name = "openshell-sandbox" version = "0.1.0" dependencies = [ "anyhow", + "base64 0.22.1", "bytes", "clap", "hex", @@ -2942,6 +2943,7 @@ dependencies = [ "rustls", "rustls-pemfile", "seccompiler", + "serde", "serde_json", "serde_yaml", "sha2 0.10.9", diff --git a/architecture/inference-routing.md b/architecture/inference-routing.md index 0d3a95af..b2824f0d 100644 --- a/architecture/inference-routing.md +++ b/architecture/inference-routing.md @@ -180,6 +180,15 @@ Before forwarding inference requests, the proxy strips sensitive and hop-by-hop - **Request**: `authorization`, `x-api-key`, `host`, `content-length`, and hop-by-hop headers (`connection`, `keep-alive`, `proxy-authenticate`, `proxy-authorization`, `proxy-connection`, `te`, `trailer`, `transfer-encoding`, `upgrade`). - **Response**: `content-length` and hop-by-hop headers. +### Internal debug capture + +For Milestone 1 attribution work, the sandbox proxy supports an internal append-only JSONL debug log for `inference.local` traffic. + +- Enable it by setting `OPENSHELL_INFERENCE_DEBUG_LOG=/absolute/path/to/inference-debug.jsonl` in the sandbox runtime environment before the proxy starts. +- Each parsed `inference.local` request appends one JSON object with process attribution (`pid`, binary path, ancestor binaries, cmdline paths), request metadata, selected route metadata, sanitized headers, and capped request/response body capture. +- Request and response bodies are captured up to 64 KiB each. The record includes total byte counts plus explicit truncation flags and captured-byte counts when the body exceeds that cap. +- Header logging is separate from forwarding logic. Auth headers are stripped and auth-like values such as cookies or token-bearing headers are redacted before they reach the JSONL file. + ### Response streaming The router supports two response modes: diff --git a/crates/openshell-router/src/backend.rs b/crates/openshell-router/src/backend.rs index 5ac4b38f..8a0784fa 100644 --- a/crates/openshell-router/src/backend.rs +++ b/crates/openshell-router/src/backend.rs @@ -411,6 +411,7 @@ mod tests { name: "inference.local".to_string(), endpoint: endpoint.to_string(), model: "test-model".to_string(), + provider_type: None, api_key: "sk-test".to_string(), protocols: protocols.iter().map(|p| (*p).to_string()).collect(), auth, diff --git a/crates/openshell-router/src/config.rs b/crates/openshell-router/src/config.rs index d9c081d6..a980a571 100644 --- a/crates/openshell-router/src/config.rs +++ b/crates/openshell-router/src/config.rs @@ -39,6 +39,7 @@ pub struct ResolvedRoute { pub name: String, pub endpoint: String, pub model: String, + pub provider_type: Option, pub api_key: String, pub protocols: Vec, /// How to inject the API key on outgoing requests. @@ -53,6 +54,7 @@ impl std::fmt::Debug for ResolvedRoute { .field("name", &self.name) .field("endpoint", &self.endpoint) .field("model", &self.model) + .field("provider_type", &self.provider_type) .field("api_key", &"[REDACTED]") .field("protocols", &self.protocols) .field("auth", &self.auth) @@ -125,6 +127,7 @@ impl RouteConfig { name: self.name.clone(), endpoint: self.endpoint.clone(), model: self.model.clone(), + provider_type: self.provider_type.clone(), api_key: self.resolve_api_key()?, protocols, auth, @@ -252,6 +255,7 @@ routes: name: "test".to_string(), endpoint: "https://api.example.com/v1".to_string(), model: "test-model".to_string(), + provider_type: Some("openai".to_string()), api_key: "sk-super-secret-key-12345".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: AuthHeader::Bearer, diff --git a/crates/openshell-router/src/mock.rs b/crates/openshell-router/src/mock.rs index 9b6accb6..5e433d5a 100644 --- a/crates/openshell-router/src/mock.rs +++ b/crates/openshell-router/src/mock.rs @@ -127,6 +127,7 @@ mod tests { name: "test".to_string(), endpoint: endpoint.to_string(), model: model.to_string(), + provider_type: None, api_key: "key".to_string(), protocols: protocols.iter().map(ToString::to_string).collect(), auth: crate::config::AuthHeader::Bearer, diff --git a/crates/openshell-router/tests/backend_integration.rs b/crates/openshell-router/tests/backend_integration.rs index 4861bd6d..d1debe1b 100644 --- a/crates/openshell-router/tests/backend_integration.rs +++ b/crates/openshell-router/tests/backend_integration.rs @@ -11,6 +11,7 @@ fn mock_candidates(base_url: &str) -> Vec { name: "inference.local".to_string(), endpoint: base_url.to_string(), model: "meta/llama-3.1-8b-instruct".to_string(), + provider_type: Some("openai".to_string()), api_key: "test-api-key".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: AuthHeader::Bearer, @@ -113,6 +114,7 @@ async fn proxy_no_compatible_route_returns_error() { name: "inference.local".to_string(), endpoint: "http://localhost:1234".to_string(), model: "test".to_string(), + provider_type: None, api_key: "key".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: AuthHeader::Custom("x-api-key"), @@ -174,6 +176,7 @@ async fn proxy_mock_route_returns_canned_response() { name: "inference.local".to_string(), endpoint: "mock://test".to_string(), model: "mock/test-model".to_string(), + provider_type: None, api_key: "unused".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: AuthHeader::Bearer, @@ -308,6 +311,7 @@ async fn proxy_uses_x_api_key_for_anthropic_route() { name: "inference.local".to_string(), endpoint: mock_server.uri(), model: "claude-sonnet-4-20250514".to_string(), + provider_type: Some("anthropic".to_string()), api_key: "test-anthropic-key".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: AuthHeader::Custom("x-api-key"), @@ -366,6 +370,7 @@ async fn proxy_anthropic_does_not_send_bearer_auth() { name: "inference.local".to_string(), endpoint: mock_server.uri(), model: "claude-sonnet-4-20250514".to_string(), + provider_type: Some("anthropic".to_string()), api_key: "anthropic-key".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: AuthHeader::Custom("x-api-key"), @@ -410,6 +415,7 @@ async fn proxy_forwards_client_anthropic_version_header() { name: "inference.local".to_string(), endpoint: mock_server.uri(), model: "claude-sonnet-4-20250514".to_string(), + provider_type: Some("anthropic".to_string()), api_key: "test-anthropic-key".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: AuthHeader::Custom("x-api-key"), diff --git a/crates/openshell-sandbox/Cargo.toml b/crates/openshell-sandbox/Cargo.toml index 8a0639a7..12388a59 100644 --- a/crates/openshell-sandbox/Cargo.toml +++ b/crates/openshell-sandbox/Cargo.toml @@ -56,8 +56,10 @@ bytes = { workspace = true } ipnet = "2" # Serialization +serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } +base64 = { workspace = true } # Logging tracing = { workspace = true } diff --git a/crates/openshell-sandbox/src/inference_debug.rs b/crates/openshell-sandbox/src/inference_debug.rs new file mode 100644 index 00000000..fb7a322e --- /dev/null +++ b/crates/openshell-sandbox/src/inference_debug.rs @@ -0,0 +1,287 @@ +// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +use base64::Engine; +use serde::Serialize; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{info, warn}; + +pub(crate) const INFERENCE_DEBUG_LOG_ENV: &str = "OPENSHELL_INFERENCE_DEBUG_LOG"; +const BODY_CAPTURE_LIMIT_BYTES: usize = 64 * 1024; + +#[derive(Debug)] +pub(crate) struct InferenceDebugLogger { + path: PathBuf, + file: Mutex, +} + +impl InferenceDebugLogger { + pub(crate) fn from_env() -> Option { + let path = std::env::var_os(INFERENCE_DEBUG_LOG_ENV)?; + let path = PathBuf::from(path); + match Self::new(&path) { + Ok(logger) => { + info!(path = %logger.path.display(), "Inference debug logging enabled"); + Some(logger) + } + Err(error) => { + warn!( + path = %path.display(), + error = %error, + "Failed to enable inference debug logging" + ); + None + } + } + } + + pub(crate) fn new(path: &Path) -> std::io::Result { + let file = OpenOptions::new().create(true).append(true).open(path)?; + Ok(Self { + path: path.to_path_buf(), + file: Mutex::new(file), + }) + } + + pub(crate) fn write_record(&self, record: &InferenceDebugRecord) { + let mut line = match serde_json::to_vec(record) { + Ok(line) => line, + Err(error) => { + warn!(error = %error, "Failed to serialize inference debug record"); + return; + } + }; + line.push(b'\n'); + + let write = |file: &mut File| -> std::io::Result<()> { + file.write_all(&line)?; + file.flush() + }; + + match self.file.lock() { + Ok(mut file) => { + if let Err(error) = write(&mut file) { + warn!( + path = %self.path.display(), + error = %error, + "Failed to append inference debug record" + ); + } + } + Err(poisoned) => { + let mut file = poisoned.into_inner(); + if let Err(error) = write(&mut file) { + warn!( + path = %self.path.display(), + error = %error, + "Failed to append inference debug record after mutex poison" + ); + } + } + } + } +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub(crate) struct LoggedHeader { + pub name: String, + pub value: String, +} + +pub(crate) fn logged_headers(headers: &[(String, String)]) -> Vec { + headers + .iter() + .map(|(name, value)| LoggedHeader { + name: name.clone(), + value: value.clone(), + }) + .collect() +} + +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum InferenceDebugOutcome { + Routed, + Denied, + UpstreamError, +} + +#[derive(Debug, Clone, Serialize)] +pub(crate) struct InferenceDebugRecord { + pub timestamp_ms: u64, + pub source_port: u16, + #[serde(skip_serializing_if = "Option::is_none")] + pub pid: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub binary_path: Option, + pub ancestor_binaries: Vec, + pub cmdline_paths: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub identity_error: Option, + pub method: String, + pub raw_path: String, + pub normalized_path: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub protocol: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub selected_route: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub selected_provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub selected_model: Option, + pub request_headers: Vec, + pub request_body_bytes: usize, + pub request_body_capture_bytes: usize, + pub request_body_truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub request_body_b64: Option, + pub response_status: u16, + pub response_headers: Vec, + pub response_body_bytes: usize, + pub response_body_capture_bytes: usize, + pub response_body_truncated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub response_body_b64: Option, + pub streaming: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub time_to_first_chunk_ms: Option, + pub total_duration_ms: u64, + pub outcome: InferenceDebugOutcome, + #[serde(skip_serializing_if = "Option::is_none")] + pub deny_reason: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub router_error: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub(crate) struct BodyCaptureBuffer { + captured: Vec, + total_bytes: usize, + truncated: bool, +} + +impl BodyCaptureBuffer { + pub(crate) fn from_slice(bytes: &[u8]) -> Self { + let mut capture = Self::default(); + capture.push(bytes); + capture + } + + pub(crate) fn push(&mut self, bytes: &[u8]) { + self.total_bytes += bytes.len(); + + let remaining = BODY_CAPTURE_LIMIT_BYTES.saturating_sub(self.captured.len()); + let take = remaining.min(bytes.len()); + self.captured.extend_from_slice(&bytes[..take]); + if take < bytes.len() { + self.truncated = true; + } + } + + pub(crate) const fn total_bytes(&self) -> usize { + self.total_bytes + } + + pub(crate) fn captured_bytes(&self) -> usize { + self.captured.len() + } + + pub(crate) const fn truncated(&self) -> bool { + self.truncated + } + + pub(crate) fn encoded_body(&self) -> Option { + (!self.captured.is_empty()) + .then(|| base64::engine::general_purpose::STANDARD.encode(&self.captured)) + } +} + +pub(crate) fn timestamp_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |duration| duration.as_millis() as u64) +} + +pub(crate) fn duration_ms(duration: Duration) -> u64 { + duration.as_millis() as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn body_capture_truncates_and_encodes() { + let data = vec![b'a'; BODY_CAPTURE_LIMIT_BYTES + 32]; + let capture = BodyCaptureBuffer::from_slice(&data); + + assert_eq!(capture.total_bytes(), BODY_CAPTURE_LIMIT_BYTES + 32); + assert_eq!(capture.captured_bytes(), BODY_CAPTURE_LIMIT_BYTES); + assert!(capture.truncated()); + assert_eq!( + capture.encoded_body().unwrap(), + base64::engine::general_purpose::STANDARD.encode(vec![b'a'; BODY_CAPTURE_LIMIT_BYTES]) + ); + } + + #[test] + fn logger_writes_jsonl_record() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("inference-debug.jsonl"); + let logger = InferenceDebugLogger::new(&path).unwrap(); + + logger.write_record(&InferenceDebugRecord { + timestamp_ms: 1234, + source_port: 4567, + pid: Some(42), + binary_path: Some("/usr/bin/test".to_string()), + ancestor_binaries: vec!["/usr/bin/parent".to_string()], + cmdline_paths: vec!["/workspace/app".to_string()], + identity_error: None, + method: "POST".to_string(), + raw_path: "/v1/messages".to_string(), + normalized_path: "/v1/messages".to_string(), + protocol: Some("anthropic_messages".to_string()), + kind: Some("messages".to_string()), + selected_route: Some("inference.local".to_string()), + selected_provider: Some("anthropic".to_string()), + selected_model: Some("claude-sonnet".to_string()), + request_headers: vec![LoggedHeader { + name: "content-type".to_string(), + value: "application/json".to_string(), + }], + request_body_bytes: 2, + request_body_capture_bytes: 2, + request_body_truncated: false, + request_body_b64: Some("e30=".to_string()), + response_status: 200, + response_headers: vec![LoggedHeader { + name: "content-type".to_string(), + value: "application/json".to_string(), + }], + response_body_bytes: 2, + response_body_capture_bytes: 2, + response_body_truncated: false, + response_body_b64: Some("e30=".to_string()), + streaming: false, + time_to_first_chunk_ms: None, + total_duration_ms: 10, + outcome: InferenceDebugOutcome::Routed, + deny_reason: None, + router_error: None, + }); + + let content = std::fs::read_to_string(path).unwrap(); + let line = content.lines().next().unwrap(); + let json: serde_json::Value = serde_json::from_str(line).unwrap(); + assert_eq!(json["source_port"], 4567); + assert_eq!(json["selected_provider"], "anthropic"); + assert_eq!(json["outcome"], "routed"); + } +} diff --git a/crates/openshell-sandbox/src/lib.rs b/crates/openshell-sandbox/src/lib.rs index 754c3be0..1028288f 100644 --- a/crates/openshell-sandbox/src/lib.rs +++ b/crates/openshell-sandbox/src/lib.rs @@ -10,6 +10,7 @@ mod child_env; pub mod denial_aggregator; mod grpc_client; mod identity; +mod inference_debug; pub mod l7; pub mod log_push; pub mod mechanistic_mapper; @@ -805,6 +806,7 @@ pub(crate) fn bundle_to_resolved_routes( name: r.name.clone(), endpoint: r.base_url.clone(), model: r.model_id.clone(), + provider_type: (!r.provider_type.is_empty()).then(|| r.provider_type.clone()), api_key: r.api_key.clone(), protocols: r.protocols.clone(), auth, @@ -1474,6 +1476,7 @@ mod tests { name: "inference.local".to_string(), endpoint: "https://api.openai.com/v1".to_string(), model: "gpt-4o".to_string(), + provider_type: Some("openai".to_string()), api_key: "key1".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: openshell_core::inference::AuthHeader::Bearer, @@ -1483,6 +1486,7 @@ mod tests { name: "sandbox-system".to_string(), endpoint: "https://api.anthropic.com/v1".to_string(), model: "claude-sonnet-4-20250514".to_string(), + provider_type: Some("anthropic".to_string()), api_key: "key2".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: openshell_core::inference::AuthHeader::Custom("x-api-key"), @@ -1771,6 +1775,7 @@ filesystem_policy: name: "inference.local".to_string(), endpoint: "http://original:8000/v1".to_string(), model: "original-model".to_string(), + provider_type: Some("openai".to_string()), api_key: "key".to_string(), auth: openshell_core::inference::AuthHeader::Bearer, protocols: vec!["openai_chat_completions".to_string()], diff --git a/crates/openshell-sandbox/src/procfs.rs b/crates/openshell-sandbox/src/procfs.rs index ece16c82..62c4825a 100644 --- a/crates/openshell-sandbox/src/procfs.rs +++ b/crates/openshell-sandbox/src/procfs.rs @@ -7,9 +7,16 @@ //! for process-identity binding in the OPA proxy policy engine. use miette::{IntoDiagnostic, Result}; -use std::path::Path; -#[cfg(target_os = "linux")] -use std::path::PathBuf; +use std::path::{Path, PathBuf}; + +/// Process identity snapshot derived from a socket owner. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SocketProcessIdentity { + pub pid: u32, + pub binary_path: PathBuf, + pub ancestor_binaries: Vec, + pub cmdline_paths: Vec, +} /// Read the binary path of a process via `/proc/{pid}/exe` symlink. /// @@ -54,6 +61,29 @@ pub fn resolve_tcp_peer_identity(entrypoint_pid: u32, peer_port: u16) -> Result< Ok((path, pid)) } +/// Resolve the full process identity for the TCP peer inside a sandbox network namespace. +/// +/// This combines socket ownership, ancestor binary lookup, and cmdline-derived +/// absolute paths into one reusable snapshot. +#[cfg(target_os = "linux")] +pub fn resolve_tcp_peer_process_identity( + entrypoint_pid: u32, + peer_port: u16, +) -> Result { + let (binary_path, pid) = resolve_tcp_peer_identity(entrypoint_pid, peer_port)?; + let ancestor_binaries = collect_ancestor_binaries(pid, entrypoint_pid); + let mut exclude = ancestor_binaries.clone(); + exclude.push(binary_path.clone()); + let cmdline_paths = collect_cmdline_paths(pid, entrypoint_pid, &exclude); + + Ok(SocketProcessIdentity { + pid, + binary_path, + ancestor_binaries, + cmdline_paths, + }) +} + /// Read the `PPid` (parent PID) from `/proc//status`. #[cfg(target_os = "linux")] pub fn read_ppid(pid: u32) -> Option { diff --git a/crates/openshell-sandbox/src/proxy.rs b/crates/openshell-sandbox/src/proxy.rs index d662399b..5dbf47c2 100644 --- a/crates/openshell-sandbox/src/proxy.rs +++ b/crates/openshell-sandbox/src/proxy.rs @@ -5,6 +5,10 @@ use crate::denial_aggregator::DenialEvent; use crate::identity::BinaryIdentityCache; +use crate::inference_debug::{ + BodyCaptureBuffer, InferenceDebugLogger, InferenceDebugOutcome, InferenceDebugRecord, + duration_ms, logged_headers, timestamp_ms, +}; use crate::l7::tls::ProxyTlsState; use crate::opa::{NetworkAction, OpaEngine}; use crate::policy::ProxyPolicy; @@ -14,6 +18,7 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicU32; +use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::mpsc; @@ -36,6 +41,20 @@ struct ConnectDecision { cmdline_paths: Vec, } +#[derive(Debug)] +struct InferenceAttribution { + source_port: u16, + identity: Option, + identity_error: Option, +} + +#[derive(Debug, Clone)] +struct SelectedRouteMeta { + name: String, + provider_type: Option, + model: String, +} + /// Outcome of an inference interception attempt. /// /// Returned by [`handle_inference_interception`] so the call site can emit @@ -151,6 +170,7 @@ impl ProxyHandle { let listener = TcpListener::bind(http_addr).await.into_diagnostic()?; let local_addr = listener.local_addr().into_diagnostic()?; info!(addr = %local_addr, "Proxy listening (tcp)"); + let inference_debug = InferenceDebugLogger::from_env().map(Arc::new); let join = tokio::spawn(async move { loop { @@ -163,9 +183,10 @@ impl ProxyHandle { let inf = inference_ctx.clone(); let resolver = secret_resolver.clone(); let dtx = denial_tx.clone(); + let inf_debug = inference_debug.clone(); tokio::spawn(async move { if let Err(err) = handle_tcp_connection( - stream, opa, cache, spid, tls, inf, resolver, dtx, + stream, opa, cache, spid, tls, inf, resolver, dtx, inf_debug, ) .await { @@ -266,6 +287,7 @@ async fn handle_tcp_connection( inference_ctx: Option>, secret_resolver: Option>, denial_tx: Option>, + inference_debug: Option>, ) -> Result<()> { let mut buf = vec![0u8; MAX_HEADER_BYTES]; let mut used = 0usize; @@ -316,8 +338,10 @@ async fn handle_tcp_connection( let (host, port) = parse_target(target)?; let host_lc = host.to_ascii_lowercase(); + let peer_addr = client.peer_addr().into_diagnostic()?; if host_lc == INFERENCE_LOCAL_HOST { + let attribution = build_inference_attribution(peer_addr, &entrypoint_pid); respond(&mut client, b"HTTP/1.1 200 Connection Established\r\n\r\n").await?; let outcome = handle_inference_interception( client, @@ -325,6 +349,8 @@ async fn handle_tcp_connection( port, tls_state.as_ref(), inference_ctx.as_ref(), + &attribution, + inference_debug.as_deref(), ) .await?; if let InferenceOutcome::Denied { reason } = outcome { @@ -333,7 +359,6 @@ async fn handle_tcp_connection( return Ok(()); } - let peer_addr = client.peer_addr().into_diagnostic()?; let local_addr = client.local_addr().into_diagnostic()?; // Evaluate OPA policy with process-identity binding @@ -662,6 +687,48 @@ async fn handle_tcp_connection( Ok(()) } +#[cfg(target_os = "linux")] +fn resolve_socket_process_identity( + peer_addr: SocketAddr, + entrypoint_pid: &AtomicU32, +) -> std::result::Result { + use std::sync::atomic::Ordering; + + let pid = entrypoint_pid.load(Ordering::Acquire); + if pid == 0 { + return Err("entrypoint process not yet spawned".to_string()); + } + + crate::procfs::resolve_tcp_peer_process_identity(pid, peer_addr.port()) + .map_err(|error| format!("failed to resolve peer identity: {error}")) +} + +#[cfg(not(target_os = "linux"))] +fn resolve_socket_process_identity( + _peer_addr: SocketAddr, + _entrypoint_pid: &AtomicU32, +) -> std::result::Result { + Err("identity binding unavailable on this platform".to_string()) +} + +fn build_inference_attribution( + peer_addr: SocketAddr, + entrypoint_pid: &AtomicU32, +) -> InferenceAttribution { + match resolve_socket_process_identity(peer_addr, entrypoint_pid) { + Ok(identity) => InferenceAttribution { + source_port: peer_addr.port(), + identity: Some(identity), + identity_error: None, + }, + Err(identity_error) => InferenceAttribution { + source_port: peer_addr.port(), + identity: None, + identity_error: Some(identity_error), + }, + } +} + /// Evaluate OPA policy for a TCP connection with identity binding via /proc/net/tcp. #[cfg(target_os = "linux")] fn evaluate_opa_tcp( @@ -673,7 +740,6 @@ fn evaluate_opa_tcp( port: u16, ) -> ConnectDecision { use crate::opa::NetworkInput; - use std::sync::atomic::Ordering; let deny = |reason: String, binary: Option, @@ -690,30 +756,16 @@ fn evaluate_opa_tcp( } }; - let pid = entrypoint_pid.load(Ordering::Acquire); - if pid == 0 { - return deny( - "entrypoint process not yet spawned".into(), - None, - None, - vec![], - vec![], - ); - } - - let peer_port = peer_addr.port(); - let (bin_path, binary_pid) = match crate::procfs::resolve_tcp_peer_identity(pid, peer_port) { - Ok(r) => r, - Err(e) => { - return deny( - format!("failed to resolve peer binary: {e}"), - None, - None, - vec![], - vec![], - ); + let identity = match resolve_socket_process_identity(peer_addr, entrypoint_pid) { + Ok(identity) => identity, + Err(reason) => { + return deny(reason, None, None, vec![], vec![]); } }; + let bin_path = identity.binary_path.clone(); + let binary_pid = identity.pid; + let ancestors = identity.ancestor_binaries.clone(); + let cmdline_paths = identity.cmdline_paths.clone(); // TOFU verify the immediate binary let bin_hash = match identity_cache.verify_or_cache(&bin_path) { @@ -729,9 +781,6 @@ fn evaluate_opa_tcp( } }; - // Walk the process tree upward to collect ancestor binaries - let ancestors = crate::procfs::collect_ancestor_binaries(binary_pid, pid); - // TOFU verify each ancestor binary for ancestor in &ancestors { if let Err(e) = identity_cache.verify_or_cache(ancestor) { @@ -748,12 +797,6 @@ fn evaluate_opa_tcp( } } - // Collect cmdline paths for script-based binary detection. - // Excludes exe paths already captured in bin_path/ancestors to avoid duplicates. - let mut exclude = ancestors.clone(); - exclude.push(bin_path.clone()); - let cmdline_paths = crate::procfs::collect_cmdline_paths(binary_pid, pid, &exclude); - let input = NetworkInput { host: host.to_string(), port, @@ -822,6 +865,8 @@ async fn handle_inference_interception( _port: u16, tls_state: Option<&Arc>, inference_ctx: Option<&Arc>, + attribution: &InferenceAttribution, + debug_logger: Option<&InferenceDebugLogger>, ) -> Result { use crate::l7::inference::{ParseResult, format_http_response, try_parse_http_request}; @@ -879,7 +924,14 @@ async fn handle_inference_interception( // Try to parse a complete HTTP request match try_parse_http_request(&buf[..used]) { ParseResult::Complete(request, consumed) => { - let was_routed = route_inference_request(&request, ctx, &mut tls_client).await?; + let was_routed = route_inference_request( + &request, + ctx, + &mut tls_client, + attribution, + debug_logger, + ) + .await?; if was_routed { routed_any = true; } else if !routed_any { @@ -922,10 +974,13 @@ async fn route_inference_request( request: &crate::l7::inference::ParsedHttpRequest, ctx: &InferenceContext, tls_client: &mut (impl tokio::io::AsyncWrite + Unpin), + attribution: &InferenceAttribution, + debug_logger: Option<&InferenceDebugLogger>, ) -> Result { use crate::l7::inference::{detect_inference_pattern, format_http_response}; let normalized_path = normalize_inference_path(&request.path); + let started_at = Instant::now(); if let Some(pattern) = detect_inference_pattern(&request.method, &normalized_path, &ctx.patterns) @@ -940,8 +995,25 @@ async fn route_inference_request( // Strip credential + framing/hop-by-hop headers. let filtered_headers = sanitize_inference_request_headers(&request.headers); + let request_log_headers = debug_logger + .map(|_| sanitize_inference_request_log_headers(&request.headers)) + .unwrap_or_default(); let routes = ctx.routes.read().await; + let selected_route = debug_logger + .map(|_| select_inference_route_meta(&routes, &pattern.protocol)) + .unwrap_or(None); + let mut debug_record = debug_logger.map(|_| { + make_inference_debug_record( + attribution, + request, + &normalized_path, + Some(pattern.protocol.as_str()), + Some(pattern.kind.as_str()), + selected_route.as_ref(), + &request_log_headers, + ) + }); if routes.is_empty() { let body = serde_json::json!({ @@ -949,12 +1021,26 @@ async fn route_inference_request( "hint": "run: openshell cluster inference set --help" }); let body_bytes = body.to_string(); - let response = format_http_response( - 503, - &[("content-type".to_string(), "application/json".to_string())], - body_bytes.as_bytes(), - ); + let response_headers = + vec![("content-type".to_string(), "application/json".to_string())]; + let response = format_http_response(503, &response_headers, body_bytes.as_bytes()); write_all(tls_client, &response).await?; + if let (Some(logger), Some(mut record)) = (debug_logger, debug_record.take()) { + let response_capture = BodyCaptureBuffer::from_slice(body_bytes.as_bytes()); + finalize_inference_debug_record( + &mut record, + 503, + &sanitize_inference_response_log_headers(response_headers), + &response_capture, + false, + None, + started_at.elapsed(), + InferenceDebugOutcome::UpstreamError, + None, + Some("cluster inference is not configured".to_string()), + ); + logger.write_record(&record); + } return Ok(true); } @@ -975,9 +1061,16 @@ async fn route_inference_request( format_chunk, format_chunk_terminator, format_http_response_header, }; - let resp_headers = sanitize_inference_response_headers( - std::mem::take(&mut resp.headers).into_iter().collect(), - ); + let raw_resp_headers: Vec<(String, String)> = + std::mem::take(&mut resp.headers).into_iter().collect(); + let resp_headers = sanitize_inference_response_headers(raw_resp_headers.clone()); + let response_log_headers = debug_logger + .map(|_| sanitize_inference_response_log_headers(raw_resp_headers)) + .unwrap_or_default(); + let streaming = request_is_streaming(request, &resp_headers); + let mut response_capture = debug_logger.map(|_| BodyCaptureBuffer::default()); + let mut first_chunk_delay = None; + let mut stream_error = None; // Write response headers immediately (chunked TE). let header_bytes = format_http_response_header(resp.status, &resp_headers); @@ -987,12 +1080,19 @@ async fn route_inference_request( loop { match resp.next_chunk().await { Ok(Some(chunk)) => { + if first_chunk_delay.is_none() { + first_chunk_delay = Some(started_at.elapsed()); + } + if let Some(capture) = response_capture.as_mut() { + capture.push(&chunk); + } let encoded = format_chunk(&chunk); write_all(tls_client, &encoded).await?; } Ok(None) => break, Err(e) => { warn!(error = %e, "error reading upstream response chunk"); + stream_error = Some(e.to_string()); break; } } @@ -1000,18 +1100,55 @@ async fn route_inference_request( // Terminate the chunked stream. write_all(tls_client, format_chunk_terminator()).await?; + if let (Some(logger), Some(mut record), Some(response_capture)) = + (debug_logger, debug_record.take(), response_capture) + { + let outcome = if stream_error.is_some() { + InferenceDebugOutcome::UpstreamError + } else { + InferenceDebugOutcome::Routed + }; + finalize_inference_debug_record( + &mut record, + resp.status, + &response_log_headers, + &response_capture, + streaming, + first_chunk_delay, + started_at.elapsed(), + outcome, + None, + stream_error, + ); + logger.write_record(&record); + } } Err(e) => { warn!(error = %e, "inference endpoint detected but upstream service failed"); let (status, msg) = router_error_to_http(&e); let body = serde_json::json!({"error": msg}); let body_bytes = body.to_string(); - let response = format_http_response( - status, - &[("content-type".to_string(), "application/json".to_string())], - body_bytes.as_bytes(), - ); + let response_headers = + vec![("content-type".to_string(), "application/json".to_string())]; + let response = + format_http_response(status, &response_headers, body_bytes.as_bytes()); write_all(tls_client, &response).await?; + if let (Some(logger), Some(mut record)) = (debug_logger, debug_record.take()) { + let response_capture = BodyCaptureBuffer::from_slice(body_bytes.as_bytes()); + finalize_inference_debug_record( + &mut record, + status, + &sanitize_inference_response_log_headers(response_headers), + &response_capture, + false, + None, + started_at.elapsed(), + InferenceDebugOutcome::UpstreamError, + None, + Some(e.to_string()), + ); + logger.write_record(&record); + } } } Ok(true) @@ -1024,12 +1161,35 @@ async fn route_inference_request( ); let body = serde_json::json!({"error": "connection not allowed by policy"}); let body_bytes = body.to_string(); - let response = format_http_response( - 403, - &[("content-type".to_string(), "application/json".to_string())], - body_bytes.as_bytes(), - ); + let response_headers = vec![("content-type".to_string(), "application/json".to_string())]; + let response = format_http_response(403, &response_headers, body_bytes.as_bytes()); write_all(tls_client, &response).await?; + if let Some(logger) = debug_logger { + let request_log_headers = sanitize_inference_request_log_headers(&request.headers); + let mut record = make_inference_debug_record( + attribution, + request, + &normalized_path, + None, + None, + None, + &request_log_headers, + ); + let response_capture = BodyCaptureBuffer::from_slice(body_bytes.as_bytes()); + finalize_inference_debug_record( + &mut record, + 403, + &sanitize_inference_response_log_headers(response_headers), + &response_capture, + false, + None, + started_at.elapsed(), + InferenceDebugOutcome::Denied, + Some("connection not allowed by policy".to_string()), + None, + ); + logger.write_record(&record); + } Ok(false) } } @@ -1058,6 +1218,10 @@ fn sanitize_inference_request_headers(headers: &[(String, String)]) -> Vec<(Stri .collect() } +fn sanitize_inference_request_log_headers(headers: &[(String, String)]) -> Vec<(String, String)> { + redact_sensitive_log_headers(sanitize_inference_request_headers(headers)) +} + fn sanitize_inference_response_headers(headers: Vec<(String, String)>) -> Vec<(String, String)> { headers .into_iter() @@ -1065,6 +1229,25 @@ fn sanitize_inference_response_headers(headers: Vec<(String, String)>) -> Vec<(S .collect() } +fn sanitize_inference_response_log_headers( + headers: Vec<(String, String)>, +) -> Vec<(String, String)> { + redact_sensitive_log_headers(sanitize_inference_response_headers(headers)) +} + +fn redact_sensitive_log_headers(headers: Vec<(String, String)>) -> Vec<(String, String)> { + headers + .into_iter() + .map(|(name, value)| { + if should_redact_log_header(&name) { + (name, "[REDACTED]".to_string()) + } else { + (name, value) + } + }) + .collect() +} + fn should_strip_request_header(name: &str) -> bool { let name_lc = name.to_ascii_lowercase(); matches!( @@ -1078,6 +1261,17 @@ fn should_strip_response_header(name: &str) -> bool { matches!(name_lc.as_str(), "content-length") || is_hop_by_hop_header(&name_lc) } +fn should_redact_log_header(name: &str) -> bool { + let name_lc = name.to_ascii_lowercase(); + matches!( + name_lc.as_str(), + "authorization" | "proxy-authorization" | "x-api-key" | "cookie" | "set-cookie" + ) || name_lc == "api-key" + || name_lc.ends_with("-api-key") + || name_lc.contains("token") + || name_lc.contains("secret") +} + fn is_hop_by_hop_header(name: &str) -> bool { matches!( name, @@ -1093,6 +1287,147 @@ fn is_hop_by_hop_header(name: &str) -> bool { ) } +fn select_inference_route_meta( + routes: &[openshell_router::config::ResolvedRoute], + protocol: &str, +) -> Option { + let normalized_protocol = protocol.trim().to_ascii_lowercase(); + routes + .iter() + .find(|route| route.protocols.iter().any(|p| p == &normalized_protocol)) + .map(|route| SelectedRouteMeta { + name: route.name.clone(), + provider_type: route.provider_type.clone(), + model: route.model.clone(), + }) +} + +fn request_is_streaming( + request: &crate::l7::inference::ParsedHttpRequest, + response_headers: &[(String, String)], +) -> bool { + let request_wants_sse = request.headers.iter().any(|(name, value)| { + name.eq_ignore_ascii_case("accept") + && value.to_ascii_lowercase().contains("text/event-stream") + }); + if request_wants_sse { + return true; + } + + let response_is_sse = response_headers.iter().any(|(name, value)| { + name.eq_ignore_ascii_case("content-type") + && value.to_ascii_lowercase().contains("text/event-stream") + }); + if response_is_sse { + return true; + } + + serde_json::from_slice::(&request.body) + .ok() + .and_then(|json| json.get("stream").and_then(serde_json::Value::as_bool)) + .unwrap_or(false) +} + +fn make_inference_debug_record( + attribution: &InferenceAttribution, + request: &crate::l7::inference::ParsedHttpRequest, + normalized_path: &str, + protocol: Option<&str>, + kind: Option<&str>, + selected_route: Option<&SelectedRouteMeta>, + request_headers: &[(String, String)], +) -> InferenceDebugRecord { + let request_capture = BodyCaptureBuffer::from_slice(&request.body); + let pid = attribution.identity.as_ref().map(|identity| identity.pid); + let binary_path = attribution + .identity + .as_ref() + .map(|identity| identity.binary_path.display().to_string()); + let ancestor_binaries = attribution + .identity + .as_ref() + .map(|identity| { + identity + .ancestor_binaries + .iter() + .map(|path| path.display().to_string()) + .collect() + }) + .unwrap_or_default(); + let cmdline_paths = attribution + .identity + .as_ref() + .map(|identity| { + identity + .cmdline_paths + .iter() + .map(|path| path.display().to_string()) + .collect() + }) + .unwrap_or_default(); + + InferenceDebugRecord { + timestamp_ms: timestamp_ms(), + source_port: attribution.source_port, + pid, + binary_path, + ancestor_binaries, + cmdline_paths, + identity_error: attribution.identity_error.clone(), + method: request.method.clone(), + raw_path: request.path.clone(), + normalized_path: normalized_path.to_string(), + protocol: protocol.map(ToString::to_string), + kind: kind.map(ToString::to_string), + selected_route: selected_route.map(|route| route.name.clone()), + selected_provider: selected_route.and_then(|route| route.provider_type.clone()), + selected_model: selected_route.map(|route| route.model.clone()), + request_headers: logged_headers(request_headers), + request_body_bytes: request_capture.total_bytes(), + request_body_capture_bytes: request_capture.captured_bytes(), + request_body_truncated: request_capture.truncated(), + request_body_b64: request_capture.encoded_body(), + response_status: 0, + response_headers: Vec::new(), + response_body_bytes: 0, + response_body_capture_bytes: 0, + response_body_truncated: false, + response_body_b64: None, + streaming: false, + time_to_first_chunk_ms: None, + total_duration_ms: 0, + outcome: InferenceDebugOutcome::Denied, + deny_reason: None, + router_error: None, + } +} + +fn finalize_inference_debug_record( + record: &mut InferenceDebugRecord, + response_status: u16, + response_headers: &[(String, String)], + response_capture: &BodyCaptureBuffer, + streaming: bool, + time_to_first_chunk: Option, + total_duration: Duration, + outcome: InferenceDebugOutcome, + deny_reason: Option, + router_error: Option, +) { + record.response_status = response_status; + record.response_headers = logged_headers(response_headers); + record.response_body_bytes = response_capture.total_bytes(); + record.response_body_capture_bytes = response_capture.captured_bytes(); + record.response_body_truncated = response_capture.truncated(); + record.response_body_b64 = response_capture.encoded_body(); + record.streaming = streaming; + record.time_to_first_chunk_ms = time_to_first_chunk.map(duration_ms); + record.total_duration_ms = duration_ms(total_duration); + record.outcome = outcome; + record.deny_reason = deny_reason; + record.router_error = router_error; +} + /// Write all bytes to an async writer. async fn write_all(writer: &mut (impl tokio::io::AsyncWrite + Unpin), data: &[u8]) -> Result<()> { use tokio::io::AsyncWriteExt; @@ -2063,6 +2398,33 @@ mod tests { ); } + #[test] + fn sanitize_request_log_headers_redacts_cookie_values() { + let headers = vec![ + ("cookie".to_string(), "session=abc".to_string()), + ("x-session-token".to_string(), "secret".to_string()), + ("content-type".to_string(), "application/json".to_string()), + ]; + + let kept = sanitize_inference_request_log_headers(&headers); + + assert!( + kept.iter() + .any(|(k, v)| k.eq_ignore_ascii_case("cookie") && v == "[REDACTED]"), + "cookie value should be redacted" + ); + assert!( + kept.iter() + .any(|(k, v)| k.eq_ignore_ascii_case("x-session-token") && v == "[REDACTED]"), + "token-like value should be redacted" + ); + assert!( + kept.iter() + .any(|(k, v)| k.eq_ignore_ascii_case("content-type") && v == "application/json"), + "non-sensitive headers should be preserved" + ); + } + // -- router_error_to_http -- #[test] @@ -2158,6 +2520,27 @@ mod tests { ); } + #[test] + fn sanitize_response_log_headers_redacts_set_cookie() { + let headers = vec![ + ("set-cookie".to_string(), "session=abc".to_string()), + ("content-type".to_string(), "application/json".to_string()), + ]; + + let kept = sanitize_inference_response_log_headers(headers); + + assert!( + kept.iter() + .any(|(k, v)| k.eq_ignore_ascii_case("set-cookie") && v == "[REDACTED]"), + "set-cookie value should be redacted" + ); + assert!( + kept.iter() + .any(|(k, v)| k.eq_ignore_ascii_case("content-type") && v == "application/json"), + "non-sensitive response headers should be preserved" + ); + } + // -- is_always_blocked_ip -- #[test] diff --git a/crates/openshell-sandbox/tests/system_inference.rs b/crates/openshell-sandbox/tests/system_inference.rs index 3f6a471e..5209ff84 100644 --- a/crates/openshell-sandbox/tests/system_inference.rs +++ b/crates/openshell-sandbox/tests/system_inference.rs @@ -16,6 +16,7 @@ fn make_system_route() -> ResolvedRoute { name: "sandbox-system".to_string(), endpoint: "mock://system-test".to_string(), model: "system/policy-analyzer".to_string(), + provider_type: None, api_key: "system-key".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: AuthHeader::Bearer, @@ -28,6 +29,7 @@ fn make_user_route() -> ResolvedRoute { name: "inference.local".to_string(), endpoint: "mock://user-test".to_string(), model: "user/gpt-4o".to_string(), + provider_type: None, api_key: "user-key".to_string(), protocols: vec!["openai_chat_completions".to_string()], auth: AuthHeader::Bearer, @@ -120,6 +122,7 @@ async fn system_inference_with_anthropic_protocol() { name: "sandbox-system".to_string(), endpoint: "mock://anthropic-system".to_string(), model: "claude-sonnet-4-20250514".to_string(), + provider_type: Some("anthropic".to_string()), api_key: "ant-key".to_string(), protocols: vec!["anthropic_messages".to_string()], auth: AuthHeader::Custom("x-api-key"), diff --git a/crates/openshell-server/src/inference.rs b/crates/openshell-server/src/inference.rs index d9498463..246105ae 100644 --- a/crates/openshell-server/src/inference.rs +++ b/crates/openshell-server/src/inference.rs @@ -254,11 +254,12 @@ fn resolve_provider_route(provider: &Provider) -> Result