mirror of
https://github.com/openai/codex.git
synced 2026-05-05 22:01:37 +03:00
## Why The `notify` hook payload did not identify which Codex client started the turn. That meant downstream notification hooks could not distinguish between completions coming from the TUI and completions coming from app-server clients such as VS Code or Xcode. Now that the Codex App provides its own desktop notifications, it would be nice to be able to filter those out. This change adds that context without changing the existing payload shape for callers that do not know the client name, and keeps the new end-to-end test cross-platform. ## What changed - added an optional top-level `client` field to the legacy `notify` JSON payload - threaded that value through `core` and `hooks`; the internal session and turn state now carries it as `app_server_client_name` - set the field to `codex-tui` for TUI turns - captured `initialize.clientInfo.name` in the app server and applied it to subsequent turns before dispatching hooks - replaced the notify integration test hook with a `python3` script so the test does not rely on Unix shell permissions or `bash` - documented the new field in `docs/config.md` ## Testing - `cargo test -p codex-hooks` - `cargo test -p codex-tui` - `cargo test -p codex-app-server suite::v2::initialize::turn_start_notify_payload_includes_initialize_client_name -- --exact --nocapture` - `cargo test -p codex-core` (`src/lib.rs` passed; `core/tests/all.rs` still has unrelated existing failures in this environment) ## Docs The public config reference on `developers.openai.com/codex` should mention that the legacy `notify` payload may include a top-level `client` field. The TUI reports `codex-tui`, and the app server reports `initialize.clientInfo.name` when it is available.
150 lines
5.4 KiB
Rust
150 lines
5.4 KiB
Rust
use std::sync::Arc;
|
|
|
|
use codex_core::CodexThread;
|
|
use codex_core::NewThread;
|
|
use codex_core::ThreadManager;
|
|
use codex_core::config::Config;
|
|
use codex_protocol::protocol::Event;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::Op;
|
|
use tokio::sync::mpsc::UnboundedSender;
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
|
|
use crate::app_event::AppEvent;
|
|
use crate::app_event_sender::AppEventSender;
|
|
|
|
const TUI_NOTIFY_CLIENT: &str = "codex-tui";
|
|
|
|
async fn initialize_app_server_client_name(thread: &CodexThread) {
|
|
if let Err(err) = thread
|
|
.set_app_server_client_name(Some(TUI_NOTIFY_CLIENT.to_string()))
|
|
.await
|
|
{
|
|
tracing::error!("failed to set app server client name: {err}");
|
|
}
|
|
}
|
|
|
|
/// Spawn the agent bootstrapper and op forwarding loop, returning the
|
|
/// `UnboundedSender<Op>` used by the UI to submit operations.
|
|
pub(crate) fn spawn_agent(
|
|
config: Config,
|
|
app_event_tx: AppEventSender,
|
|
server: Arc<ThreadManager>,
|
|
) -> UnboundedSender<Op> {
|
|
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
|
|
|
let app_event_tx_clone = app_event_tx;
|
|
tokio::spawn(async move {
|
|
let NewThread {
|
|
thread,
|
|
session_configured,
|
|
..
|
|
} = match server.start_thread(config).await {
|
|
Ok(v) => v,
|
|
Err(err) => {
|
|
let message = format!("Failed to initialize codex: {err}");
|
|
tracing::error!("{message}");
|
|
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
|
|
id: "".to_string(),
|
|
msg: EventMsg::Error(err.to_error_event(None)),
|
|
}));
|
|
app_event_tx_clone.send(AppEvent::FatalExitRequest(message));
|
|
tracing::error!("failed to initialize codex: {err}");
|
|
return;
|
|
}
|
|
};
|
|
initialize_app_server_client_name(thread.as_ref()).await;
|
|
|
|
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
|
|
let ev = codex_protocol::protocol::Event {
|
|
// The `id` does not matter for rendering, so we can use a fake value.
|
|
id: "".to_string(),
|
|
msg: codex_protocol::protocol::EventMsg::SessionConfigured(session_configured),
|
|
};
|
|
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
|
|
|
let thread_clone = thread.clone();
|
|
tokio::spawn(async move {
|
|
while let Some(op) = codex_op_rx.recv().await {
|
|
let id = thread_clone.submit(op).await;
|
|
if let Err(e) = id {
|
|
tracing::error!("failed to submit op: {e}");
|
|
}
|
|
}
|
|
});
|
|
|
|
while let Ok(event) = thread.next_event().await {
|
|
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
|
app_event_tx_clone.send(AppEvent::CodexEvent(event));
|
|
if is_shutdown_complete {
|
|
// ShutdownComplete is terminal for a thread; drop this receiver task so
|
|
// the Arc<CodexThread> can be released and thread resources can clean up.
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
codex_op_tx
|
|
}
|
|
|
|
/// Spawn agent loops for an existing thread (e.g., a forked thread).
|
|
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
|
|
/// events and accepts Ops for submission.
|
|
pub(crate) fn spawn_agent_from_existing(
|
|
thread: std::sync::Arc<CodexThread>,
|
|
session_configured: codex_protocol::protocol::SessionConfiguredEvent,
|
|
app_event_tx: AppEventSender,
|
|
) -> UnboundedSender<Op> {
|
|
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
|
|
|
let app_event_tx_clone = app_event_tx;
|
|
tokio::spawn(async move {
|
|
initialize_app_server_client_name(thread.as_ref()).await;
|
|
|
|
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
|
|
let ev = codex_protocol::protocol::Event {
|
|
id: "".to_string(),
|
|
msg: codex_protocol::protocol::EventMsg::SessionConfigured(session_configured),
|
|
};
|
|
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
|
|
|
let thread_clone = thread.clone();
|
|
tokio::spawn(async move {
|
|
while let Some(op) = codex_op_rx.recv().await {
|
|
let id = thread_clone.submit(op).await;
|
|
if let Err(e) = id {
|
|
tracing::error!("failed to submit op: {e}");
|
|
}
|
|
}
|
|
});
|
|
|
|
while let Ok(event) = thread.next_event().await {
|
|
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
|
app_event_tx_clone.send(AppEvent::CodexEvent(event));
|
|
if is_shutdown_complete {
|
|
// ShutdownComplete is terminal for a thread; drop this receiver task so
|
|
// the Arc<CodexThread> can be released and thread resources can clean up.
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
codex_op_tx
|
|
}
|
|
|
|
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
|
|
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
|
|
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
|
|
|
tokio::spawn(async move {
|
|
initialize_app_server_client_name(thread.as_ref()).await;
|
|
while let Some(op) = codex_op_rx.recv().await {
|
|
if let Err(e) = thread.submit(op).await {
|
|
tracing::error!("failed to submit op: {e}");
|
|
}
|
|
}
|
|
});
|
|
|
|
codex_op_tx
|
|
}
|