mirror of
https://github.com/openai/codex.git
synced 2026-04-28 18:32:04 +03:00
[codex] add responses proxy JSON dumps (#16753)
This makes Responses API proxy request/response dumping first-class by adding an optional `--dump-dir` flag that emits paired JSON files with shared sequence/timestamp prefixes, captures full request and response headers and records parsed JSON bodies.
This commit is contained in:
committed by
GitHub
parent
13d828d236
commit
6edb865cc6
360
codex-rs/responses-api-proxy/src/dump.rs
Normal file
360
codex-rs/responses-api-proxy/src/dump.rs
Normal file
@@ -0,0 +1,360 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use reqwest::header::HeaderMap;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tiny_http::Header;
|
||||
use tiny_http::Method;
|
||||
|
||||
const AUTHORIZATION_HEADER_NAME: &str = "authorization";
|
||||
const REDACTED_HEADER_VALUE: &str = "[REDACTED]";
|
||||
|
||||
pub(crate) struct ExchangeDumper {
|
||||
dump_dir: PathBuf,
|
||||
next_sequence: AtomicU64,
|
||||
}
|
||||
|
||||
impl ExchangeDumper {
|
||||
pub(crate) fn new(dump_dir: PathBuf) -> io::Result<Self> {
|
||||
fs::create_dir_all(&dump_dir)?;
|
||||
|
||||
Ok(Self {
|
||||
dump_dir,
|
||||
next_sequence: AtomicU64::new(1),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn dump_request(
|
||||
&self,
|
||||
method: &Method,
|
||||
url: &str,
|
||||
headers: &[Header],
|
||||
body: &[u8],
|
||||
) -> io::Result<ExchangeDump> {
|
||||
let sequence = self.next_sequence.fetch_add(1, Ordering::Relaxed);
|
||||
let timestamp_ms = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map_or(0, |duration| duration.as_millis());
|
||||
let prefix = format!("{sequence:06}-{timestamp_ms}");
|
||||
|
||||
let request_path = self.dump_dir.join(format!("{prefix}-request.json"));
|
||||
let response_path = self.dump_dir.join(format!("{prefix}-response.json"));
|
||||
|
||||
let request_dump = RequestDump {
|
||||
method: method.as_str().to_string(),
|
||||
url: url.to_string(),
|
||||
headers: headers.iter().map(HeaderDump::from).collect(),
|
||||
body: dump_body(body),
|
||||
};
|
||||
|
||||
write_json_dump(&request_path, &request_dump)?;
|
||||
|
||||
Ok(ExchangeDump { response_path })
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ExchangeDump {
|
||||
response_path: PathBuf,
|
||||
}
|
||||
|
||||
impl ExchangeDump {
|
||||
pub(crate) fn tee_response_body<R: Read>(
|
||||
self,
|
||||
status: u16,
|
||||
headers: &HeaderMap,
|
||||
response_body: R,
|
||||
) -> ResponseBodyDump<R> {
|
||||
ResponseBodyDump {
|
||||
response_body,
|
||||
response_path: self.response_path,
|
||||
status,
|
||||
headers: headers.iter().map(HeaderDump::from).collect(),
|
||||
body: Vec::new(),
|
||||
dump_written: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct ResponseBodyDump<R> {
|
||||
response_body: R,
|
||||
response_path: PathBuf,
|
||||
status: u16,
|
||||
headers: Vec<HeaderDump>,
|
||||
body: Vec<u8>,
|
||||
dump_written: bool,
|
||||
}
|
||||
|
||||
impl<R> ResponseBodyDump<R> {
|
||||
fn write_dump_if_needed(&mut self) {
|
||||
if self.dump_written {
|
||||
return;
|
||||
}
|
||||
|
||||
self.dump_written = true;
|
||||
|
||||
let response_dump = ResponseDump {
|
||||
status: self.status,
|
||||
headers: std::mem::take(&mut self.headers),
|
||||
body: dump_body(&self.body),
|
||||
};
|
||||
|
||||
if let Err(err) = write_json_dump(&self.response_path, &response_dump) {
|
||||
eprintln!(
|
||||
"responses-api-proxy failed to write {}: {err}",
|
||||
self.response_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for ResponseBodyDump<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let bytes_read = self.response_body.read(buf)?;
|
||||
if bytes_read == 0 {
|
||||
self.write_dump_if_needed();
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
self.body.extend_from_slice(&buf[..bytes_read]);
|
||||
Ok(bytes_read)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> Drop for ResponseBodyDump<R> {
|
||||
fn drop(&mut self) {
|
||||
self.write_dump_if_needed();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct RequestDump {
|
||||
method: String,
|
||||
url: String,
|
||||
headers: Vec<HeaderDump>,
|
||||
body: Value,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ResponseDump {
|
||||
status: u16,
|
||||
headers: Vec<HeaderDump>,
|
||||
body: Value,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct HeaderDump {
|
||||
name: String,
|
||||
value: String,
|
||||
}
|
||||
|
||||
impl From<&Header> for HeaderDump {
|
||||
fn from(header: &Header) -> Self {
|
||||
let name = header.field.as_str().to_string();
|
||||
let value = if should_redact_header(&name) {
|
||||
REDACTED_HEADER_VALUE.to_string()
|
||||
} else {
|
||||
header.value.as_str().to_string()
|
||||
};
|
||||
|
||||
Self { name, value }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(&reqwest::header::HeaderName, &reqwest::header::HeaderValue)> for HeaderDump {
|
||||
fn from(header: (&reqwest::header::HeaderName, &reqwest::header::HeaderValue)) -> Self {
|
||||
let name = header.0.as_str();
|
||||
let value = if should_redact_header(name) {
|
||||
REDACTED_HEADER_VALUE.to_string()
|
||||
} else {
|
||||
String::from_utf8_lossy(header.1.as_bytes()).into_owned()
|
||||
};
|
||||
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_redact_header(name: &str) -> bool {
|
||||
name.eq_ignore_ascii_case(AUTHORIZATION_HEADER_NAME)
|
||||
|| name.to_ascii_lowercase().contains("cookie")
|
||||
}
|
||||
|
||||
fn dump_body(body: &[u8]) -> Value {
|
||||
serde_json::from_slice(body)
|
||||
.unwrap_or_else(|_| Value::String(String::from_utf8_lossy(body).into_owned()))
|
||||
}
|
||||
|
||||
fn write_json_dump(path: &PathBuf, dump: &impl Serialize) -> io::Result<()> {
|
||||
let mut bytes = serde_json::to_vec_pretty(dump)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
|
||||
bytes.push(b'\n');
|
||||
fs::write(path, bytes)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::fs;
|
||||
use std::io::Cursor;
|
||||
use std::io::Read;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use reqwest::header::AUTHORIZATION;
|
||||
use reqwest::header::CONTENT_TYPE;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderValue;
|
||||
use serde_json::json;
|
||||
use tiny_http::Header;
|
||||
use tiny_http::Method;
|
||||
|
||||
use super::ExchangeDumper;
|
||||
|
||||
static NEXT_TEST_DIR: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
#[test]
|
||||
fn dump_request_writes_redacted_headers_and_json_body() {
|
||||
let dump_dir = test_dump_dir();
|
||||
let dumper = ExchangeDumper::new(dump_dir.clone()).expect("create dumper");
|
||||
let headers = vec![
|
||||
Header::from_bytes(&b"Authorization"[..], &b"Bearer secret"[..])
|
||||
.expect("authorization header"),
|
||||
Header::from_bytes(&b"Cookie"[..], &b"user-session=secret"[..]).expect("cookie header"),
|
||||
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
|
||||
.expect("content-type header"),
|
||||
];
|
||||
|
||||
let exchange_dump = dumper
|
||||
.dump_request(
|
||||
&Method::Post,
|
||||
"/v1/responses",
|
||||
&headers,
|
||||
br#"{"model":"gpt-5.4"}"#,
|
||||
)
|
||||
.expect("dump request");
|
||||
|
||||
let request_dump = fs::read_to_string(dump_file_with_suffix(&dump_dir, "-request.json"))
|
||||
.expect("read request dump");
|
||||
|
||||
assert_eq!(
|
||||
serde_json::from_str::<serde_json::Value>(&request_dump).expect("parse request dump"),
|
||||
json!({
|
||||
"method": "POST",
|
||||
"url": "/v1/responses",
|
||||
"headers": [
|
||||
{
|
||||
"name": "Authorization",
|
||||
"value": "[REDACTED]"
|
||||
},
|
||||
{
|
||||
"name": "Cookie",
|
||||
"value": "[REDACTED]"
|
||||
},
|
||||
{
|
||||
"name": "Content-Type",
|
||||
"value": "application/json"
|
||||
}
|
||||
],
|
||||
"body": {
|
||||
"model": "gpt-5.4"
|
||||
}
|
||||
})
|
||||
);
|
||||
assert!(
|
||||
exchange_dump
|
||||
.response_path
|
||||
.file_name()
|
||||
.expect("response dump file name")
|
||||
.to_string_lossy()
|
||||
.ends_with("-response.json")
|
||||
);
|
||||
|
||||
fs::remove_dir_all(dump_dir).expect("remove test dump dir");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn response_body_dump_streams_body_and_writes_response_file() {
|
||||
let dump_dir = test_dump_dir();
|
||||
let dumper = ExchangeDumper::new(dump_dir.clone()).expect("create dumper");
|
||||
let exchange_dump = dumper
|
||||
.dump_request(&Method::Post, "/v1/responses", &[], b"{}")
|
||||
.expect("dump request");
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
|
||||
headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer secret"));
|
||||
headers.insert(
|
||||
"set-cookie",
|
||||
HeaderValue::from_static("user-session=secret"),
|
||||
);
|
||||
|
||||
let mut response_body = String::new();
|
||||
exchange_dump
|
||||
.tee_response_body(
|
||||
/*status*/ 200,
|
||||
&headers,
|
||||
Cursor::new(b"data: hello\n\n".to_vec()),
|
||||
)
|
||||
.read_to_string(&mut response_body)
|
||||
.expect("read response body");
|
||||
|
||||
let response_dump = fs::read_to_string(dump_file_with_suffix(&dump_dir, "-response.json"))
|
||||
.expect("read response dump");
|
||||
|
||||
assert_eq!(response_body, "data: hello\n\n");
|
||||
assert_eq!(
|
||||
serde_json::from_str::<serde_json::Value>(&response_dump).expect("parse response dump"),
|
||||
json!({
|
||||
"status": 200,
|
||||
"headers": [
|
||||
{
|
||||
"name": "content-type",
|
||||
"value": "text/event-stream"
|
||||
},
|
||||
{
|
||||
"name": "authorization",
|
||||
"value": "[REDACTED]"
|
||||
},
|
||||
{
|
||||
"name": "set-cookie",
|
||||
"value": "[REDACTED]"
|
||||
}
|
||||
],
|
||||
"body": "data: hello\n\n"
|
||||
})
|
||||
);
|
||||
|
||||
fs::remove_dir_all(dump_dir).expect("remove test dump dir");
|
||||
}
|
||||
|
||||
fn test_dump_dir() -> std::path::PathBuf {
|
||||
let test_id = NEXT_TEST_DIR.fetch_add(1, Ordering::Relaxed);
|
||||
let dump_dir = std::env::temp_dir().join(format!(
|
||||
"codex-responses-api-proxy-dump-test-{}-{test_id}",
|
||||
std::process::id()
|
||||
));
|
||||
fs::create_dir_all(&dump_dir).expect("create test dump dir");
|
||||
dump_dir
|
||||
}
|
||||
|
||||
fn dump_file_with_suffix(dump_dir: &std::path::Path, suffix: &str) -> std::path::PathBuf {
|
||||
let mut matches = fs::read_dir(dump_dir)
|
||||
.expect("read dump dir")
|
||||
.map(|entry| entry.expect("read dump entry").path())
|
||||
.filter(|path| path.to_string_lossy().ends_with(suffix))
|
||||
.collect::<Vec<_>>();
|
||||
matches.sort();
|
||||
|
||||
assert_eq!(matches.len(), 1);
|
||||
matches.pop().expect("single dump file")
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::fs::File;
|
||||
use std::fs::{self};
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::net::SocketAddr;
|
||||
use std::net::TcpListener;
|
||||
@@ -27,7 +28,9 @@ use tiny_http::Response;
|
||||
use tiny_http::Server;
|
||||
use tiny_http::StatusCode;
|
||||
|
||||
mod dump;
|
||||
mod read_api_key;
|
||||
use dump::ExchangeDumper;
|
||||
use read_api_key::read_auth_header_from_stdin;
|
||||
|
||||
/// CLI arguments for the proxy.
|
||||
@@ -49,6 +52,10 @@ pub struct Args {
|
||||
/// Absolute URL the proxy should forward requests to (defaults to OpenAI).
|
||||
#[arg(long, default_value = "https://api.openai.com/v1/responses")]
|
||||
pub upstream_url: String,
|
||||
|
||||
/// Directory where request/response dumps should be written as JSON.
|
||||
#[arg(long, value_name = "DIR")]
|
||||
pub dump_dir: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -79,6 +86,12 @@ pub fn run_main(args: Args) -> Result<()> {
|
||||
upstream_url,
|
||||
host_header,
|
||||
});
|
||||
let dump_dir = args
|
||||
.dump_dir
|
||||
.map(ExchangeDumper::new)
|
||||
.transpose()
|
||||
.context("creating --dump-dir")?
|
||||
.map(Arc::new);
|
||||
|
||||
let (listener, bound_addr) = bind_listener(args.port)?;
|
||||
if let Some(path) = args.server_info.as_ref() {
|
||||
@@ -100,13 +113,20 @@ pub fn run_main(args: Args) -> Result<()> {
|
||||
for request in server.incoming_requests() {
|
||||
let client = client.clone();
|
||||
let forward_config = forward_config.clone();
|
||||
let dump_dir = dump_dir.clone();
|
||||
std::thread::spawn(move || {
|
||||
if http_shutdown && request.method() == &Method::Get && request.url() == "/shutdown" {
|
||||
let _ = request.respond(Response::new_empty(StatusCode(200)));
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
if let Err(e) = forward_request(&client, auth_header, &forward_config, request) {
|
||||
if let Err(e) = forward_request(
|
||||
&client,
|
||||
auth_header,
|
||||
&forward_config,
|
||||
dump_dir.as_deref(),
|
||||
request,
|
||||
) {
|
||||
eprintln!("forwarding error: {e}");
|
||||
}
|
||||
});
|
||||
@@ -144,6 +164,7 @@ fn forward_request(
|
||||
client: &Client,
|
||||
auth_header: &'static str,
|
||||
config: &ForwardConfig,
|
||||
dump_dir: Option<&ExchangeDumper>,
|
||||
mut req: Request,
|
||||
) -> Result<()> {
|
||||
// Only allow POST /v1/responses exactly, no query string.
|
||||
@@ -159,8 +180,18 @@ fn forward_request(
|
||||
|
||||
// Read request body
|
||||
let mut body = Vec::new();
|
||||
let mut reader = req.as_reader();
|
||||
std::io::Read::read_to_end(&mut reader, &mut body)?;
|
||||
let reader = req.as_reader();
|
||||
reader.read_to_end(&mut body)?;
|
||||
|
||||
let exchange_dump = dump_dir.and_then(|dump_dir| {
|
||||
dump_dir
|
||||
.dump_request(&method, &url_path, req.headers(), &body)
|
||||
.map_err(|err| {
|
||||
eprintln!("responses-api-proxy failed to dump request: {err}");
|
||||
err
|
||||
})
|
||||
.ok()
|
||||
});
|
||||
|
||||
// Build headers for upstream, forwarding everything from the incoming
|
||||
// request except Authorization (we replace it below).
|
||||
@@ -224,10 +255,17 @@ fn forward_request(
|
||||
}
|
||||
});
|
||||
|
||||
let response_body: Box<dyn Read + Send> = if let Some(exchange_dump) = exchange_dump {
|
||||
let headers = upstream_resp.headers().clone();
|
||||
Box::new(exchange_dump.tee_response_body(status.as_u16(), &headers, upstream_resp))
|
||||
} else {
|
||||
Box::new(upstream_resp)
|
||||
};
|
||||
|
||||
let response = Response::new(
|
||||
StatusCode(status.as_u16()),
|
||||
response_headers,
|
||||
upstream_resp,
|
||||
response_body,
|
||||
content_length,
|
||||
None,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user