mirror of
https://github.com/openai/codex.git
synced 2026-05-03 21:01:55 +03:00
[rollout_trace] Trace tool and code-mode boundaries (#18878)
## Summary Extends rollout tracing across tool dispatch and code-mode runtime boundaries. This records canonical tool-call lifecycle events and links code-mode execution/wait operations back to the model-visible calls that caused them. ## Stack This is PR 3/5 in the rollout trace stack. - [#18876](https://github.com/openai/codex/pull/18876): Add rollout trace crate - [#18877](https://github.com/openai/codex/pull/18877): Record core session rollout traces - [#18878](https://github.com/openai/codex/pull/18878): Trace tool and code-mode boundaries - [#18879](https://github.com/openai/codex/pull/18879): Trace sessions and multi-agent edges - [#18880](https://github.com/openai/codex/pull/18880): Add debug trace reduction command ## Review Notes This PR is about attribution. Reviewers should focus on whether direct tool calls, code-mode-originated tool calls, waits, outputs, and cancellation boundaries are recorded with enough source information for deterministic reduction without coupling the reducer to live runtime internals. The stack remains valid after this layer: tool and code-mode traces reduce through the existing crate model, while the broader session and multi-agent relationships are added in the next PR.
This commit is contained in:
@@ -18,11 +18,13 @@ pub use description::render_json_schema_to_typescript;
|
||||
pub use response::DEFAULT_IMAGE_DETAIL;
|
||||
pub use response::FunctionCallOutputContentItem;
|
||||
pub use response::ImageDetail;
|
||||
pub use runtime::CodeModeNestedToolCall;
|
||||
pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
|
||||
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
pub use runtime::ExecuteRequest;
|
||||
pub use runtime::RuntimeResponse;
|
||||
pub use runtime::WaitOutcome;
|
||||
pub use runtime::WaitRequest;
|
||||
pub use service::CodeModeService;
|
||||
pub use service::CodeModeTurnHost;
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::sync::mpsc as std_mpsc;
|
||||
use std::thread;
|
||||
|
||||
use codex_protocol::ToolName;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
@@ -25,6 +26,11 @@ const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExecuteRequest {
|
||||
/// Runtime cell id for this execution.
|
||||
///
|
||||
/// Callers allocate this before execution so tracing, waits, and nested tool
|
||||
/// calls can refer to the cell as soon as JavaScript starts.
|
||||
pub cell_id: String,
|
||||
pub tool_call_id: String,
|
||||
pub enabled_tools: Vec<ToolDefinition>,
|
||||
pub source: String,
|
||||
@@ -40,7 +46,30 @@ pub struct WaitRequest {
|
||||
pub terminate: bool,
|
||||
}
|
||||
|
||||
/// Result of waiting on a code-mode cell.
|
||||
///
|
||||
/// The wrapped `RuntimeResponse` is the model-facing wait result. The enum
|
||||
/// variant carries the extra lifecycle provenance that `RuntimeResponse` cannot:
|
||||
/// a failed real cell and a missing-cell wait both use
|
||||
/// `RuntimeResponse::Result { error_text: Some(..), .. }`, but only the former
|
||||
/// should be treated as a code-cell lifecycle event.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum WaitOutcome {
|
||||
/// The requested code cell was live when the wait command was accepted.
|
||||
LiveCell(RuntimeResponse),
|
||||
/// The requested code cell was not live.
|
||||
MissingCell(RuntimeResponse),
|
||||
}
|
||||
|
||||
impl From<WaitOutcome> for RuntimeResponse {
|
||||
fn from(outcome: WaitOutcome) -> Self {
|
||||
match outcome {
|
||||
WaitOutcome::LiveCell(response) | WaitOutcome::MissingCell(response) => response,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize)]
|
||||
pub enum RuntimeResponse {
|
||||
Yielded {
|
||||
cell_id: String,
|
||||
@@ -58,14 +87,22 @@ pub enum RuntimeResponse {
|
||||
},
|
||||
}
|
||||
|
||||
/// Nested tool request emitted by one code-mode cell.
|
||||
///
|
||||
/// Code mode owns the per-cell runtime id. Hosts should preserve it for
|
||||
/// provenance/debugging, but should still assign their own runtime tool call id
|
||||
/// if their tool-call graph requires globally unique ids.
|
||||
#[derive(Debug)]
|
||||
pub struct CodeModeNestedToolCall {
|
||||
pub cell_id: String,
|
||||
pub runtime_tool_call_id: String,
|
||||
pub tool_name: ToolName,
|
||||
pub input: Option<JsonValue>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TurnMessage {
|
||||
ToolCall {
|
||||
cell_id: String,
|
||||
id: String,
|
||||
name: ToolName,
|
||||
input: Option<JsonValue>,
|
||||
},
|
||||
ToolCall(CodeModeNestedToolCall),
|
||||
Notify {
|
||||
cell_id: String,
|
||||
call_id: String,
|
||||
@@ -331,6 +368,7 @@ mod tests {
|
||||
|
||||
fn execute_request(source: &str) -> ExecuteRequest {
|
||||
ExecuteRequest {
|
||||
cell_id: "1".to_string(),
|
||||
tool_call_id: "call_1".to_string(),
|
||||
enabled_tools: Vec::new(),
|
||||
source: source.to_string(),
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::ToolName;
|
||||
use serde_json::Value as JsonValue;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -14,12 +13,14 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::FunctionCallOutputContentItem;
|
||||
use crate::runtime::CodeModeNestedToolCall;
|
||||
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
use crate::runtime::RuntimeCommand;
|
||||
use crate::runtime::RuntimeEvent;
|
||||
use crate::runtime::RuntimeResponse;
|
||||
use crate::runtime::TurnMessage;
|
||||
use crate::runtime::WaitOutcome;
|
||||
use crate::runtime::WaitRequest;
|
||||
use crate::runtime::spawn_runtime;
|
||||
|
||||
@@ -27,8 +28,7 @@ use crate::runtime::spawn_runtime;
|
||||
pub trait CodeModeTurnHost: Send + Sync {
|
||||
async fn invoke_tool(
|
||||
&self,
|
||||
tool_name: ToolName,
|
||||
input: Option<JsonValue>,
|
||||
invocation: CodeModeNestedToolCall,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Result<JsonValue, String>;
|
||||
|
||||
@@ -76,24 +76,45 @@ impl CodeModeService {
|
||||
*self.inner.stored_values.lock().await = values;
|
||||
}
|
||||
|
||||
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
|
||||
let cell_id = self
|
||||
.inner
|
||||
/// Reserves the runtime cell id for a future `execute` request.
|
||||
///
|
||||
/// The runtime can issue nested tool calls before the first `execute`
|
||||
/// response is returned. Hosts that need a parent trace object for those
|
||||
/// nested calls should allocate the cell id up front and pass it back on the
|
||||
/// `ExecuteRequest`.
|
||||
pub fn allocate_cell_id(&self) -> String {
|
||||
self.inner
|
||||
.next_cell_id
|
||||
.fetch_add(1, Ordering::Relaxed)
|
||||
.to_string();
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let initial_yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS);
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?;
|
||||
let (control_tx, control_rx) = mpsc::unbounded_channel();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = {
|
||||
let mut sessions = self.inner.sessions.lock().await;
|
||||
if sessions.contains_key(&cell_id) {
|
||||
return Err(format!("exec cell {cell_id} already exists"));
|
||||
}
|
||||
|
||||
self.inner.sessions.lock().await.insert(
|
||||
cell_id.clone(),
|
||||
SessionHandle {
|
||||
control_tx: control_tx.clone(),
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
},
|
||||
);
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request, event_tx)?;
|
||||
|
||||
// Keep the session registry locked through insertion so a
|
||||
// caller-owned cell id cannot race with another execute and replace
|
||||
// a live runtime.
|
||||
sessions.insert(
|
||||
cell_id.clone(),
|
||||
SessionHandle {
|
||||
control_tx,
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
},
|
||||
);
|
||||
(runtime_tx, runtime_terminate_handle)
|
||||
};
|
||||
|
||||
tokio::spawn(run_session_control(
|
||||
Arc::clone(&self.inner),
|
||||
@@ -105,7 +126,7 @@ impl CodeModeService {
|
||||
event_rx,
|
||||
control_rx,
|
||||
response_tx,
|
||||
request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS),
|
||||
initial_yield_time_ms,
|
||||
));
|
||||
|
||||
response_rx
|
||||
@@ -113,7 +134,7 @@ impl CodeModeService {
|
||||
.map_err(|_| "exec runtime ended unexpectedly".to_string())
|
||||
}
|
||||
|
||||
pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
|
||||
pub async fn wait(&self, request: WaitRequest) -> Result<WaitOutcome, String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let handle = self
|
||||
.inner
|
||||
@@ -123,7 +144,7 @@ impl CodeModeService {
|
||||
.get(&request.cell_id)
|
||||
.cloned();
|
||||
let Some(handle) = handle else {
|
||||
return Ok(missing_cell_response(cell_id));
|
||||
return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id)));
|
||||
};
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let control_message = if request.terminate {
|
||||
@@ -135,11 +156,13 @@ impl CodeModeService {
|
||||
}
|
||||
};
|
||||
if handle.control_tx.send(control_message).is_err() {
|
||||
return Ok(missing_cell_response(cell_id));
|
||||
return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id)));
|
||||
}
|
||||
match response_rx.await {
|
||||
Ok(response) => Ok(response),
|
||||
Err(_) => Ok(missing_cell_response(request.cell_id)),
|
||||
Ok(response) => Ok(WaitOutcome::LiveCell(response)),
|
||||
Err(_) => Ok(WaitOutcome::MissingCell(missing_cell_response(
|
||||
request.cell_id,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,18 +192,14 @@ impl CodeModeService {
|
||||
);
|
||||
}
|
||||
}
|
||||
TurnMessage::ToolCall {
|
||||
cell_id,
|
||||
id,
|
||||
name,
|
||||
input,
|
||||
} => {
|
||||
TurnMessage::ToolCall(invocation) => {
|
||||
let host = Arc::clone(&host);
|
||||
let inner = Arc::clone(&inner);
|
||||
tokio::spawn(async move {
|
||||
let response = host
|
||||
.invoke_tool(name, input, CancellationToken::new())
|
||||
.await;
|
||||
let cell_id = invocation.cell_id.clone();
|
||||
let runtime_tool_call_id = invocation.runtime_tool_call_id.clone();
|
||||
let response =
|
||||
host.invoke_tool(invocation, CancellationToken::new()).await;
|
||||
let runtime_tx = inner
|
||||
.sessions
|
||||
.lock()
|
||||
@@ -191,8 +210,14 @@ impl CodeModeService {
|
||||
return;
|
||||
};
|
||||
let command = match response {
|
||||
Ok(result) => RuntimeCommand::ToolResponse { id, result },
|
||||
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
|
||||
Ok(result) => RuntimeCommand::ToolResponse {
|
||||
id: runtime_tool_call_id,
|
||||
result,
|
||||
},
|
||||
Err(error_text) => RuntimeCommand::ToolError {
|
||||
id: runtime_tool_call_id,
|
||||
error_text,
|
||||
},
|
||||
};
|
||||
let _ = runtime_tx.send(command);
|
||||
});
|
||||
@@ -361,12 +386,16 @@ async fn run_session_control(
|
||||
}).await;
|
||||
}
|
||||
RuntimeEvent::ToolCall { id, name, input } => {
|
||||
let _ = inner.turn_message_tx.send(TurnMessage::ToolCall {
|
||||
let tool_call = CodeModeNestedToolCall {
|
||||
cell_id: cell_id.clone(),
|
||||
id,
|
||||
name,
|
||||
runtime_tool_call_id: id,
|
||||
tool_name: name,
|
||||
input,
|
||||
}).await;
|
||||
};
|
||||
let _ = inner
|
||||
.turn_message_tx
|
||||
.send(TurnMessage::ToolCall(tool_call))
|
||||
.await;
|
||||
}
|
||||
RuntimeEvent::Result {
|
||||
stored_values,
|
||||
@@ -479,6 +508,8 @@ mod tests {
|
||||
use super::RuntimeResponse;
|
||||
use super::SessionControlCommand;
|
||||
use super::SessionControlContext;
|
||||
use super::WaitOutcome;
|
||||
use super::WaitRequest;
|
||||
use super::run_session_control;
|
||||
use crate::FunctionCallOutputContentItem;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
@@ -487,6 +518,7 @@ mod tests {
|
||||
|
||||
fn execute_request(source: &str) -> ExecuteRequest {
|
||||
ExecuteRequest {
|
||||
cell_id: "1".to_string(),
|
||||
tool_call_id: "call_1".to_string(),
|
||||
enabled_tools: Vec::new(),
|
||||
source: source.to_string(),
|
||||
@@ -829,6 +861,30 @@ image({
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_reports_missing_cell_separately_from_runtime_results() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.wait(WaitRequest {
|
||||
cell_id: "missing".to_string(),
|
||||
yield_time_ms: 1,
|
||||
terminate: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
WaitOutcome::MissingCell(RuntimeResponse::Result {
|
||||
cell_id: "missing".to_string(),
|
||||
content_items: Vec::new(),
|
||||
stored_values: HashMap::new(),
|
||||
error_text: Some("exec cell missing not found".to_string()),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminate_waits_for_runtime_shutdown_before_responding() {
|
||||
let inner = test_inner();
|
||||
|
||||
Reference in New Issue
Block a user