use std::sync::Arc; use async_trait::async_trait; use codex_sandboxing::SandboxType; use tokio::sync::watch; use tracing::trace; use crate::ExecBackend; use crate::ExecProcess; use crate::ExecServerError; use crate::StartedExecProcess; use crate::client::ExecServerClient; use crate::client::Session; use crate::protocol::ExecParams; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; #[derive(Clone)] pub(crate) struct RemoteProcess { client: ExecServerClient, } struct RemoteExecProcess { session: Session, } impl RemoteProcess { pub(crate) fn new(client: ExecServerClient) -> Self { trace!("remote process new"); Self { client } } } #[async_trait] impl ExecBackend for RemoteProcess { async fn start(&self, params: ExecParams) -> Result { let process_id = params.process_id.clone(); let sandbox_type = params .sandbox .as_ref() .map_or(SandboxType::None, |sandbox| sandbox.sandbox_type()); let session = self.client.register_session(&process_id).await?; match self.client.exec(params).await { Ok(_) => {} Err(err) => { session.unregister().await; return Err(err); } } Ok(StartedExecProcess { process: Arc::new(RemoteExecProcess { session }), sandbox_type, }) } } #[async_trait] impl ExecProcess for RemoteExecProcess { fn process_id(&self) -> &crate::ProcessId { self.session.process_id() } fn subscribe_wake(&self) -> watch::Receiver { self.session.subscribe_wake() } async fn read( &self, after_seq: Option, max_bytes: Option, wait_ms: Option, ) -> Result { self.session.read(after_seq, max_bytes, wait_ms).await } async fn write(&self, chunk: Vec) -> Result { trace!("exec process write"); self.session.write(chunk).await } async fn terminate(&self) -> Result<(), ExecServerError> { trace!("exec process terminate"); self.session.terminate().await } } impl Drop for RemoteExecProcess { fn drop(&mut self) { let session = self.session.clone(); tokio::spawn(async move { session.unregister().await; }); } }