mirror of
https://github.com/openai/codex.git
synced 2026-04-06 23:51:39 +03:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1525bbdb9a | ||
|
|
46b7e4fb2c | ||
|
|
9bb813353e | ||
|
|
bd30bad96f |
25
MODULE.bazel
25
MODULE.bazel
@@ -228,10 +228,18 @@ inject_repo(crate, "zstd")
|
||||
use_repo(crate, "argument_comment_lint_crates")
|
||||
|
||||
bazel_dep(name = "bzip2", version = "1.0.8.bcr.3")
|
||||
single_version_override(
|
||||
module_name = "bzip2",
|
||||
patch_strip = 1,
|
||||
patches = [
|
||||
"//patches:bzip2_windows_stack_args.patch",
|
||||
],
|
||||
)
|
||||
|
||||
crate.annotation(
|
||||
crate = "bzip2-sys",
|
||||
gen_build_script = "on",
|
||||
gen_build_script = "off",
|
||||
deps = ["@bzip2//:bz2"],
|
||||
)
|
||||
|
||||
inject_repo(crate, "bzip2")
|
||||
@@ -245,14 +253,25 @@ crate.annotation(
|
||||
|
||||
inject_repo(crate, "zlib")
|
||||
|
||||
# TODO(zbarsky): Enable annotation after fixing windows arm64 builds.
|
||||
bazel_dep(name = "xz", version = "5.4.5.bcr.8")
|
||||
single_version_override(
|
||||
module_name = "xz",
|
||||
patch_strip = 1,
|
||||
patches = [
|
||||
"//patches:xz_windows_stack_args.patch",
|
||||
],
|
||||
)
|
||||
|
||||
crate.annotation(
|
||||
crate = "lzma-sys",
|
||||
gen_build_script = "on",
|
||||
gen_build_script = "off",
|
||||
deps = ["@xz//:lzma"],
|
||||
)
|
||||
|
||||
bazel_dep(name = "openssl", version = "3.5.4.bcr.0")
|
||||
|
||||
inject_repo(crate, "xz")
|
||||
|
||||
crate.annotation(
|
||||
build_script_data = [
|
||||
"@openssl//:gen_dir",
|
||||
|
||||
2
MODULE.bazel.lock
generated
2
MODULE.bazel.lock
generated
@@ -228,6 +228,8 @@
|
||||
"https://bcr.bazel.build/modules/upb/0.0.0-20220923-a547704/MODULE.bazel": "7298990c00040a0e2f121f6c32544bab27d4452f80d9ce51349b1a28f3005c43",
|
||||
"https://bcr.bazel.build/modules/with_cfg.bzl/0.12.0/MODULE.bazel": "b573395fe63aef4299ba095173e2f62ccfee5ad9bbf7acaa95dba73af9fc2b38",
|
||||
"https://bcr.bazel.build/modules/with_cfg.bzl/0.12.0/source.json": "3f3fbaeafecaf629877ad152a2c9def21f8d330d91aa94c5dc75bbb98c10b8b8",
|
||||
"https://bcr.bazel.build/modules/xz/5.4.5.bcr.8/MODULE.bazel": "e48a69bd54053c2ec5fffc2a29fb70122afd3e83ab6c07068f63bc6553fa57cc",
|
||||
"https://bcr.bazel.build/modules/xz/5.4.5.bcr.8/source.json": "bd7e928ccd63505b44f4784f7bbf12cc11f9ff23bf3ca12ff2c91cd74846099e",
|
||||
"https://bcr.bazel.build/modules/zlib/1.2.11/MODULE.bazel": "07b389abc85fdbca459b69e2ec656ae5622873af3f845e1c9d80fe179f3effa0",
|
||||
"https://bcr.bazel.build/modules/zlib/1.3.1.bcr.5/MODULE.bazel": "eec517b5bbe5492629466e11dae908d043364302283de25581e3eb944326c4ca",
|
||||
"https://bcr.bazel.build/modules/zlib/1.3.1.bcr.8/MODULE.bazel": "772c674bb78a0342b8caf32ab5c25085c493ca4ff08398208dcbe4375fe9f776",
|
||||
|
||||
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1492,6 +1492,7 @@ dependencies = [
|
||||
"codex-experimental-api-macros",
|
||||
"codex-git-utils",
|
||||
"codex-protocol",
|
||||
"codex-shell-command",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-cargo-bin",
|
||||
"inventory",
|
||||
@@ -1501,7 +1502,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"shlex",
|
||||
"similar",
|
||||
"strum_macros 0.28.0",
|
||||
"tempfile",
|
||||
|
||||
@@ -17,12 +17,12 @@ clap = { workspace = true, features = ["derive"] }
|
||||
codex-experimental-api-macros = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-shell-command = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
schemars = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
serde_with = { workspace = true }
|
||||
shlex = { workspace = true }
|
||||
strum_macros = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
rmcp = { workspace = true, default-features = false, features = [
|
||||
|
||||
@@ -15,6 +15,7 @@ pub use export::generate_ts_with_options;
|
||||
pub use export::generate_types;
|
||||
pub use jsonrpc_lite::*;
|
||||
pub use protocol::common::*;
|
||||
pub use protocol::item_builders::*;
|
||||
pub use protocol::thread_history::*;
|
||||
pub use protocol::v1::ApplyPatchApprovalParams;
|
||||
pub use protocol::v1::ApplyPatchApprovalResponse;
|
||||
|
||||
299
codex-rs/app-server-protocol/src/protocol/item_builders.rs
Normal file
299
codex-rs/app-server-protocol/src/protocol/item_builders.rs
Normal file
@@ -0,0 +1,299 @@
|
||||
//! Shared builders for synthetic [`ThreadItem`] values emitted by the app-server layer.
|
||||
//!
|
||||
//! These items do not come from first-class core `ItemStarted` / `ItemCompleted` events.
|
||||
//! Instead, the app-server synthesizes them so clients can render a coherent lifecycle for
|
||||
//! approvals and other pre-execution flows before the underlying tool has started or when the
|
||||
//! tool never starts at all.
|
||||
//!
|
||||
//! Keeping these builders in one place is useful for two reasons:
|
||||
//! - Live notifications and rebuilt `thread/read` history both need to construct the same
|
||||
//! synthetic items, so sharing the logic avoids drift between those paths.
|
||||
//! - The projection is presentation-specific. Core protocol events stay generic, while the
|
||||
//! app-server protocol decides how to surface those events as `ThreadItem`s for clients.
|
||||
use crate::protocol::common::ServerNotification;
|
||||
use crate::protocol::v2::CommandAction;
|
||||
use crate::protocol::v2::CommandExecutionSource;
|
||||
use crate::protocol::v2::CommandExecutionStatus;
|
||||
use crate::protocol::v2::FileUpdateChange;
|
||||
use crate::protocol::v2::GuardianApprovalReview;
|
||||
use crate::protocol::v2::GuardianApprovalReviewStatus;
|
||||
use crate::protocol::v2::ItemGuardianApprovalReviewCompletedNotification;
|
||||
use crate::protocol::v2::ItemGuardianApprovalReviewStartedNotification;
|
||||
use crate::protocol::v2::PatchApplyStatus;
|
||||
use crate::protocol::v2::PatchChangeKind;
|
||||
use crate::protocol::v2::ThreadItem;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::ExecCommandBeginEvent;
|
||||
use codex_protocol::protocol::ExecCommandEndEvent;
|
||||
use codex_protocol::protocol::FileChange;
|
||||
use codex_protocol::protocol::GuardianAssessmentAction;
|
||||
use codex_protocol::protocol::GuardianAssessmentEvent;
|
||||
use codex_protocol::protocol::PatchApplyBeginEvent;
|
||||
use codex_protocol::protocol::PatchApplyEndEvent;
|
||||
use codex_shell_command::parse_command::parse_command;
|
||||
use codex_shell_command::parse_command::shlex_join;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
pub fn build_file_change_approval_request_item(
|
||||
payload: &ApplyPatchApprovalRequestEvent,
|
||||
) -> ThreadItem {
|
||||
ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_file_change_begin_item(payload: &PatchApplyBeginEvent) -> ThreadItem {
|
||||
ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_file_change_end_item(payload: &PatchApplyEndEvent) -> ThreadItem {
|
||||
ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status: (&payload.status).into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_command_execution_approval_request_item(
|
||||
payload: &ExecApprovalRequestEvent,
|
||||
) -> ThreadItem {
|
||||
ThreadItem::CommandExecution {
|
||||
id: payload.call_id.clone(),
|
||||
command: shlex_join(&payload.command),
|
||||
cwd: payload.cwd.clone(),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
command_actions: payload
|
||||
.parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(CommandAction::from)
|
||||
.collect(),
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_command_execution_begin_item(payload: &ExecCommandBeginEvent) -> ThreadItem {
|
||||
ThreadItem::CommandExecution {
|
||||
id: payload.call_id.clone(),
|
||||
command: shlex_join(&payload.command),
|
||||
cwd: payload.cwd.clone(),
|
||||
process_id: payload.process_id.clone(),
|
||||
source: payload.source.into(),
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
command_actions: payload
|
||||
.parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(CommandAction::from)
|
||||
.collect(),
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_command_execution_end_item(payload: &ExecCommandEndEvent) -> ThreadItem {
|
||||
let aggregated_output = if payload.aggregated_output.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(payload.aggregated_output.clone())
|
||||
};
|
||||
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
|
||||
|
||||
ThreadItem::CommandExecution {
|
||||
id: payload.call_id.clone(),
|
||||
command: shlex_join(&payload.command),
|
||||
cwd: payload.cwd.clone(),
|
||||
process_id: payload.process_id.clone(),
|
||||
source: payload.source.into(),
|
||||
status: (&payload.status).into(),
|
||||
command_actions: payload
|
||||
.parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(CommandAction::from)
|
||||
.collect(),
|
||||
aggregated_output,
|
||||
exit_code: Some(payload.exit_code),
|
||||
duration_ms: Some(duration_ms),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a guardian-derived [`ThreadItem`].
|
||||
///
|
||||
/// Currently this only synthesizes [`ThreadItem::CommandExecution`] for
|
||||
/// [`GuardianAssessmentAction::Command`] and [`GuardianAssessmentAction::Execve`].
|
||||
pub fn build_item_from_guardian_event(
|
||||
assessment: &GuardianAssessmentEvent,
|
||||
status: CommandExecutionStatus,
|
||||
) -> Option<ThreadItem> {
|
||||
match &assessment.action {
|
||||
GuardianAssessmentAction::Command { command, cwd, .. } => {
|
||||
let command = command.clone();
|
||||
let command_actions = vec![CommandAction::Unknown {
|
||||
command: command.clone(),
|
||||
}];
|
||||
Some(ThreadItem::CommandExecution {
|
||||
id: assessment.id.clone(),
|
||||
command,
|
||||
cwd: cwd.clone(),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status,
|
||||
command_actions,
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
})
|
||||
}
|
||||
GuardianAssessmentAction::Execve {
|
||||
program, argv, cwd, ..
|
||||
} => {
|
||||
let argv = if argv.is_empty() {
|
||||
vec![program.clone()]
|
||||
} else {
|
||||
std::iter::once(program.clone())
|
||||
.chain(argv.iter().skip(1).cloned())
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
let command = shlex_join(&argv);
|
||||
let parsed_cmd = parse_command(&argv);
|
||||
let command_actions = if parsed_cmd.is_empty() {
|
||||
vec![CommandAction::Unknown {
|
||||
command: command.clone(),
|
||||
}]
|
||||
} else {
|
||||
parsed_cmd.into_iter().map(CommandAction::from).collect()
|
||||
};
|
||||
Some(ThreadItem::CommandExecution {
|
||||
id: assessment.id.clone(),
|
||||
command,
|
||||
cwd: cwd.clone(),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status,
|
||||
command_actions,
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
})
|
||||
}
|
||||
GuardianAssessmentAction::ApplyPatch { .. }
|
||||
| GuardianAssessmentAction::NetworkAccess { .. }
|
||||
| GuardianAssessmentAction::McpToolCall { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn guardian_auto_approval_review_notification(
|
||||
conversation_id: &ThreadId,
|
||||
event_turn_id: &str,
|
||||
assessment: &GuardianAssessmentEvent,
|
||||
) -> ServerNotification {
|
||||
// TODO(ccunningham): Attach guardian review state to the reviewed tool
|
||||
// item's lifecycle instead of sending standalone review notifications so
|
||||
// the app-server API can persist and replay review state via `thread/read`.
|
||||
let turn_id = if assessment.turn_id.is_empty() {
|
||||
event_turn_id.to_string()
|
||||
} else {
|
||||
assessment.turn_id.clone()
|
||||
};
|
||||
let review = GuardianApprovalReview {
|
||||
status: match assessment.status {
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::InProgress => {
|
||||
GuardianApprovalReviewStatus::InProgress
|
||||
}
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::Approved => {
|
||||
GuardianApprovalReviewStatus::Approved
|
||||
}
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::Denied => {
|
||||
GuardianApprovalReviewStatus::Denied
|
||||
}
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::Aborted => {
|
||||
GuardianApprovalReviewStatus::Aborted
|
||||
}
|
||||
},
|
||||
risk_score: assessment.risk_score,
|
||||
risk_level: assessment.risk_level.map(Into::into),
|
||||
rationale: assessment.rationale.clone(),
|
||||
};
|
||||
let action = assessment.action.clone().into();
|
||||
match assessment.status {
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::InProgress => {
|
||||
ServerNotification::ItemGuardianApprovalReviewStarted(
|
||||
ItemGuardianApprovalReviewStartedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id,
|
||||
target_item_id: assessment.id.clone(),
|
||||
review,
|
||||
action,
|
||||
},
|
||||
)
|
||||
}
|
||||
codex_protocol::protocol::GuardianAssessmentStatus::Approved
|
||||
| codex_protocol::protocol::GuardianAssessmentStatus::Denied
|
||||
| codex_protocol::protocol::GuardianAssessmentStatus::Aborted => {
|
||||
ServerNotification::ItemGuardianApprovalReviewCompleted(
|
||||
ItemGuardianApprovalReviewCompletedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
turn_id,
|
||||
target_item_id: assessment.id.clone(),
|
||||
review,
|
||||
action,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_patch_changes(changes: &HashMap<PathBuf, FileChange>) -> Vec<FileUpdateChange> {
|
||||
let mut converted: Vec<FileUpdateChange> = changes
|
||||
.iter()
|
||||
.map(|(path, change)| FileUpdateChange {
|
||||
path: path.to_string_lossy().into_owned(),
|
||||
kind: map_patch_change_kind(change),
|
||||
diff: format_file_change_diff(change),
|
||||
})
|
||||
.collect();
|
||||
converted.sort_by(|a, b| a.path.cmp(&b.path));
|
||||
converted
|
||||
}
|
||||
|
||||
fn map_patch_change_kind(change: &FileChange) -> PatchChangeKind {
|
||||
match change {
|
||||
FileChange::Add { .. } => PatchChangeKind::Add,
|
||||
FileChange::Delete { .. } => PatchChangeKind::Delete,
|
||||
FileChange::Update { move_path, .. } => PatchChangeKind::Update {
|
||||
move_path: move_path.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn format_file_change_diff(change: &FileChange) -> String {
|
||||
match change {
|
||||
FileChange::Add { content } => content.clone(),
|
||||
FileChange::Delete { content } => content.clone(),
|
||||
FileChange::Update {
|
||||
unified_diff,
|
||||
move_path,
|
||||
} => {
|
||||
if let Some(path) = move_path {
|
||||
format!("{unified_diff}\n\nMoved to: {}", path.display())
|
||||
} else {
|
||||
unified_diff.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@
|
||||
// Exposes protocol pieces used by `lib.rs` via `pub use protocol::common::*;`.
|
||||
|
||||
pub mod common;
|
||||
pub mod item_builders;
|
||||
mod mappers;
|
||||
mod serde_helpers;
|
||||
pub mod thread_history;
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
use crate::protocol::item_builders::build_command_execution_begin_item;
|
||||
use crate::protocol::item_builders::build_command_execution_end_item;
|
||||
use crate::protocol::item_builders::build_file_change_approval_request_item;
|
||||
use crate::protocol::item_builders::build_file_change_begin_item;
|
||||
use crate::protocol::item_builders::build_file_change_end_item;
|
||||
use crate::protocol::item_builders::build_item_from_guardian_event;
|
||||
use crate::protocol::v2::CollabAgentState;
|
||||
use crate::protocol::v2::CollabAgentTool;
|
||||
use crate::protocol::v2::CollabAgentToolCallStatus;
|
||||
use crate::protocol::v2::CommandAction;
|
||||
use crate::protocol::v2::CommandExecutionStatus;
|
||||
use crate::protocol::v2::DynamicToolCallOutputContentItem;
|
||||
use crate::protocol::v2::DynamicToolCallStatus;
|
||||
use crate::protocol::v2::FileUpdateChange;
|
||||
use crate::protocol::v2::McpToolCallError;
|
||||
use crate::protocol::v2::McpToolCallResult;
|
||||
use crate::protocol::v2::McpToolCallStatus;
|
||||
use crate::protocol::v2::PatchApplyStatus;
|
||||
use crate::protocol::v2::PatchChangeKind;
|
||||
use crate::protocol::v2::ThreadItem;
|
||||
use crate::protocol::v2::Turn;
|
||||
use crate::protocol::v2::TurnError as V2TurnError;
|
||||
@@ -31,6 +33,8 @@ use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecCommandBeginEvent;
|
||||
use codex_protocol::protocol::ExecCommandEndEvent;
|
||||
use codex_protocol::protocol::GuardianAssessmentEvent;
|
||||
use codex_protocol::protocol::GuardianAssessmentStatus;
|
||||
use codex_protocol::protocol::ImageGenerationBeginEvent;
|
||||
use codex_protocol::protocol::ImageGenerationEndEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
@@ -53,6 +57,14 @@ use std::collections::HashMap;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(test)]
|
||||
use crate::protocol::v2::CommandAction;
|
||||
#[cfg(test)]
|
||||
use crate::protocol::v2::FileUpdateChange;
|
||||
#[cfg(test)]
|
||||
use crate::protocol::v2::PatchApplyStatus;
|
||||
#[cfg(test)]
|
||||
use crate::protocol::v2::PatchChangeKind;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
|
||||
#[cfg(test)]
|
||||
@@ -149,6 +161,7 @@ impl ThreadHistoryBuilder {
|
||||
EventMsg::WebSearchEnd(payload) => self.handle_web_search_end(payload),
|
||||
EventMsg::ExecCommandBegin(payload) => self.handle_exec_command_begin(payload),
|
||||
EventMsg::ExecCommandEnd(payload) => self.handle_exec_command_end(payload),
|
||||
EventMsg::GuardianAssessment(payload) => self.handle_guardian_assessment(payload),
|
||||
EventMsg::ApplyPatchApprovalRequest(payload) => {
|
||||
self.handle_apply_patch_approval_request(payload)
|
||||
}
|
||||
@@ -375,57 +388,12 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn handle_exec_command_begin(&mut self, payload: &ExecCommandBeginEvent) {
|
||||
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
||||
.unwrap_or_else(|_| payload.command.join(" "));
|
||||
let command_actions = payload
|
||||
.parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(CommandAction::from)
|
||||
.collect();
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: payload.call_id.clone(),
|
||||
command,
|
||||
cwd: payload.cwd.clone(),
|
||||
process_id: payload.process_id.clone(),
|
||||
source: payload.source.into(),
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
command_actions,
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
};
|
||||
let item = build_command_execution_begin_item(payload);
|
||||
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
||||
}
|
||||
|
||||
fn handle_exec_command_end(&mut self, payload: &ExecCommandEndEvent) {
|
||||
let status: CommandExecutionStatus = (&payload.status).into();
|
||||
let duration_ms = i64::try_from(payload.duration.as_millis()).unwrap_or(i64::MAX);
|
||||
let aggregated_output = if payload.aggregated_output.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(payload.aggregated_output.clone())
|
||||
};
|
||||
let command = shlex::try_join(payload.command.iter().map(String::as_str))
|
||||
.unwrap_or_else(|_| payload.command.join(" "));
|
||||
let command_actions = payload
|
||||
.parsed_cmd
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(CommandAction::from)
|
||||
.collect();
|
||||
let item = ThreadItem::CommandExecution {
|
||||
id: payload.call_id.clone(),
|
||||
command,
|
||||
cwd: payload.cwd.clone(),
|
||||
process_id: payload.process_id.clone(),
|
||||
source: payload.source.into(),
|
||||
status,
|
||||
command_actions,
|
||||
aggregated_output,
|
||||
exit_code: Some(payload.exit_code),
|
||||
duration_ms: Some(duration_ms),
|
||||
};
|
||||
let item = build_command_execution_end_item(payload);
|
||||
// Command completions can arrive out of order. Unified exec may return
|
||||
// while a PTY is still running, then emit ExecCommandEnd later from a
|
||||
// background exit watcher when that process finally exits. By then, a
|
||||
@@ -434,12 +402,26 @@ impl ThreadHistoryBuilder {
|
||||
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
||||
}
|
||||
|
||||
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
|
||||
let item = ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
fn handle_guardian_assessment(&mut self, payload: &GuardianAssessmentEvent) {
|
||||
let status = match payload.status {
|
||||
GuardianAssessmentStatus::InProgress => CommandExecutionStatus::InProgress,
|
||||
GuardianAssessmentStatus::Denied | GuardianAssessmentStatus::Aborted => {
|
||||
CommandExecutionStatus::Declined
|
||||
}
|
||||
GuardianAssessmentStatus::Approved => return,
|
||||
};
|
||||
let Some(item) = build_item_from_guardian_event(payload, status) else {
|
||||
return;
|
||||
};
|
||||
if payload.turn_id.is_empty() {
|
||||
self.upsert_item_in_current_turn(item);
|
||||
} else {
|
||||
self.upsert_item_in_turn_id(&payload.turn_id, item);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_apply_patch_approval_request(&mut self, payload: &ApplyPatchApprovalRequestEvent) {
|
||||
let item = build_file_change_approval_request_item(payload);
|
||||
if payload.turn_id.is_empty() {
|
||||
self.upsert_item_in_current_turn(item);
|
||||
} else {
|
||||
@@ -448,11 +430,7 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn handle_patch_apply_begin(&mut self, payload: &PatchApplyBeginEvent) {
|
||||
let item = ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status: PatchApplyStatus::InProgress,
|
||||
};
|
||||
let item = build_file_change_begin_item(payload);
|
||||
if payload.turn_id.is_empty() {
|
||||
self.upsert_item_in_current_turn(item);
|
||||
} else {
|
||||
@@ -461,12 +439,7 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn handle_patch_apply_end(&mut self, payload: &PatchApplyEndEvent) {
|
||||
let status: PatchApplyStatus = (&payload.status).into();
|
||||
let item = ThreadItem::FileChange {
|
||||
id: payload.call_id.clone(),
|
||||
changes: convert_patch_changes(&payload.changes),
|
||||
status,
|
||||
};
|
||||
let item = build_file_change_end_item(payload);
|
||||
if payload.turn_id.is_empty() {
|
||||
self.upsert_item_in_current_turn(item);
|
||||
} else {
|
||||
@@ -1076,21 +1049,6 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn convert_patch_changes(
|
||||
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
|
||||
) -> Vec<FileUpdateChange> {
|
||||
let mut converted: Vec<FileUpdateChange> = changes
|
||||
.iter()
|
||||
.map(|(path, change)| FileUpdateChange {
|
||||
path: path.to_string_lossy().into_owned(),
|
||||
kind: map_patch_change_kind(change),
|
||||
diff: format_file_change_diff(change),
|
||||
})
|
||||
.collect();
|
||||
converted.sort_by(|a, b| a.path.cmp(&b.path));
|
||||
converted
|
||||
}
|
||||
|
||||
fn convert_dynamic_tool_content_items(
|
||||
items: &[codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem],
|
||||
) -> Vec<DynamicToolCallOutputContentItem> {
|
||||
@@ -1108,33 +1066,6 @@ fn convert_dynamic_tool_content_items(
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn map_patch_change_kind(change: &codex_protocol::protocol::FileChange) -> PatchChangeKind {
|
||||
match change {
|
||||
codex_protocol::protocol::FileChange::Add { .. } => PatchChangeKind::Add,
|
||||
codex_protocol::protocol::FileChange::Delete { .. } => PatchChangeKind::Delete,
|
||||
codex_protocol::protocol::FileChange::Update { move_path, .. } => PatchChangeKind::Update {
|
||||
move_path: move_path.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn format_file_change_diff(change: &codex_protocol::protocol::FileChange) -> String {
|
||||
match change {
|
||||
codex_protocol::protocol::FileChange::Add { content } => content.clone(),
|
||||
codex_protocol::protocol::FileChange::Delete { content } => content.clone(),
|
||||
codex_protocol::protocol::FileChange::Update {
|
||||
unified_diff,
|
||||
move_path,
|
||||
} => {
|
||||
if let Some(path) = move_path {
|
||||
format!("{unified_diff}\n\nMoved to: {}", path.display())
|
||||
} else {
|
||||
unified_diff.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn upsert_turn_item(items: &mut Vec<ThreadItem>, item: ThreadItem) {
|
||||
if let Some(existing_item) = items
|
||||
.iter_mut()
|
||||
@@ -2030,6 +1961,136 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_declined_guardian_command_item() {
|
||||
let events = vec![
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-1".into(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}),
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "review this command".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
|
||||
id: "guardian-exec".into(),
|
||||
turn_id: "turn-1".into(),
|
||||
status: GuardianAssessmentStatus::InProgress,
|
||||
risk_score: None,
|
||||
risk_level: None,
|
||||
rationale: None,
|
||||
action: serde_json::from_value(serde_json::json!({
|
||||
"type": "command",
|
||||
"source": "shell",
|
||||
"command": "rm -rf /tmp/guardian",
|
||||
"cwd": "/tmp",
|
||||
}))
|
||||
.expect("guardian action"),
|
||||
}),
|
||||
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
|
||||
id: "guardian-exec".into(),
|
||||
turn_id: "turn-1".into(),
|
||||
status: GuardianAssessmentStatus::Denied,
|
||||
risk_score: Some(97),
|
||||
risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High),
|
||||
rationale: Some("Would delete user data.".into()),
|
||||
action: serde_json::from_value(serde_json::json!({
|
||||
"type": "command",
|
||||
"source": "shell",
|
||||
"command": "rm -rf /tmp/guardian",
|
||||
"cwd": "/tmp",
|
||||
}))
|
||||
.expect("guardian action"),
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].items.len(), 2);
|
||||
assert_eq!(
|
||||
turns[0].items[1],
|
||||
ThreadItem::CommandExecution {
|
||||
id: "guardian-exec".into(),
|
||||
command: "rm -rf /tmp/guardian".into(),
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status: CommandExecutionStatus::Declined,
|
||||
command_actions: vec![CommandAction::Unknown {
|
||||
command: "rm -rf /tmp/guardian".into(),
|
||||
}],
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_in_progress_guardian_execve_item() {
|
||||
let events = vec![
|
||||
EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: "turn-1".into(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}),
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "run a subcommand".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::GuardianAssessment(GuardianAssessmentEvent {
|
||||
id: "guardian-execve".into(),
|
||||
turn_id: "turn-1".into(),
|
||||
status: GuardianAssessmentStatus::InProgress,
|
||||
risk_score: None,
|
||||
risk_level: None,
|
||||
rationale: None,
|
||||
action: serde_json::from_value(serde_json::json!({
|
||||
"type": "execve",
|
||||
"source": "shell",
|
||||
"program": "/bin/rm",
|
||||
"argv": ["/usr/bin/rm", "-f", "/tmp/file.sqlite"],
|
||||
"cwd": "/tmp",
|
||||
}))
|
||||
.expect("guardian action"),
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].items.len(), 2);
|
||||
assert_eq!(
|
||||
turns[0].items[1],
|
||||
ThreadItem::CommandExecution {
|
||||
id: "guardian-execve".into(),
|
||||
command: "/bin/rm -f /tmp/file.sqlite".into(),
|
||||
cwd: PathBuf::from("/tmp"),
|
||||
process_id: None,
|
||||
source: CommandExecutionSource::Agent,
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
command_actions: vec![CommandAction::Unknown {
|
||||
command: "/bin/rm -f /tmp/file.sqlite".into(),
|
||||
}],
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn assigns_late_exec_completion_to_original_turn() {
|
||||
let events = vec![
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -80,6 +80,7 @@ use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -379,6 +380,8 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
@@ -393,7 +396,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
log_db: None,
|
||||
config_warnings: args.config_warnings,
|
||||
session_source: args.session_source,
|
||||
enable_codex_api_key_env: args.enable_codex_api_key_env,
|
||||
auth_manager,
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
|
||||
@@ -396,11 +396,8 @@ pub async fn run_main_with_transport(
|
||||
}
|
||||
}
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
cloud_requirements_loader(
|
||||
auth_manager,
|
||||
config.chatgpt_base_url,
|
||||
@@ -611,6 +608,8 @@ pub async fn run_main_with_transport(
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let outbound_control_tx = outbound_control_tx;
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
|
||||
let loader_overrides = loader_overrides_for_config_api;
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
@@ -625,7 +624,7 @@ pub async fn run_main_with_transport(
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env: false,
|
||||
auth_manager,
|
||||
rpc_transport: analytics_rpc_transport(transport),
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
|
||||
@@ -193,7 +193,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
pub(crate) enable_codex_api_key_env: bool,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
}
|
||||
|
||||
@@ -213,17 +213,12 @@ impl MessageProcessor {
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env,
|
||||
auth_manager,
|
||||
rpc_transport,
|
||||
} = args;
|
||||
let auth_manager = AuthManager::shared_with_external_auth(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}),
|
||||
);
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
}));
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
@@ -235,7 +230,6 @@ impl MessageProcessor {
|
||||
},
|
||||
environment_manager,
|
||||
));
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
let analytics_events_client = AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
config.chatgpt_base_url.trim_end_matches('/').to_string(),
|
||||
|
||||
@@ -27,6 +27,7 @@ use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use opentelemetry::global;
|
||||
@@ -234,6 +235,8 @@ fn build_test_processor(
|
||||
) {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false);
|
||||
let processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing,
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
@@ -246,7 +249,7 @@ fn build_test_processor(
|
||||
log_db: None,
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::VSCode,
|
||||
enable_codex_api_key_env: false,
|
||||
auth_manager,
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
});
|
||||
(processor, outgoing_rx)
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::Weak;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
|
||||
type PendingInterruptQueue = Vec<(
|
||||
ConnectionRequestId,
|
||||
@@ -116,6 +117,38 @@ impl ThreadState {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resolve_server_request_on_thread_listener(
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
request_id: RequestId,
|
||||
) {
|
||||
let (completion_tx, completion_rx) = oneshot::channel();
|
||||
let listener_command_tx = {
|
||||
let state = thread_state.lock().await;
|
||||
state.listener_command_tx()
|
||||
};
|
||||
let Some(listener_command_tx) = listener_command_tx else {
|
||||
error!("failed to remove pending client request: thread listener is not running");
|
||||
return;
|
||||
};
|
||||
|
||||
if listener_command_tx
|
||||
.send(ThreadListenerCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
completion_tx,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
error!(
|
||||
"failed to remove pending client request: thread listener command channel is closed"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(err) = completion_rx.await {
|
||||
error!("failed to remove pending client request: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
struct ThreadEntry {
|
||||
state: Arc<Mutex<ThreadState>>,
|
||||
connection_ids: HashSet<ConnectionId>,
|
||||
|
||||
@@ -58,6 +58,7 @@ use codex_features::Features;
|
||||
use codex_features::FeaturesToml;
|
||||
use codex_git_utils::resolve_root_git_project_for_trust;
|
||||
use codex_login::AuthCredentialsStoreMode;
|
||||
use codex_login::AuthManagerConfig;
|
||||
use codex_mcp::mcp::McpConfig;
|
||||
use codex_model_provider_info::LEGACY_OLLAMA_CHAT_PROVIDER_ID;
|
||||
use codex_model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
@@ -593,6 +594,20 @@ pub struct Config {
|
||||
pub otel: codex_config::types::OtelConfig,
|
||||
}
|
||||
|
||||
impl AuthManagerConfig for Config {
|
||||
fn codex_home(&self) -> PathBuf {
|
||||
self.codex_home.clone()
|
||||
}
|
||||
|
||||
fn cli_auth_credentials_store_mode(&self) -> AuthCredentialsStoreMode {
|
||||
self.cli_auth_credentials_store_mode
|
||||
}
|
||||
|
||||
fn forced_chatgpt_workspace_id(&self) -> Option<String> {
|
||||
self.forced_chatgpt_workspace_id.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ConfigBuilder {
|
||||
codex_home: Option<PathBuf>,
|
||||
|
||||
@@ -143,7 +143,8 @@ pub(crate) async fn list_tool_suggest_discoverable_tools_with_auth(
|
||||
pub async fn list_cached_accessible_connectors_from_mcp_tools(
|
||||
config: &Config,
|
||||
) -> Option<Vec<AppInfo>> {
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ false);
|
||||
let auth = auth_manager.auth().await;
|
||||
if !config.features.apps_enabled_for_auth(auth.as_ref()) {
|
||||
return Some(Vec::new());
|
||||
@@ -182,7 +183,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
|
||||
config: &Config,
|
||||
force_refetch: bool,
|
||||
) -> anyhow::Result<AccessibleConnectorsStatus> {
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ false);
|
||||
let auth = auth_manager.auth().await;
|
||||
if !config.features.apps_enabled_for_auth(auth.as_ref()) {
|
||||
return Ok(AccessibleConnectorsStatus {
|
||||
@@ -417,7 +419,8 @@ async fn list_directory_connectors_for_tool_suggest_with_auth(
|
||||
let token_data = if let Some(auth) = auth {
|
||||
auth.get_token_data().ok()
|
||||
} else {
|
||||
let auth_manager = auth_manager_from_config(config);
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(config, /*enable_codex_api_key_env*/ false);
|
||||
auth_manager
|
||||
.auth()
|
||||
.await
|
||||
@@ -492,14 +495,6 @@ async fn chatgpt_get_request_with_token<T: DeserializeOwned>(
|
||||
}
|
||||
}
|
||||
|
||||
fn auth_manager_from_config(config: &Config) -> std::sync::Arc<AuthManager> {
|
||||
AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn connector_display_label(connector: &AppInfo) -> String {
|
||||
format_connector_label(&connector.name, &connector.id)
|
||||
}
|
||||
|
||||
@@ -26,12 +26,8 @@ pub async fn build_prompt_input(
|
||||
) -> CodexResult<Vec<ResponseItem>> {
|
||||
config.ephemeral = true;
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false);
|
||||
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
|
||||
@@ -1108,6 +1108,23 @@ pub struct AuthManager {
|
||||
external_auth: RwLock<Option<Arc<dyn ExternalAuth>>>,
|
||||
}
|
||||
|
||||
/// Configuration view required to construct a shared [`AuthManager`].
|
||||
///
|
||||
/// Implementations should return the auth-related config values for the
|
||||
/// already-resolved runtime configuration. The primary implementation is
|
||||
/// `codex_core::config::Config`, but this trait keeps `codex-login` independent
|
||||
/// from `codex-core`.
|
||||
pub trait AuthManagerConfig {
|
||||
/// Returns the Codex home directory used for auth storage.
|
||||
fn codex_home(&self) -> PathBuf;
|
||||
|
||||
/// Returns the CLI auth credential storage mode for auth loading.
|
||||
fn cli_auth_credentials_store_mode(&self) -> AuthCredentialsStoreMode;
|
||||
|
||||
/// Returns the workspace ID that ChatGPT auth should be restricted to, if any.
|
||||
fn forced_chatgpt_workspace_id(&self) -> Option<String>;
|
||||
}
|
||||
|
||||
impl Debug for AuthManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AuthManager")
|
||||
@@ -1404,19 +1421,18 @@ impl AuthManager {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn shared_with_external_auth(
|
||||
codex_home: PathBuf,
|
||||
/// Convenience constructor returning an `Arc` wrapper from resolved config.
|
||||
pub fn shared_from_config(
|
||||
config: &impl AuthManagerConfig,
|
||||
enable_codex_api_key_env: bool,
|
||||
auth_credentials_store_mode: AuthCredentialsStoreMode,
|
||||
external_auth: Arc<dyn ExternalAuth>,
|
||||
) -> Arc<Self> {
|
||||
let manager = Self::shared(
|
||||
codex_home,
|
||||
let auth_manager = Self::shared(
|
||||
config.codex_home(),
|
||||
enable_codex_api_key_env,
|
||||
auth_credentials_store_mode,
|
||||
config.cli_auth_credentials_store_mode(),
|
||||
);
|
||||
manager.set_external_auth(external_auth);
|
||||
manager
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id());
|
||||
auth_manager
|
||||
}
|
||||
|
||||
pub fn unauthorized_recovery(self: &Arc<Self>) -> UnauthorizedRecovery {
|
||||
|
||||
@@ -23,6 +23,7 @@ pub use auth::AuthConfig;
|
||||
pub use auth::AuthCredentialsStoreMode;
|
||||
pub use auth::AuthDotJson;
|
||||
pub use auth::AuthManager;
|
||||
pub use auth::AuthManagerConfig;
|
||||
pub use auth::CLIENT_ID;
|
||||
pub use auth::CODEX_API_KEY_ENV_VAR;
|
||||
pub use auth::CodexAuth;
|
||||
|
||||
@@ -56,10 +56,9 @@ impl MessageProcessor {
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
) -> Self {
|
||||
let outgoing = Arc::new(outgoing);
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
let auth_manager = AuthManager::shared_from_config(
|
||||
config.as_ref(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
|
||||
@@ -1,4 +1,29 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
use sqlx::migrate::Migrator;
|
||||
|
||||
pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations");
|
||||
|
||||
/// Allow an older Codex binary to open a database that has already been
|
||||
/// migrated by a newer binary running in parallel.
|
||||
///
|
||||
/// We intentionally ignore applied migration versions that are newer than the
|
||||
/// embedded migration set. Known migration versions are still validated by
|
||||
/// checksum, so this only relaxes the "database is ahead of me" case.
|
||||
fn runtime_migrator(base: &'static Migrator) -> Migrator {
|
||||
Migrator {
|
||||
migrations: Cow::Borrowed(base.migrations.as_ref()),
|
||||
ignore_missing: true,
|
||||
locking: base.locking,
|
||||
no_tx: base.no_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn runtime_state_migrator() -> Migrator {
|
||||
runtime_migrator(&STATE_MIGRATOR)
|
||||
}
|
||||
|
||||
pub(crate) fn runtime_logs_migrator() -> Migrator {
|
||||
runtime_migrator(&LOGS_MIGRATOR)
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ use crate::ThreadMetadata;
|
||||
use crate::ThreadMetadataBuilder;
|
||||
use crate::ThreadsPage;
|
||||
use crate::apply_rollout_item;
|
||||
use crate::migrations::LOGS_MIGRATOR;
|
||||
use crate::migrations::STATE_MIGRATOR;
|
||||
use crate::migrations::runtime_logs_migrator;
|
||||
use crate::migrations::runtime_state_migrator;
|
||||
use crate::model::AgentJobRow;
|
||||
use crate::model::ThreadRow;
|
||||
use crate::model::anchor_from_item;
|
||||
@@ -83,6 +83,8 @@ impl StateRuntime {
|
||||
/// rest of the state store.
|
||||
pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result<Arc<Self>> {
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
let state_migrator = runtime_state_migrator();
|
||||
let logs_migrator = runtime_logs_migrator();
|
||||
let current_state_name = state_db_filename();
|
||||
let current_logs_name = logs_db_filename();
|
||||
remove_legacy_db_files(
|
||||
@@ -101,14 +103,14 @@ impl StateRuntime {
|
||||
.await;
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let logs_path = logs_db_path(codex_home.as_path());
|
||||
let pool = match open_state_sqlite(&state_path, &STATE_MIGRATOR).await {
|
||||
let pool = match open_state_sqlite(&state_path, &state_migrator).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open state db at {}: {err}", state_path.display());
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let logs_pool = match open_logs_sqlite(&logs_path, &LOGS_MIGRATOR).await {
|
||||
let logs_pool = match open_logs_sqlite(&logs_path, &logs_migrator).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open logs db at {}: {err}", logs_path.display());
|
||||
@@ -146,7 +148,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
.log_statements(LevelFilter::Off)
|
||||
}
|
||||
|
||||
async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
|
||||
async fn open_state_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
@@ -172,7 +174,7 @@ async fn open_state_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
async fn open_logs_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
|
||||
async fn open_logs_sqlite(path: &Path, migrator: &Migrator) -> anyhow::Result<SqlitePool> {
|
||||
let options = base_sqlite_options(path).auto_vacuum(SqliteAutoVacuum::Incremental);
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(5)
|
||||
@@ -268,3 +270,74 @@ fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -
|
||||
};
|
||||
!version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::open_state_sqlite;
|
||||
use super::runtime_state_migrator;
|
||||
use super::state_db_path;
|
||||
use super::test_support::unique_temp_dir;
|
||||
use crate::migrations::STATE_MIGRATOR;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::migrate::MigrateError;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use std::path::Path;
|
||||
|
||||
async fn open_db_pool(path: &Path) -> SqlitePool {
|
||||
SqlitePool::connect_with(
|
||||
SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
.create_if_missing(false),
|
||||
)
|
||||
.await
|
||||
.expect("open sqlite pool")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn open_state_sqlite_tolerates_newer_applied_migrations() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex home");
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let pool = SqlitePool::connect_with(
|
||||
SqliteConnectOptions::new()
|
||||
.filename(&state_path)
|
||||
.create_if_missing(true),
|
||||
)
|
||||
.await
|
||||
.expect("open state db");
|
||||
STATE_MIGRATOR
|
||||
.run(&pool)
|
||||
.await
|
||||
.expect("apply current state schema");
|
||||
sqlx::query(
|
||||
"INSERT INTO _sqlx_migrations (version, description, success, checksum, execution_time) VALUES (?, ?, ?, ?, ?)",
|
||||
)
|
||||
.bind(9_999_i64)
|
||||
.bind("future migration")
|
||||
.bind(true)
|
||||
.bind(vec![1_u8, 2, 3, 4])
|
||||
.bind(1_i64)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.expect("insert future migration record");
|
||||
pool.close().await;
|
||||
|
||||
let strict_pool = open_db_pool(state_path.as_path()).await;
|
||||
let strict_err = STATE_MIGRATOR
|
||||
.run(&strict_pool)
|
||||
.await
|
||||
.expect_err("strict migrator should reject newer applied migrations");
|
||||
assert!(matches!(strict_err, MigrateError::VersionMissing(9_999)));
|
||||
strict_pool.close().await;
|
||||
|
||||
let tolerant_migrator = runtime_state_migrator();
|
||||
let tolerant_pool = open_state_sqlite(state_path.as_path(), &tolerant_migrator)
|
||||
.await
|
||||
.expect("runtime migrator should tolerate newer applied migrations");
|
||||
tolerant_pool.close().await;
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ exports_files([
|
||||
"aws-lc-sys_memcmp_check.patch",
|
||||
"aws-lc-sys_windows_msvc_prebuilt_nasm.patch",
|
||||
"aws-lc-sys_windows_msvc_memcmp_probe.patch",
|
||||
"bzip2_windows_stack_args.patch",
|
||||
"llvm_windows_symlink_extract.patch",
|
||||
"rules_rust_windows_bootstrap_process_wrapper_linker.patch",
|
||||
"rules_rust_windows_build_script_runner_paths.patch",
|
||||
@@ -20,5 +21,6 @@ exports_files([
|
||||
"v8_module_deps.patch",
|
||||
"v8_source_portability.patch",
|
||||
"windows-link.patch",
|
||||
"xz_windows_stack_args.patch",
|
||||
"zstd-sys_windows_msvc_include_dirs.patch",
|
||||
])
|
||||
|
||||
23
patches/bzip2_windows_stack_args.patch
Normal file
23
patches/bzip2_windows_stack_args.patch
Normal file
@@ -0,0 +1,23 @@
|
||||
diff --git a/BUILD.bazel b/BUILD.bazel
|
||||
--- a/BUILD.bazel
|
||||
+++ b/BUILD.bazel
|
||||
@@ -28,4 +28,11 @@ cc_library(
|
||||
defines = [
|
||||
"_FILE_OFFSET_BITS=64",
|
||||
],
|
||||
+ copts = select({
|
||||
+ "@platforms//os:windows": [
|
||||
+ "-fno-stack-protector",
|
||||
+ "-mno-stack-arg-probe",
|
||||
+ ],
|
||||
+ "//conditions:default": [],
|
||||
+ }),
|
||||
includes = ["."],
|
||||
diff --git a/MODULE.bazel b/MODULE.bazel
|
||||
--- a/MODULE.bazel
|
||||
+++ b/MODULE.bazel
|
||||
@@ -4,3 +4,4 @@ module(
|
||||
)
|
||||
|
||||
bazel_dep(name = "rules_cc", version = "0.0.10")
|
||||
+bazel_dep(name = "platforms", version = "1.0.0")
|
||||
14
patches/xz_windows_stack_args.patch
Normal file
14
patches/xz_windows_stack_args.patch
Normal file
@@ -0,0 +1,14 @@
|
||||
diff --git a/BUILD.bazel b/BUILD.bazel
|
||||
--- a/BUILD.bazel
|
||||
+++ b/BUILD.bazel
|
||||
@@ -154,6 +154,9 @@ cc_library(
|
||||
],
|
||||
copts = select({
|
||||
- "@platforms//os:windows": [],
|
||||
+ "@platforms//os:windows": [
|
||||
+ "-fno-stack-protector",
|
||||
+ "-mno-stack-arg-probe",
|
||||
+ ],
|
||||
"//conditions:default": ["-std=c99"],
|
||||
}),
|
||||
defines = select({
|
||||
Reference in New Issue
Block a user