mirror of
https://github.com/openai/codex.git
synced 2026-04-30 19:32:04 +03:00
feat(network-proxy): structured policy signaling and attempt correlation to core (#11662)
## Summary When network requests were blocked, downstream code often had to infer ask vs deny from free-form response text. That was brittle and led to incorrect approval behavior. This PR fixes the proxy side so blocked decisions are structured and request metadata survives reliably. ## Description - Blocked proxy responses now carry consistent structured policy decision data. - Request attempt metadata is preserved across proxy env paths (including ALL_PROXY flows). - Header stripping was tightened so we still remove unsafe forwarding headers, but keep metadata needed for policy handling. - Block messages were clarified (for example, allowlist miss vs explicit deny). - Added unified violation log entries so policy failures can be inspected in one place. - Added/updated tests for these behaviors. --------- Co-authored-by: Codex <199175422+chatgpt-codex-connector[bot]@users.noreply.github.com>
This commit is contained in:
@@ -28,11 +28,13 @@ use time::OffsetDateTime;
|
||||
use tokio::net::lookup_host;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::timeout;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const MAX_BLOCKED_EVENTS: usize = 200;
|
||||
const DNS_LOOKUP_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const NETWORK_POLICY_VIOLATION_PREFIX: &str = "CODEX_NETWORK_POLICY_VIOLATION";
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum HostBlockReason {
|
||||
@@ -71,6 +73,14 @@ pub struct BlockedRequest {
|
||||
pub method: Option<String>,
|
||||
pub mode: Option<NetworkMode>,
|
||||
pub protocol: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub attempt_id: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub decision: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub source: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub port: Option<u16>,
|
||||
pub timestamp: i64,
|
||||
}
|
||||
|
||||
@@ -81,6 +91,10 @@ pub struct BlockedRequestArgs {
|
||||
pub method: Option<String>,
|
||||
pub mode: Option<NetworkMode>,
|
||||
pub protocol: String,
|
||||
pub attempt_id: Option<String>,
|
||||
pub decision: Option<String>,
|
||||
pub source: Option<String>,
|
||||
pub port: Option<u16>,
|
||||
}
|
||||
|
||||
impl BlockedRequest {
|
||||
@@ -92,6 +106,10 @@ impl BlockedRequest {
|
||||
method,
|
||||
mode,
|
||||
protocol,
|
||||
attempt_id,
|
||||
decision,
|
||||
source,
|
||||
port,
|
||||
} = args;
|
||||
Self {
|
||||
host,
|
||||
@@ -100,11 +118,28 @@ impl BlockedRequest {
|
||||
method,
|
||||
mode,
|
||||
protocol,
|
||||
attempt_id,
|
||||
decision,
|
||||
source,
|
||||
port,
|
||||
timestamp: unix_timestamp(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn blocked_request_violation_log_line(entry: &BlockedRequest) -> String {
|
||||
match serde_json::to_string(entry) {
|
||||
Ok(json) => format!("{NETWORK_POLICY_VIOLATION_PREFIX} {json}"),
|
||||
Err(err) => {
|
||||
debug!("failed to serialize blocked request for violation log: {err}");
|
||||
format!(
|
||||
"{NETWORK_POLICY_VIOLATION_PREFIX} host={} reason={}",
|
||||
entry.host, entry.reason
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConfigState {
|
||||
pub config: NetworkProxyConfig,
|
||||
@@ -112,6 +147,7 @@ pub struct ConfigState {
|
||||
pub deny_set: GlobSet,
|
||||
pub constraints: NetworkProxyConstraints,
|
||||
pub blocked: VecDeque<BlockedRequest>,
|
||||
pub blocked_total: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -276,14 +312,59 @@ impl NetworkProxyState {
|
||||
|
||||
pub async fn record_blocked(&self, entry: BlockedRequest) -> Result<()> {
|
||||
self.reload_if_needed().await?;
|
||||
let violation_line = blocked_request_violation_log_line(&entry);
|
||||
let mut guard = self.state.write().await;
|
||||
let host = entry.host.clone();
|
||||
let reason = entry.reason.clone();
|
||||
let decision = entry.decision.clone();
|
||||
let source = entry.source.clone();
|
||||
let protocol = entry.protocol.clone();
|
||||
let port = entry.port;
|
||||
let attempt_id = entry.attempt_id.clone();
|
||||
guard.blocked.push_back(entry);
|
||||
guard.blocked_total = guard.blocked_total.saturating_add(1);
|
||||
let total = guard.blocked_total;
|
||||
while guard.blocked.len() > MAX_BLOCKED_EVENTS {
|
||||
guard.blocked.pop_front();
|
||||
}
|
||||
debug!(
|
||||
"recorded blocked request telemetry (total={}, host={}, reason={}, decision={:?}, source={:?}, protocol={}, port={:?}, attempt_id={:?}, buffered={})",
|
||||
total,
|
||||
host,
|
||||
reason,
|
||||
decision,
|
||||
source,
|
||||
protocol,
|
||||
port,
|
||||
attempt_id,
|
||||
guard.blocked.len()
|
||||
);
|
||||
debug!("{violation_line}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a snapshot of buffered blocked-request entries without consuming
|
||||
/// them.
|
||||
pub async fn blocked_snapshot(&self) -> Result<Vec<BlockedRequest>> {
|
||||
self.reload_if_needed().await?;
|
||||
let guard = self.state.read().await;
|
||||
Ok(guard.blocked.iter().cloned().collect())
|
||||
}
|
||||
|
||||
pub async fn latest_blocked_for_attempt(
|
||||
&self,
|
||||
attempt_id: &str,
|
||||
) -> Result<Option<BlockedRequest>> {
|
||||
self.reload_if_needed().await?;
|
||||
let guard = self.state.read().await;
|
||||
Ok(guard
|
||||
.blocked
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|entry| entry.attempt_id.as_deref() == Some(attempt_id))
|
||||
.cloned())
|
||||
}
|
||||
|
||||
/// Drain and return the buffered blocked-request entries in FIFO order.
|
||||
pub async fn drain_blocked(&self) -> Result<Vec<BlockedRequest>> {
|
||||
self.reload_if_needed().await?;
|
||||
@@ -380,12 +461,17 @@ impl NetworkProxyState {
|
||||
match self.reloader.maybe_reload().await? {
|
||||
None => Ok(()),
|
||||
Some(mut new_state) => {
|
||||
let (previous_cfg, blocked) = {
|
||||
let (previous_cfg, blocked, blocked_total) = {
|
||||
let guard = self.state.read().await;
|
||||
(guard.config.clone(), guard.blocked.clone())
|
||||
(
|
||||
guard.config.clone(),
|
||||
guard.blocked.clone(),
|
||||
guard.blocked_total,
|
||||
)
|
||||
};
|
||||
log_policy_changes(&previous_cfg, &new_state.config);
|
||||
new_state.blocked = blocked;
|
||||
new_state.blocked_total = blocked_total;
|
||||
{
|
||||
let mut guard = self.state.write().await;
|
||||
*guard = new_state;
|
||||
@@ -566,6 +652,153 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn blocked_snapshot_does_not_consume_entries() {
|
||||
let state = network_proxy_state_for_policy(NetworkProxySettings::default());
|
||||
|
||||
state
|
||||
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
|
||||
host: "google.com".to_string(),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: None,
|
||||
method: Some("GET".to_string()),
|
||||
mode: None,
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: None,
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
}))
|
||||
.await
|
||||
.expect("entry should be recorded");
|
||||
|
||||
let snapshot = state
|
||||
.blocked_snapshot()
|
||||
.await
|
||||
.expect("snapshot should succeed");
|
||||
assert_eq!(snapshot.len(), 1);
|
||||
assert_eq!(snapshot[0].host, "google.com");
|
||||
assert_eq!(snapshot[0].decision.as_deref(), Some("ask"));
|
||||
|
||||
let drained = state
|
||||
.drain_blocked()
|
||||
.await
|
||||
.expect("drain should include snapshot entry");
|
||||
assert_eq!(drained.len(), 1);
|
||||
assert_eq!(drained[0].host, snapshot[0].host);
|
||||
assert_eq!(drained[0].reason, snapshot[0].reason);
|
||||
assert_eq!(drained[0].decision, snapshot[0].decision);
|
||||
assert_eq!(drained[0].source, snapshot[0].source);
|
||||
assert_eq!(drained[0].port, snapshot[0].port);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn latest_blocked_for_attempt_returns_latest_matching_entry() {
|
||||
let state = network_proxy_state_for_policy(NetworkProxySettings::default());
|
||||
|
||||
state
|
||||
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
|
||||
host: "one.example.com".to_string(),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: None,
|
||||
method: Some("GET".to_string()),
|
||||
mode: None,
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: Some("attempt-1".to_string()),
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
}))
|
||||
.await
|
||||
.expect("entry should be recorded");
|
||||
state
|
||||
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
|
||||
host: "two.example.com".to_string(),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: None,
|
||||
method: Some("GET".to_string()),
|
||||
mode: None,
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: Some("attempt-2".to_string()),
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
}))
|
||||
.await
|
||||
.expect("entry should be recorded");
|
||||
state
|
||||
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
|
||||
host: "three.example.com".to_string(),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: None,
|
||||
method: Some("GET".to_string()),
|
||||
mode: None,
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: Some("attempt-1".to_string()),
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
}))
|
||||
.await
|
||||
.expect("entry should be recorded");
|
||||
|
||||
let latest = state
|
||||
.latest_blocked_for_attempt("attempt-1")
|
||||
.await
|
||||
.expect("lookup should succeed")
|
||||
.expect("attempt should have a blocked entry");
|
||||
assert_eq!(latest.host, "three.example.com");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn drain_blocked_returns_buffered_window() {
|
||||
let state = network_proxy_state_for_policy(NetworkProxySettings::default());
|
||||
|
||||
for idx in 0..(MAX_BLOCKED_EVENTS + 5) {
|
||||
state
|
||||
.record_blocked(BlockedRequest::new(BlockedRequestArgs {
|
||||
host: format!("example{idx}.com"),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: None,
|
||||
method: Some("GET".to_string()),
|
||||
mode: None,
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: None,
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
}))
|
||||
.await
|
||||
.expect("entry should be recorded");
|
||||
}
|
||||
|
||||
let blocked = state.drain_blocked().await.expect("drain should succeed");
|
||||
assert_eq!(blocked.len(), MAX_BLOCKED_EVENTS);
|
||||
assert_eq!(blocked[0].host, "example5.com");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blocked_request_violation_log_line_serializes_payload() {
|
||||
let entry = BlockedRequest {
|
||||
host: "google.com".to_string(),
|
||||
reason: "not_allowed".to_string(),
|
||||
client: Some("127.0.0.1".to_string()),
|
||||
method: Some("GET".to_string()),
|
||||
mode: Some(NetworkMode::Full),
|
||||
protocol: "http".to_string(),
|
||||
attempt_id: Some("attempt-1".to_string()),
|
||||
decision: Some("ask".to_string()),
|
||||
source: Some("decider".to_string()),
|
||||
port: Some(80),
|
||||
timestamp: 1_735_689_600,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
blocked_request_violation_log_line(&entry),
|
||||
r#"CODEX_NETWORK_POLICY_VIOLATION {"host":"google.com","reason":"not_allowed","client":"127.0.0.1","method":"GET","mode":"full","protocol":"http","attempt_id":"attempt-1","decision":"ask","source":"decider","port":80,"timestamp":1735689600}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_blocked_subdomain_wildcards_exclude_apex() {
|
||||
let state = network_proxy_state_for_policy(NetworkProxySettings {
|
||||
|
||||
Reference in New Issue
Block a user