From 2bced810da2c989b5acf1987e48b824fa121817a Mon Sep 17 00:00:00 2001 From: viyatb-oai Date: Fri, 13 Feb 2026 01:01:11 -0800 Subject: [PATCH] feat(network-proxy): structured policy signaling and attempt correlation to core (#11662) ## Summary When network requests were blocked, downstream code often had to infer ask vs deny from free-form response text. That was brittle and led to incorrect approval behavior. This PR fixes the proxy side so blocked decisions are structured and request metadata survives reliably. ## Description - Blocked proxy responses now carry consistent structured policy decision data. - Request attempt metadata is preserved across proxy env paths (including ALL_PROXY flows). - Header stripping was tightened so we still remove unsafe forwarding headers, but keep metadata needed for policy handling. - Block messages were clarified (for example, allowlist miss vs explicit deny). - Added unified violation log entries so policy failures can be inspected in one place. - Added/updated tests for these behaviors. --------- Co-authored-by: Codex <199175422+chatgpt-codex-connector[bot]@users.noreply.github.com> --- .codespellignore | 4 +- .codespellrc | 2 +- codex-rs/Cargo.lock | 1 + codex-rs/network-proxy/Cargo.toml | 1 + codex-rs/network-proxy/src/admin.rs | 2 +- codex-rs/network-proxy/src/http_proxy.rs | 164 ++++++++++++- codex-rs/network-proxy/src/lib.rs | 2 + codex-rs/network-proxy/src/metadata.rs | 50 ++++ codex-rs/network-proxy/src/network_policy.rs | 25 +- codex-rs/network-proxy/src/proxy.rs | 83 ++++++- codex-rs/network-proxy/src/responses.rs | 66 +----- codex-rs/network-proxy/src/runtime.rs | 237 ++++++++++++++++++- codex-rs/network-proxy/src/socks5.rs | 26 ++ codex-rs/network-proxy/src/state.rs | 1 + 14 files changed, 581 insertions(+), 83 deletions(-) create mode 100644 codex-rs/network-proxy/src/metadata.rs diff --git a/.codespellignore b/.codespellignore index 835c0e538e..947511b0c3 100644 --- a/.codespellignore +++ b/.codespellignore @@ -1,3 +1,5 @@ iTerm iTerm2 -psuedo \ No newline at end of file +psuedo +te +TE diff --git a/.codespellrc b/.codespellrc index ec1f02408e..4f3067ed76 100644 --- a/.codespellrc +++ b/.codespellrc @@ -3,4 +3,4 @@ skip = .git*,vendor,*-lock.yaml,*.lock,.codespellrc,*test.ts,*.jsonl,frame*.txt,*.snap,*.snap.new,*meriyah.umd.min.js check-hidden = true ignore-regex = ^\s*"image/\S+": ".*|\b(afterAll)\b -ignore-words-list = ratatui,ser,iTerm,iterm2,iterm +ignore-words-list = ratatui,ser,iTerm,iterm2,iterm,te,TE diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index ad6ba09107..535396feb0 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2035,6 +2035,7 @@ version = "0.0.0" dependencies = [ "anyhow", "async-trait", + "base64 0.22.1", "clap", "codex-utils-absolute-path", "codex-utils-rustls-provider", diff --git a/codex-rs/network-proxy/Cargo.toml b/codex-rs/network-proxy/Cargo.toml index 48a75c288d..cbcde383b2 100644 --- a/codex-rs/network-proxy/Cargo.toml +++ b/codex-rs/network-proxy/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +base64 = { workspace = true } clap = { workspace = true, features = ["derive"] } codex-utils-absolute-path = { workspace = true } codex-utils-rustls-provider = { workspace = true } diff --git a/codex-rs/network-proxy/src/admin.rs b/codex-rs/network-proxy/src/admin.rs index 678972f810..02aebae1c0 100644 --- a/codex-rs/network-proxy/src/admin.rs +++ b/codex-rs/network-proxy/src/admin.rs @@ -89,7 +89,7 @@ async fn handle_admin_request( text_response(StatusCode::INTERNAL_SERVER_ERROR, "error") } }, - ("GET", "/blocked") => match state.drain_blocked().await { + ("GET", "/blocked") => match state.blocked_snapshot().await { Ok(blocked) => json_response(&BlockedResponse { blocked }), Err(err) => { error!("failed to read blocked queue: {err}"); diff --git a/codex-rs/network-proxy/src/http_proxy.rs b/codex-rs/network-proxy/src/http_proxy.rs index e59980cc5f..eed0ad9da6 100644 --- a/codex-rs/network-proxy/src/http_proxy.rs +++ b/codex-rs/network-proxy/src/http_proxy.rs @@ -1,4 +1,5 @@ use crate::config::NetworkMode; +use crate::metadata::attempt_id_from_proxy_authorization; use crate::network_policy::NetworkDecision; use crate::network_policy::NetworkDecisionSource; use crate::network_policy::NetworkPolicyDecider; @@ -16,7 +17,6 @@ use crate::responses::blocked_header_value; use crate::responses::blocked_message_with_policy; use crate::responses::blocked_text_response_with_policy; use crate::responses::json_response; -use crate::responses::policy_decision_prefix; use crate::runtime::unix_socket_permissions_supported; use crate::state::BlockedRequest; use crate::state::BlockedRequestArgs; @@ -36,11 +36,13 @@ use rama_core::layer::AddInputExtensionLayer; use rama_core::rt::Executor; use rama_core::service::service_fn; use rama_http::Body; +use rama_http::HeaderMap; +use rama_http::HeaderName; use rama_http::HeaderValue; use rama_http::Request; use rama_http::Response; use rama_http::StatusCode; -use rama_http::layer::remove_header::RemoveRequestHeaderLayer; +use rama_http::header; use rama_http::layer::remove_header::RemoveResponseHeaderLayer; use rama_http::matcher::MethodMatcher; use rama_http_backend::client::proxy::layer::HttpProxyConnector; @@ -119,7 +121,6 @@ async fn run_http_proxy_with_listener( service_fn(http_connect_proxy), ), RemoveResponseHeaderLayer::hop_by_hop(), - RemoveRequestHeaderLayer::hop_by_hop(), ) .into_layer(service_fn({ let policy_decider = policy_decider.clone(); @@ -159,6 +160,7 @@ async fn http_connect_accept( } let client = client_addr(&req); + let network_attempt_id = request_network_attempt_id(&req); let enabled = app_state .enabled() @@ -186,6 +188,7 @@ async fn http_connect_accept( method: Some("CONNECT".to_string()), command: None, exec_policy_hint: None, + attempt_id: network_attempt_id.clone(), }); match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await { @@ -210,6 +213,10 @@ async fn http_connect_accept( method: Some("CONNECT".to_string()), mode: None, protocol: "http-connect".to_string(), + attempt_id: network_attempt_id.clone(), + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(authority.port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -248,6 +255,10 @@ async fn http_connect_accept( method: Some("CONNECT".to_string()), mode: Some(NetworkMode::Limited), protocol: "http-connect".to_string(), + attempt_id: network_attempt_id, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(authority.port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -353,7 +364,7 @@ async fn forward_connect_tunnel( async fn http_plain_proxy( policy_decider: Option>, - req: Request, + mut req: Request, ) -> Result { let app_state = match req.extensions().get::>().cloned() { Some(state) => state, @@ -363,6 +374,7 @@ async fn http_plain_proxy( } }; let client = client_addr(&req); + let network_attempt_id = request_network_attempt_id(&req); let method_allowed = match app_state .method_allowed(req.method().as_str()) @@ -492,6 +504,7 @@ async fn http_plain_proxy( method: Some(req.method().as_str().to_string()), command: None, exec_policy_hint: None, + attempt_id: network_attempt_id.clone(), }); match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await { @@ -516,6 +529,10 @@ async fn http_plain_proxy( method: Some(req.method().as_str().to_string()), mode: None, protocol: "http".to_string(), + attempt_id: network_attempt_id.clone(), + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -546,6 +563,10 @@ async fn http_plain_proxy( method: Some(req.method().as_str().to_string()), mode: Some(NetworkMode::Limited), protocol: "http".to_string(), + attempt_id: network_attempt_id, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -578,6 +599,8 @@ async fn http_plain_proxy( UpstreamClient::direct() }; + // Strip hop-by-hop headers only after extracting metadata used for policy correlation. + remove_hop_by_hop_request_headers(req.headers_mut()); match client.serve(req).await { Ok(resp) => Ok(resp), Err(err) => { @@ -602,6 +625,7 @@ async fn proxy_via_unix_socket(req: Request, socket_path: &str) -> Result(input: &T) -> Option { .map(|info| info.peer_addr().to_string()) } +fn request_network_attempt_id(req: &Request) -> Option { + // Some HTTP stacks normalize proxy credentials into `authorization`; accept both. + attempt_id_from_proxy_authorization(req.headers().get("proxy-authorization")) + .or_else(|| attempt_id_from_proxy_authorization(req.headers().get("authorization"))) +} + +fn remove_hop_by_hop_request_headers(headers: &mut HeaderMap) { + while let Some(raw_connection) = headers.get(header::CONNECTION).cloned() { + headers.remove(header::CONNECTION); + if let Ok(raw_connection) = raw_connection.to_str() { + let connection_headers: Vec = raw_connection + .split(',') + .map(str::trim) + .filter(|token| !token.is_empty()) + .map(ToOwned::to_owned) + .collect(); + for token in connection_headers { + if let Ok(name) = HeaderName::from_bytes(token.as_bytes()) { + headers.remove(name); + } + } + } + } + for name in [ + &header::KEEP_ALIVE, + &header::PROXY_CONNECTION, + &header::PROXY_AUTHORIZATION, + &header::TRAILER, + &header::TRANSFER_ENCODING, + &header::UPGRADE, + ] { + headers.remove(name); + } + + // codespell:ignore te,TE + // 0x74,0x65 is ASCII "te" (the HTTP TE hop-by-hop header). + if let Ok(short_hop_header_name) = HeaderName::from_bytes(&[0x74, 0x65]) { + headers.remove(short_hop_header_name); + } +} + fn json_blocked(host: &str, reason: &str, details: Option<&PolicyDecisionDetails<'_>>) -> Response { - let (policy_decision_prefix, message) = details + let (message, decision, source, protocol, port) = details .map(|details| { ( - Some(policy_decision_prefix(details)), Some(blocked_message_with_policy(reason, details)), + Some(details.decision.as_str()), + Some(details.source.as_str()), + Some(details.protocol.as_policy_protocol()), + Some(details.port), ) }) - .unwrap_or((None, None)); + .unwrap_or((None, None, None, None, None)); let response = BlockedResponse { status: "blocked", host, reason, - policy_decision_prefix, + decision, + source, + protocol, + port, message, }; let mut resp = json_response(&response); @@ -667,6 +738,10 @@ async fn proxy_disabled_response( method, mode: None, protocol: protocol.as_policy_protocol().to_string(), + attempt_id: None, + decision: Some("deny".to_string()), + source: Some("proxy_state".to_string()), + port: Some(port), })) .await; @@ -703,7 +778,13 @@ struct BlockedResponse<'a> { host: &'a str, reason: &'a str, #[serde(skip_serializing_if = "Option::is_none")] - policy_decision_prefix: Option, + decision: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + source: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + protocol: Option<&'static str>, + #[serde(skip_serializing_if = "Option::is_none")] + port: Option, #[serde(skip_serializing_if = "Option::is_none")] message: Option, } @@ -715,6 +796,8 @@ mod tests { use crate::config::NetworkMode; use crate::config::NetworkProxySettings; use crate::runtime::network_proxy_state_for_policy; + use base64::Engine; + use base64::engine::general_purpose::STANDARD; use pretty_assertions::assert_eq; use rama_http::Method; use rama_http::Request; @@ -744,4 +827,67 @@ mod tests { "blocked-by-method-policy" ); } + + #[test] + fn request_network_attempt_id_reads_proxy_authorization_header() { + let encoded = STANDARD.encode("codex-net-attempt-attempt-1:"); + let req = Request::builder() + .method(Method::GET) + .uri("http://example.com") + .header("proxy-authorization", format!("Basic {encoded}")) + .body(Body::empty()) + .unwrap(); + assert_eq!( + request_network_attempt_id(&req), + Some("attempt-1".to_string()) + ); + } + + #[test] + fn request_network_attempt_id_reads_authorization_header_fallback() { + let encoded = STANDARD.encode("codex-net-attempt-attempt-2:"); + let req = Request::builder() + .method(Method::GET) + .uri("http://example.com") + .header("authorization", format!("Basic {encoded}")) + .body(Body::empty()) + .unwrap(); + assert_eq!( + request_network_attempt_id(&req), + Some("attempt-2".to_string()) + ); + } + + #[test] + fn remove_hop_by_hop_request_headers_keeps_forwarding_headers() { + let mut headers = HeaderMap::new(); + headers.insert( + header::CONNECTION, + HeaderValue::from_static("x-hop, keep-alive"), + ); + headers.insert("x-hop", HeaderValue::from_static("1")); + headers.insert( + header::PROXY_AUTHORIZATION, + HeaderValue::from_static("Basic abc"), + ); + headers.insert( + &header::X_FORWARDED_FOR, + HeaderValue::from_static("127.0.0.1"), + ); + headers.insert(header::HOST, HeaderValue::from_static("example.com")); + + remove_hop_by_hop_request_headers(&mut headers); + + assert_eq!(headers.get(header::CONNECTION), None); + assert_eq!(headers.get("x-hop"), None); + assert_eq!(headers.get(header::PROXY_AUTHORIZATION), None); + assert_eq!( + headers.get(&header::X_FORWARDED_FOR), + Some(&HeaderValue::from_static("127.0.0.1")) + ); + assert_eq!( + headers.get(header::HOST), + Some(&HeaderValue::from_static("example.com")) + ); + } } diff --git a/codex-rs/network-proxy/src/lib.rs b/codex-rs/network-proxy/src/lib.rs index 8ca0d969c8..0983b07dc5 100644 --- a/codex-rs/network-proxy/src/lib.rs +++ b/codex-rs/network-proxy/src/lib.rs @@ -3,6 +3,7 @@ mod admin; mod config; mod http_proxy; +mod metadata; mod network_policy; mod policy; mod proxy; @@ -32,6 +33,7 @@ pub use proxy::NetworkProxyHandle; pub use proxy::PROXY_URL_ENV_KEYS; pub use proxy::has_proxy_url_env_vars; pub use proxy::proxy_url_env_value; +pub use runtime::BlockedRequest; pub use runtime::ConfigReloader; pub use runtime::ConfigState; pub use runtime::NetworkProxyState; diff --git a/codex-rs/network-proxy/src/metadata.rs b/codex-rs/network-proxy/src/metadata.rs new file mode 100644 index 0000000000..2542125c35 --- /dev/null +++ b/codex-rs/network-proxy/src/metadata.rs @@ -0,0 +1,50 @@ +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use rama_http::HeaderValue; + +pub const NETWORK_ATTEMPT_USERNAME_PREFIX: &str = "codex-net-attempt-"; + +pub fn proxy_username_for_attempt_id(attempt_id: &str) -> String { + format!("{NETWORK_ATTEMPT_USERNAME_PREFIX}{attempt_id}") +} + +pub fn attempt_id_from_proxy_authorization(header: Option<&HeaderValue>) -> Option { + let header = header?; + let raw = header.to_str().ok()?; + let encoded = raw.strip_prefix("Basic ")?; + let decoded = STANDARD.decode(encoded.trim()).ok()?; + let decoded = String::from_utf8(decoded).ok()?; + let username = decoded + .split_once(':') + .map(|(user, _)| user) + .unwrap_or(decoded.as_str()); + let attempt_id = username.strip_prefix(NETWORK_ATTEMPT_USERNAME_PREFIX)?; + if attempt_id.is_empty() { + None + } else { + Some(attempt_id.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use base64::engine::general_purpose::STANDARD; + + #[test] + fn parses_attempt_id_from_proxy_authorization_header() { + let encoded = STANDARD.encode(format!("{NETWORK_ATTEMPT_USERNAME_PREFIX}abc123:")); + let header = HeaderValue::from_str(&format!("Basic {encoded}")).unwrap(); + assert_eq!( + attempt_id_from_proxy_authorization(Some(&header)), + Some("abc123".to_string()) + ); + } + + #[test] + fn ignores_non_attempt_proxy_authorization_header() { + let encoded = STANDARD.encode("normal-user:password"); + let header = HeaderValue::from_str(&format!("Basic {encoded}")).unwrap(); + assert_eq!(attempt_id_from_proxy_authorization(Some(&header)), None); + } +} diff --git a/codex-rs/network-proxy/src/network_policy.rs b/codex-rs/network-proxy/src/network_policy.rs index b70c3e0b17..201a62cda4 100644 --- a/codex-rs/network-proxy/src/network_policy.rs +++ b/codex-rs/network-proxy/src/network_policy.rs @@ -69,6 +69,7 @@ pub struct NetworkPolicyRequest { pub method: Option, pub command: Option, pub exec_policy_hint: Option, + pub attempt_id: Option, } pub struct NetworkPolicyRequestArgs { @@ -79,6 +80,7 @@ pub struct NetworkPolicyRequestArgs { pub method: Option, pub command: Option, pub exec_policy_hint: Option, + pub attempt_id: Option, } impl NetworkPolicyRequest { @@ -91,6 +93,7 @@ impl NetworkPolicyRequest { method, command, exec_policy_hint, + attempt_id, } = args; Self { protocol, @@ -100,6 +103,7 @@ impl NetworkPolicyRequest { method, command, exec_policy_hint, + attempt_id, } } } @@ -119,6 +123,10 @@ impl NetworkDecision { Self::deny_with_source(reason, NetworkDecisionSource::Decider) } + pub fn ask(reason: impl Into) -> Self { + Self::ask_with_source(reason, NetworkDecisionSource::Decider) + } + pub fn deny_with_source(reason: impl Into, source: NetworkDecisionSource) -> Self { let reason = reason.into(); let reason = if reason.is_empty() { @@ -216,9 +224,9 @@ fn map_decider_decision(decision: NetworkDecision) -> NetworkDecision { #[cfg(test)] mod tests { use super::*; - use crate::config::NetworkProxySettings; use crate::reasons::REASON_DENIED; + use crate::reasons::REASON_NOT_ALLOWED; use crate::reasons::REASON_NOT_ALLOWED_LOCAL; use crate::state::network_proxy_state_for_policy; use pretty_assertions::assert_eq; @@ -248,6 +256,7 @@ mod tests { method: Some("GET".to_string()), command: None, exec_policy_hint: None, + attempt_id: None, }); let decision = evaluate_host_policy(&state, Some(&decider), &request) @@ -281,6 +290,7 @@ mod tests { method: Some("GET".to_string()), command: None, exec_policy_hint: None, + attempt_id: None, }); let decision = evaluate_host_policy(&state, Some(&decider), &request) @@ -321,6 +331,7 @@ mod tests { method: Some("GET".to_string()), command: None, exec_policy_hint: None, + attempt_id: None, }); let decision = evaluate_host_policy(&state, Some(&decider), &request) @@ -336,4 +347,16 @@ mod tests { ); assert_eq!(calls.load(Ordering::SeqCst), 0); } + + #[test] + fn ask_uses_decider_source_and_ask_decision() { + assert_eq!( + NetworkDecision::ask(REASON_NOT_ALLOWED), + NetworkDecision::Deny { + reason: REASON_NOT_ALLOWED.to_string(), + source: NetworkDecisionSource::Decider, + decision: NetworkPolicyDecision::Ask, + } + ); + } } diff --git a/codex-rs/network-proxy/src/proxy.rs b/codex-rs/network-proxy/src/proxy.rs index 566ac0da25..42f1637c2d 100644 --- a/codex-rs/network-proxy/src/proxy.rs +++ b/codex-rs/network-proxy/src/proxy.rs @@ -1,7 +1,9 @@ use crate::admin; use crate::config; use crate::http_proxy; +use crate::metadata::proxy_username_for_attempt_id; use crate::network_policy::NetworkPolicyDecider; +use crate::runtime::BlockedRequest; use crate::runtime::unix_socket_permissions_supported; use crate::socks5; use crate::state::NetworkProxyState; @@ -312,8 +314,12 @@ fn apply_proxy_env_overrides( socks_addr: SocketAddr, socks_enabled: bool, allow_local_binding: bool, + network_attempt_id: Option<&str>, ) { - let http_proxy_url = format!("http://{http_addr}"); + let http_proxy_url = network_attempt_id + .map(proxy_username_for_attempt_id) + .map(|username| format!("http://{username}@{http_addr}")) + .unwrap_or_else(|| format!("http://{http_addr}")); let socks_proxy_url = format!("socks5h://{socks_addr}"); env.insert( ALLOW_LOCAL_BINDING_ENV_KEY.to_string(), @@ -354,18 +360,25 @@ fn apply_proxy_env_overrides( env.insert("ELECTRON_GET_USE_PROXY".to_string(), "true".to_string()); - if socks_enabled { + // Keep HTTP_PROXY/HTTPS_PROXY as HTTP endpoints. A lot of clients break if + // those vars contain SOCKS URLs. We only switch ALL_PROXY here. + // + // For attempt-scoped runs, point ALL_PROXY at the HTTP proxy URL so the + // attempt metadata survives in proxy credentials for correlation. + if socks_enabled && network_attempt_id.is_none() { set_env_keys(env, ALL_PROXY_ENV_KEYS, &socks_proxy_url); set_env_keys(env, FTP_PROXY_ENV_KEYS, &socks_proxy_url); - #[cfg(target_os = "macos")] - { - // Preserve existing SSH wrappers (for example: Secretive/Teleport setups) - // and only provide a SOCKS ProxyCommand fallback when one is not present. - env.entry("GIT_SSH_COMMAND".to_string()) - .or_insert_with(|| format!("ssh -o ProxyCommand='nc -X 5 -x {socks_addr} %h %p'")); - } } else { set_env_keys(env, ALL_PROXY_ENV_KEYS, &http_proxy_url); + set_env_keys(env, FTP_PROXY_ENV_KEYS, &http_proxy_url); + } + + #[cfg(target_os = "macos")] + if socks_enabled { + // Preserve existing SSH wrappers (for example: Secretive/Teleport setups) + // and only provide a SOCKS ProxyCommand fallback when one is not present. + env.entry("GIT_SSH_COMMAND".to_string()) + .or_insert_with(|| format!("ssh -o ProxyCommand='nc -X 5 -x {socks_addr} %h %p'")); } } @@ -386,7 +399,22 @@ impl NetworkProxy { self.admin_addr } + pub async fn latest_blocked_request_for_attempt( + &self, + attempt_id: &str, + ) -> Result> { + self.state.latest_blocked_for_attempt(attempt_id).await + } + pub fn apply_to_env(&self, env: &mut HashMap) { + self.apply_to_env_for_attempt(env, None); + } + + pub fn apply_to_env_for_attempt( + &self, + env: &mut HashMap, + network_attempt_id: Option<&str>, + ) { // Enforce proxying for child processes. We intentionally override existing values so // command-level environment cannot bypass the managed proxy endpoint. apply_proxy_env_overrides( @@ -395,6 +423,7 @@ impl NetworkProxy { self.socks_addr, self.socks_enabled, self.allow_local_binding, + network_attempt_id, ); } @@ -694,6 +723,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8081), true, false, + None, ); assert_eq!( @@ -736,6 +766,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8081), false, true, + None, ); assert_eq!( @@ -745,6 +776,39 @@ mod tests { assert_eq!(env.get(ALLOW_LOCAL_BINDING_ENV_KEY), Some(&"1".to_string())); } + #[test] + fn apply_proxy_env_overrides_embeds_attempt_id_in_http_proxy_url() { + let mut env = HashMap::new(); + apply_proxy_env_overrides( + &mut env, + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3128), + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8081), + true, + false, + Some("attempt-123"), + ); + + assert_eq!( + env.get("HTTP_PROXY"), + Some(&"http://codex-net-attempt-attempt-123@127.0.0.1:3128".to_string()) + ); + assert_eq!( + env.get("HTTPS_PROXY"), + Some(&"http://codex-net-attempt-attempt-123@127.0.0.1:3128".to_string()) + ); + assert_eq!( + env.get("ALL_PROXY"), + Some(&"http://codex-net-attempt-attempt-123@127.0.0.1:3128".to_string()) + ); + #[cfg(target_os = "macos")] + assert_eq!( + env.get("GIT_SSH_COMMAND"), + Some(&"ssh -o ProxyCommand='nc -X 5 -x 127.0.0.1:8081 %h %p'".to_string()) + ); + #[cfg(not(target_os = "macos"))] + assert_eq!(env.get("GIT_SSH_COMMAND"), None); + } + #[cfg(target_os = "macos")] #[test] fn apply_proxy_env_overrides_preserves_existing_git_ssh_command() { @@ -759,6 +823,7 @@ mod tests { SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8081), true, false, + None, ); assert_eq!( diff --git a/codex-rs/network-proxy/src/responses.rs b/codex-rs/network-proxy/src/responses.rs index 354d4bbaf2..b848a00743 100644 --- a/codex-rs/network-proxy/src/responses.rs +++ b/codex-rs/network-proxy/src/responses.rs @@ -11,8 +11,6 @@ use rama_http::StatusCode; use serde::Serialize; use tracing::error; -const NETWORK_POLICY_DECISION_PREFIX: &str = "CODEX_NETWORK_POLICY_DECISION"; - pub struct PolicyDecisionDetails<'a> { pub decision: NetworkPolicyDecision, pub reason: &'a str, @@ -22,17 +20,6 @@ pub struct PolicyDecisionDetails<'a> { pub port: u16, } -#[derive(Serialize)] -#[serde(rename_all = "camelCase")] -struct PolicyDecisionPayload<'a> { - decision: &'a str, - reason: &'a str, - source: &'a str, - protocol: &'a str, - host: &'a str, - port: u16, -} - pub fn text_response(status: StatusCode, body: &str) -> Response { Response::builder() .status(status) @@ -70,7 +57,9 @@ pub fn blocked_header_value(reason: &str) -> &'static str { pub fn blocked_message(reason: &str) -> &'static str { match reason { - REASON_NOT_ALLOWED => "Codex blocked this request: domain not in allowlist.", + REASON_NOT_ALLOWED => { + "Codex blocked this request: domain not in allowlist (this is not a denylist block)." + } REASON_NOT_ALLOWED_LOCAL => { "Codex blocked this request: local/private addresses not allowed." } @@ -82,31 +71,9 @@ pub fn blocked_message(reason: &str) -> &'static str { } } -pub fn policy_decision_prefix(details: &PolicyDecisionDetails<'_>) -> String { - let payload = PolicyDecisionPayload { - decision: details.decision.as_str(), - reason: details.reason, - source: details.source.as_str(), - protocol: details.protocol.as_policy_protocol(), - host: details.host, - port: details.port, - }; - let payload_json = match serde_json::to_string(&payload) { - Ok(json) => json, - Err(err) => { - error!("failed to serialize policy decision payload: {err}"); - "{}".to_string() - } - }; - format!("{NETWORK_POLICY_DECISION_PREFIX} {payload_json}") -} - pub fn blocked_message_with_policy(reason: &str, details: &PolicyDecisionDetails<'_>) -> String { - format!( - "{}\n{}", - policy_decision_prefix(details), - blocked_message(reason) - ) + let _ = (details.reason, details.host); + blocked_message(reason).to_string() } pub fn blocked_text_response_with_policy( @@ -128,7 +95,7 @@ mod tests { use pretty_assertions::assert_eq; #[test] - fn policy_decision_prefix_serializes_expected_payload() { + fn blocked_message_with_policy_returns_human_message() { let details = PolicyDecisionDetails { decision: NetworkPolicyDecision::Ask, reason: REASON_NOT_ALLOWED, @@ -138,29 +105,10 @@ mod tests { port: 443, }; - let line = policy_decision_prefix(&details); - assert_eq!( - line, - r#"CODEX_NETWORK_POLICY_DECISION {"decision":"ask","reason":"not_allowed","source":"decider","protocol":"https_connect","host":"api.example.com","port":443}"# - ); - } - - #[test] - fn blocked_message_with_policy_includes_prefix_and_human_message() { - let details = PolicyDecisionDetails { - decision: NetworkPolicyDecision::Deny, - reason: REASON_NOT_ALLOWED, - source: NetworkDecisionSource::BaselinePolicy, - protocol: NetworkProtocol::Http, - host: "api.example.com", - port: 80, - }; - let message = blocked_message_with_policy(REASON_NOT_ALLOWED, &details); assert_eq!( message, - r#"CODEX_NETWORK_POLICY_DECISION {"decision":"deny","reason":"not_allowed","source":"baseline_policy","protocol":"http","host":"api.example.com","port":80} -Codex blocked this request: domain not in allowlist."# + "Codex blocked this request: domain not in allowlist (this is not a denylist block)." ); } } diff --git a/codex-rs/network-proxy/src/runtime.rs b/codex-rs/network-proxy/src/runtime.rs index 029c7b5f6c..737a8dd1de 100644 --- a/codex-rs/network-proxy/src/runtime.rs +++ b/codex-rs/network-proxy/src/runtime.rs @@ -28,11 +28,13 @@ use time::OffsetDateTime; use tokio::net::lookup_host; use tokio::sync::RwLock; use tokio::time::timeout; +use tracing::debug; use tracing::info; use tracing::warn; const MAX_BLOCKED_EVENTS: usize = 200; const DNS_LOOKUP_TIMEOUT: Duration = Duration::from_secs(2); +const NETWORK_POLICY_VIOLATION_PREFIX: &str = "CODEX_NETWORK_POLICY_VIOLATION"; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum HostBlockReason { @@ -71,6 +73,14 @@ pub struct BlockedRequest { pub method: Option, pub mode: Option, pub protocol: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub attempt_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub decision: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub port: Option, pub timestamp: i64, } @@ -81,6 +91,10 @@ pub struct BlockedRequestArgs { pub method: Option, pub mode: Option, pub protocol: String, + pub attempt_id: Option, + pub decision: Option, + pub source: Option, + pub port: Option, } impl BlockedRequest { @@ -92,6 +106,10 @@ impl BlockedRequest { method, mode, protocol, + attempt_id, + decision, + source, + port, } = args; Self { host, @@ -100,11 +118,28 @@ impl BlockedRequest { method, mode, protocol, + attempt_id, + decision, + source, + port, timestamp: unix_timestamp(), } } } +fn blocked_request_violation_log_line(entry: &BlockedRequest) -> String { + match serde_json::to_string(entry) { + Ok(json) => format!("{NETWORK_POLICY_VIOLATION_PREFIX} {json}"), + Err(err) => { + debug!("failed to serialize blocked request for violation log: {err}"); + format!( + "{NETWORK_POLICY_VIOLATION_PREFIX} host={} reason={}", + entry.host, entry.reason + ) + } + } +} + #[derive(Clone)] pub struct ConfigState { pub config: NetworkProxyConfig, @@ -112,6 +147,7 @@ pub struct ConfigState { pub deny_set: GlobSet, pub constraints: NetworkProxyConstraints, pub blocked: VecDeque, + pub blocked_total: u64, } #[async_trait] @@ -276,14 +312,59 @@ impl NetworkProxyState { pub async fn record_blocked(&self, entry: BlockedRequest) -> Result<()> { self.reload_if_needed().await?; + let violation_line = blocked_request_violation_log_line(&entry); let mut guard = self.state.write().await; + let host = entry.host.clone(); + let reason = entry.reason.clone(); + let decision = entry.decision.clone(); + let source = entry.source.clone(); + let protocol = entry.protocol.clone(); + let port = entry.port; + let attempt_id = entry.attempt_id.clone(); guard.blocked.push_back(entry); + guard.blocked_total = guard.blocked_total.saturating_add(1); + let total = guard.blocked_total; while guard.blocked.len() > MAX_BLOCKED_EVENTS { guard.blocked.pop_front(); } + debug!( + "recorded blocked request telemetry (total={}, host={}, reason={}, decision={:?}, source={:?}, protocol={}, port={:?}, attempt_id={:?}, buffered={})", + total, + host, + reason, + decision, + source, + protocol, + port, + attempt_id, + guard.blocked.len() + ); + debug!("{violation_line}"); Ok(()) } + /// Returns a snapshot of buffered blocked-request entries without consuming + /// them. + pub async fn blocked_snapshot(&self) -> Result> { + self.reload_if_needed().await?; + let guard = self.state.read().await; + Ok(guard.blocked.iter().cloned().collect()) + } + + pub async fn latest_blocked_for_attempt( + &self, + attempt_id: &str, + ) -> Result> { + self.reload_if_needed().await?; + let guard = self.state.read().await; + Ok(guard + .blocked + .iter() + .rev() + .find(|entry| entry.attempt_id.as_deref() == Some(attempt_id)) + .cloned()) + } + /// Drain and return the buffered blocked-request entries in FIFO order. pub async fn drain_blocked(&self) -> Result> { self.reload_if_needed().await?; @@ -380,12 +461,17 @@ impl NetworkProxyState { match self.reloader.maybe_reload().await? { None => Ok(()), Some(mut new_state) => { - let (previous_cfg, blocked) = { + let (previous_cfg, blocked, blocked_total) = { let guard = self.state.read().await; - (guard.config.clone(), guard.blocked.clone()) + ( + guard.config.clone(), + guard.blocked.clone(), + guard.blocked_total, + ) }; log_policy_changes(&previous_cfg, &new_state.config); new_state.blocked = blocked; + new_state.blocked_total = blocked_total; { let mut guard = self.state.write().await; *guard = new_state; @@ -566,6 +652,153 @@ mod tests { ); } + #[tokio::test] + async fn blocked_snapshot_does_not_consume_entries() { + let state = network_proxy_state_for_policy(NetworkProxySettings::default()); + + state + .record_blocked(BlockedRequest::new(BlockedRequestArgs { + host: "google.com".to_string(), + reason: "not_allowed".to_string(), + client: None, + method: Some("GET".to_string()), + mode: None, + protocol: "http".to_string(), + attempt_id: None, + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + })) + .await + .expect("entry should be recorded"); + + let snapshot = state + .blocked_snapshot() + .await + .expect("snapshot should succeed"); + assert_eq!(snapshot.len(), 1); + assert_eq!(snapshot[0].host, "google.com"); + assert_eq!(snapshot[0].decision.as_deref(), Some("ask")); + + let drained = state + .drain_blocked() + .await + .expect("drain should include snapshot entry"); + assert_eq!(drained.len(), 1); + assert_eq!(drained[0].host, snapshot[0].host); + assert_eq!(drained[0].reason, snapshot[0].reason); + assert_eq!(drained[0].decision, snapshot[0].decision); + assert_eq!(drained[0].source, snapshot[0].source); + assert_eq!(drained[0].port, snapshot[0].port); + } + + #[tokio::test] + async fn latest_blocked_for_attempt_returns_latest_matching_entry() { + let state = network_proxy_state_for_policy(NetworkProxySettings::default()); + + state + .record_blocked(BlockedRequest::new(BlockedRequestArgs { + host: "one.example.com".to_string(), + reason: "not_allowed".to_string(), + client: None, + method: Some("GET".to_string()), + mode: None, + protocol: "http".to_string(), + attempt_id: Some("attempt-1".to_string()), + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + })) + .await + .expect("entry should be recorded"); + state + .record_blocked(BlockedRequest::new(BlockedRequestArgs { + host: "two.example.com".to_string(), + reason: "not_allowed".to_string(), + client: None, + method: Some("GET".to_string()), + mode: None, + protocol: "http".to_string(), + attempt_id: Some("attempt-2".to_string()), + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + })) + .await + .expect("entry should be recorded"); + state + .record_blocked(BlockedRequest::new(BlockedRequestArgs { + host: "three.example.com".to_string(), + reason: "not_allowed".to_string(), + client: None, + method: Some("GET".to_string()), + mode: None, + protocol: "http".to_string(), + attempt_id: Some("attempt-1".to_string()), + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + })) + .await + .expect("entry should be recorded"); + + let latest = state + .latest_blocked_for_attempt("attempt-1") + .await + .expect("lookup should succeed") + .expect("attempt should have a blocked entry"); + assert_eq!(latest.host, "three.example.com"); + } + + #[tokio::test] + async fn drain_blocked_returns_buffered_window() { + let state = network_proxy_state_for_policy(NetworkProxySettings::default()); + + for idx in 0..(MAX_BLOCKED_EVENTS + 5) { + state + .record_blocked(BlockedRequest::new(BlockedRequestArgs { + host: format!("example{idx}.com"), + reason: "not_allowed".to_string(), + client: None, + method: Some("GET".to_string()), + mode: None, + protocol: "http".to_string(), + attempt_id: None, + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + })) + .await + .expect("entry should be recorded"); + } + + let blocked = state.drain_blocked().await.expect("drain should succeed"); + assert_eq!(blocked.len(), MAX_BLOCKED_EVENTS); + assert_eq!(blocked[0].host, "example5.com"); + } + + #[test] + fn blocked_request_violation_log_line_serializes_payload() { + let entry = BlockedRequest { + host: "google.com".to_string(), + reason: "not_allowed".to_string(), + client: Some("127.0.0.1".to_string()), + method: Some("GET".to_string()), + mode: Some(NetworkMode::Full), + protocol: "http".to_string(), + attempt_id: Some("attempt-1".to_string()), + decision: Some("ask".to_string()), + source: Some("decider".to_string()), + port: Some(80), + timestamp: 1_735_689_600, + }; + + assert_eq!( + blocked_request_violation_log_line(&entry), + r#"CODEX_NETWORK_POLICY_VIOLATION {"host":"google.com","reason":"not_allowed","client":"127.0.0.1","method":"GET","mode":"full","protocol":"http","attempt_id":"attempt-1","decision":"ask","source":"decider","port":80,"timestamp":1735689600}"# + ); + } + #[tokio::test] async fn host_blocked_subdomain_wildcards_exclude_apex() { let state = network_proxy_state_for_policy(NetworkProxySettings { diff --git a/codex-rs/network-proxy/src/socks5.rs b/codex-rs/network-proxy/src/socks5.rs index eaf265a7b9..176055a5d2 100644 --- a/codex-rs/network-proxy/src/socks5.rs +++ b/codex-rs/network-proxy/src/socks5.rs @@ -168,6 +168,10 @@ async fn handle_socks5_tcp( method: None, mode: None, protocol: "socks5".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -198,6 +202,10 @@ async fn handle_socks5_tcp( method: None, mode: Some(NetworkMode::Limited), protocol: "socks5".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -221,6 +229,7 @@ async fn handle_socks5_tcp( method: None, command: None, exec_policy_hint: None, + attempt_id: None, }); match evaluate_host_policy(&app_state, policy_decider.as_ref(), &request).await { @@ -245,6 +254,10 @@ async fn handle_socks5_tcp( method: None, mode: None, protocol: "socks5".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -305,6 +318,10 @@ async fn inspect_socks5_udp( method: None, mode: None, protocol: "socks5-udp".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); @@ -335,6 +352,10 @@ async fn inspect_socks5_udp( method: None, mode: Some(NetworkMode::Limited), protocol: "socks5-udp".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; return Err(policy_denied_error(REASON_METHOD_NOT_ALLOWED, &details)); @@ -354,6 +375,7 @@ async fn inspect_socks5_udp( method: None, command: None, exec_policy_hint: None, + attempt_id: None, }); match evaluate_host_policy(&state, policy_decider.as_ref(), &request).await { @@ -378,6 +400,10 @@ async fn inspect_socks5_udp( method: None, mode: None, protocol: "socks5-udp".to_string(), + attempt_id: None, + decision: Some(details.decision.as_str().to_string()), + source: Some(details.source.as_str().to_string()), + port: Some(port), })) .await; let client = client.as_deref().unwrap_or_default(); diff --git a/codex-rs/network-proxy/src/state.rs b/codex-rs/network-proxy/src/state.rs index 509ada760e..97dfcc4b5d 100644 --- a/codex-rs/network-proxy/src/state.rs +++ b/codex-rs/network-proxy/src/state.rs @@ -60,6 +60,7 @@ pub fn build_config_state( deny_set, constraints, blocked: std::collections::VecDeque::new(), + blocked_total: 0, }) }