Compare commits

..

2 Commits

Author SHA1 Message Date
Michael Bolin
dfa9a44202 feat: support dotenv 2025-07-22 10:19:42 -07:00
Michael Bolin
d5809ef6ef chore: install an extension for TOML syntax highlighting in the devcontainer 2025-07-22 10:11:07 -07:00
31 changed files with 121 additions and 729 deletions

View File

@@ -93,7 +93,7 @@ jobs:
sudo apt install -y musl-tools pkg-config
- name: Cargo build
run: cargo build --target ${{ matrix.target }} --release --bin codex --bin codex-exec --bin codex-linux-sandbox
run: cargo build --target ${{ matrix.target }} --release --all-targets --all-features
- name: Stage artifacts
shell: bash

3
codex-rs/Cargo.lock generated
View File

@@ -636,6 +636,7 @@ dependencies = [
"codex-login",
"codex-mcp-server",
"codex-tui",
"dotenvy",
"serde_json",
"tokio",
"tracing",
@@ -756,9 +757,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"clap",
"codex-common",
"codex-core",
"dotenvy",
"landlock",
"libc",
"seccompiler",

View File

@@ -26,6 +26,7 @@ codex-login = { path = "../login" }
codex-linux-sandbox = { path = "../linux-sandbox" }
codex-mcp-server = { path = "../mcp-server" }
codex-tui = { path = "../tui" }
dotenvy = "0.15.7"
serde_json = "1"
tokio = { version = "1", features = [
"io-std",

View File

@@ -99,6 +99,12 @@ fn main() -> anyhow::Result<()> {
}
async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
// Load env vars from ~/.codex/.env and `$(pwd)/.env`.
if let Ok(codex_home) = codex_core::config::find_codex_home() {
dotenvy::from_path(codex_home.join(".env")).ok();
}
dotenvy::dotenv().ok();
let cli = MultitoolCli::parse();
match cli.subcommand {

View File

@@ -41,12 +41,7 @@ pub(crate) async fn stream_chat_completions(
for item in &prompt.input {
match item {
ResponseItem::Message {
// id will always be None for input items
id: _,
role,
content,
} => {
ResponseItem::Message { role, content } => {
let mut text = String::new();
for c in content {
match c {
@@ -260,7 +255,6 @@ async fn process_chat_sse<S>(
.and_then(|c| c.as_str())
{
let item = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: content.to_string(),
@@ -408,7 +402,6 @@ where
}))) => {
if !this.cumulative.is_empty() {
let aggregated_item = crate::models::ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![crate::models::ContentItem::OutputText {
text: std::mem::take(&mut this.cumulative),
@@ -437,8 +430,8 @@ where
// will never appear in a Chat Completions stream.
continue;
}
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta { .. })))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta { .. }))) => {
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(_))))
| Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
// Deltas are ignored here since aggregation waits for the
// final OutputItemDone.
continue;

View File

@@ -230,7 +230,6 @@ struct SseEvent {
response: Option<Value>,
item: Option<Value>,
delta: Option<String>,
item_id: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -361,22 +360,21 @@ async fn process_sse<S>(
};
let event = ResponseEvent::OutputItemDone(item);
trace!(?event, "output_item.done");
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
"response.output_text.delta" => {
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
let event = ResponseEvent::OutputTextDelta { delta, item_id };
if let Some(delta) = event.delta {
let event = ResponseEvent::OutputTextDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}
}
}
"response.reasoning_summary_text.delta" => {
if let (Some(delta), Some(item_id)) = (event.delta, event.item_id) {
let event = ResponseEvent::ReasoningSummaryDelta { delta, item_id };
if let Some(delta) = event.delta {
let event = ResponseEvent::ReasoningSummaryDelta(delta);
if tx_event.send(Ok(event)).await.is_err() {
return;
}

View File

@@ -34,18 +34,11 @@ pub struct Prompt {
/// the "fully qualified" tool name (i.e., prefixed with the server name),
/// which should be reported to the model in place of Tool::name.
pub extra_tools: HashMap<String, mcp_types::Tool>,
/// Optional override for the built-in BASE_INSTRUCTIONS.
pub base_instructions_override: Option<String>,
}
impl Prompt {
pub(crate) fn get_full_instructions(&self, model: &str) -> Cow<'_, str> {
let base = self
.base_instructions_override
.as_deref()
.unwrap_or(BASE_INSTRUCTIONS);
let mut sections: Vec<&str> = vec![base];
let mut sections: Vec<&str> = vec![BASE_INSTRUCTIONS];
if let Some(ref user) = self.user_instructions {
sections.push(user);
}
@@ -56,8 +49,6 @@ impl Prompt {
}
}
/// Events emitted by the response stream.
/// https://platform.openai.com/docs/api-reference/responses-streaming/response
#[derive(Debug)]
pub enum ResponseEvent {
Created,
@@ -66,14 +57,8 @@ pub enum ResponseEvent {
response_id: String,
token_usage: Option<TokenUsage>,
},
OutputTextDelta {
delta: String,
item_id: String,
},
ReasoningSummaryDelta {
delta: String,
item_id: String,
},
OutputTextDelta(String),
ReasoningSummaryDelta(String),
}
#[derive(Debug, Serialize)]

View File

@@ -108,15 +108,13 @@ impl Codex {
let (tx_sub, rx_sub) = async_channel::bounded(64);
let (tx_event, rx_event) = async_channel::bounded(1600);
let user_instructions = get_user_instructions(&config).await;
let instructions = get_user_instructions(&config).await;
let configure_session = Op::ConfigureSession {
provider: config.model_provider.clone(),
model: config.model.clone(),
model_reasoning_effort: config.model_reasoning_effort,
model_reasoning_summary: config.model_reasoning_summary,
user_instructions,
base_instructions: config.base_instructions.clone(),
instructions,
approval_policy: config.approval_policy,
sandbox_policy: config.sandbox_policy.clone(),
disable_response_storage: config.disable_response_storage,
@@ -185,8 +183,7 @@ pub(crate) struct Session {
/// the model as well as sandbox policies are resolved against this path
/// instead of `std::env::current_dir()`.
cwd: PathBuf,
base_instructions: Option<String>,
user_instructions: Option<String>,
instructions: Option<String>,
approval_policy: AskForApproval,
sandbox_policy: SandboxPolicy,
shell_environment_policy: ShellEnvironmentPolicy,
@@ -580,8 +577,7 @@ async fn submission_loop(
model,
model_reasoning_effort,
model_reasoning_summary,
user_instructions,
base_instructions,
instructions,
approval_policy,
sandbox_policy,
disable_response_storage,
@@ -629,17 +625,15 @@ async fn submission_loop(
let rollout_recorder = match rollout_recorder {
Some(rec) => Some(rec),
None => {
match RolloutRecorder::new(&config, session_id, user_instructions.clone())
.await
{
Ok(r) => Some(r),
Err(e) => {
warn!("failed to initialise rollout recorder: {e}");
None
}
None => match RolloutRecorder::new(&config, session_id, instructions.clone())
.await
{
Ok(r) => Some(r),
Err(e) => {
warn!("failed to initialise rollout recorder: {e}");
None
}
}
},
};
let client = ModelClient::new(
@@ -705,8 +699,7 @@ async fn submission_loop(
client,
tx_event: tx_event.clone(),
ctrl_c: Arc::clone(&ctrl_c),
user_instructions,
base_instructions,
instructions,
approval_policy,
sandbox_policy,
shell_environment_policy: config.shell_environment_policy.clone(),
@@ -1074,10 +1067,9 @@ async fn run_turn(
let prompt = Prompt {
input,
prev_id,
user_instructions: sess.user_instructions.clone(),
user_instructions: sess.instructions.clone(),
store,
extra_tools,
base_instructions_override: sess.base_instructions.clone(),
};
let mut retries = 0;
@@ -1248,17 +1240,17 @@ async fn try_run_turn(
state.previous_response_id = Some(response_id);
return Ok(output);
}
ResponseEvent::OutputTextDelta { delta, item_id } => {
ResponseEvent::OutputTextDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id }),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
ResponseEvent::ReasoningSummaryDelta { delta, item_id } => {
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id }),
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }),
};
sess.tx_event.send(event).await.ok();
}
@@ -1273,32 +1265,26 @@ async fn handle_response_item(
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
let output = match item {
ResponseItem::Message { content, id, .. } => {
ResponseItem::Message { content, .. } => {
for item in content {
if let ContentItem::OutputText { text } = item {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentMessage(AgentMessageEvent {
id: id.clone(),
message: text,
}),
msg: EventMsg::AgentMessage(AgentMessageEvent { message: text }),
};
sess.tx_event.send(event).await.ok();
}
}
None
}
ResponseItem::Reasoning { id, summary } => {
ResponseItem::Reasoning { id: _, summary } => {
for item in summary {
let text = match item {
ReasoningItemReasoningSummary::SummaryText { text } => text,
};
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
id: id.clone(),
text,
}),
msg: EventMsg::AgentReasoning(AgentReasoningEvent { text }),
};
sess.tx_event.send(event).await.ok();
}
@@ -2098,7 +2084,7 @@ fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> Strin
fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -> Option<String> {
responses.iter().rev().find_map(|item| {
if let ResponseItem::Message { role, content, .. } = item {
if let ResponseItem::Message { role, content } = item {
if role == "assistant" {
content.iter().rev().find_map(|ci| {
if let ContentItem::OutputText { text } = ci {

View File

@@ -63,10 +63,7 @@ pub struct Config {
pub disable_response_storage: bool,
/// User-provided instructions from instructions.md.
pub user_instructions: Option<String>,
/// Base instructions override.
pub base_instructions: Option<String>,
pub instructions: Option<String>,
/// Optional external notifier command. When set, Codex will spawn this
/// program after each completed *turn* (i.e. when the agent finishes
@@ -330,9 +327,6 @@ pub struct ConfigToml {
/// Experimental rollout resume path (absolute path to .jsonl; undocumented).
pub experimental_resume: Option<PathBuf>,
/// Experimental path to a file whose contents replace the built-in BASE_INSTRUCTIONS.
pub experimental_instructions_file: Option<PathBuf>,
}
impl ConfigToml {
@@ -365,7 +359,6 @@ pub struct ConfigOverrides {
pub model_provider: Option<String>,
pub config_profile: Option<String>,
pub codex_linux_sandbox_exe: Option<PathBuf>,
pub base_instructions: Option<String>,
}
impl Config {
@@ -376,7 +369,7 @@ impl Config {
overrides: ConfigOverrides,
codex_home: PathBuf,
) -> std::io::Result<Self> {
let user_instructions = Self::load_instructions(Some(&codex_home));
let instructions = Self::load_instructions(Some(&codex_home));
// Destructure ConfigOverrides fully to ensure all overrides are applied.
let ConfigOverrides {
@@ -387,7 +380,6 @@ impl Config {
model_provider,
config_profile: config_profile_key,
codex_linux_sandbox_exe,
base_instructions,
} = overrides;
let config_profile = match config_profile_key.as_ref().or(cfg.profile.as_ref()) {
@@ -465,10 +457,6 @@ impl Config {
let experimental_resume = cfg.experimental_resume;
let base_instructions = base_instructions.or(Self::get_base_instructions(
cfg.experimental_instructions_file.as_ref(),
));
let config = Self {
model,
model_context_window,
@@ -487,8 +475,7 @@ impl Config {
.or(cfg.disable_response_storage)
.unwrap_or(false),
notify: cfg.notify,
user_instructions,
base_instructions,
instructions,
mcp_servers: cfg.mcp_servers,
model_providers,
project_doc_max_bytes: cfg.project_doc_max_bytes.unwrap_or(PROJECT_DOC_MAX_BYTES),
@@ -538,15 +525,6 @@ impl Config {
}
})
}
fn get_base_instructions(path: Option<&PathBuf>) -> Option<String> {
let path = path.as_ref()?;
std::fs::read_to_string(path)
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
}
}
fn default_model() -> String {
@@ -823,7 +801,7 @@ disable_response_storage = true
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
disable_response_storage: false,
user_instructions: None,
instructions: None,
notify: None,
cwd: fixture.cwd(),
mcp_servers: HashMap::new(),
@@ -840,7 +818,6 @@ disable_response_storage = true
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_resume: None,
base_instructions: None,
},
o3_profile_config
);
@@ -871,7 +848,7 @@ disable_response_storage = true
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
disable_response_storage: false,
user_instructions: None,
instructions: None,
notify: None,
cwd: fixture.cwd(),
mcp_servers: HashMap::new(),
@@ -888,7 +865,6 @@ disable_response_storage = true
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_resume: None,
base_instructions: None,
};
assert_eq!(expected_gpt3_profile_config, gpt3_profile_config);
@@ -934,7 +910,7 @@ disable_response_storage = true
sandbox_policy: SandboxPolicy::new_read_only_policy(),
shell_environment_policy: ShellEnvironmentPolicy::default(),
disable_response_storage: true,
user_instructions: None,
instructions: None,
notify: None,
cwd: fixture.cwd(),
mcp_servers: HashMap::new(),
@@ -951,7 +927,6 @@ disable_response_storage = true
model_supports_reasoning_summaries: false,
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
experimental_resume: None,
base_instructions: None,
};
assert_eq!(expected_zdr_profile_config, zdr_profile_config);

View File

@@ -37,7 +37,6 @@ pub enum ContentItem {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ResponseItem {
Message {
id: Option<String>,
role: String,
content: Vec<ContentItem>,
},
@@ -79,11 +78,7 @@ pub enum ResponseItem {
impl From<ResponseInputItem> for ResponseItem {
fn from(item: ResponseInputItem) -> Self {
match item {
ResponseInputItem::Message { role, content } => Self::Message {
id: None,
role,
content,
},
ResponseInputItem::Message { role, content } => Self::Message { role, content },
ResponseInputItem::FunctionCallOutput { call_id, output } => {
Self::FunctionCallOutput { call_id, output }
}

View File

@@ -27,16 +27,16 @@ const PROJECT_DOC_SEPARATOR: &str = "\n\n--- project-doc ---\n\n";
/// string of instructions.
pub(crate) async fn get_user_instructions(config: &Config) -> Option<String> {
match find_project_doc(config).await {
Ok(Some(project_doc)) => match &config.user_instructions {
Ok(Some(project_doc)) => match &config.instructions {
Some(original_instructions) => Some(format!(
"{original_instructions}{PROJECT_DOC_SEPARATOR}{project_doc}"
)),
None => Some(project_doc),
},
Ok(None) => config.user_instructions.clone(),
Ok(None) => config.instructions.clone(),
Err(e) => {
error!("error trying to find project doc: {e:#}");
config.user_instructions.clone()
config.instructions.clone()
}
}
}
@@ -159,7 +159,7 @@ mod tests {
config.cwd = root.path().to_path_buf();
config.project_doc_max_bytes = limit;
config.user_instructions = instructions.map(ToOwned::to_owned);
config.instructions = instructions.map(ToOwned::to_owned);
config
}

View File

@@ -44,12 +44,8 @@ pub enum Op {
model_reasoning_effort: ReasoningEffortConfig,
model_reasoning_summary: ReasoningSummaryConfig,
/// Model instructions that are appended to the base instructions.
user_instructions: Option<String>,
/// Base instructions override.
base_instructions: Option<String>,
/// Model instructions
instructions: Option<String>,
/// When to escalate for approval for execution
approval_policy: AskForApproval,
/// How to sandbox commands executed in the system
@@ -351,26 +347,22 @@ pub struct TokenUsage {
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageEvent {
pub id: Option<String>,
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentMessageDeltaEvent {
pub delta: String,
pub item_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningEvent {
pub id: String,
pub text: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AgentReasoningDeltaEvent {
pub delta: String,
pub item_id: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@@ -1,3 +1,5 @@
use std::time::Duration;
use codex_core::Codex;
use codex_core::ModelProviderInfo;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
@@ -9,6 +11,7 @@ mod test_support;
use tempfile::TempDir;
use test_support::load_default_config_for_test;
use test_support::load_sse_fixture_with_id;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
@@ -83,15 +86,21 @@ async fn includes_session_id_and_model_headers_in_request() {
.await
.unwrap();
let EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, .. }) =
test_support::wait_for_event(&codex, |ev| matches!(ev, EventMsg::SessionConfigured(_)))
let mut current_session_id = None;
// Wait for TaskComplete
loop {
let ev = timeout(Duration::from_secs(1), codex.next_event())
.await
else {
unreachable!()
};
.unwrap()
.unwrap();
let current_session_id = Some(session_id.to_string());
test_support::wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
if let EventMsg::SessionConfigured(SessionConfiguredEvent { session_id, .. }) = ev.msg {
current_session_id = Some(session_id.to_string());
}
if matches!(ev.msg, EventMsg::TaskComplete(_)) {
break;
}
}
// get request from the server
let request = &server.received_requests().await.unwrap()[0];
@@ -99,76 +108,6 @@ async fn includes_session_id_and_model_headers_in_request() {
let originator = request.headers.get("originator").unwrap();
assert!(current_session_id.is_some());
assert_eq!(
request_body.to_str().unwrap(),
current_session_id.as_ref().unwrap()
);
assert_eq!(request_body.to_str().unwrap(), &current_session_id.unwrap());
assert_eq!(originator.to_str().unwrap(), "codex_cli_rs");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn includes_base_instructions_override_in_request() {
#![allow(clippy::unwrap_used)]
// Mock server
let server = MockServer::start().await;
// First request must NOT include `previous_response_id`.
let first = ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse_completed("resp1"), "text/event-stream");
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(first)
.expect(1)
.mount(&server)
.await;
let model_provider = ModelProviderInfo {
name: "openai".into(),
base_url: format!("{}/v1", server.uri()),
// Environment variable that should exist in the test environment.
// ModelClient will return an error if the environment variable for the
// provider is not set.
env_key: Some("PATH".into()),
env_key_instructions: None,
wire_api: codex_core::WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: None,
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.base_instructions = Some("test instructions".to_string());
config.model_provider = model_provider;
let ctrl_c = std::sync::Arc::new(tokio::sync::Notify::new());
let (codex, ..) = Codex::spawn(config, ctrl_c.clone()).await.unwrap();
codex
.submit(Op::UserInput {
items: vec![InputItem::Text {
text: "hello".into(),
}],
})
.await
.unwrap();
test_support::wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let request = &server.received_requests().await.unwrap()[0];
let request_body = request.body_json::<serde_json::Value>().unwrap();
assert!(
request_body["instructions"]
.as_str()
.unwrap()
.contains("test instructions")
);
}

View File

@@ -120,7 +120,7 @@ async fn live_streaming_and_prev_id_reset() {
.expect("agent closed");
match &ev.msg {
EventMsg::AgentMessage(AgentMessageEvent { id: _, message })
EventMsg::AgentMessage(AgentMessageEvent { message })
if message.contains("second turn succeeded") =>
{
got_expected = true;

View File

@@ -76,24 +76,3 @@ pub fn load_sse_fixture_with_id(path: impl AsRef<std::path::Path>, id: &str) ->
})
.collect()
}
#[allow(dead_code)]
pub async fn wait_for_event<F>(
codex: &codex_core::Codex,
mut predicate: F,
) -> codex_core::protocol::EventMsg
where
F: FnMut(&codex_core::protocol::EventMsg) -> bool,
{
use tokio::time::Duration;
use tokio::time::timeout;
loop {
let ev = timeout(Duration::from_secs(1), codex.next_event())
.await
.expect("timeout waiting for event")
.expect("stream ended unexpectedly");
if predicate(&ev.msg) {
return ev.msg;
}
}
}

View File

@@ -174,7 +174,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
EventMsg::TokenCount(TokenUsage { total_tokens, .. }) => {
ts_println!(self, "tokens used: {total_tokens}");
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, item_id: _ }) => {
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if !self.answer_started {
ts_println!(self, "{}\n", "codex".style(self.italic).style(self.magenta));
self.answer_started = true;
@@ -183,7 +183,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, item_id: _ }) => {
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if !self.show_agent_reasoning {
return;
}
@@ -199,7 +199,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
#[allow(clippy::expect_used)]
std::io::stdout().flush().expect("could not flush stdout");
}
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if answer_started is false, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if !self.answer_started {

View File

@@ -110,7 +110,6 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
cwd: cwd.map(|p| p.canonicalize().unwrap_or(p)),
model_provider: None,
codex_linux_sandbox_exe,
base_instructions: None,
};
// Parse `-c` overrides.
let cli_kv_overrides = match config_overrides.parse_overrides() {

View File

@@ -17,9 +17,7 @@ workspace = true
[dependencies]
anyhow = "1"
clap = { version = "4", features = ["derive"] }
codex-common = { path = "../common", features = ["cli"] }
codex-core = { path = "../core" }
dotenvy = "0.15.7"
tokio = { version = "1", features = ["rt-multi-thread"] }
[dev-dependencies]

View File

@@ -43,10 +43,6 @@ where
crate::run_main();
}
// This modifies the environment, which is not thread-safe, so do this
// before creating any threads/the Tokio runtime.
load_dotenv();
// Regular invocation create a Tokio runtime and execute the provided
// async entry-point.
let runtime = tokio::runtime::Runtime::new()?;
@@ -65,11 +61,3 @@ where
pub fn run_main() -> ! {
panic!("codex-linux-sandbox is only supported on Linux");
}
/// Load env vars from ~/.codex/.env and `$(pwd)/.env`.
fn load_dotenv() {
if let Ok(codex_home) = codex_core::config::find_codex_home() {
dotenvy::from_path(codex_home.join(".env")).ok();
}
dotenvy::dotenv().ok();
}

View File

@@ -14,7 +14,7 @@ use std::path::PathBuf;
use crate::json_to_toml::json_to_toml;
/// Client-supplied configuration for a `codex` tool-call.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct CodexToolCallParam {
/// The *initial user prompt* to start the Codex conversation.
@@ -46,10 +46,6 @@ pub struct CodexToolCallParam {
/// CODEX_HOME/config.toml.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<HashMap<String, serde_json::Value>>,
/// The set of instructions to use instead of the default ones.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub base_instructions: Option<String>,
}
/// Custom enum mirroring [`AskForApproval`], but has an extra dependency on
@@ -139,7 +135,6 @@ impl CodexToolCallParam {
approval_policy,
sandbox,
config: cli_overrides,
base_instructions,
} = self;
// Build the `ConfigOverrides` recognised by codex-core.
@@ -151,7 +146,6 @@ impl CodexToolCallParam {
sandbox_mode: sandbox.map(Into::into),
model_provider: None,
codex_linux_sandbox_exe,
base_instructions,
};
let cli_overrides = cli_overrides
@@ -168,7 +162,7 @@ impl CodexToolCallParam {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CodexToolCallReplyParam {
pub(crate) struct CodexToolCallReplyParam {
/// The *session id* for this conversation.
pub session_id: String,
@@ -274,10 +268,6 @@ mod tests {
"description": "The *initial user prompt* to start the Codex conversation.",
"type": "string"
},
"base-instructions": {
"description": "The set of instructions to use instead of the default ones.",
"type": "string"
},
},
"required": [
"prompt"

View File

@@ -20,7 +20,6 @@ use mcp_types::CallToolResult;
use mcp_types::ContentBlock;
use mcp_types::RequestId;
use mcp_types::TextContent;
use serde_json::json;
use tokio::sync::Mutex;
use uuid::Uuid;
@@ -40,7 +39,6 @@ pub async fn run_codex_tool_session(
config: CodexConfig,
outgoing: Arc<OutgoingMessageSender>,
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
) {
let (codex, first_event, _ctrl_c, session_id) = match init_codex(config).await {
Ok(res) => res,
@@ -75,10 +73,7 @@ pub async fn run_codex_tool_session(
RequestId::String(s) => s.clone(),
RequestId::Integer(n) => n.to_string(),
};
running_requests_id_to_codex_uuid
.lock()
.await
.insert(id.clone(), session_id);
let submission = Submission {
id: sub_id.clone(),
op: Op::UserInput {
@@ -90,12 +85,9 @@ pub async fn run_codex_tool_session(
if let Err(e) = codex.submit_with_id(submission).await {
tracing::error!("Failed to submit initial prompt: {e}");
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid.lock().await.remove(&id);
return;
}
run_codex_tool_session_inner(codex, outgoing, id, running_requests_id_to_codex_uuid).await;
run_codex_tool_session_inner(codex, outgoing, id).await;
}
pub async fn run_codex_tool_session_reply(
@@ -103,13 +95,7 @@ pub async fn run_codex_tool_session_reply(
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
prompt: String,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
session_id: Uuid,
) {
running_requests_id_to_codex_uuid
.lock()
.await
.insert(request_id.clone(), session_id);
if let Err(e) = codex
.submit(Op::UserInput {
items: vec![InputItem::Text { text: prompt }],
@@ -117,28 +103,15 @@ pub async fn run_codex_tool_session_reply(
.await
{
tracing::error!("Failed to submit user input: {e}");
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid
.lock()
.await
.remove(&request_id);
return;
}
run_codex_tool_session_inner(
codex,
outgoing,
request_id,
running_requests_id_to_codex_uuid,
)
.await;
run_codex_tool_session_inner(codex, outgoing, request_id).await;
}
async fn run_codex_tool_session_inner(
codex: Arc<Codex>,
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
) {
let request_id_str = match &request_id {
RequestId::String(s) => s.clone(),
@@ -170,14 +143,6 @@ async fn run_codex_tool_session_inner(
.await;
continue;
}
EventMsg::Error(err_event) => {
// Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption).
let result = json!({
"error": err_event.message,
});
outgoing.send_response(request_id.clone(), result).await;
break;
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
reason,
grant_root,
@@ -213,11 +178,6 @@ async fn run_codex_tool_session_inner(
outgoing
.send_response(request_id.clone(), result.into())
.await;
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid
.lock()
.await
.remove(&request_id);
break;
}
EventMsg::SessionConfigured(_) => {
@@ -232,7 +192,8 @@ async fn run_codex_tool_session_inner(
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::TaskStarted
EventMsg::Error(_)
| EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)

View File

@@ -27,7 +27,6 @@ use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
pub use crate::codex_tool_config::CodexToolCallParam;
pub use crate::codex_tool_config::CodexToolCallReplyParam;
pub use crate::exec_approval::ExecApprovalElicitRequestParams;
pub use crate::exec_approval::ExecApprovalResponse;
pub use crate::patch_approval::PatchApprovalElicitRequestParams;
@@ -82,7 +81,7 @@ pub async fn run_main(codex_linux_sandbox_exe: Option<PathBuf>) -> IoResult<()>
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n),
JSONRPCMessage::Error(e) => processor.process_error(e),
}
}

View File

@@ -10,7 +10,6 @@ use crate::outgoing_message::OutgoingMessageSender;
use codex_core::Codex;
use codex_core::config::Config as CodexConfig;
use codex_core::protocol::Submission;
use mcp_types::CallToolRequestParams;
use mcp_types::CallToolResult;
use mcp_types::ClientRequest;
@@ -36,7 +35,6 @@ pub(crate) struct MessageProcessor {
initialized: bool,
codex_linux_sandbox_exe: Option<PathBuf>,
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
}
impl MessageProcessor {
@@ -51,7 +49,6 @@ impl MessageProcessor {
initialized: false,
codex_linux_sandbox_exe,
session_map: Arc::new(Mutex::new(HashMap::new())),
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
}
}
@@ -119,7 +116,7 @@ impl MessageProcessor {
}
/// Handle a fire-and-forget JSON-RPC notification.
pub(crate) async fn process_notification(&mut self, notification: JSONRPCNotification) {
pub(crate) fn process_notification(&mut self, notification: JSONRPCNotification) {
let server_notification = match ServerNotification::try_from(notification) {
Ok(n) => n,
Err(e) => {
@@ -132,7 +129,7 @@ impl MessageProcessor {
// handler so additional logic can be implemented incrementally.
match server_notification {
ServerNotification::CancelledNotification(params) => {
self.handle_cancelled_notification(params).await;
self.handle_cancelled_notification(params);
}
ServerNotification::ProgressNotification(params) => {
self.handle_progress_notification(params);
@@ -382,7 +379,6 @@ impl MessageProcessor {
// Clone outgoing and session map to move into async task.
let outgoing = self.outgoing.clone();
let session_map = self.session_map.clone();
let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone();
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
@@ -394,7 +390,6 @@ impl MessageProcessor {
config,
outgoing,
session_map,
running_requests_id_to_codex_uuid,
)
.await;
});
@@ -469,12 +464,13 @@ impl MessageProcessor {
// Clone outgoing and session map to move into async task.
let outgoing = self.outgoing.clone();
let running_requests_id_to_codex_uuid = self.running_requests_id_to_codex_uuid.clone();
let codex = {
// Spawn an async task to handle the Codex session so that we do not
// block the synchronous message-processing loop.
task::spawn(async move {
let session_map = session_map_mutex.lock().await;
match session_map.get(&session_id).cloned() {
Some(c) => c,
let codex = match session_map.get(&session_id) {
Some(codex) => codex,
None => {
tracing::warn!("Session not found for session_id: {session_id}");
let result = CallToolResult {
@@ -486,32 +482,21 @@ impl MessageProcessor {
is_error: Some(true),
structured_content: None,
};
// unwrap_or_default is fine here because we know the result is valid JSON
outgoing
.send_response(request_id, serde_json::to_value(result).unwrap_or_default())
.await;
return;
}
}
};
};
// Spawn the long-running reply handler.
tokio::spawn({
let codex = codex.clone();
let outgoing = outgoing.clone();
let prompt = prompt.clone();
let running_requests_id_to_codex_uuid = running_requests_id_to_codex_uuid.clone();
async move {
crate::codex_tool_runner::run_codex_tool_session_reply(
codex,
outgoing,
request_id,
prompt,
running_requests_id_to_codex_uuid,
session_id,
)
.await;
}
crate::codex_tool_runner::run_codex_tool_session_reply(
codex.clone(),
outgoing,
request_id,
prompt.clone(),
)
.await;
});
}
@@ -533,58 +518,11 @@ impl MessageProcessor {
// Notification handlers
// ---------------------------------------------------------------------
async fn handle_cancelled_notification(
fn handle_cancelled_notification(
&self,
params: <mcp_types::CancelledNotification as mcp_types::ModelContextProtocolNotification>::Params,
) {
let request_id = params.request_id;
// Create a stable string form early for logging and submission id.
let request_id_string = match &request_id {
RequestId::String(s) => s.clone(),
RequestId::Integer(i) => i.to_string(),
};
// Obtain the session_id while holding the first lock, then release.
let session_id = {
let map_guard = self.running_requests_id_to_codex_uuid.lock().await;
match map_guard.get(&request_id) {
Some(id) => *id, // Uuid is Copy
None => {
tracing::warn!("Session not found for request_id: {}", request_id_string);
return;
}
}
};
tracing::info!("session_id: {session_id}");
// Obtain the Codex Arc while holding the session_map lock, then release.
let codex_arc = {
let sessions_guard = self.session_map.lock().await;
match sessions_guard.get(&session_id) {
Some(codex) => Arc::clone(codex),
None => {
tracing::warn!("Session not found for session_id: {session_id}");
return;
}
}
};
// Submit interrupt to Codex.
let err = codex_arc
.submit_with_id(Submission {
id: request_id_string,
op: codex_core::protocol::Op::Interrupt,
})
.await;
if let Err(e) = err {
tracing::error!("Failed to submit interrupt to Codex: {e}");
return;
}
// unregister the id so we don't keep it in the map
self.running_requests_id_to_codex_uuid
.lock()
.await
.remove(&request_id);
tracing::info!("notifications/cancelled -> params: {:?}", params);
}
fn handle_progress_notification(

View File

@@ -16,7 +16,6 @@ use serde::Serialize;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::trace;
use tracing::warn;
pub(crate) struct OutgoingMessageSender {
@@ -80,7 +79,6 @@ impl OutgoingMessageSender {
}
pub(crate) async fn send_event_as_notification(&self, event: &Event) {
trace!(?event, "sending event as notification");
#[expect(clippy::expect_used)]
let params = Some(serde_json::to_value(event).expect("Event must serialize"));
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {

View File

@@ -12,7 +12,6 @@ use tokio::process::ChildStdout;
use anyhow::Context;
use assert_cmd::prelude::*;
use codex_mcp_server::CodexToolCallParam;
use codex_mcp_server::CodexToolCallReplyParam;
use mcp_types::CallToolRequestParams;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
@@ -142,29 +141,19 @@ impl McpProcess {
/// correlating notifications.
pub async fn send_codex_tool_call(
&mut self,
params: CodexToolCallParam,
) -> anyhow::Result<i64> {
let codex_tool_call_params = CallToolRequestParams {
name: "codex".to_string(),
arguments: Some(serde_json::to_value(params)?),
};
self.send_request(
mcp_types::CallToolRequest::METHOD,
Some(serde_json::to_value(codex_tool_call_params)?),
)
.await
}
pub async fn send_codex_reply_tool_call(
&mut self,
session_id: &str,
cwd: Option<String>,
prompt: &str,
) -> anyhow::Result<i64> {
let codex_tool_call_params = CallToolRequestParams {
name: "codex-reply".to_string(),
arguments: Some(serde_json::to_value(CodexToolCallReplyParam {
name: "codex".to_string(),
arguments: Some(serde_json::to_value(CodexToolCallParam {
cwd,
prompt: prompt.to_string(),
session_id: session_id.to_string(),
model: None,
profile: None,
approval_policy: None,
sandbox: None,
config: None,
})?),
};
self.send_request(
@@ -191,8 +180,6 @@ impl McpProcess {
Ok(request_id)
}
// allow dead code
#[allow(dead_code)]
pub async fn send_response(
&mut self,
id: RequestId,
@@ -220,8 +207,7 @@ impl McpProcess {
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
Ok(message)
}
// allow dead code
#[allow(dead_code)]
pub async fn read_stream_until_request_message(&mut self) -> anyhow::Result<JSONRPCRequest> {
loop {
let message = self.read_jsonrpc_message().await?;
@@ -244,8 +230,6 @@ impl McpProcess {
}
}
// allow dead code
#[allow(dead_code)]
pub async fn read_stream_until_response_message(
&mut self,
request_id: RequestId,
@@ -272,58 +256,4 @@ impl McpProcess {
}
}
}
pub async fn read_stream_until_configured_response_message(
&mut self,
) -> anyhow::Result<String> {
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(notification) => {
if notification.method == "codex/event" {
if let Some(params) = notification.params {
if let Some(msg) = params.get("msg") {
if let Some(msg_type) = msg.get("type") {
if msg_type == "session_configured" {
if let Some(session_id) = msg.get("session_id") {
return Ok(session_id
.to_string()
.trim_matches('"')
.to_string());
}
}
}
}
}
}
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
}
JSONRPCMessage::Error(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
}
JSONRPCMessage::Response(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Response: {message:?}");
}
}
}
}
// allow dead code
#[allow(dead_code)]
pub async fn send_notification(
&mut self,
method: &str,
params: Option<serde_json::Value>,
) -> anyhow::Result<()> {
self.send_jsonrpc_message(JSONRPCMessage::Notification(JSONRPCNotification {
jsonrpc: JSONRPC_VERSION.into(),
method: method.to_string(),
params,
}))
.await
}
}

View File

@@ -4,8 +4,6 @@ mod responses;
pub use mcp_process::McpProcess;
pub use mock_model_server::create_mock_chat_completions_server;
#[allow(unused_imports)]
pub use responses::create_apply_patch_sse_response;
#[allow(unused_imports)]
pub use responses::create_final_assistant_message_sse_response;
pub use responses::create_shell_sse_response;

View File

@@ -39,8 +39,6 @@ pub fn create_shell_sse_response(
Ok(sse)
}
// allow dead code
#[allow(dead_code)]
pub fn create_final_assistant_message_sse_response(message: &str) -> anyhow::Result<String> {
let assistant_message = json!({
"choices": [
@@ -60,8 +58,6 @@ pub fn create_final_assistant_message_sse_response(message: &str) -> anyhow::Res
Ok(sse)
}
// allow dead code
#[allow(dead_code)]
pub fn create_apply_patch_sse_response(
patch_content: &str,
call_id: &str,

View File

@@ -8,7 +8,6 @@ use std::path::PathBuf;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_core::protocol::FileChange;
use codex_core::protocol::ReviewDecision;
use codex_mcp_server::CodexToolCallParam;
use codex_mcp_server::ExecApprovalElicitRequestParams;
use codex_mcp_server::ExecApprovalResponse;
use codex_mcp_server::PatchApprovalElicitRequestParams;
@@ -77,10 +76,7 @@ async fn shell_command_approval_triggers_elicitation() -> anyhow::Result<()> {
// In turn, it should reply with a tool call, which the MCP should forward
// as an elicitation.
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
prompt: "run `git init`".to_string(),
..Default::default()
})
.send_codex_tool_call(None, "run `git init`")
.await?;
let elicitation_request = timeout(
DEFAULT_READ_TIMEOUT,
@@ -213,11 +209,10 @@ async fn patch_approval_triggers_elicitation() -> anyhow::Result<()> {
// Send a "codex" tool request that will trigger the apply_patch command
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
cwd: Some(cwd.path().to_string_lossy().to_string()),
prompt: "please modify the test file".to_string(),
..Default::default()
})
.send_codex_tool_call(
Some(cwd.path().to_string_lossy().to_string()),
"please modify the test file",
)
.await?;
let elicitation_request = timeout(
DEFAULT_READ_TIMEOUT,
@@ -284,75 +279,6 @@ async fn patch_approval_triggers_elicitation() -> anyhow::Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_codex_tool_passes_base_instructions() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
// Apparently `#[tokio::test]` must return `()`, so we create a helper
// function that returns `Result` so we can use `?` in favor of `unwrap`.
if let Err(err) = codex_tool_passes_base_instructions().await {
panic!("failure: {err}");
}
}
async fn codex_tool_passes_base_instructions() -> anyhow::Result<()> {
#![allow(clippy::unwrap_used)]
let server =
create_mock_chat_completions_server(vec![create_final_assistant_message_sse_response(
"Enjoy!",
)?])
.await;
// Run `codex mcp` with a specific config.toml.
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp_process = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize()).await??;
// Send a "codex" tool request, which should hit the completions endpoint.
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
prompt: "How are you?".to_string(),
base_instructions: Some("You are a helpful assistant.".to_string()),
..Default::default()
})
.await?;
let codex_response = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
)
.await??;
assert_eq!(
JSONRPCResponse {
jsonrpc: JSONRPC_VERSION.into(),
id: RequestId::Integer(codex_request_id),
result: json!({
"content": [
{
"text": "Enjoy!",
"type": "text"
}
]
}),
},
codex_response
);
let requests = server.received_requests().await.unwrap();
let request = requests[0].body_json::<serde_json::Value>().unwrap();
let instructions = request["messages"][0]["content"].as_str().unwrap();
assert!(instructions.starts_with("You are a helpful assistant."));
Ok(())
}
fn create_expected_patch_approval_elicitation_request(
elicitation_request_id: RequestId,
changes: HashMap<PathBuf, FileChange>,

View File

@@ -1,176 +0,0 @@
#![cfg(unix)]
mod common;
use std::path::Path;
use codex_core::exec::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_mcp_server::CodexToolCallParam;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use crate::common::McpProcess;
use crate::common::create_mock_chat_completions_server;
use crate::common::create_shell_sse_response;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_shell_command_interruption() {
if std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!(
"Skipping test because it cannot execute when network is disabled in a Codex sandbox."
);
return;
}
if let Err(err) = shell_command_interruption().await {
panic!("failure: {err}");
}
}
async fn shell_command_interruption() -> anyhow::Result<()> {
// Use a cross-platform blocking command. On Windows plain `sleep` is not guaranteed to exist
// (MSYS/GNU coreutils may be absent) and the failure causes the tool call to finish immediately,
// which triggers a second model request before the test sends the explicit follow-up. That
// prematurely consumes the second mocked SSE response and leads to a third POST (panic: no response for 2).
// Powershell Start-Sleep is always available on Windows runners. On Unix we keep using `sleep`.
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 60".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "60".to_string()];
let workdir_for_shell_function_call = TempDir::new()?;
// Create mock server with a single SSE response: the long sleep command
let server = create_mock_chat_completions_server(vec![
create_shell_sse_response(
shell_command.clone(),
Some(workdir_for_shell_function_call.path()),
Some(60_000), // 60 seconds timeout in ms
"call_sleep",
)?,
create_shell_sse_response(
shell_command.clone(),
Some(workdir_for_shell_function_call.path()),
Some(60_000), // 60 seconds timeout in ms
"call_sleep",
)?,
])
.await;
// Create Codex configuration
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), server.uri())?;
let mut mcp_process = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize()).await??;
// Send codex tool call that triggers "sleep 60"
let codex_request_id = mcp_process
.send_codex_tool_call(CodexToolCallParam {
cwd: None,
prompt: "First Run: run `sleep 60`".to_string(),
model: None,
profile: None,
approval_policy: None,
sandbox: None,
config: None,
base_instructions: None,
})
.await?;
let session_id = mcp_process
.read_stream_until_configured_response_message()
.await?;
// Give the command a moment to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Send interrupt notification
mcp_process
.send_notification(
"notifications/cancelled",
Some(json!({ "requestId": codex_request_id })),
)
.await?;
// Expect Codex to return an error or interruption response
let codex_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
)
.await??;
assert!(
codex_response
.result
.as_object()
.map(|o| o.contains_key("error"))
.unwrap_or(false),
"Expected an interruption or error result, got: {codex_response:?}"
);
let codex_reply_request_id = mcp_process
.send_codex_reply_tool_call(&session_id, "Second Run: run `sleep 60`")
.await?;
// Give the command a moment to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Send interrupt notification
mcp_process
.send_notification(
"notifications/cancelled",
Some(json!({ "requestId": codex_reply_request_id })),
)
.await?;
// Expect Codex to return an error or interruption response
let codex_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_reply_request_id)),
)
.await??;
assert!(
codex_response
.result
.as_object()
.map(|o| o.contains_key("error"))
.unwrap_or(false),
"Expected an interruption or error result, got: {codex_response:?}"
);
Ok(())
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn create_config_toml(codex_home: &Path, server_uri: String) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -246,7 +246,7 @@ impl ChatWidget<'_> {
self.request_redraw();
}
EventMsg::AgentMessage(AgentMessageEvent { id: _, message }) => {
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
// if the answer buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new answer.
if self.answer_buffer.is_empty() {
@@ -259,7 +259,7 @@ impl ChatWidget<'_> {
self.answer_buffer.clear();
self.request_redraw();
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta, .. }) => {
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
if self.answer_buffer.is_empty() {
self.conversation_history
.add_agent_message(&self.config, "".to_string());
@@ -269,7 +269,7 @@ impl ChatWidget<'_> {
.replace_prev_agent_message(&self.config, self.answer_buffer.clone());
self.request_redraw();
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta, .. }) => {
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
if self.reasoning_buffer.is_empty() {
self.conversation_history
.add_agent_reasoning(&self.config, "".to_string());
@@ -279,7 +279,7 @@ impl ChatWidget<'_> {
.replace_prev_agent_reasoning(&self.config, self.reasoning_buffer.clone());
self.request_redraw();
}
EventMsg::AgentReasoning(AgentReasoningEvent { id: _, text }) => {
EventMsg::AgentReasoning(AgentReasoningEvent { text }) => {
// if the reasoning buffer is empty, this means we haven't received any
// delta. Thus, we need to print the message as a new reasoning.
if self.reasoning_buffer.is_empty() {

View File

@@ -75,7 +75,6 @@ pub fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> std::io::
model_provider: None,
config_profile: cli.config_profile.clone(),
codex_linux_sandbox_exe,
base_instructions: None,
};
// Parse `-c` overrides from the CLI.
let cli_kv_overrides = match cli.config_overrides.parse_overrides() {