mirror of
https://github.com/openai/codex.git
synced 2026-04-27 18:01:04 +03:00
feedback
This commit is contained in:
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -831,6 +831,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"toml 0.9.4",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -31,6 +31,7 @@ tokio = { version = "1", features = [
|
||||
"rt-multi-thread",
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { version = "0.7" }
|
||||
toml = "0.9"
|
||||
tracing = { version = "0.1.41", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
|
||||
@@ -15,7 +15,6 @@ use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::Submission;
|
||||
use codex_core::protocol::TaskCompleteEvent;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::ContentBlock;
|
||||
@@ -79,27 +78,18 @@ pub async fn run_codex_tool_session(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
||||
// any events emitted for this tool-call can be correlated with the
|
||||
// originating `tools/call` request.
|
||||
let sub_id = match &id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
running_requests_id_to_codex_uuid
|
||||
.lock()
|
||||
.await
|
||||
.insert(id.clone(), session_id);
|
||||
let submission = Submission {
|
||||
id: sub_id.clone(),
|
||||
op: Op::UserInput {
|
||||
if let Err(e) = codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: initial_prompt.clone(),
|
||||
}],
|
||||
},
|
||||
};
|
||||
|
||||
if let Err(e) = codex.submit_with_id(submission).await {
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to submit initial prompt: {e}");
|
||||
// unregister the id so we don't keep it in the map
|
||||
running_requests_id_to_codex_uuid.lock().await.remove(&id);
|
||||
@@ -151,10 +141,7 @@ async fn run_codex_tool_session_inner(
|
||||
request_id: RequestId,
|
||||
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
|
||||
) {
|
||||
let request_id_str = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
let request_id_str = crate::request_id::request_id_to_string(&request_id);
|
||||
|
||||
// Stream events until the task needs to pause for user interaction or
|
||||
// completes.
|
||||
|
||||
@@ -3,10 +3,13 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::Codex;
|
||||
use codex_core::error::Result as CodexResult;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::FileChange;
|
||||
use mcp_types::RequestId;
|
||||
@@ -14,6 +17,7 @@ use tokio::sync::Mutex;
|
||||
// no streaming watch channel; streaming is toggled via set_streaming on the struct
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::exec_approval::handle_exec_approval_request;
|
||||
use crate::mcp_protocol::CodexEventNotificationParams;
|
||||
@@ -21,8 +25,10 @@ use crate::mcp_protocol::ConversationId;
|
||||
use crate::mcp_protocol::InitialStateNotificationParams;
|
||||
use crate::mcp_protocol::InitialStatePayload;
|
||||
use crate::mcp_protocol::NotificationMeta;
|
||||
use crate::mcp_protocol::ServerNotification;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::patch_approval::handle_patch_approval_request;
|
||||
use crate::request_id::request_id_to_string;
|
||||
|
||||
/// Conversation struct that owns the Codex session and all per-conversation state.
|
||||
pub(crate) struct Conversation {
|
||||
@@ -31,6 +37,7 @@ pub(crate) struct Conversation {
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: RequestId,
|
||||
state: Mutex<ConversationState>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
struct ConversationState {
|
||||
@@ -56,9 +63,10 @@ impl Conversation {
|
||||
buffered_events: Vec::new(),
|
||||
pending_elicitations: Vec::new(),
|
||||
}),
|
||||
cancel: CancellationToken::new(),
|
||||
});
|
||||
// Detach a background loop tied to this Conversation
|
||||
Conversation::spawn_loop(conv.clone());
|
||||
spawn_conversation_loop(conv.clone());
|
||||
conv
|
||||
}
|
||||
|
||||
@@ -80,22 +88,6 @@ impl Conversation {
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_loop(this: Arc<Self>) {
|
||||
tokio::spawn(async move {
|
||||
// Clone once outside the loop; `Codex` is cheap to clone but we don't need to do it repeatedly.
|
||||
let codex = this.codex.clone();
|
||||
loop {
|
||||
match codex.next_event().await {
|
||||
Ok(event) => this.handle_event(event).await,
|
||||
Err(e) => {
|
||||
error!("Codex next_event error (session {}): {e}", this.session_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn codex(&self) -> Arc<Codex> {
|
||||
self.codex.clone()
|
||||
}
|
||||
@@ -103,23 +95,13 @@ impl Conversation {
|
||||
pub(crate) async fn try_submit_user_input(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
items: Vec<codex_core::protocol::InputItem>,
|
||||
) -> Result<(), String> {
|
||||
let request_id_string = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
};
|
||||
let submit_res = self
|
||||
.codex
|
||||
.submit_with_id(codex_core::protocol::Submission {
|
||||
id: request_id_string,
|
||||
op: codex_core::protocol::Op::UserInput { items },
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = submit_res {
|
||||
return Err(format!("Failed to submit user input: {e}"));
|
||||
}
|
||||
Ok(())
|
||||
items: Vec<InputItem>,
|
||||
) -> CodexResult<()> {
|
||||
let _ = request_id; // request_id is not used to enforce uniqueness; Codex generates ids.
|
||||
self.codex
|
||||
.submit(Op::UserInput { items })
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
async fn handle_event(&self, event: Event) {
|
||||
@@ -142,8 +124,8 @@ impl Conversation {
|
||||
self.process_exec_request(command, cwd, call_id, event.id.clone())
|
||||
.await;
|
||||
}
|
||||
EventMsg::Error(_) => {
|
||||
error!("Codex runtime error");
|
||||
EventMsg::Error(err) => {
|
||||
error!("Codex runtime error: {}", err.message);
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id,
|
||||
@@ -151,7 +133,7 @@ impl Conversation {
|
||||
grant_root,
|
||||
changes,
|
||||
}) => {
|
||||
self.process_patch_request(PatchRequest {
|
||||
self.start_patch_approval(PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
@@ -162,8 +144,8 @@ impl Conversation {
|
||||
}
|
||||
EventMsg::TaskComplete(_) => {}
|
||||
EventMsg::TaskStarted => {}
|
||||
EventMsg::SessionConfigured(_) => {
|
||||
error!("unexpected SessionConfigured event");
|
||||
EventMsg::SessionConfigured(ev) => {
|
||||
error!("unexpected SessionConfigured event: {:?}", ev);
|
||||
}
|
||||
EventMsg::AgentMessageDelta(_) => {}
|
||||
EventMsg::AgentReasoningDelta(_) => {}
|
||||
@@ -195,13 +177,9 @@ impl Conversation {
|
||||
}),
|
||||
initial_state: InitialStatePayload { events },
|
||||
};
|
||||
if let Ok(params_val) = serde_json::to_value(¶ms) {
|
||||
self.outgoing
|
||||
.send_custom_notification("notifications/initial_state", params_val)
|
||||
.await;
|
||||
} else {
|
||||
error!("Failed to serialize InitialState params");
|
||||
}
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::InitialState(params))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn drain_pending_elicitations_from(&self, items: Vec<PendingElicitation>) {
|
||||
@@ -219,10 +197,7 @@ impl Conversation {
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
match &self.request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
},
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
call_id,
|
||||
)
|
||||
@@ -243,10 +218,7 @@ impl Conversation {
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
match &self.request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
},
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
)
|
||||
.await;
|
||||
@@ -262,7 +234,10 @@ impl Conversation {
|
||||
call_id: String,
|
||||
event_id: String,
|
||||
) {
|
||||
let should_stream = { self.state.lock().await.streaming_enabled };
|
||||
let should_stream = {
|
||||
let st = self.state.lock().await;
|
||||
st.streaming_enabled
|
||||
};
|
||||
if should_stream {
|
||||
handle_exec_approval_request(
|
||||
command,
|
||||
@@ -270,10 +245,7 @@ impl Conversation {
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
match &self.request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
},
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
call_id,
|
||||
)
|
||||
@@ -290,7 +262,7 @@ impl Conversation {
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_patch_request(&self, req: PatchRequest) {
|
||||
async fn start_patch_approval(&self, req: PatchRequest) {
|
||||
let PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
@@ -298,7 +270,10 @@ impl Conversation {
|
||||
changes,
|
||||
event_id,
|
||||
} = req;
|
||||
let should_stream = { self.state.lock().await.streaming_enabled };
|
||||
let should_stream = {
|
||||
let st = self.state.lock().await;
|
||||
st.streaming_enabled
|
||||
};
|
||||
if should_stream {
|
||||
handle_patch_approval_request(
|
||||
call_id,
|
||||
@@ -308,10 +283,7 @@ impl Conversation {
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
match &self.request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
},
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
)
|
||||
.await;
|
||||
@@ -337,12 +309,15 @@ impl Conversation {
|
||||
meta: None,
|
||||
msg: msg.clone(),
|
||||
};
|
||||
if let Ok(params_val) = serde_json::to_value(¶ms) {
|
||||
self.outgoing
|
||||
.send_custom_notification(&method, params_val)
|
||||
.await;
|
||||
} else {
|
||||
error!("Failed to serialize event params");
|
||||
match serde_json::to_value(¶ms) {
|
||||
Ok(params_val) => {
|
||||
self.outgoing
|
||||
.send_custom_notification(&method, params_val)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to serialize event params: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -366,3 +341,32 @@ struct ExecRequest {
|
||||
event_id: String,
|
||||
call_id: String,
|
||||
}
|
||||
|
||||
impl Drop for Conversation {
|
||||
fn drop(&mut self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_conversation_loop(this: Arc<Conversation>) {
|
||||
tokio::spawn(async move {
|
||||
let codex = this.codex.clone();
|
||||
let cancel = this.cancel.clone();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
res = codex.next_event() => {
|
||||
match res {
|
||||
Ok(event) => this.handle_event(event).await,
|
||||
Err(e) => {
|
||||
error!("Codex next_event error (session {}): {e}", this.session_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ mod json_to_toml;
|
||||
pub mod mcp_protocol;
|
||||
pub(crate) mod message_processor;
|
||||
mod outgoing_message;
|
||||
mod request_id;
|
||||
mod patch_approval;
|
||||
pub(crate) mod tool_handlers;
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use crate::tool_handlers::stream_conversation::handle_stream_conversation;
|
||||
|
||||
use codex_core::Codex;
|
||||
use codex_core::config::Config as CodexConfig;
|
||||
use codex_core::protocol::Submission;
|
||||
use mcp_types::CallToolRequest;
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::CallToolResult;
|
||||
@@ -357,8 +356,8 @@ impl MessageProcessor {
|
||||
async fn handle_new_tool_calls(&self, request_id: RequestId, params: ToolCallRequestParams) {
|
||||
// Track the request to allow graceful cancellation routing later.
|
||||
{
|
||||
let mut guard = self.tool_request_map.lock().await;
|
||||
guard.insert(request_id.clone(), params.clone());
|
||||
let mut tool_request_map = self.tool_request_map.lock().await;
|
||||
tool_request_map.insert(request_id.clone(), params.clone());
|
||||
}
|
||||
match params {
|
||||
ToolCallRequestParams::ConversationCreate(args) => {
|
||||
@@ -601,8 +600,8 @@ impl MessageProcessor {
|
||||
let request_id = params.request_id;
|
||||
|
||||
if let Some(orig) = {
|
||||
let mut guard = self.tool_request_map.lock().await;
|
||||
guard.remove(&request_id)
|
||||
let mut tool_request_map = self.tool_request_map.lock().await;
|
||||
tool_request_map.remove(&request_id)
|
||||
} {
|
||||
self.handle_mcp_protocol_cancelled_notification(request_id, orig)
|
||||
.await;
|
||||
@@ -621,12 +620,7 @@ impl MessageProcessor {
|
||||
stream_conversation::handle_cancel(self, &args).await;
|
||||
}
|
||||
ToolCallRequestParams::ConversationSendMessage(args) => {
|
||||
// Cancel in-flight user input for this conversation by interrupting
|
||||
// the submission with the same request id we used when sending.
|
||||
let request_id_string = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
};
|
||||
// Cancel in-flight user input for this conversation by interrupting the session.
|
||||
|
||||
let session_id = args.conversation_id.0;
|
||||
let codex_arc = {
|
||||
@@ -642,13 +636,7 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = codex_arc
|
||||
.submit_with_id(Submission {
|
||||
id: request_id_string,
|
||||
op: codex_core::protocol::Op::Interrupt,
|
||||
})
|
||||
.await
|
||||
{
|
||||
if let Err(e) = codex_arc.submit(codex_core::protocol::Op::Interrupt).await {
|
||||
tracing::error!("Failed to submit interrupt for send_message cancel: {e}");
|
||||
}
|
||||
}
|
||||
@@ -664,10 +652,8 @@ impl MessageProcessor {
|
||||
}
|
||||
|
||||
async fn handle_legacy_cancelled_notification(&self, request_id: RequestId) {
|
||||
let request_id_string = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
};
|
||||
use crate::request_id::request_id_to_string;
|
||||
let request_id_string = request_id_to_string(&request_id);
|
||||
|
||||
let session_id = {
|
||||
let map_guard = self.running_requests_id_to_codex_uuid.lock().await;
|
||||
@@ -693,10 +679,7 @@ impl MessageProcessor {
|
||||
};
|
||||
|
||||
if let Err(e) = codex_arc
|
||||
.submit_with_id(Submission {
|
||||
id: request_id_string,
|
||||
op: codex_core::protocol::Op::Interrupt,
|
||||
})
|
||||
.submit(codex_core::protocol::Op::Interrupt)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to submit interrupt to Codex: {e}");
|
||||
|
||||
@@ -133,6 +133,28 @@ impl OutgoingMessageSender {
|
||||
});
|
||||
let _ = self.sender.send(outgoing_message).await;
|
||||
}
|
||||
|
||||
/// Send a typed server notification by serializing it into a method/params pair.
|
||||
pub(crate) async fn send_server_notification(
|
||||
&self,
|
||||
notification: crate::mcp_protocol::ServerNotification,
|
||||
) {
|
||||
match serde_json::to_value(notification) {
|
||||
Ok(serde_json::Value::Object(mut map)) => {
|
||||
let method = map
|
||||
.remove("method")
|
||||
.and_then(|v| v.as_str().map(|s| s.to_string()));
|
||||
let params = map.remove("params").unwrap_or(serde_json::Value::Null);
|
||||
if let Some(method) = method {
|
||||
self.send_custom_notification(&method, params).await;
|
||||
} else {
|
||||
warn!("ServerNotification missing method after serialization");
|
||||
}
|
||||
}
|
||||
Ok(_) => warn!("ServerNotification did not serialize to an object"),
|
||||
Err(err) => warn!("Failed to serialize ServerNotification: {err:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outgoing message from the server to the client.
|
||||
|
||||
9
codex-rs/mcp-server/src/request_id.rs
Normal file
9
codex-rs/mcp-server/src/request_id.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use mcp_types::RequestId;
|
||||
|
||||
/// Utility to convert an MCP `RequestId` into a `String`.
|
||||
pub(crate) fn request_id_to_string(id: &RequestId) -> String {
|
||||
match id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ pub(crate) async fn handle_send_message(
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
Some(ToolCallResponseResult::ConversationSendMessage(
|
||||
ConversationSendMessageResult::Error { message: e },
|
||||
ConversationSendMessageResult::Error { message: e.to_string() },
|
||||
)),
|
||||
Some(true),
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_mcp_server::CodexToolCallParam;
|
||||
use mcp_types::JSONRPCResponse;
|
||||
use mcp_types::ModelContextProtocolNotification;
|
||||
use mcp_types::RequestId;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
@@ -94,7 +95,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
// Send interrupt notification
|
||||
mcp_process
|
||||
.send_notification(
|
||||
"notifications/cancelled",
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": codex_request_id })),
|
||||
)
|
||||
.await?;
|
||||
@@ -125,7 +126,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
// Send interrupt notification
|
||||
mcp_process
|
||||
.send_notification(
|
||||
"notifications/cancelled",
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": codex_reply_request_id })),
|
||||
)
|
||||
.await?;
|
||||
@@ -148,4 +149,4 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helpers are provided by tests/common
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use mcp_test_support::create_config_toml;
|
||||
use mcp_test_support::create_final_assistant_message_sse_response;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
use mcp_types::JSONRPCNotification;
|
||||
use mcp_types::ModelContextProtocolNotification;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
@@ -181,7 +182,7 @@ async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
|
||||
|
||||
// Cancel stream A
|
||||
mcp.send_notification(
|
||||
"notifications/cancelled",
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": stream_a_id })),
|
||||
)
|
||||
.await
|
||||
@@ -245,7 +246,6 @@ async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
|
||||
}),
|
||||
];
|
||||
assert_eq!(*events, expected);
|
||||
drop(server);
|
||||
}
|
||||
|
||||
// create_config_toml is provided by tests/common
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user