mirror of
https://github.com/openai/codex.git
synced 2026-05-17 04:01:08 +03:00
Compare commits
4 Commits
dev/sdk-py
...
owen/bette
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d99152db9 | ||
|
|
2c4563ba28 | ||
|
|
0762963147 | ||
|
|
a37e0df551 |
@@ -1,5 +1,6 @@
|
||||
mod pid;
|
||||
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use serde::Serialize;
|
||||
@@ -31,3 +32,15 @@ pub(crate) fn pid_backend(paths: BackendPaths) -> PidBackend {
|
||||
pub(crate) fn pid_update_loop_backend(paths: BackendPaths) -> PidBackend {
|
||||
PidBackend::new_update_loop(paths.codex_bin, paths.update_pid_file)
|
||||
}
|
||||
|
||||
pub(crate) async fn append_stderr_log_tail_context(pid_file: &Path, context: &mut String) {
|
||||
match pid::read_stderr_log_tail(pid_file).await {
|
||||
Ok(Some(tail)) => tail.append_to_context(context),
|
||||
Ok(None) => {}
|
||||
Err(err) => {
|
||||
context.push_str(&format!(
|
||||
"\n\nFailed to read managed app-server stderr log: {err:#}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::io::SeekFrom;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
#[cfg(unix)]
|
||||
@@ -10,6 +11,8 @@ use anyhow::bail;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
#[cfg(unix)]
|
||||
use tokio::process::Command;
|
||||
use tokio::time::sleep;
|
||||
@@ -18,6 +21,7 @@ const STOP_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
const STOP_GRACE_PERIOD: Duration = Duration::from_secs(60);
|
||||
const STOP_TIMEOUT: Duration = Duration::from_secs(70);
|
||||
const START_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const STDERR_LOG_TAIL_BYTES: u64 = 4096;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
@@ -35,6 +39,25 @@ struct PidRecord {
|
||||
process_start_time: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct PidLogTail {
|
||||
pub(crate) path: PathBuf,
|
||||
pub(crate) contents: String,
|
||||
}
|
||||
|
||||
impl PidLogTail {
|
||||
pub(crate) fn append_to_context(&self, context: &mut String) {
|
||||
context.push_str(&format!(
|
||||
"\n\nManaged app-server stderr ({}):",
|
||||
self.path.display()
|
||||
));
|
||||
for line in self.contents.lines() {
|
||||
context.push_str("\n ");
|
||||
context.push_str(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
enum PidFileState {
|
||||
Missing,
|
||||
@@ -129,11 +152,18 @@ impl PidBackend {
|
||||
}
|
||||
};
|
||||
let mut command = Command::new(&self.codex_bin);
|
||||
let stderr_log = match self.open_stderr_log().await {
|
||||
Ok(stderr_log) => stderr_log,
|
||||
Err(err) => {
|
||||
let _ = fs::remove_file(&self.pid_file).await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
command
|
||||
.args(self.command_args())
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null());
|
||||
.stderr(Stdio::from(stderr_log.into_std().await));
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
@@ -169,8 +199,11 @@ impl PidBackend {
|
||||
},
|
||||
Err(err) => {
|
||||
let _ = self.terminate_process(pid);
|
||||
let mut context =
|
||||
format!("failed to record pid-managed app-server process {pid} startup");
|
||||
super::append_stderr_log_tail_context(&self.pid_file, &mut context).await;
|
||||
let _ = fs::remove_file(&self.pid_file).await;
|
||||
return Err(err);
|
||||
return Err(err).context(context);
|
||||
}
|
||||
};
|
||||
let contents = serde_json::to_vec(&record).context("failed to serialize pid record")?;
|
||||
@@ -344,6 +377,23 @@ impl PidBackend {
|
||||
Ok(reservation_lock)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn open_stderr_log(&self) -> Result<fs::File> {
|
||||
let stderr_log_file = stderr_log_file_for_pid_file(&self.pid_file);
|
||||
fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(&stderr_log_file)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to open stderr log for pid-managed app server {}",
|
||||
stderr_log_file.display()
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn command_args(&self) -> Vec<&'static str> {
|
||||
match self.command_kind {
|
||||
@@ -376,6 +426,56 @@ impl PidBackend {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn read_stderr_log_tail(pid_file: &Path) -> Result<Option<PidLogTail>> {
|
||||
let path = stderr_log_file_for_pid_file(pid_file);
|
||||
let Some(contents) = read_log_tail(&path, STDERR_LOG_TAIL_BYTES).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(PidLogTail { path, contents }))
|
||||
}
|
||||
|
||||
fn stderr_log_file_for_pid_file(pid_file: &Path) -> PathBuf {
|
||||
pid_file.with_extension("stderr.log")
|
||||
}
|
||||
|
||||
async fn read_log_tail(path: &Path, byte_limit: u64) -> Result<Option<String>> {
|
||||
let mut file = match fs::File::open(path).await {
|
||||
Ok(file) => file,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(err) => {
|
||||
return Err(err)
|
||||
.with_context(|| format!("failed to open stderr log {}", path.display()));
|
||||
}
|
||||
};
|
||||
let len = file
|
||||
.metadata()
|
||||
.await
|
||||
.with_context(|| format!("failed to inspect stderr log {}", path.display()))?
|
||||
.len();
|
||||
if len == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let start = len.saturating_sub(byte_limit);
|
||||
file.seek(SeekFrom::Start(start))
|
||||
.await
|
||||
.with_context(|| format!("failed to seek stderr log {}", path.display()))?;
|
||||
let mut bytes = Vec::new();
|
||||
file.read_to_end(&mut bytes)
|
||||
.await
|
||||
.with_context(|| format!("failed to read stderr log {}", path.display()))?;
|
||||
if start > 0
|
||||
&& let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n')
|
||||
{
|
||||
bytes.drain(..=newline_index);
|
||||
}
|
||||
let contents = String::from_utf8_lossy(&bytes).trim_end().to_string();
|
||||
if contents.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
Ok(Some(contents))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn process_exists(pid: u32) -> bool {
|
||||
let Ok(pid) = libc::pid_t::try_from(pid) else {
|
||||
|
||||
@@ -6,7 +6,10 @@ use tempfile::TempDir;
|
||||
use super::PidBackend;
|
||||
use super::PidCommandKind;
|
||||
use super::PidFileState;
|
||||
use super::PidLogTail;
|
||||
use super::PidRecord;
|
||||
use super::read_stderr_log_tail;
|
||||
use super::stderr_log_file_for_pid_file;
|
||||
use super::try_lock_file;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -170,3 +173,24 @@ fn app_server_remote_control_uses_runtime_flag() {
|
||||
vec!["app-server", "--remote-control", "--listen", "unix://"]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_stderr_log_tail_returns_recent_complete_lines() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let pid_file = temp_dir.path().join("app-server.pid");
|
||||
let log_file = stderr_log_file_for_pid_file(&pid_file);
|
||||
let contents = format!("{}\nrecent error\nusage", "x".repeat(4100));
|
||||
tokio::fs::write(&log_file, contents)
|
||||
.await
|
||||
.expect("write stderr log");
|
||||
|
||||
assert_eq!(
|
||||
read_stderr_log_tail(&pid_file)
|
||||
.await
|
||||
.expect("read stderr log"),
|
||||
Some(PidLogTail {
|
||||
path: log_file,
|
||||
contents: "recent error\nusage".to_string(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::InitializeResponse;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
@@ -14,12 +15,16 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_uds::UnixStream;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::client_async;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
const PROBE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
pub(crate) const CONTROL_SOCKET_RESPONSE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const CLIENT_NAME: &str = "codex_app_server_daemon";
|
||||
const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct ProbeInfo {
|
||||
@@ -27,7 +32,7 @@ pub(crate) struct ProbeInfo {
|
||||
}
|
||||
|
||||
pub(crate) async fn probe(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
timeout(PROBE_TIMEOUT, probe_inner(socket_path))
|
||||
timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, probe_inner(socket_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
@@ -38,54 +43,14 @@ pub(crate) async fn probe(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
}
|
||||
|
||||
async fn probe_inner(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to connect to {}", socket_path.display()))?;
|
||||
let (mut websocket, _response) = client_async("ws://localhost/", stream)
|
||||
.await
|
||||
.with_context(|| format!("failed to upgrade {}", socket_path.display()))?;
|
||||
|
||||
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: "initialize".to_string(),
|
||||
params: Some(serde_json::to_value(InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: CLIENT_NAME.to_string(),
|
||||
title: Some("Codex App Server Daemon".to_string()),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
})?),
|
||||
trace: None,
|
||||
});
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(&initialize)?.into()))
|
||||
.await
|
||||
.context("failed to send initialize request")?;
|
||||
|
||||
let response = loop {
|
||||
let frame = websocket
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("app-server closed before initialize response"))??;
|
||||
let Message::Text(payload) = frame else {
|
||||
continue;
|
||||
};
|
||||
let message = serde_json::from_str::<JSONRPCMessage>(&payload)?;
|
||||
if let JSONRPCMessage::Response(response) = message
|
||||
&& response.id == RequestId::Integer(1)
|
||||
{
|
||||
break response;
|
||||
}
|
||||
};
|
||||
let initialize_response = serde_json::from_value::<InitializeResponse>(response.result)?;
|
||||
let mut websocket = connect(socket_path).await?;
|
||||
|
||||
let initialize_response = initialize(&mut websocket, /*experimental_api*/ false).await?;
|
||||
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "initialized".to_string(),
|
||||
params: None,
|
||||
});
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(&initialized)?.into()))
|
||||
send_message(&mut websocket, &initialized)
|
||||
.await
|
||||
.context("failed to send initialized notification")?;
|
||||
websocket.close(None).await.ok();
|
||||
@@ -95,6 +60,91 @@ async fn probe_inner(socket_path: &Path) -> Result<ProbeInfo> {
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn connect(socket_path: &Path) -> Result<WebSocketStream<UnixStream>> {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to connect to {}", socket_path.display()))?;
|
||||
let (websocket, _response) = client_async("ws://localhost/", stream)
|
||||
.await
|
||||
.with_context(|| format!("failed to upgrade {}", socket_path.display()))?;
|
||||
Ok(websocket)
|
||||
}
|
||||
|
||||
pub(crate) async fn initialize<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
experimental_api: bool,
|
||||
) -> Result<InitializeResponse>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let initialize = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: INITIALIZE_REQUEST_ID,
|
||||
method: "initialize".to_string(),
|
||||
params: Some(serde_json::to_value(InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: CLIENT_NAME.to_string(),
|
||||
title: Some("Codex App Server Daemon".to_string()),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
},
|
||||
capabilities: if experimental_api {
|
||||
Some(InitializeCapabilities {
|
||||
experimental_api: true,
|
||||
..Default::default()
|
||||
})
|
||||
} else {
|
||||
None
|
||||
},
|
||||
})?),
|
||||
trace: None,
|
||||
});
|
||||
send_message(websocket, &initialize)
|
||||
.await
|
||||
.context("failed to send initialize request")?;
|
||||
|
||||
let response = loop {
|
||||
let message = timeout(CONTROL_SOCKET_RESPONSE_TIMEOUT, read_message(websocket))
|
||||
.await
|
||||
.context("timed out waiting for initialize response")??;
|
||||
if let JSONRPCMessage::Response(response) = message
|
||||
&& response.id == INITIALIZE_REQUEST_ID
|
||||
{
|
||||
break response;
|
||||
}
|
||||
};
|
||||
serde_json::from_value::<InitializeResponse>(response.result)
|
||||
.context("failed to parse initialize response")
|
||||
}
|
||||
|
||||
pub(crate) async fn send_message<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
message: &JSONRPCMessage,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
websocket
|
||||
.send(Message::Text(serde_json::to_string(message)?.into()))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn read_message<S>(websocket: &mut WebSocketStream<S>) -> Result<JSONRPCMessage>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
loop {
|
||||
let frame = websocket
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("app-server closed the control socket"))??;
|
||||
let Message::Text(payload) = frame else {
|
||||
continue;
|
||||
};
|
||||
return serde_json::from_str::<JSONRPCMessage>(&payload)
|
||||
.context("failed to parse app-server JSON-RPC message");
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_version_from_user_agent(user_agent: &str) -> Result<String> {
|
||||
let (_originator, rest) = user_agent
|
||||
.split_once('/')
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
mod backend;
|
||||
mod client;
|
||||
mod managed_install;
|
||||
mod remote_control_client;
|
||||
mod settings;
|
||||
mod update_loop;
|
||||
|
||||
@@ -13,6 +14,7 @@ use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
pub use backend::BackendKind;
|
||||
use backend::BackendPaths;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_transport::app_server_control_socket_path;
|
||||
use codex_utils_home_dir::find_codex_home;
|
||||
use managed_install::managed_codex_bin;
|
||||
@@ -58,6 +60,8 @@ pub struct LifecycleOutput {
|
||||
pub backend: Option<BackendKind>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pid: Option<u32>,
|
||||
pub managed_codex_path: PathBuf,
|
||||
pub managed_codex_version: Option<String>,
|
||||
pub socket_path: PathBuf,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub cli_version: Option<String>,
|
||||
@@ -84,6 +88,7 @@ pub struct BootstrapOutput {
|
||||
pub auto_update_enabled: bool,
|
||||
pub remote_control_enabled: bool,
|
||||
pub managed_codex_path: PathBuf,
|
||||
pub managed_codex_version: Option<String>,
|
||||
pub socket_path: PathBuf,
|
||||
pub cli_version: String,
|
||||
pub app_server_version: String,
|
||||
@@ -96,6 +101,20 @@ pub enum RemoteControlStartOutput {
|
||||
Start(LifecycleOutput),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteControlReadyStatus {
|
||||
pub status: RemoteControlConnectionStatus,
|
||||
pub server_name: String,
|
||||
pub environment_id: Option<String>,
|
||||
pub timed_out: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteControlReadyOutput {
|
||||
pub daemon: RemoteControlStartOutput,
|
||||
pub remote_control: RemoteControlReadyStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RemoteControlMode {
|
||||
Enabled,
|
||||
@@ -179,6 +198,27 @@ pub async fn ensure_remote_control_started() -> Result<RemoteControlStartOutput>
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn ensure_remote_control_ready() -> Result<RemoteControlReadyOutput> {
|
||||
ensure_supported_platform()?;
|
||||
Daemon::from_environment()?
|
||||
.ensure_remote_control_ready()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn enable_remote_control_on_socket(
|
||||
socket_path: &Path,
|
||||
connect_timeout: Duration,
|
||||
connect_retry_delay: Duration,
|
||||
) -> Result<RemoteControlReadyStatus> {
|
||||
ensure_supported_platform()?;
|
||||
remote_control_client::enable_remote_control_with_connect_retry(
|
||||
socket_path,
|
||||
connect_timeout,
|
||||
connect_retry_delay,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_remote_control(mode: RemoteControlMode) -> Result<RemoteControlOutput> {
|
||||
ensure_supported_platform()?;
|
||||
Daemon::from_environment()?.set_remote_control(mode).await
|
||||
@@ -248,33 +288,39 @@ impl Daemon {
|
||||
async fn start(&self) -> Result<LifecycleOutput> {
|
||||
let settings = self.load_settings().await?;
|
||||
if let Ok(info) = client::probe(&self.socket_path).await {
|
||||
return Ok(self.output(
|
||||
LifecycleStatus::AlreadyRunning,
|
||||
self.running_backend(&settings).await?,
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
));
|
||||
return Ok(self
|
||||
.output(
|
||||
LifecycleStatus::AlreadyRunning,
|
||||
self.running_backend(&settings).await?,
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
)
|
||||
.await);
|
||||
}
|
||||
|
||||
if self.running_backend_instance(&settings).await?.is_some() {
|
||||
let info = self.wait_until_ready().await?;
|
||||
return Ok(self.output(
|
||||
LifecycleStatus::AlreadyRunning,
|
||||
Some(BackendKind::Pid),
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
));
|
||||
return Ok(self
|
||||
.output(
|
||||
LifecycleStatus::AlreadyRunning,
|
||||
Some(BackendKind::Pid),
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
)
|
||||
.await);
|
||||
}
|
||||
|
||||
self.ensure_managed_codex_bin()?;
|
||||
let pid = self.start_managed_backend(&settings).await?;
|
||||
let info = self.wait_until_ready().await?;
|
||||
Ok(self.output(
|
||||
LifecycleStatus::Started,
|
||||
Some(BackendKind::Pid),
|
||||
pid,
|
||||
Some(info.app_server_version),
|
||||
))
|
||||
Ok(self
|
||||
.output(
|
||||
LifecycleStatus::Started,
|
||||
Some(BackendKind::Pid),
|
||||
pid,
|
||||
Some(info.app_server_version),
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn restart(&self) -> Result<LifecycleOutput> {
|
||||
@@ -294,12 +340,14 @@ impl Daemon {
|
||||
|
||||
let pid = self.start_managed_backend(&settings).await?;
|
||||
let info = self.wait_until_ready().await?;
|
||||
Ok(self.output(
|
||||
LifecycleStatus::Restarted,
|
||||
Some(BackendKind::Pid),
|
||||
pid,
|
||||
Some(info.app_server_version),
|
||||
))
|
||||
Ok(self
|
||||
.output(
|
||||
LifecycleStatus::Restarted,
|
||||
Some(BackendKind::Pid),
|
||||
pid,
|
||||
Some(info.app_server_version),
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
@@ -352,12 +400,14 @@ impl Daemon {
|
||||
let settings = self.load_settings().await?;
|
||||
if let Some(backend) = self.running_backend_instance(&settings).await? {
|
||||
backend.stop().await?;
|
||||
return Ok(self.output(
|
||||
LifecycleStatus::Stopped,
|
||||
Some(BackendKind::Pid),
|
||||
/*pid*/ None,
|
||||
/*app_server_version*/ None,
|
||||
));
|
||||
return Ok(self
|
||||
.output(
|
||||
LifecycleStatus::Stopped,
|
||||
Some(BackendKind::Pid),
|
||||
/*pid*/ None,
|
||||
/*app_server_version*/ None,
|
||||
)
|
||||
.await);
|
||||
}
|
||||
|
||||
if client::probe(&self.socket_path).await.is_ok() {
|
||||
@@ -366,23 +416,27 @@ impl Daemon {
|
||||
));
|
||||
}
|
||||
|
||||
Ok(self.output(
|
||||
LifecycleStatus::NotRunning,
|
||||
/*backend*/ None,
|
||||
/*pid*/ None,
|
||||
/*app_server_version*/ None,
|
||||
))
|
||||
Ok(self
|
||||
.output(
|
||||
LifecycleStatus::NotRunning,
|
||||
/*backend*/ None,
|
||||
/*pid*/ None,
|
||||
/*app_server_version*/ None,
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn version(&self) -> Result<LifecycleOutput> {
|
||||
let settings = self.load_settings().await?;
|
||||
let info = client::probe(&self.socket_path).await?;
|
||||
Ok(self.output(
|
||||
LifecycleStatus::Running,
|
||||
self.running_backend(&settings).await?,
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
))
|
||||
Ok(self
|
||||
.output(
|
||||
LifecycleStatus::Running,
|
||||
self.running_backend(&settings).await?,
|
||||
/*pid*/ None,
|
||||
Some(info.app_server_version),
|
||||
)
|
||||
.await)
|
||||
}
|
||||
|
||||
async fn wait_until_ready(&self) -> Result<client::ProbeInfo> {
|
||||
@@ -395,17 +449,34 @@ impl Daemon {
|
||||
sleep(START_POLL_INTERVAL).await;
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(err).with_context(|| {
|
||||
format!(
|
||||
"app server did not become ready on {}",
|
||||
self.socket_path.display()
|
||||
)
|
||||
});
|
||||
let context = self.app_server_not_ready_context().await;
|
||||
return Err(err).context(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn app_server_not_ready_context(&self) -> String {
|
||||
let mut context = format!(
|
||||
"app server did not become ready on {}",
|
||||
self.socket_path.display()
|
||||
);
|
||||
self.append_daemon_app_server_context(&mut context).await;
|
||||
backend::append_stderr_log_tail_context(&self.pid_file, &mut context).await;
|
||||
context
|
||||
}
|
||||
|
||||
async fn append_daemon_app_server_context(&self, context: &mut String) {
|
||||
let managed_codex_version = self
|
||||
.managed_codex_version_best_effort()
|
||||
.await
|
||||
.unwrap_or_else(|| "unknown".to_string());
|
||||
context.push_str(&format!(
|
||||
"\n\nDaemon used app-server:\n path: {}\n version: {managed_codex_version}",
|
||||
self.managed_codex_bin.display()
|
||||
));
|
||||
}
|
||||
|
||||
async fn bootstrap(&self, options: BootstrapOptions) -> Result<BootstrapOutput> {
|
||||
let _operation_lock = self.acquire_operation_lock().await?;
|
||||
self.bootstrap_locked(options).await
|
||||
@@ -430,6 +501,16 @@ impl Daemon {
|
||||
Ok(RemoteControlStartOutput::Bootstrap(output))
|
||||
}
|
||||
|
||||
async fn ensure_remote_control_ready(&self) -> Result<RemoteControlReadyOutput> {
|
||||
let daemon = self.ensure_remote_control_started().await?;
|
||||
let remote_control =
|
||||
remote_control_client::enable_remote_control(&self.socket_path).await?;
|
||||
Ok(RemoteControlReadyOutput {
|
||||
daemon,
|
||||
remote_control,
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_remote_control(&self, mode: RemoteControlMode) -> Result<RemoteControlOutput> {
|
||||
let _operation_lock = self.acquire_operation_lock().await?;
|
||||
self.set_remote_control_locked(mode).await
|
||||
@@ -512,12 +593,14 @@ impl Daemon {
|
||||
updater.start().await?;
|
||||
|
||||
let info = self.wait_until_ready().await?;
|
||||
let managed_codex_version = self.managed_codex_version_best_effort().await;
|
||||
Ok(BootstrapOutput {
|
||||
status: BootstrapStatus::Bootstrapped,
|
||||
backend: BackendKind::Pid,
|
||||
auto_update_enabled: true,
|
||||
remote_control_enabled: settings.remote_control_enabled,
|
||||
managed_codex_path: self.managed_codex_bin.clone(),
|
||||
managed_codex_version,
|
||||
socket_path: self.socket_path.clone(),
|
||||
cli_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
app_server_version: info.app_server_version,
|
||||
@@ -577,6 +660,16 @@ impl Daemon {
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn managed_codex_version_best_effort(&self) -> Option<String> {
|
||||
managed_codex_version(&self.managed_codex_bin).await.ok()
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
async fn managed_codex_version_best_effort(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
|
||||
fn backend_paths(&self, settings: &DaemonSettings) -> BackendPaths {
|
||||
self.backend_paths_with_bin(settings, &self.managed_codex_bin)
|
||||
}
|
||||
@@ -636,17 +729,20 @@ impl Daemon {
|
||||
})
|
||||
}
|
||||
|
||||
fn output(
|
||||
async fn output(
|
||||
&self,
|
||||
status: LifecycleStatus,
|
||||
backend: Option<BackendKind>,
|
||||
pid: Option<u32>,
|
||||
app_server_version: Option<String>,
|
||||
) -> LifecycleOutput {
|
||||
let managed_codex_version = self.managed_codex_version_best_effort().await;
|
||||
LifecycleOutput {
|
||||
status,
|
||||
backend,
|
||||
pid,
|
||||
managed_codex_path: self.managed_codex_bin.clone(),
|
||||
managed_codex_version,
|
||||
socket_path: self.socket_path.clone(),
|
||||
cli_version: Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
app_server_version,
|
||||
@@ -735,10 +831,12 @@ fn try_lock_file(_file: &tokio::fs::File) -> Result<bool> {
|
||||
#[cfg(all(test, unix))]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use super::BackendKind;
|
||||
use super::BootstrapOutput;
|
||||
use super::BootstrapStatus;
|
||||
use super::Daemon;
|
||||
use super::LifecycleOutput;
|
||||
use super::LifecycleStatus;
|
||||
use super::RemoteControlStartOutput;
|
||||
@@ -751,22 +849,6 @@ mod tests {
|
||||
use super::should_reexec_updater;
|
||||
use crate::client::ProbeInfo;
|
||||
|
||||
#[test]
|
||||
fn lifecycle_status_uses_camel_case_json() {
|
||||
assert_eq!(
|
||||
serde_json::to_string(&LifecycleStatus::AlreadyRunning).expect("serialize"),
|
||||
"\"alreadyRunning\""
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bootstrap_status_uses_camel_case_json() {
|
||||
assert_eq!(
|
||||
serde_json::to_string(&BootstrapStatus::Bootstrapped).expect("serialize"),
|
||||
"\"bootstrapped\""
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_status_uses_camel_case_json() {
|
||||
assert_eq!(
|
||||
@@ -847,12 +929,26 @@ mod tests {
|
||||
status: LifecycleStatus::AlreadyRunning,
|
||||
backend: Some(BackendKind::Pid),
|
||||
pid: None,
|
||||
managed_codex_path: "codex".into(),
|
||||
managed_codex_version: Some("1.2.3".to_string()),
|
||||
socket_path: "codex.sock".into(),
|
||||
cli_version: Some("1.2.3".to_string()),
|
||||
app_server_version: Some("1.2.4".to_string()),
|
||||
};
|
||||
let output = RemoteControlStartOutput::Start(lifecycle_output.clone());
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&lifecycle_output).expect("serialize"),
|
||||
serde_json::json!({
|
||||
"status": "alreadyRunning",
|
||||
"backend": "pid",
|
||||
"managedCodexPath": "codex",
|
||||
"managedCodexVersion": "1.2.3",
|
||||
"socketPath": "codex.sock",
|
||||
"cliVersion": "1.2.3",
|
||||
"appServerVersion": "1.2.4",
|
||||
})
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(output).expect("serialize"),
|
||||
serde_json::to_value(lifecycle_output).expect("serialize")
|
||||
@@ -864,15 +960,59 @@ mod tests {
|
||||
auto_update_enabled: true,
|
||||
remote_control_enabled: true,
|
||||
managed_codex_path: "codex".into(),
|
||||
managed_codex_version: Some("1.2.3".to_string()),
|
||||
socket_path: "codex.sock".into(),
|
||||
cli_version: "1.2.3".to_string(),
|
||||
app_server_version: "1.2.4".to_string(),
|
||||
};
|
||||
let output = RemoteControlStartOutput::Bootstrap(bootstrap_output.clone());
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(&bootstrap_output).expect("serialize"),
|
||||
serde_json::json!({
|
||||
"status": "bootstrapped",
|
||||
"backend": "pid",
|
||||
"autoUpdateEnabled": true,
|
||||
"remoteControlEnabled": true,
|
||||
"managedCodexPath": "codex",
|
||||
"managedCodexVersion": "1.2.3",
|
||||
"socketPath": "codex.sock",
|
||||
"cliVersion": "1.2.3",
|
||||
"appServerVersion": "1.2.4",
|
||||
})
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(output).expect("serialize"),
|
||||
serde_json::to_value(bootstrap_output).expect("serialize")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn not_ready_context_reports_daemon_app_server_before_stderr() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let daemon = Daemon {
|
||||
socket_path: temp_dir.path().join("app-server-control.sock"),
|
||||
pid_file: temp_dir.path().join("app-server.pid"),
|
||||
update_pid_file: temp_dir.path().join("app-server-updater.pid"),
|
||||
operation_lock_file: temp_dir.path().join("daemon.lock"),
|
||||
settings_file: temp_dir.path().join("settings.json"),
|
||||
managed_codex_bin: temp_dir.path().join("missing-codex"),
|
||||
};
|
||||
let stderr_log = daemon.pid_file.with_extension("stderr.log");
|
||||
tokio::fs::write(&stderr_log, "unexpected argument")
|
||||
.await
|
||||
.expect("write stderr log");
|
||||
|
||||
assert_eq!(
|
||||
daemon.app_server_not_ready_context().await,
|
||||
format!(
|
||||
"app server did not become ready on {}\n\n\
|
||||
Daemon used app-server:\n path: {}\n version: unknown\n\n\
|
||||
Managed app-server stderr ({}):\n unexpected argument",
|
||||
daemon.socket_path.display(),
|
||||
daemon.managed_codex_bin.display(),
|
||||
stderr_log.display()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
459
codex-rs/app-server-daemon/src/remote_control_client.rs
Normal file
459
codex-rs/app-server-daemon/src/remote_control_client.rs
Normal file
@@ -0,0 +1,459 @@
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_protocol::RemoteControlEnableResponse;
|
||||
use codex_app_server_protocol::RemoteControlStatusChangedNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
|
||||
use crate::RemoteControlReadyStatus;
|
||||
use crate::client;
|
||||
|
||||
const REMOTE_CONTROL_READY_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_CONTROL_ENABLE_REQUEST_ID: RequestId = RequestId::Integer(2);
|
||||
|
||||
pub(crate) async fn enable_remote_control(socket_path: &Path) -> Result<RemoteControlReadyStatus> {
|
||||
let mut websocket = client::connect(socket_path).await?;
|
||||
enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await
|
||||
}
|
||||
|
||||
pub(crate) async fn enable_remote_control_with_connect_retry(
|
||||
socket_path: &Path,
|
||||
connect_timeout: Duration,
|
||||
connect_retry_delay: Duration,
|
||||
) -> Result<RemoteControlReadyStatus> {
|
||||
let mut websocket =
|
||||
connect_with_retry(socket_path, connect_timeout, connect_retry_delay).await?;
|
||||
enable_remote_control_with_timeout(&mut websocket, REMOTE_CONTROL_READY_TIMEOUT).await
|
||||
}
|
||||
|
||||
async fn enable_remote_control_with_timeout<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
ready_timeout: Duration,
|
||||
) -> Result<RemoteControlReadyStatus>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
client::initialize(websocket, /*experimental_api*/ true).await?;
|
||||
let initialized = JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "initialized".to_string(),
|
||||
params: None,
|
||||
});
|
||||
client::send_message(websocket, &initialized)
|
||||
.await
|
||||
.context("failed to send initialized notification")?;
|
||||
|
||||
let enable = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: REMOTE_CONTROL_ENABLE_REQUEST_ID,
|
||||
method: "remoteControl/enable".to_string(),
|
||||
params: None,
|
||||
trace: None,
|
||||
});
|
||||
client::send_message(websocket, &enable)
|
||||
.await
|
||||
.context("failed to send remoteControl/enable request")?;
|
||||
|
||||
let mut latest = read_enable_response(websocket).await?;
|
||||
if latest.status == RemoteControlConnectionStatus::Connecting {
|
||||
latest = wait_for_remote_control_status(websocket, latest, ready_timeout).await?;
|
||||
}
|
||||
websocket.close(None).await.ok();
|
||||
Ok(latest)
|
||||
}
|
||||
|
||||
async fn connect_with_retry(
|
||||
socket_path: &Path,
|
||||
connect_timeout: Duration,
|
||||
connect_retry_delay: Duration,
|
||||
) -> Result<WebSocketStream<codex_uds::UnixStream>> {
|
||||
let deadline = Instant::now() + connect_timeout;
|
||||
loop {
|
||||
match client::connect(socket_path).await {
|
||||
Ok(websocket) => return Ok(websocket),
|
||||
Err(_) if Instant::now() < deadline => {
|
||||
sleep(connect_retry_delay).await;
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(error).with_context(|| {
|
||||
format!(
|
||||
"app server did not become ready on {}",
|
||||
socket_path.display()
|
||||
)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_enable_response<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
) -> Result<RemoteControlReadyStatus>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
loop {
|
||||
let message = timeout(
|
||||
client::CONTROL_SOCKET_RESPONSE_TIMEOUT,
|
||||
client::read_message(websocket),
|
||||
)
|
||||
.await
|
||||
.context("timed out waiting for remoteControl/enable response")??;
|
||||
match message {
|
||||
JSONRPCMessage::Response(response)
|
||||
if response.id == REMOTE_CONTROL_ENABLE_REQUEST_ID =>
|
||||
{
|
||||
let response =
|
||||
serde_json::from_value::<RemoteControlEnableResponse>(response.result)
|
||||
.context("failed to parse remoteControl/enable response")?;
|
||||
return Ok(RemoteControlReadyStatus::from(response));
|
||||
}
|
||||
JSONRPCMessage::Error(err) if err.id == REMOTE_CONTROL_ENABLE_REQUEST_ID => {
|
||||
return Err(anyhow!(
|
||||
"remoteControl/enable failed: {}",
|
||||
err.error.message
|
||||
));
|
||||
}
|
||||
JSONRPCMessage::Notification(notification)
|
||||
if remote_control_status_notification(¬ification).is_some() =>
|
||||
{
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_remote_control_status<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
mut latest: RemoteControlReadyStatus,
|
||||
ready_timeout: Duration,
|
||||
) -> Result<RemoteControlReadyStatus>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
let deadline = tokio::time::Instant::now() + ready_timeout;
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
let message = match timeout(remaining, client::read_message(websocket)).await {
|
||||
Ok(Ok(message)) => message,
|
||||
Ok(Err(err)) => return Err(err),
|
||||
Err(_) => {
|
||||
latest.timed_out = true;
|
||||
return Ok(latest);
|
||||
}
|
||||
};
|
||||
let JSONRPCMessage::Notification(notification) = message else {
|
||||
continue;
|
||||
};
|
||||
let Some(status) = remote_control_status_notification(¬ification) else {
|
||||
continue;
|
||||
};
|
||||
latest = RemoteControlReadyStatus::from(status);
|
||||
if latest.status != RemoteControlConnectionStatus::Connecting {
|
||||
return Ok(latest);
|
||||
}
|
||||
}
|
||||
latest.timed_out = true;
|
||||
Ok(latest)
|
||||
}
|
||||
|
||||
fn remote_control_status_notification(
|
||||
notification: &JSONRPCNotification,
|
||||
) -> Option<RemoteControlStatusChangedNotification> {
|
||||
if notification.method != "remoteControl/status/changed" {
|
||||
return None;
|
||||
}
|
||||
let params = notification.params.clone()?;
|
||||
serde_json::from_value(params).ok()
|
||||
}
|
||||
|
||||
impl From<RemoteControlEnableResponse> for RemoteControlReadyStatus {
|
||||
fn from(response: RemoteControlEnableResponse) -> Self {
|
||||
let RemoteControlEnableResponse {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = response;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemoteControlStatusChangedNotification> for RemoteControlReadyStatus {
|
||||
fn from(notification: RemoteControlStatusChangedNotification) -> Self {
|
||||
let RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name,
|
||||
installation_id: _,
|
||||
environment_id,
|
||||
} = notification;
|
||||
Self {
|
||||
status,
|
||||
server_name,
|
||||
environment_id,
|
||||
timed_out: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, unix))]
|
||||
mod tests {
|
||||
use anyhow::Result;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_uds::UnixListener;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio_tungstenite::accept_async;
|
||||
|
||||
use super::*;
|
||||
|
||||
const INITIALIZE_REQUEST_ID: RequestId = RequestId::Integer(1);
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
const TEST_SERVER_NAME: &str = "owen-mbp";
|
||||
const TEST_CODEX_HOME: &str = "/tmp/codex-home";
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_uses_connected_enable_response_without_later_notification()
|
||||
-> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: Some(remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
)),
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connected,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_waits_for_connected_notification() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: Some(remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected,
|
||||
Some("env_test"),
|
||||
)),
|
||||
ready_timeout: Duration::from_secs(1),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connected,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_reports_connecting_after_timeout() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Connecting,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: None,
|
||||
timed_out: true,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enable_remote_control_returns_errored_enable_response() -> Result<()> {
|
||||
let status = run_enable_remote_control_scenario(EnableScenario {
|
||||
initial_notification: None,
|
||||
enable_response: remote_control_status(
|
||||
RemoteControlConnectionStatus::Errored,
|
||||
/*environment_id*/ None,
|
||||
),
|
||||
after_enable_notification: None,
|
||||
ready_timeout: Duration::from_millis(20),
|
||||
})
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
status,
|
||||
RemoteControlReadyStatus {
|
||||
status: RemoteControlConnectionStatus::Errored,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
environment_id: None,
|
||||
timed_out: false,
|
||||
}
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnableScenario {
|
||||
initial_notification: Option<RemoteControlStatusChangedNotification>,
|
||||
enable_response: RemoteControlStatusChangedNotification,
|
||||
after_enable_notification: Option<RemoteControlStatusChangedNotification>,
|
||||
ready_timeout: Duration,
|
||||
}
|
||||
|
||||
async fn run_enable_remote_control_scenario(
|
||||
scenario: EnableScenario,
|
||||
) -> Result<RemoteControlReadyStatus> {
|
||||
let dir = TempDir::new()?;
|
||||
let socket_path = dir.path().join("app-server.sock");
|
||||
let listener = UnixListener::bind(&socket_path).await?;
|
||||
let ready_timeout = scenario.ready_timeout;
|
||||
let server_task = tokio::spawn(serve_enable_remote_control_scenario(listener, scenario));
|
||||
|
||||
let mut websocket = client::connect(&socket_path).await?;
|
||||
let status = enable_remote_control_with_timeout(&mut websocket, ready_timeout).await?;
|
||||
server_task.await??;
|
||||
Ok(status)
|
||||
}
|
||||
|
||||
async fn serve_enable_remote_control_scenario(
|
||||
mut listener: UnixListener,
|
||||
scenario: EnableScenario,
|
||||
) -> Result<()> {
|
||||
let stream = listener.accept().await?;
|
||||
let mut websocket = accept_async(stream).await?;
|
||||
|
||||
let initialize = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Request(initialize) = initialize else {
|
||||
panic!("expected initialize request");
|
||||
};
|
||||
assert_eq!(initialize.id, INITIALIZE_REQUEST_ID);
|
||||
assert_eq!(initialize.method, "initialize");
|
||||
let Some(initialize_params) = initialize.params else {
|
||||
panic!("expected initialize params");
|
||||
};
|
||||
assert_eq!(
|
||||
initialize_params["capabilities"]["experimentalApi"],
|
||||
serde_json::Value::Bool(true)
|
||||
);
|
||||
client::send_message(
|
||||
&mut websocket,
|
||||
&JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: INITIALIZE_REQUEST_ID,
|
||||
result: serde_json::json!({
|
||||
"userAgent": "codex_app_server/1.2.3",
|
||||
"codexHome": TEST_CODEX_HOME,
|
||||
"platformFamily": "unix",
|
||||
"platformOs": "macos",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let initialized = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Notification(initialized) = initialized else {
|
||||
panic!("expected initialized notification");
|
||||
};
|
||||
assert_eq!(initialized.method, "initialized");
|
||||
|
||||
if let Some(status) = scenario.initial_notification {
|
||||
send_remote_control_status(&mut websocket, status).await?;
|
||||
}
|
||||
|
||||
let enable = client::read_message(&mut websocket).await?;
|
||||
let JSONRPCMessage::Request(enable) = enable else {
|
||||
panic!("expected remoteControl/enable request");
|
||||
};
|
||||
assert_eq!(enable.id, REMOTE_CONTROL_ENABLE_REQUEST_ID);
|
||||
assert_eq!(enable.method, "remoteControl/enable");
|
||||
client::send_message(
|
||||
&mut websocket,
|
||||
&JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: REMOTE_CONTROL_ENABLE_REQUEST_ID,
|
||||
result: serde_json::to_value(RemoteControlEnableResponse::from(
|
||||
scenario.enable_response,
|
||||
))?,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(status) = scenario.after_enable_notification {
|
||||
send_remote_control_status(&mut websocket, status).await?;
|
||||
} else {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_remote_control_status<S>(
|
||||
websocket: &mut WebSocketStream<S>,
|
||||
status: RemoteControlStatusChangedNotification,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
|
||||
{
|
||||
client::send_message(
|
||||
websocket,
|
||||
&JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: "remoteControl/status/changed".to_string(),
|
||||
params: Some(serde_json::to_value(status)?),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn remote_control_status(
|
||||
status: RemoteControlConnectionStatus,
|
||||
environment_id: Option<&str>,
|
||||
) -> RemoteControlStatusChangedNotification {
|
||||
RemoteControlStatusChangedNotification {
|
||||
status,
|
||||
server_name: TEST_SERVER_NAME.to_string(),
|
||||
installation_id: TEST_INSTALLATION_ID.to_string(),
|
||||
environment_id: environment_id.map(str::to_string),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -400,6 +400,7 @@ pub enum PluginStartupTasks {
|
||||
pub struct AppServerRuntimeOptions {
|
||||
pub plugin_startup_tasks: PluginStartupTasks,
|
||||
pub remote_control_enabled: bool,
|
||||
pub install_shutdown_signal_handler: bool,
|
||||
}
|
||||
|
||||
impl Default for AppServerRuntimeOptions {
|
||||
@@ -407,6 +408,7 @@ impl Default for AppServerRuntimeOptions {
|
||||
Self {
|
||||
plugin_startup_tasks: PluginStartupTasks::Start,
|
||||
remote_control_enabled: false,
|
||||
install_shutdown_signal_handler: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -645,7 +647,8 @@ pub async fn run_main_with_transport_options(
|
||||
|
||||
let single_client_mode = matches!(&transport, AppServerTransport::Stdio);
|
||||
let shutdown_when_no_connections = single_client_mode;
|
||||
let graceful_signal_restart_enabled = !single_client_mode;
|
||||
let graceful_signal_restart_enabled =
|
||||
runtime_options.install_shutdown_signal_handler && !single_client_mode;
|
||||
let mut app_server_client_name_rx = None;
|
||||
|
||||
match &transport {
|
||||
|
||||
@@ -52,6 +52,7 @@ mod doctor;
|
||||
mod marketplace_cmd;
|
||||
mod mcp_cmd;
|
||||
mod plugin_cmd;
|
||||
mod remote_control_cmd;
|
||||
mod state_db_recovery;
|
||||
#[cfg(not(windows))]
|
||||
mod wsl_paths;
|
||||
@@ -59,6 +60,7 @@ mod wsl_paths;
|
||||
use crate::mcp_cmd::McpCli;
|
||||
use crate::plugin_cmd::PluginCli;
|
||||
use crate::plugin_cmd::PluginSubcommand;
|
||||
use crate::remote_control_cmd::RemoteControlCommand;
|
||||
use doctor::DoctorCommand;
|
||||
use state_db_recovery as local_state_db;
|
||||
|
||||
@@ -559,21 +561,6 @@ struct AppServerBootstrapCommand {
|
||||
remote_control: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
struct RemoteControlCommand {
|
||||
#[command(subcommand)]
|
||||
subcommand: Option<RemoteControlSubcommand>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, clap::Subcommand)]
|
||||
enum RemoteControlSubcommand {
|
||||
/// Start the app-server daemon with remote control enabled.
|
||||
Start,
|
||||
|
||||
/// Stop the app-server daemon.
|
||||
Stop,
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
struct GenerateTsCommand {
|
||||
/// Output directory where .ts files will be written
|
||||
@@ -1063,24 +1050,18 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control_cli)) => {
|
||||
let subcommand_name = remote_control_subcommand_name(&remote_control_cli);
|
||||
let subcommand_name = remote_control_cli.subcommand_name();
|
||||
reject_remote_mode_for_subcommand(
|
||||
root_remote.as_deref(),
|
||||
root_remote_auth_token_env.as_deref(),
|
||||
subcommand_name,
|
||||
)?;
|
||||
match remote_control_cli
|
||||
.subcommand
|
||||
.unwrap_or(RemoteControlSubcommand::Start)
|
||||
{
|
||||
RemoteControlSubcommand::Start => {
|
||||
let output = codex_app_server_daemon::ensure_remote_control_started().await?;
|
||||
println!("{}", serde_json::to_string(&output)?);
|
||||
}
|
||||
RemoteControlSubcommand::Stop => {
|
||||
print_app_server_daemon_output(AppServerLifecycleCommand::Stop).await?;
|
||||
}
|
||||
}
|
||||
remote_control_cmd::run(
|
||||
remote_control_cli,
|
||||
arg0_paths.clone(),
|
||||
root_config_overrides,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
Some(Subcommand::App(app_cli)) => {
|
||||
@@ -1802,9 +1783,7 @@ fn unsupported_subcommand_name_for_strict_config(
|
||||
Some(Subcommand::AppServer(app_server)) => {
|
||||
Some(app_server_subcommand_name(app_server.subcommand.as_ref()))
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control)) => {
|
||||
Some(remote_control_subcommand_name(remote_control))
|
||||
}
|
||||
Some(Subcommand::RemoteControl(remote_control)) => Some(remote_control.subcommand_name()),
|
||||
Some(Subcommand::Mcp(_)) => Some("mcp"),
|
||||
Some(Subcommand::Plugin(_)) => Some("plugin"),
|
||||
#[cfg(any(target_os = "macos", target_os = "windows"))]
|
||||
@@ -1857,14 +1836,6 @@ fn reject_remote_mode_for_app_server_subcommand(
|
||||
reject_remote_mode_for_subcommand(remote, remote_auth_token_env, subcommand_name)
|
||||
}
|
||||
|
||||
fn remote_control_subcommand_name(command: &RemoteControlCommand) -> &'static str {
|
||||
match command.subcommand {
|
||||
None => "remote-control",
|
||||
Some(RemoteControlSubcommand::Start) => "remote-control start",
|
||||
Some(RemoteControlSubcommand::Stop) => "remote-control stop",
|
||||
}
|
||||
}
|
||||
|
||||
fn app_server_subcommand_name(subcommand: Option<&AppServerSubcommand>) -> &'static str {
|
||||
match subcommand {
|
||||
None => "app-server",
|
||||
@@ -2879,12 +2850,10 @@ mod tests {
|
||||
fn reject_remote_flag_for_remote_control() {
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "unix://", "remote-control"])
|
||||
.expect("parse");
|
||||
assert_matches!(
|
||||
cli.subcommand,
|
||||
Some(Subcommand::RemoteControl(RemoteControlCommand {
|
||||
subcommand: None
|
||||
}))
|
||||
);
|
||||
let Some(Subcommand::RemoteControl(remote_control)) = &cli.subcommand else {
|
||||
panic!("expected remote-control subcommand");
|
||||
};
|
||||
assert_eq!(remote_control.subcommand_name(), "remote-control");
|
||||
|
||||
let err = reject_remote_mode_for_subcommand(
|
||||
cli.remote.remote.as_deref(),
|
||||
|
||||
695
codex-rs/cli/src/remote_control_cmd.rs
Normal file
695
codex-rs/cli/src/remote_control_cmd.rs
Normal file
@@ -0,0 +1,695 @@
|
||||
use std::io::Write;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use clap::Args;
|
||||
use codex_app_server::AppServerRuntimeOptions;
|
||||
use codex_app_server::AppServerTransport;
|
||||
use codex_app_server::AppServerWebsocketAuthSettings;
|
||||
use codex_app_server_daemon::LifecycleCommand as AppServerLifecycleCommand;
|
||||
use codex_app_server_daemon::LifecycleOutput as AppServerLifecycleOutput;
|
||||
use codex_app_server_daemon::LifecycleStatus as AppServerLifecycleStatus;
|
||||
use codex_app_server_daemon::RemoteControlReadyOutput as AppServerRemoteControlReadyOutput;
|
||||
use codex_app_server_daemon::RemoteControlReadyStatus as AppServerRemoteControlReadyStatus;
|
||||
use codex_app_server_daemon::RemoteControlStartOutput as AppServerRemoteControlStartOutput;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const FOREGROUND_SOCKET_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const FOREGROUND_SOCKET_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(50);
|
||||
const FOREGROUND_APP_SERVER_ABORT_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub(crate) struct RemoteControlCommand {
|
||||
/// Emit machine-readable JSON.
|
||||
#[arg(long = "json", global = true)]
|
||||
json: bool,
|
||||
|
||||
#[command(subcommand)]
|
||||
subcommand: Option<RemoteControlSubcommand>,
|
||||
}
|
||||
|
||||
impl RemoteControlCommand {
|
||||
pub(crate) fn subcommand_name(&self) -> &'static str {
|
||||
match self.subcommand {
|
||||
None => "remote-control",
|
||||
Some(RemoteControlSubcommand::Start) => "remote-control start",
|
||||
Some(RemoteControlSubcommand::Stop) => "remote-control stop",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, clap::Subcommand)]
|
||||
enum RemoteControlSubcommand {
|
||||
/// Start the app-server daemon with remote control enabled.
|
||||
Start,
|
||||
|
||||
/// Stop the app-server daemon.
|
||||
Stop,
|
||||
}
|
||||
|
||||
pub(crate) async fn run(
|
||||
command: RemoteControlCommand,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
) -> anyhow::Result<()> {
|
||||
match command.subcommand {
|
||||
None => {
|
||||
print_remote_control_progress(
|
||||
command.json,
|
||||
"Starting app-server with remote control enabled...",
|
||||
)?;
|
||||
run_foreground_remote_control(command.json, arg0_paths, root_config_overrides).await?;
|
||||
}
|
||||
Some(RemoteControlSubcommand::Start) => {
|
||||
print_remote_control_progress(
|
||||
command.json,
|
||||
"Starting app-server daemon with remote control enabled...",
|
||||
)?;
|
||||
let output = codex_app_server_daemon::ensure_remote_control_ready().await?;
|
||||
print_remote_control_start_output(&output, command.json)?;
|
||||
}
|
||||
Some(RemoteControlSubcommand::Stop) => {
|
||||
print_remote_control_progress(command.json, "Stopping remote control...")?;
|
||||
let output = codex_app_server_daemon::run(AppServerLifecycleCommand::Stop).await?;
|
||||
print_remote_control_stop_output(&output, command.json)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_remote_control_progress(json: bool, message: &str) -> anyhow::Result<()> {
|
||||
if json {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("{message}");
|
||||
std::io::stdout()
|
||||
.flush()
|
||||
.context("failed to flush remote-control progress message")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_foreground_remote_control(
|
||||
json: bool,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
) -> anyhow::Result<()> {
|
||||
let socket_dir = tempfile::Builder::new()
|
||||
.prefix("codex-rc-")
|
||||
.tempdir_in("/tmp")
|
||||
.or_else(|_| tempfile::tempdir())
|
||||
.context("failed to create private app-server socket directory")?;
|
||||
let socket_path = socket_dir.path().join("rc.sock");
|
||||
let socket_path = AbsolutePathBuf::from_absolute_path(&socket_path)
|
||||
.context("private app-server socket path was not absolute")?;
|
||||
let transport = AppServerTransport::UnixSocket {
|
||||
socket_path: socket_path.clone(),
|
||||
};
|
||||
let runtime_options = AppServerRuntimeOptions {
|
||||
remote_control_enabled: true,
|
||||
install_shutdown_signal_handler: false,
|
||||
..Default::default()
|
||||
};
|
||||
let (stop_rx, stop_signal_task) = foreground_stop_signal();
|
||||
let mut app_server_task = tokio::spawn(codex_app_server::run_main_with_transport_options(
|
||||
arg0_paths,
|
||||
root_config_overrides,
|
||||
LoaderOverrides::default(),
|
||||
/*strict_config*/ false,
|
||||
/*default_analytics_enabled*/ false,
|
||||
transport,
|
||||
SessionSource::VSCode,
|
||||
AppServerWebsocketAuthSettings::default(),
|
||||
runtime_options,
|
||||
));
|
||||
|
||||
let summary = match wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
wait_for_foreground_remote_control_ready(socket_path),
|
||||
stop_rx.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
ForegroundStartupResult::Ready(summary) => summary,
|
||||
ForegroundStartupResult::Stopped => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Ok(());
|
||||
}
|
||||
ForegroundStartupResult::ReadyFailed(error) => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
ForegroundStartupResult::AppServerExited(error) => {
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
};
|
||||
|
||||
if *stop_rx.borrow() {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Err(error) = print_foreground_ready_output(&summary, json) {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
stop_signal_task.abort();
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let result = wait_for_foreground_app_server(app_server_task, stop_rx).await;
|
||||
stop_signal_task.abort();
|
||||
result
|
||||
}
|
||||
|
||||
fn foreground_stop_signal() -> (watch::Receiver<bool>, JoinHandle<()>) {
|
||||
let (stop_tx, stop_rx) = watch::channel(false);
|
||||
let task = tokio::spawn(async move {
|
||||
if let Err(err) = tokio::signal::ctrl_c().await {
|
||||
eprintln!("failed to listen for Ctrl-C: {err}");
|
||||
}
|
||||
let _ = stop_tx.send(true);
|
||||
});
|
||||
(stop_rx, task)
|
||||
}
|
||||
|
||||
enum ForegroundStartupResult {
|
||||
Ready(AppServerRemoteControlReadyStatus),
|
||||
Stopped,
|
||||
ReadyFailed(anyhow::Error),
|
||||
AppServerExited(anyhow::Error),
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_remote_control_start(
|
||||
app_server_task: &mut JoinHandle<std::io::Result<()>>,
|
||||
ready: impl std::future::Future<Output = anyhow::Result<AppServerRemoteControlReadyStatus>>,
|
||||
mut stop_rx: watch::Receiver<bool>,
|
||||
) -> ForegroundStartupResult {
|
||||
tokio::pin!(ready);
|
||||
|
||||
tokio::select! {
|
||||
ready_result = &mut ready => match ready_result {
|
||||
Ok(summary) => ForegroundStartupResult::Ready(summary),
|
||||
Err(error) => ForegroundStartupResult::ReadyFailed(error),
|
||||
},
|
||||
app_server_result = app_server_task => {
|
||||
ForegroundStartupResult::AppServerExited(
|
||||
foreground_app_server_exited_before_ready(app_server_result)
|
||||
)
|
||||
}
|
||||
_ = wait_for_stop_signal(&mut stop_rx) => ForegroundStartupResult::Stopped,
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_app_server(
|
||||
mut app_server_task: JoinHandle<std::io::Result<()>>,
|
||||
mut stop_rx: watch::Receiver<bool>,
|
||||
) -> anyhow::Result<()> {
|
||||
tokio::select! {
|
||||
app_server_result = &mut app_server_task => {
|
||||
app_server_result
|
||||
.context("foreground app-server task failed to join")?
|
||||
.context("foreground app-server exited with an error")?;
|
||||
}
|
||||
_ = wait_for_stop_signal(&mut stop_rx) => {
|
||||
abort_foreground_app_server(app_server_task).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_stop_signal(stop_rx: &mut watch::Receiver<bool>) {
|
||||
if *stop_rx.borrow() {
|
||||
return;
|
||||
}
|
||||
let _ = stop_rx.wait_for(|stopped| *stopped).await;
|
||||
}
|
||||
|
||||
fn foreground_app_server_exited_before_ready(
|
||||
result: Result<std::io::Result<()>, tokio::task::JoinError>,
|
||||
) -> anyhow::Error {
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
anyhow::anyhow!("foreground app-server exited before remote control became ready")
|
||||
}
|
||||
Ok(Err(error)) => anyhow::Error::new(error)
|
||||
.context("foreground app-server exited before remote control became ready"),
|
||||
Err(error) => anyhow::Error::new(error)
|
||||
.context("foreground app-server task failed before remote control became ready"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn abort_foreground_app_server(app_server_task: JoinHandle<std::io::Result<()>>) {
|
||||
app_server_task.abort();
|
||||
let _ = timeout(FOREGROUND_APP_SERVER_ABORT_TIMEOUT, app_server_task).await;
|
||||
}
|
||||
|
||||
async fn wait_for_foreground_remote_control_ready(
|
||||
socket_path: AbsolutePathBuf,
|
||||
) -> anyhow::Result<AppServerRemoteControlReadyStatus> {
|
||||
codex_app_server_daemon::enable_remote_control_on_socket(
|
||||
socket_path.as_path(),
|
||||
FOREGROUND_SOCKET_CONNECT_TIMEOUT,
|
||||
FOREGROUND_SOCKET_CONNECT_RETRY_DELAY,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn print_remote_control_start_output(
|
||||
output: &AppServerRemoteControlReadyOutput,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
ensure_remote_control_startable(&output.remote_control)?;
|
||||
if json {
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&RemoteControlStartJsonOutput::daemon(output))?
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for line in remote_control_start_human_lines(
|
||||
&output.remote_control,
|
||||
RemoteControlHumanOutputMode::Daemon,
|
||||
)? {
|
||||
println!("{line}");
|
||||
}
|
||||
for line in daemon_app_server_human_lines(&output.daemon) {
|
||||
println!("{line}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_foreground_ready_output(
|
||||
summary: &AppServerRemoteControlReadyStatus,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if json {
|
||||
ensure_remote_control_startable(summary)?;
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&RemoteControlStartJsonOutput::foreground(summary))?
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for line in remote_control_start_human_lines(summary, RemoteControlHumanOutputMode::Foreground)?
|
||||
{
|
||||
println!("{line}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct RemoteControlStartJsonOutput<'a> {
|
||||
mode: RemoteControlModeJson,
|
||||
status: RemoteControlConnectionStatus,
|
||||
server_name: &'a str,
|
||||
environment_id: Option<&'a str>,
|
||||
timed_out: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
daemon: Option<&'a AppServerRemoteControlStartOutput>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
enum RemoteControlModeJson {
|
||||
Foreground,
|
||||
Daemon,
|
||||
}
|
||||
|
||||
impl<'a> RemoteControlStartJsonOutput<'a> {
|
||||
fn foreground(summary: &'a AppServerRemoteControlReadyStatus) -> Self {
|
||||
Self {
|
||||
mode: RemoteControlModeJson::Foreground,
|
||||
status: summary.status,
|
||||
server_name: &summary.server_name,
|
||||
environment_id: summary.environment_id.as_deref(),
|
||||
timed_out: summary.timed_out,
|
||||
daemon: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn daemon(output: &'a AppServerRemoteControlReadyOutput) -> Self {
|
||||
let remote_control = &output.remote_control;
|
||||
Self {
|
||||
mode: RemoteControlModeJson::Daemon,
|
||||
status: remote_control.status,
|
||||
server_name: &remote_control.server_name,
|
||||
environment_id: remote_control.environment_id.as_deref(),
|
||||
timed_out: remote_control.timed_out,
|
||||
daemon: Some(&output.daemon),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_control_start_human_message(
|
||||
output: &AppServerRemoteControlReadyStatus,
|
||||
) -> anyhow::Result<String> {
|
||||
ensure_remote_control_startable(output)?;
|
||||
match output.status {
|
||||
RemoteControlConnectionStatus::Connected => Ok(format!(
|
||||
"This machine is available for remote control as {}.",
|
||||
output.server_name
|
||||
)),
|
||||
RemoteControlConnectionStatus::Connecting => Ok(format!(
|
||||
"Remote control is enabled on {} and still connecting.",
|
||||
output.server_name
|
||||
)),
|
||||
RemoteControlConnectionStatus::Errored | RemoteControlConnectionStatus::Disabled => {
|
||||
unreachable!("errored and disabled statuses are rejected before formatting")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ensure_remote_control_startable(
|
||||
output: &AppServerRemoteControlReadyStatus,
|
||||
) -> anyhow::Result<()> {
|
||||
match output.status {
|
||||
RemoteControlConnectionStatus::Connected | RemoteControlConnectionStatus::Connecting => {
|
||||
Ok(())
|
||||
}
|
||||
RemoteControlConnectionStatus::Errored => {
|
||||
anyhow::bail!(
|
||||
"Remote control is enabled on {} but the connection is errored.",
|
||||
output.server_name
|
||||
);
|
||||
}
|
||||
RemoteControlConnectionStatus::Disabled => {
|
||||
anyhow::bail!("Remote control is disabled on {}.", output.server_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum RemoteControlHumanOutputMode {
|
||||
Foreground,
|
||||
Daemon,
|
||||
}
|
||||
|
||||
fn remote_control_start_human_lines(
|
||||
summary: &AppServerRemoteControlReadyStatus,
|
||||
mode: RemoteControlHumanOutputMode,
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
let mut lines = vec![remote_control_start_human_message(summary)?];
|
||||
match mode {
|
||||
RemoteControlHumanOutputMode::Foreground => {
|
||||
lines.push("Press Ctrl-C to stop.".to_string());
|
||||
}
|
||||
RemoteControlHumanOutputMode::Daemon => {}
|
||||
}
|
||||
Ok(lines)
|
||||
}
|
||||
|
||||
fn daemon_app_server_human_lines(output: &AppServerRemoteControlStartOutput) -> Vec<String> {
|
||||
let (managed_codex_path, managed_codex_version) = daemon_app_server_identity(output);
|
||||
vec![
|
||||
"Daemon used app-server:".to_string(),
|
||||
format!(" path: {}", managed_codex_path.display()),
|
||||
format!(" version: {}", managed_codex_version.unwrap_or("unknown")),
|
||||
]
|
||||
}
|
||||
|
||||
fn daemon_app_server_identity(
|
||||
output: &AppServerRemoteControlStartOutput,
|
||||
) -> (&std::path::Path, Option<&str>) {
|
||||
match output {
|
||||
AppServerRemoteControlStartOutput::Bootstrap(output) => (
|
||||
&output.managed_codex_path,
|
||||
output.managed_codex_version.as_deref(),
|
||||
),
|
||||
AppServerRemoteControlStartOutput::Start(output) => (
|
||||
&output.managed_codex_path,
|
||||
output.managed_codex_version.as_deref(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn print_remote_control_stop_output(
|
||||
output: &AppServerLifecycleOutput,
|
||||
json: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(output)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
println!("{}", remote_control_stop_human_message(output));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remote_control_stop_human_message(output: &AppServerLifecycleOutput) -> String {
|
||||
match output.status {
|
||||
AppServerLifecycleStatus::Stopped => "Remote control stopped.".to_string(),
|
||||
AppServerLifecycleStatus::NotRunning => "Remote control is not running.".to_string(),
|
||||
AppServerLifecycleStatus::Started
|
||||
| AppServerLifecycleStatus::Restarted
|
||||
| AppServerLifecycleStatus::AlreadyRunning
|
||||
| AppServerLifecycleStatus::Running => {
|
||||
format!(
|
||||
"Remote control stop completed with status {:?}.",
|
||||
output.status
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn remote_control_status(
|
||||
status: RemoteControlConnectionStatus,
|
||||
) -> AppServerRemoteControlReadyStatus {
|
||||
AppServerRemoteControlReadyStatus {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: status == RemoteControlConnectionStatus::Connecting,
|
||||
}
|
||||
}
|
||||
|
||||
fn daemon_ready_output(
|
||||
status: RemoteControlConnectionStatus,
|
||||
) -> AppServerRemoteControlReadyOutput {
|
||||
AppServerRemoteControlReadyOutput {
|
||||
daemon: AppServerRemoteControlStartOutput::Start(AppServerLifecycleOutput {
|
||||
status: AppServerLifecycleStatus::Started,
|
||||
backend: None,
|
||||
pid: Some(42),
|
||||
managed_codex_path: PathBuf::from("/opt/codex/bin/codex"),
|
||||
managed_codex_version: Some("1.0.0".to_string()),
|
||||
socket_path: PathBuf::from("/tmp/app-server-control.sock"),
|
||||
cli_version: Some("1.0.0".to_string()),
|
||||
app_server_version: Some("2.0.0".to_string()),
|
||||
}),
|
||||
remote_control: AppServerRemoteControlReadyStatus {
|
||||
status,
|
||||
server_name: "owen-mbp".to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
timed_out: status == RemoteControlConnectionStatus::Connecting,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_human_start_messages_use_server_name() {
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Connected
|
||||
))
|
||||
.expect("connected message"),
|
||||
"This machine is available for remote control as owen-mbp."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Connecting
|
||||
))
|
||||
.expect("connecting message"),
|
||||
"Remote control is enabled on owen-mbp and still connecting."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Errored
|
||||
))
|
||||
.expect_err("errored status should fail")
|
||||
.to_string(),
|
||||
"Remote control is enabled on owen-mbp but the connection is errored."
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_message(&remote_control_status(
|
||||
RemoteControlConnectionStatus::Disabled
|
||||
))
|
||||
.expect_err("disabled status should fail")
|
||||
.to_string(),
|
||||
"Remote control is disabled on owen-mbp."
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_human_lines_include_foreground_stop_hint_only() {
|
||||
let summary = remote_control_status(RemoteControlConnectionStatus::Connected);
|
||||
|
||||
assert_eq!(
|
||||
remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Foreground)
|
||||
.expect("foreground lines"),
|
||||
vec![
|
||||
"This machine is available for remote control as owen-mbp.".to_string(),
|
||||
"Press Ctrl-C to stop.".to_string(),
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
remote_control_start_human_lines(&summary, RemoteControlHumanOutputMode::Daemon)
|
||||
.expect("daemon lines"),
|
||||
vec!["This machine is available for remote control as owen-mbp.".to_string()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn daemon_app_server_human_lines_include_path_and_version() {
|
||||
assert_eq!(
|
||||
daemon_app_server_human_lines(
|
||||
&daemon_ready_output(RemoteControlConnectionStatus::Connected).daemon
|
||||
),
|
||||
vec![
|
||||
"Daemon used app-server:".to_string(),
|
||||
" path: /opt/codex/bin/codex".to_string(),
|
||||
" version: 1.0.0".to_string(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_json_output_marks_foreground_or_daemon() {
|
||||
let foreground_summary = remote_control_status(RemoteControlConnectionStatus::Connected);
|
||||
assert_eq!(
|
||||
serde_json::to_value(RemoteControlStartJsonOutput::foreground(
|
||||
&foreground_summary
|
||||
))
|
||||
.expect("foreground JSON"),
|
||||
json!({
|
||||
"mode": "foreground",
|
||||
"status": "connected",
|
||||
"serverName": "owen-mbp",
|
||||
"environmentId": "env_test",
|
||||
"timedOut": false,
|
||||
})
|
||||
);
|
||||
|
||||
let daemon_output = daemon_ready_output(RemoteControlConnectionStatus::Connected);
|
||||
assert_eq!(
|
||||
serde_json::to_value(RemoteControlStartJsonOutput::daemon(&daemon_output))
|
||||
.expect("daemon JSON"),
|
||||
json!({
|
||||
"mode": "daemon",
|
||||
"status": "connected",
|
||||
"serverName": "owen-mbp",
|
||||
"environmentId": "env_test",
|
||||
"timedOut": false,
|
||||
"daemon": {
|
||||
"status": "started",
|
||||
"pid": 42,
|
||||
"managedCodexPath": "/opt/codex/bin/codex",
|
||||
"managedCodexVersion": "1.0.0",
|
||||
"socketPath": "/tmp/app-server-control.sock",
|
||||
"cliVersion": "1.0.0",
|
||||
"appServerVersion": "2.0.0",
|
||||
},
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_daemon_json_rejects_unstartable_status() {
|
||||
assert_eq!(
|
||||
print_remote_control_start_output(
|
||||
&daemon_ready_output(RemoteControlConnectionStatus::Errored),
|
||||
/*json*/ true
|
||||
)
|
||||
.expect_err("errored daemon status should fail")
|
||||
.to_string(),
|
||||
"Remote control is enabled on owen-mbp but the connection is errored."
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_wait_aborts_app_server_on_stop_signal() {
|
||||
let app_server_task = tokio::spawn(std::future::pending::<std::io::Result<()>>());
|
||||
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
stop_tx.send(true).expect("send stop signal");
|
||||
|
||||
tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_app_server(app_server_task, stop_rx),
|
||||
)
|
||||
.await
|
||||
.expect("foreground wait should return after stop signal")
|
||||
.expect("stop signal should shut down cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_start_wait_stops_before_ready() {
|
||||
let mut app_server_task = tokio::spawn(std::future::pending::<std::io::Result<()>>());
|
||||
let (stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
stop_tx.send(true).expect("send stop signal");
|
||||
|
||||
let startup = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
std::future::pending::<anyhow::Result<AppServerRemoteControlReadyStatus>>(),
|
||||
stop_rx,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("foreground startup wait should return after stop signal");
|
||||
|
||||
assert!(matches!(startup, ForegroundStartupResult::Stopped));
|
||||
app_server_task.abort();
|
||||
let _ = app_server_task.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn foreground_start_wait_reports_app_server_exit_before_ready() {
|
||||
let mut app_server_task =
|
||||
tokio::spawn(async { Err(std::io::Error::other("startup failed before socket bind")) });
|
||||
let (_stop_tx, stop_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
let startup = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(1),
|
||||
wait_for_foreground_remote_control_start(
|
||||
&mut app_server_task,
|
||||
std::future::pending::<anyhow::Result<AppServerRemoteControlReadyStatus>>(),
|
||||
stop_rx,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.expect("foreground startup wait should return after app-server exits");
|
||||
|
||||
let ForegroundStartupResult::AppServerExited(error) = startup else {
|
||||
panic!("expected app-server exit before ready");
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"foreground app-server exited before remote control became ready"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user