use std::collections::VecDeque; use std::io::BufRead; use std::io::BufReader; use std::io::Write; use std::process::Child; use std::process::ChildStdin; use std::process::ChildStdout; use std::process::Command; use std::process::Stdio; use std::thread; use std::time::Duration; use anyhow::Context; use anyhow::Result; use anyhow::bail; use clap::Parser; use clap::Subcommand; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; use codex_app_server_protocol::ApprovalDecision; use codex_app_server_protocol::AskForApproval; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::CommandExecutionRequestAcceptSettings; use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; use codex_app_server_protocol::FileChangeRequestApprovalParams; use codex_app_server_protocol::FileChangeRequestApprovalResponse; use codex_app_server_protocol::GetAccountRateLimitsResponse; use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::InitializeResponse; use codex_app_server_protocol::InputItem; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::LoginChatGptResponse; use codex_app_server_protocol::NewConversationParams; use codex_app_server_protocol::NewConversationResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SandboxPolicy; use codex_app_server_protocol::SendUserMessageParams; use codex_app_server_protocol::SendUserMessageResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput as V2UserInput; use codex_protocol::ConversationId; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; use uuid::Uuid; /// Minimal launcher that initializes the Codex app-server and logs the handshake. #[derive(Parser)] #[command(author = "Codex", version, about = "Bootstrap Codex app-server", long_about = None)] struct Cli { /// Path to the `codex` CLI binary. #[arg(long, env = "CODEX_BIN", default_value = "codex")] codex_bin: String, #[command(subcommand)] command: CliCommand, } #[derive(Subcommand)] enum CliCommand { /// Send a user message through the Codex app-server. SendMessage { /// User message to send to Codex. #[arg()] user_message: String, }, /// Send a user message through the app-server V2 thread/turn APIs. SendMessageV2 { /// User message to send to Codex. #[arg()] user_message: String, }, /// Start a V2 turn that elicits an ExecCommand approval. #[command(name = "trigger-cmd-approval")] TriggerCmdApproval { /// Optional prompt; defaults to a simple python command. #[arg()] user_message: Option, }, /// Start a V2 turn that elicits an ApplyPatch approval. #[command(name = "trigger-patch-approval")] TriggerPatchApproval { /// Optional prompt; defaults to creating a file via apply_patch. #[arg()] user_message: Option, }, /// Start a V2 turn that should not elicit an ExecCommand approval. #[command(name = "no-trigger-cmd-approval")] NoTriggerCmdApproval, /// Trigger the ChatGPT login flow and wait for completion. TestLogin, /// Fetch the current account rate limits from the Codex app-server. GetAccountRateLimits, } fn main() -> Result<()> { let Cli { codex_bin, command } = Cli::parse(); match command { CliCommand::SendMessage { user_message } => send_message(codex_bin, user_message), CliCommand::SendMessageV2 { user_message } => send_message_v2(codex_bin, user_message), CliCommand::TriggerCmdApproval { user_message } => { trigger_cmd_approval(codex_bin, user_message) } CliCommand::TriggerPatchApproval { user_message } => { trigger_patch_approval(codex_bin, user_message) } CliCommand::NoTriggerCmdApproval => no_trigger_cmd_approval(codex_bin), CliCommand::TestLogin => test_login(codex_bin), CliCommand::GetAccountRateLimits => get_account_rate_limits(codex_bin), } } fn send_message(codex_bin: String, user_message: String) -> Result<()> { let mut client = CodexClient::spawn(codex_bin)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); let conversation = client.new_conversation()?; println!("< newConversation response: {conversation:?}"); let subscription = client.add_conversation_listener(&conversation.conversation_id)?; println!("< addConversationListener response: {subscription:?}"); let send_response = client.send_user_message(&conversation.conversation_id, &user_message)?; println!("< sendUserMessage response: {send_response:?}"); client.stream_conversation(&conversation.conversation_id)?; client.remove_conversation_listener(subscription.subscription_id)?; Ok(()) } fn send_message_v2(codex_bin: String, user_message: String) -> Result<()> { send_message_v2_with_policies(codex_bin, user_message, None, None) } fn trigger_cmd_approval(codex_bin: String, user_message: Option) -> Result<()> { let default_prompt = "Run `touch /tmp/should-trigger-approval` so I can confirm the file exists."; let message = user_message.unwrap_or_else(|| default_prompt.to_string()); send_message_v2_with_policies( codex_bin, message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), ) } fn trigger_patch_approval(codex_bin: String, user_message: Option) -> Result<()> { let default_prompt = "Create a file named APPROVAL_DEMO.txt containing a short hello message using apply_patch."; let message = user_message.unwrap_or_else(|| default_prompt.to_string()); send_message_v2_with_policies( codex_bin, message, Some(AskForApproval::OnRequest), Some(SandboxPolicy::ReadOnly), ) } fn no_trigger_cmd_approval(codex_bin: String) -> Result<()> { let prompt = "Run `touch should_not_trigger_approval.txt`"; send_message_v2_with_policies(codex_bin, prompt.to_string(), None, None) } fn send_message_v2_with_policies( codex_bin: String, user_message: String, approval_policy: Option, sandbox_policy: Option, ) -> Result<()> { let mut client = CodexClient::spawn(codex_bin)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); let thread_response = client.thread_start(ThreadStartParams::default())?; println!("< thread/start response: {thread_response:?}"); let mut turn_params = TurnStartParams { thread_id: thread_response.thread.id.clone(), input: vec![V2UserInput::Text { text: user_message }], ..Default::default() }; turn_params.approval_policy = approval_policy; turn_params.sandbox_policy = sandbox_policy; let turn_response = client.turn_start(turn_params)?; println!("< turn/start response: {turn_response:?}"); client.stream_turn(&thread_response.thread.id, &turn_response.turn.id)?; Ok(()) } fn test_login(codex_bin: String) -> Result<()> { let mut client = CodexClient::spawn(codex_bin)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); let login_response = client.login_chat_gpt()?; println!("< loginChatGpt response: {login_response:?}"); println!( "Open the following URL in your browser to continue:\n{}", login_response.auth_url ); let completion = client.wait_for_login_completion(&login_response.login_id)?; println!("< loginChatGptComplete notification: {completion:?}"); if completion.success { println!("Login succeeded."); Ok(()) } else { bail!( "login failed: {}", completion .error .as_deref() .unwrap_or("unknown error from loginChatGptComplete") ); } } fn get_account_rate_limits(codex_bin: String) -> Result<()> { let mut client = CodexClient::spawn(codex_bin)?; let initialize = client.initialize()?; println!("< initialize response: {initialize:?}"); let response = client.get_account_rate_limits()?; println!("< account/rateLimits/read response: {response:?}"); Ok(()) } struct CodexClient { child: Child, stdin: Option, stdout: BufReader, pending_notifications: VecDeque, } impl CodexClient { fn spawn(codex_bin: String) -> Result { let mut codex_app_server = Command::new(&codex_bin) .arg("app-server") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn() .with_context(|| format!("failed to start `{codex_bin}` app-server"))?; let stdin = codex_app_server .stdin .take() .context("codex app-server stdin unavailable")?; let stdout = codex_app_server .stdout .take() .context("codex app-server stdout unavailable")?; Ok(Self { child: codex_app_server, stdin: Some(stdin), stdout: BufReader::new(stdout), pending_notifications: VecDeque::new(), }) } fn initialize(&mut self) -> Result { let request_id = self.request_id(); let request = ClientRequest::Initialize { request_id: request_id.clone(), params: InitializeParams { client_info: ClientInfo { name: "codex-toy-app-server".to_string(), title: Some("Codex Toy App Server".to_string()), version: env!("CARGO_PKG_VERSION").to_string(), }, }, }; self.send_request(request, request_id, "initialize") } fn new_conversation(&mut self) -> Result { let request_id = self.request_id(); let request = ClientRequest::NewConversation { request_id: request_id.clone(), params: NewConversationParams::default(), }; self.send_request(request, request_id, "newConversation") } fn add_conversation_listener( &mut self, conversation_id: &ConversationId, ) -> Result { let request_id = self.request_id(); let request = ClientRequest::AddConversationListener { request_id: request_id.clone(), params: AddConversationListenerParams { conversation_id: *conversation_id, experimental_raw_events: false, }, }; self.send_request(request, request_id, "addConversationListener") } fn remove_conversation_listener(&mut self, subscription_id: Uuid) -> Result<()> { let request_id = self.request_id(); let request = ClientRequest::RemoveConversationListener { request_id: request_id.clone(), params: codex_app_server_protocol::RemoveConversationListenerParams { subscription_id }, }; self.send_request::( request, request_id, "removeConversationListener", )?; Ok(()) } fn send_user_message( &mut self, conversation_id: &ConversationId, message: &str, ) -> Result { let request_id = self.request_id(); let request = ClientRequest::SendUserMessage { request_id: request_id.clone(), params: SendUserMessageParams { conversation_id: *conversation_id, items: vec![InputItem::Text { text: message.to_string(), }], }, }; self.send_request(request, request_id, "sendUserMessage") } fn thread_start(&mut self, params: ThreadStartParams) -> Result { let request_id = self.request_id(); let request = ClientRequest::ThreadStart { request_id: request_id.clone(), params, }; self.send_request(request, request_id, "thread/start") } fn turn_start(&mut self, params: TurnStartParams) -> Result { let request_id = self.request_id(); let request = ClientRequest::TurnStart { request_id: request_id.clone(), params, }; self.send_request(request, request_id, "turn/start") } fn login_chat_gpt(&mut self) -> Result { let request_id = self.request_id(); let request = ClientRequest::LoginChatGpt { request_id: request_id.clone(), params: None, }; self.send_request(request, request_id, "loginChatGpt") } fn get_account_rate_limits(&mut self) -> Result { let request_id = self.request_id(); let request = ClientRequest::GetAccountRateLimits { request_id: request_id.clone(), params: None, }; self.send_request(request, request_id, "account/rateLimits/read") } fn stream_conversation(&mut self, conversation_id: &ConversationId) -> Result<()> { loop { let notification = self.next_notification()?; if !notification.method.starts_with("codex/event/") { continue; } if let Some(event) = self.extract_event(notification, conversation_id)? { match &event.msg { EventMsg::AgentMessage(event) => { println!("{}", event.message); } EventMsg::AgentMessageDelta(event) => { print!("{}", event.delta); std::io::stdout().flush().ok(); } EventMsg::TaskComplete(event) => { println!("\n[task complete: {event:?}]"); break; } EventMsg::TurnAborted(event) => { println!("\n[turn aborted: {:?}]", event.reason); break; } EventMsg::Error(event) => { println!("[error] {event:?}"); } _ => { println!("[UNKNOWN EVENT] {:?}", event.msg); } } } } Ok(()) } fn wait_for_login_completion( &mut self, expected_login_id: &Uuid, ) -> Result { loop { let notification = self.next_notification()?; if let Ok(server_notification) = ServerNotification::try_from(notification) { match server_notification { ServerNotification::LoginChatGptComplete(completion) => { if &completion.login_id == expected_login_id { return Ok(completion); } println!( "[ignoring loginChatGptComplete for unexpected login_id: {}]", completion.login_id ); } ServerNotification::AuthStatusChange(status) => { println!("< authStatusChange notification: {status:?}"); } ServerNotification::AccountRateLimitsUpdated(snapshot) => { println!("< accountRateLimitsUpdated notification: {snapshot:?}"); } ServerNotification::SessionConfigured(_) => { // SessionConfigured notifications are unrelated to login; skip. } _ => {} } } // Not a server notification (likely a conversation event); keep waiting. } } fn stream_turn(&mut self, thread_id: &str, turn_id: &str) -> Result<()> { loop { let notification = self.next_notification()?; let Ok(server_notification) = ServerNotification::try_from(notification) else { continue; }; match server_notification { ServerNotification::ThreadStarted(payload) => { if payload.thread.id == thread_id { println!("< thread/started notification: {:?}", payload.thread); } } ServerNotification::TurnStarted(payload) => { if payload.turn.id == turn_id { println!("< turn/started notification: {:?}", payload.turn.status); } } ServerNotification::AgentMessageDelta(delta) => { print!("{}", delta.delta); std::io::stdout().flush().ok(); } ServerNotification::CommandExecutionOutputDelta(delta) => { print!("{}", delta.delta); std::io::stdout().flush().ok(); } ServerNotification::ItemStarted(payload) => { println!("\n< item started: {:?}", payload.item); } ServerNotification::ItemCompleted(payload) => { println!("< item completed: {:?}", payload.item); } ServerNotification::TurnCompleted(payload) => { if payload.turn.id == turn_id { println!("\n< turn/completed notification: {:?}", payload.turn.status); if let TurnStatus::Failed { error } = &payload.turn.status { println!("[turn error] {}", error.message); } break; } } ServerNotification::McpToolCallProgress(payload) => { println!("< MCP tool progress: {}", payload.message); } _ => { println!("[UNKNOWN SERVER NOTIFICATION] {server_notification:?}"); } } } Ok(()) } fn extract_event( &self, notification: JSONRPCNotification, conversation_id: &ConversationId, ) -> Result> { let params = notification .params .context("event notification missing params")?; let mut map = match params { Value::Object(map) => map, other => bail!("unexpected params shape: {other:?}"), }; let conversation_value = map .remove("conversationId") .context("event missing conversationId")?; let notification_conversation: ConversationId = serde_json::from_value(conversation_value) .context("conversationId was not a valid UUID")?; if ¬ification_conversation != conversation_id { return Ok(None); } let event_value = Value::Object(map); let event: Event = serde_json::from_value(event_value).context("failed to decode event payload")?; Ok(Some(event)) } fn send_request( &mut self, request: ClientRequest, request_id: RequestId, method: &str, ) -> Result where T: DeserializeOwned, { self.write_request(&request)?; self.wait_for_response(request_id, method) } fn write_request(&mut self, request: &ClientRequest) -> Result<()> { let request_json = serde_json::to_string(request)?; let request_pretty = serde_json::to_string_pretty(request)?; print_multiline_with_prefix("> ", &request_pretty); if let Some(stdin) = self.stdin.as_mut() { writeln!(stdin, "{request_json}")?; stdin .flush() .context("failed to flush request to codex app-server")?; } else { bail!("codex app-server stdin closed"); } Ok(()) } fn wait_for_response(&mut self, request_id: RequestId, method: &str) -> Result where T: DeserializeOwned, { loop { let message = self.read_jsonrpc_message()?; match message { JSONRPCMessage::Response(JSONRPCResponse { id, result }) => { if id == request_id { return serde_json::from_value(result) .with_context(|| format!("{method} response missing payload")); } } JSONRPCMessage::Error(err) => { if err.id == request_id { bail!("{method} failed: {err:?}"); } } JSONRPCMessage::Notification(notification) => { self.pending_notifications.push_back(notification); } JSONRPCMessage::Request(request) => { self.handle_server_request(request)?; } } } } fn next_notification(&mut self) -> Result { if let Some(notification) = self.pending_notifications.pop_front() { return Ok(notification); } loop { let message = self.read_jsonrpc_message()?; match message { JSONRPCMessage::Notification(notification) => return Ok(notification), JSONRPCMessage::Response(_) | JSONRPCMessage::Error(_) => { // No outstanding requests, so ignore stray responses/errors for now. continue; } JSONRPCMessage::Request(request) => { self.handle_server_request(request)?; } } } } fn read_jsonrpc_message(&mut self) -> Result { loop { let mut response_line = String::new(); let bytes = self .stdout .read_line(&mut response_line) .context("failed to read from codex app-server")?; if bytes == 0 { bail!("codex app-server closed stdout"); } let trimmed = response_line.trim(); if trimmed.is_empty() { continue; } let parsed: Value = serde_json::from_str(trimmed).context("response was not valid JSON-RPC")?; let pretty = serde_json::to_string_pretty(&parsed)?; print_multiline_with_prefix("< ", &pretty); let message: JSONRPCMessage = serde_json::from_value(parsed) .context("response was not a valid JSON-RPC message")?; return Ok(message); } } fn request_id(&self) -> RequestId { RequestId::String(Uuid::new_v4().to_string()) } fn handle_server_request(&mut self, request: JSONRPCRequest) -> Result<()> { let server_request = ServerRequest::try_from(request) .context("failed to deserialize ServerRequest from JSONRPCRequest")?; match server_request { ServerRequest::CommandExecutionRequestApproval { request_id, params } => { self.handle_command_execution_request_approval(request_id, params)?; } ServerRequest::FileChangeRequestApproval { request_id, params } => { self.approve_file_change_request(request_id, params)?; } other => { bail!("received unsupported server request: {other:?}"); } } Ok(()) } fn handle_command_execution_request_approval( &mut self, request_id: RequestId, params: CommandExecutionRequestApprovalParams, ) -> Result<()> { let CommandExecutionRequestApprovalParams { thread_id, turn_id, item_id, reason, risk, } = params; println!( "\n< commandExecution approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" ); if let Some(reason) = reason.as_deref() { println!("< reason: {reason}"); } if let Some(risk) = risk.as_ref() { println!("< risk assessment: {risk:?}"); } let response = CommandExecutionRequestApprovalResponse { decision: ApprovalDecision::Accept, accept_settings: Some(CommandExecutionRequestAcceptSettings { for_session: false }), }; self.send_server_request_response(request_id, &response)?; println!("< approved commandExecution request for item {item_id}"); Ok(()) } fn approve_file_change_request( &mut self, request_id: RequestId, params: FileChangeRequestApprovalParams, ) -> Result<()> { let FileChangeRequestApprovalParams { thread_id, turn_id, item_id, reason, grant_root, } = params; println!( "\n< fileChange approval requested for thread {thread_id}, turn {turn_id}, item {item_id}" ); if let Some(reason) = reason.as_deref() { println!("< reason: {reason}"); } if let Some(grant_root) = grant_root.as_deref() { println!("< grant root: {}", grant_root.display()); } let response = FileChangeRequestApprovalResponse { decision: ApprovalDecision::Accept, }; self.send_server_request_response(request_id, &response)?; println!("< approved fileChange request for item {item_id}"); Ok(()) } fn send_server_request_response(&mut self, request_id: RequestId, response: &T) -> Result<()> where T: Serialize, { let message = JSONRPCMessage::Response(JSONRPCResponse { id: request_id, result: serde_json::to_value(response)?, }); self.write_jsonrpc_message(message) } fn write_jsonrpc_message(&mut self, message: JSONRPCMessage) -> Result<()> { let payload = serde_json::to_string(&message)?; let pretty = serde_json::to_string_pretty(&message)?; print_multiline_with_prefix("> ", &pretty); if let Some(stdin) = self.stdin.as_mut() { writeln!(stdin, "{payload}")?; stdin .flush() .context("failed to flush response to codex app-server")?; return Ok(()); } bail!("codex app-server stdin closed") } } fn print_multiline_with_prefix(prefix: &str, payload: &str) { for line in payload.lines() { println!("{prefix}{line}"); } } impl Drop for CodexClient { fn drop(&mut self) { let _ = self.stdin.take(); if let Ok(Some(status)) = self.child.try_wait() { println!("[codex app-server exited: {status}]"); return; } thread::sleep(Duration::from_millis(100)); if let Ok(Some(status)) = self.child.try_wait() { println!("[codex app-server exited: {status}]"); return; } let _ = self.child.kill(); let _ = self.child.wait(); } }