Implemented thread-level atomic elicitation counter for stopwatch pausing (#12296)

### Purpose
While trying to build out CLI-Tools for the agent to use under skills we
have found that those tools sometimes need to invoke a user elicitation.
These elicitations are handled out of band of the codex app-server but
need to indicate to the exec manager that the command running is not
going to progress on the usual timeout horizon.

### Example
Model calls universal exec:
`$ download-credit-card-history --start-date 2026-01-19 --end-date
2026-02-19 > credit_history.jsonl`

download-cred-card-history might hit a hosted/preauthenticated service
to fetch data. That service might decide that the request requires an
end user approval the access to the personal data. It should be able to
signal to the running thread that the command in question is blocked on
user elicitation. In that case we want the exec to continue, but the
timeout to not expire on the tool call, essentially freezing time until
the user approves or rejects the command at which point the tool would
signal the app-server to decrement the outstanding elicitation count.
Now timeouts would proceed as normal.

### What's Added

- New v2 RPC methods:
    - thread/increment_elicitation
    - thread/decrement_elicitation
- Protocol updates in:
    - codex-rs/app-server-protocol/src/protocol/common.rs
    - codex-rs/app-server-protocol/src/protocol/v2.rs
- App-server handlers wired in:
    - codex-rs/app-server/src/codex_message_processor.rs

### Behavior

- Counter starts at 0 per thread.
- increment atomically increases the counter.
- decrement atomically decreases the counter; decrement at 0 returns
invalid request.
- Transition rules:
- 0 -> 1: broadcast pause state, pausing all active stopwatches
immediately.
    - \>0 -> >0: remain paused.
    - 1 -> 0: broadcast unpause state, resuming stopwatches.
- Core thread/session logic:
    - codex-rs/core/src/codex_thread.rs
    - codex-rs/core/src/codex.rs
    - codex-rs/core/src/mcp_connection_manager.rs

### Exec-server stopwatch integration

- Added centralized stopwatch tracking/controller:
    - codex-rs/exec-server/src/posix/stopwatch_controller.rs
- Hooked pause/unpause broadcast handling + stopwatch registration:
    - codex-rs/exec-server/src/posix/mcp.rs
    - codex-rs/exec-server/src/posix/stopwatch.rs
    - codex-rs/exec-server/src/posix.rs
This commit is contained in:
Channing Conger
2026-03-09 22:29:26 -07:00
committed by GitHub
parent 79307b7933
commit c6343e0649
12 changed files with 773 additions and 15 deletions

View File

@@ -5,6 +5,7 @@ use std::fs::OpenOptions;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Write;
use std::net::TcpListener;
use std::net::TcpStream;
use std::path::Path;
use std::path::PathBuf;
@@ -15,6 +16,7 @@ use std::process::Command;
use std::process::Stdio;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use anyhow::Context;
@@ -51,6 +53,10 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadDecrementElicitationParams;
use codex_app_server_protocol::ThreadDecrementElicitationResponse;
use codex_app_server_protocol::ThreadIncrementElicitationParams;
use codex_app_server_protocol::ThreadIncrementElicitationResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -65,6 +71,7 @@ use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::config::Config;
use codex_otel::current_span_w3c_trace_context;
use codex_otel::otel_provider::OtelProvider;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::W3cTraceContext;
use codex_utils_cli::CliConfigOverrides;
use serde::Serialize;
@@ -99,7 +106,6 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[
"command/exec/outputDelta",
"item/agentMessage/delta",
"item/plan/delta",
"item/commandExecution/outputDelta",
"item/fileChange/outputDelta",
"item/reasoning/summaryTextDelta",
"item/reasoning/textDelta",
@@ -246,6 +252,36 @@ enum CliCommand {
#[arg(long, default_value_t = 20)]
limit: u32,
},
/// Increment the out-of-band elicitation pause counter for a thread.
#[command(name = "thread-increment-elicitation")]
ThreadIncrementElicitation {
/// Existing thread id to update.
thread_id: String,
},
/// Decrement the out-of-band elicitation pause counter for a thread.
#[command(name = "thread-decrement-elicitation")]
ThreadDecrementElicitation {
/// Existing thread id to update.
thread_id: String,
},
/// Run the live websocket harness that proves elicitation pause prevents a
/// 10s unified exec timeout from killing a 15s helper script.
#[command(name = "live-elicitation-timeout-pause")]
LiveElicitationTimeoutPause {
/// Model passed to `thread/start`.
#[arg(long, env = "CODEX_E2E_MODEL", default_value = "gpt-5")]
model: String,
/// Existing workspace path used as the turn cwd.
#[arg(long, value_name = "path", default_value = ".")]
workspace: PathBuf,
/// Helper script to run from the model; defaults to the repo-local
/// live elicitation hold script.
#[arg(long, value_name = "path")]
script: Option<PathBuf>,
/// Seconds the helper script should sleep while the timeout is paused.
#[arg(long, default_value_t = 15)]
hold_seconds: u64,
},
}
pub async fn run() -> Result<()> {
@@ -370,6 +406,33 @@ pub async fn run() -> Result<()> {
let endpoint = resolve_endpoint(codex_bin, url)?;
thread_list(&endpoint, &config_overrides, limit).await
}
CliCommand::ThreadIncrementElicitation { thread_id } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-increment-elicitation")?;
let url = resolve_shared_websocket_url(codex_bin, url, "thread-increment-elicitation")?;
thread_increment_elicitation(&url, thread_id)
}
CliCommand::ThreadDecrementElicitation { thread_id } => {
ensure_dynamic_tools_unused(&dynamic_tools, "thread-decrement-elicitation")?;
let url = resolve_shared_websocket_url(codex_bin, url, "thread-decrement-elicitation")?;
thread_decrement_elicitation(&url, thread_id)
}
CliCommand::LiveElicitationTimeoutPause {
model,
workspace,
script,
hold_seconds,
} => {
ensure_dynamic_tools_unused(&dynamic_tools, "live-elicitation-timeout-pause")?;
live_elicitation_timeout_pause(
codex_bin,
url,
&config_overrides,
model,
workspace,
script,
hold_seconds,
)
}
}
}
@@ -378,6 +441,11 @@ enum Endpoint {
ConnectWs(String),
}
struct BackgroundAppServer {
process: Child,
url: String,
}
fn resolve_endpoint(codex_bin: Option<PathBuf>, url: Option<String>) -> Result<Endpoint> {
if codex_bin.is_some() && url.is_some() {
bail!("--codex-bin and --url are mutually exclusive");
@@ -391,6 +459,66 @@ fn resolve_endpoint(codex_bin: Option<PathBuf>, url: Option<String>) -> Result<E
Ok(Endpoint::ConnectWs("ws://127.0.0.1:4222".to_string()))
}
fn resolve_shared_websocket_url(
codex_bin: Option<PathBuf>,
url: Option<String>,
command: &str,
) -> Result<String> {
if codex_bin.is_some() {
bail!(
"{command} requires --url or an already-running websocket app-server; --codex-bin would spawn a private stdio app-server instead"
);
}
Ok(url.unwrap_or_else(|| "ws://127.0.0.1:4222".to_string()))
}
impl BackgroundAppServer {
fn spawn(codex_bin: &Path, config_overrides: &[String]) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
.context("failed to reserve a local port for websocket app-server")?;
let addr = listener.local_addr()?;
drop(listener);
let url = format!("ws://{addr}");
let mut cmd = Command::new(codex_bin);
if let Some(codex_bin_parent) = codex_bin.parent() {
let mut path = OsString::from(codex_bin_parent.as_os_str());
if let Some(existing_path) = std::env::var_os("PATH") {
path.push(":");
path.push(existing_path);
}
cmd.env("PATH", path);
}
for override_kv in config_overrides {
cmd.arg("--config").arg(override_kv);
}
let process = cmd
.arg("app-server")
.arg("--listen")
.arg(&url)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::inherit())
.spawn()
.with_context(|| format!("failed to start `{}` app-server", codex_bin.display()))?;
Ok(Self { process, url })
}
}
impl Drop for BackgroundAppServer {
fn drop(&mut self) {
if let Ok(Some(status)) = self.process.try_wait() {
println!("[background app-server exited: {status}]");
return;
}
let _ = self.process.kill();
let _ = self.process.wait();
}
}
fn serve(codex_bin: &Path, config_overrides: &[String], listen: &str, kill: bool) -> Result<()> {
let runtime_dir = PathBuf::from("/tmp/codex-app-server-test-client");
fs::create_dir_all(&runtime_dir)
@@ -1020,6 +1148,190 @@ async fn with_client<T>(
result
}
fn thread_increment_elicitation(url: &str, thread_id: String) -> Result<()> {
let endpoint = Endpoint::ConnectWs(url.to_string());
let mut client = CodexClient::connect(&endpoint, &[])?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response =
client.thread_increment_elicitation(ThreadIncrementElicitationParams { thread_id })?;
println!("< thread/increment_elicitation response: {response:?}");
Ok(())
}
fn thread_decrement_elicitation(url: &str, thread_id: String) -> Result<()> {
let endpoint = Endpoint::ConnectWs(url.to_string());
let mut client = CodexClient::connect(&endpoint, &[])?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let response =
client.thread_decrement_elicitation(ThreadDecrementElicitationParams { thread_id })?;
println!("< thread/decrement_elicitation response: {response:?}");
Ok(())
}
fn live_elicitation_timeout_pause(
codex_bin: Option<PathBuf>,
url: Option<String>,
config_overrides: &[String],
model: String,
workspace: PathBuf,
script: Option<PathBuf>,
hold_seconds: u64,
) -> Result<()> {
if cfg!(windows) {
bail!("live-elicitation-timeout-pause currently requires a POSIX shell");
}
if hold_seconds <= 10 {
bail!("--hold-seconds must be greater than 10 to exceed the unified exec timeout");
}
let mut _background_server = None;
let websocket_url = match (codex_bin, url) {
(Some(_), Some(_)) => bail!("--codex-bin and --url are mutually exclusive"),
(Some(codex_bin), None) => {
let server = BackgroundAppServer::spawn(&codex_bin, config_overrides)?;
let websocket_url = server.url.clone();
_background_server = Some(server);
websocket_url
}
(None, Some(url)) => url,
(None, None) => "ws://127.0.0.1:4222".to_string(),
};
let script_path = script.unwrap_or_else(|| {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("scripts")
.join("live_elicitation_hold.sh")
});
if !script_path.is_file() {
bail!("helper script not found: {}", script_path.display());
}
let workspace = workspace
.canonicalize()
.with_context(|| format!("failed to resolve workspace `{}`", workspace.display()))?;
let app_server_test_client_bin = std::env::current_exe()
.context("failed to resolve codex-app-server-test-client binary path")?;
let endpoint = Endpoint::ConnectWs(websocket_url.clone());
let mut client = CodexClient::connect(&endpoint, &[])?;
let initialize = client.initialize()?;
println!("< initialize response: {initialize:?}");
let thread_response = client.thread_start(ThreadStartParams {
model: Some(model),
..Default::default()
})?;
println!("< thread/start response: {thread_response:?}");
let thread_id = thread_response.thread.id;
let command = format!(
"APP_SERVER_URL={} APP_SERVER_TEST_CLIENT_BIN={} ELICITATION_HOLD_SECONDS={} sh {}",
shell_quote(&websocket_url),
shell_quote(&app_server_test_client_bin.display().to_string()),
hold_seconds,
shell_quote(&script_path.display().to_string()),
);
let prompt = format!(
"Use the `exec_command` tool exactly once. Set its `cmd` field to the exact shell command below. Do not rewrite it, do not split it, do not call any other tool, do not set `yield_time_ms`, and wait for the command to finish before replying.\n\n{command}\n\nAfter the command finishes, reply with exactly `DONE`."
);
let started_at = Instant::now();
let turn_response = client.turn_start(TurnStartParams {
thread_id: thread_id.clone(),
input: vec![V2UserInput::Text {
text: prompt,
text_elements: Vec::new(),
}],
approval_policy: Some(AskForApproval::Never),
sandbox_policy: Some(SandboxPolicy::DangerFullAccess),
effort: Some(ReasoningEffort::High),
cwd: Some(workspace),
..Default::default()
})?;
println!("< turn/start response: {turn_response:?}");
let stream_result = client.stream_turn(&thread_id, &turn_response.turn.id);
let elapsed = started_at.elapsed();
let validation_result = (|| -> Result<()> {
stream_result?;
let helper_output = client
.command_execution_outputs
.iter()
.find(|output| output.contains("[elicitation-hold]"))
.cloned()
.ok_or_else(|| anyhow::anyhow!("expected helper script markers in command output"))?;
let minimum_elapsed = Duration::from_secs(hold_seconds.saturating_sub(1));
if client.last_turn_status != Some(TurnStatus::Completed) {
bail!(
"expected completed turn, got {:?} (last error: {:?})",
client.last_turn_status,
client.last_turn_error_message
);
}
if !client
.command_execution_statuses
.contains(&CommandExecutionStatus::Completed)
{
bail!(
"expected a completed command execution, got {:?}",
client.command_execution_statuses
);
}
if !client.helper_done_seen || !helper_output.contains("[elicitation-hold] done") {
bail!(
"expected helper script completion marker in command output, got: {helper_output:?}"
);
}
if !client.unexpected_items_before_helper_done.is_empty() {
bail!(
"turn started new items before helper completion: {:?}",
client.unexpected_items_before_helper_done
);
}
if client.turn_completed_before_helper_done {
bail!("turn completed before helper script finished");
}
if elapsed < minimum_elapsed {
bail!(
"turn completed too quickly to prove timeout pause worked: elapsed={elapsed:?}, expected at least {minimum_elapsed:?}"
);
}
Ok(())
})();
match client.thread_decrement_elicitation(ThreadDecrementElicitationParams {
thread_id: thread_id.clone(),
}) {
Ok(response) => {
println!("[cleanup] thread/decrement_elicitation response after harness: {response:?}");
}
Err(err) => {
eprintln!("[cleanup] thread/decrement_elicitation ignored: {err:#}");
}
}
validation_result?;
println!(
"[live elicitation timeout pause summary] thread_id={thread_id}, turn_id={}, elapsed={elapsed:?}, command_statuses={:?}",
turn_response.turn.id, client.command_execution_statuses
);
Ok(())
}
fn ensure_dynamic_tools_unused(
dynamic_tools: &Option<Vec<DynamicToolSpec>>,
command: &str,
@@ -1073,7 +1385,14 @@ struct CodexClient {
command_approval_count: usize,
command_approval_item_ids: Vec<String>,
command_execution_statuses: Vec<CommandExecutionStatus>,
command_execution_outputs: Vec<String>,
command_output_stream: String,
command_item_started: bool,
helper_done_seen: bool,
turn_completed_before_helper_done: bool,
unexpected_items_before_helper_done: Vec<ThreadItem>,
last_turn_status: Option<TurnStatus>,
last_turn_error_message: Option<String>,
}
#[derive(Debug, Clone, Copy)]
@@ -1082,6 +1401,18 @@ enum CommandApprovalBehavior {
AbortOn(usize),
}
fn item_started_before_helper_done_is_unexpected(
item: &ThreadItem,
command_item_started: bool,
helper_done_seen: bool,
) -> bool {
if !command_item_started || helper_done_seen {
return false;
}
!matches!(item, ThreadItem::UserMessage { .. })
}
impl CodexClient {
fn connect(endpoint: &Endpoint, config_overrides: &[String]) -> Result<Self> {
match endpoint {
@@ -1132,17 +1463,35 @@ impl CodexClient {
command_approval_count: 0,
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
command_execution_outputs: Vec::new(),
command_output_stream: String::new(),
command_item_started: false,
helper_done_seen: false,
turn_completed_before_helper_done: false,
unexpected_items_before_helper_done: Vec::new(),
last_turn_status: None,
last_turn_error_message: None,
})
}
fn connect_websocket(url: &str) -> Result<Self> {
let parsed = Url::parse(url).with_context(|| format!("invalid websocket URL `{url}`"))?;
let (socket, _response) = connect(parsed.as_str()).with_context(|| {
format!(
"failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`"
)
})?;
let deadline = Instant::now() + Duration::from_secs(10);
let (socket, _response) = loop {
match connect(parsed.as_str()) {
Ok(result) => break result,
Err(err) => {
if Instant::now() >= deadline {
return Err(err).with_context(|| {
format!(
"failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`"
)
});
}
thread::sleep(Duration::from_millis(50));
}
}
};
Ok(Self {
transport: ClientTransport::WebSocket {
url: url.to_string(),
@@ -1153,10 +1502,27 @@ impl CodexClient {
command_approval_count: 0,
command_approval_item_ids: Vec::new(),
command_execution_statuses: Vec::new(),
command_execution_outputs: Vec::new(),
command_output_stream: String::new(),
command_item_started: false,
helper_done_seen: false,
turn_completed_before_helper_done: false,
unexpected_items_before_helper_done: Vec::new(),
last_turn_status: None,
last_turn_error_message: None,
})
}
fn note_helper_output(&mut self, output: &str) {
self.command_output_stream.push_str(output);
if self
.command_output_stream
.contains("[elicitation-hold] done")
{
self.helper_done_seen = true;
}
}
fn initialize(&mut self) -> Result<InitializeResponse> {
self.initialize_with_experimental_api(true)
}
@@ -1268,6 +1634,32 @@ impl CodexClient {
self.send_request(request, request_id, "thread/list")
}
fn thread_increment_elicitation(
&mut self,
params: ThreadIncrementElicitationParams,
) -> Result<ThreadIncrementElicitationResponse> {
let request_id = self.request_id();
let request = ClientRequest::ThreadIncrementElicitation {
request_id: request_id.clone(),
params,
};
self.send_request(request, request_id, "thread/increment_elicitation")
}
fn thread_decrement_elicitation(
&mut self,
params: ThreadDecrementElicitationParams,
) -> Result<ThreadDecrementElicitationResponse> {
let request_id = self.request_id();
let request = ClientRequest::ThreadDecrementElicitation {
request_id: request_id.clone(),
params,
};
self.send_request(request, request_id, "thread/decrement_elicitation")
}
fn wait_for_account_login_completion(
&mut self,
expected_login_id: &str,
@@ -1320,6 +1712,7 @@ impl CodexClient {
std::io::stdout().flush().ok();
}
ServerNotification::CommandExecutionOutputDelta(delta) => {
self.note_helper_output(&delta.delta);
print!("{}", delta.delta);
std::io::stdout().flush().ok();
}
@@ -1328,17 +1721,48 @@ impl CodexClient {
std::io::stdout().flush().ok();
}
ServerNotification::ItemStarted(payload) => {
if matches!(payload.item, ThreadItem::CommandExecution { .. }) {
if self.command_item_started && !self.helper_done_seen {
self.unexpected_items_before_helper_done
.push(payload.item.clone());
}
self.command_item_started = true;
} else if item_started_before_helper_done_is_unexpected(
&payload.item,
self.command_item_started,
self.helper_done_seen,
) {
self.unexpected_items_before_helper_done
.push(payload.item.clone());
}
println!("\n< item started: {:?}", payload.item);
}
ServerNotification::ItemCompleted(payload) => {
if let ThreadItem::CommandExecution { status, .. } = payload.item.clone() {
if let ThreadItem::CommandExecution {
status,
aggregated_output,
..
} = payload.item.clone()
{
self.command_execution_statuses.push(status);
if let Some(aggregated_output) = aggregated_output {
self.note_helper_output(&aggregated_output);
self.command_execution_outputs.push(aggregated_output);
}
}
println!("< item completed: {:?}", payload.item);
}
ServerNotification::TurnCompleted(payload) => {
if payload.turn.id == turn_id {
self.last_turn_status = Some(payload.turn.status.clone());
if self.command_item_started && !self.helper_done_seen {
self.turn_completed_before_helper_done = true;
}
self.last_turn_error_message = payload
.turn
.error
.as_ref()
.map(|error| error.message.clone());
println!("\n< turn/completed notification: {:?}", payload.turn.status);
if payload.turn.status == TurnStatus::Failed
&& let Some(error) = payload.turn.error