This commit is contained in:
jif-oai
2025-10-30 13:25:17 +00:00
parent aa76003e28
commit 7c48a0b717
26 changed files with 775 additions and 599 deletions

12
codex-rs/Cargo.lock generated
View File

@@ -829,6 +829,17 @@ dependencies = [
"tracing",
]
[[package]]
name = "codex-api-client"
version = "0.0.0"
dependencies = [
"codex-protocol",
"futures",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "codex-app-server"
version = "0.0.0"
@@ -1059,6 +1070,7 @@ dependencies = [
"base64",
"bytes",
"chrono",
"codex-api-client",
"codex-app-server-protocol",
"codex-apply-patch",
"codex-async-utils",

View File

@@ -38,7 +38,7 @@ members = [
"utils/pty",
"utils/readiness",
"utils/string",
"utils/tokenizer",
"utils/tokenizer", "api-client",
]
resolver = "2"
@@ -87,6 +87,7 @@ codex-utils-pty = { path = "utils/pty" }
codex-utils-readiness = { path = "utils/readiness" }
codex-utils-string = { path = "utils/string" }
codex-utils-tokenizer = { path = "utils/tokenizer" }
codex-api-client = { path = "api-client" }
core_test_support = { path = "core/tests/common" }
mcp-types = { path = "mcp-types" }
mcp_test_support = { path = "mcp-server/tests/common" }

View File

@@ -0,0 +1,14 @@
[package]
name = "codex-api-client"
version.workspace = true
edition.workspace = true
[dependencies]
futures = "0.3.31"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.41", features = ["sync"] }
codex-protocol = { path = "../protocol" }
[lints]
workspace = true

View File

@@ -0,0 +1,8 @@
pub mod prompt;
pub mod stream;
pub use crate::prompt::Prompt;
pub use crate::stream::Reasoning;
pub use crate::stream::TextControls;
pub use crate::stream::TextFormat;
pub use crate::stream::TextFormatType;

View File

@@ -0,0 +1,31 @@
use codex_protocol::models::ResponseItem;
use serde::Serialize;
use serde_json::Value;
#[derive(Debug, Clone, Default, Serialize)]
pub struct Prompt {
#[serde(skip_serializing)]
pub input: Vec<ResponseItem>,
#[serde(skip_serializing)]
pub tools: Vec<Value>,
#[serde(skip_serializing)]
pub parallel_tool_calls: bool,
#[serde(skip_serializing)]
pub output_schema: Option<Value>,
}
impl Prompt {
pub fn new(
input: Vec<ResponseItem>,
tools: Vec<Value>,
parallel_tool_calls: bool,
output_schema: Option<Value>,
) -> Self {
Self {
input,
tools,
parallel_tool_calls,
output_schema,
}
}
}

View File

@@ -0,0 +1,35 @@
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use serde::Serialize;
use serde_json::Value;
#[derive(Debug, Serialize)]
pub struct Reasoning {
#[serde(skip_serializing_if = "Option::is_none")]
pub effort: Option<ReasoningEffortConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<ReasoningSummaryConfig>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TextFormatType {
#[default]
JsonSchema,
}
#[derive(Debug, Serialize, Default, Clone)]
pub struct TextFormat {
pub r#type: TextFormatType,
pub strict: bool,
pub schema: Value,
pub name: String,
}
#[derive(Debug, Serialize, Default, Clone)]
pub struct TextControls {
#[serde(skip_serializing_if = "Option::is_none")]
pub verbosity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub format: Option<TextFormat>,
}

View File

@@ -22,6 +22,7 @@ chrono = { workspace = true, features = ["serde"] }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }
codex-api-client = { workspace = true }
codex-file-search = { workspace = true }
codex-git = { workspace = true }
codex-keyring-store = { workspace = true }

View File

@@ -1,7 +1,7 @@
use std::time::Duration;
use crate::ModelProviderInfo;
use crate::client_common::Prompt;
use crate::client::StreamPayload;
use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::default_client::CodexHttpClient;
@@ -38,14 +38,14 @@ use tracing::trace;
/// Implementation for the classic Chat Completions API.
pub(crate) async fn stream_chat_completions(
prompt: &Prompt,
payload: &StreamPayload,
model_family: &ModelFamily,
client: &CodexHttpClient,
provider: &ModelProviderInfo,
otel_event_manager: &OtelEventManager,
session_source: &SessionSource,
) -> Result<ResponseStream> {
if prompt.output_schema.is_some() {
if payload.prompt.output_schema.is_some() {
return Err(CodexErr::UnsupportedOperation(
"output_schema is not supported for Chat Completions API".to_string(),
));
@@ -54,10 +54,9 @@ pub(crate) async fn stream_chat_completions(
// Build messages array
let mut messages = Vec::<serde_json::Value>::new();
let full_instructions = prompt.get_full_instructions(model_family);
messages.push(json!({"role": "system", "content": full_instructions}));
messages.push(json!({"role": "system", "content": payload.instructions}));
let input = prompt.get_formatted_input();
let input = payload.prompt.input.clone();
// Pre-scan: map Reasoning blocks to the adjacent assistant anchor after the last user.
// - If the last emitted message is a user message, drop all reasoning.
@@ -327,7 +326,7 @@ pub(crate) async fn stream_chat_completions(
}
}
let tools_json = create_tools_json_for_chat_completions_api(&prompt.tools)?;
let tools_json = create_tools_json_for_chat_completions_api(&payload.prompt.tools)?;
let payload = json!({
"model": model_family.slug,
"messages": messages,

View File

@@ -36,7 +36,6 @@ use crate::chat_completions::stream_chat_completions;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::client_common::ResponsesApiRequest;
use crate::client_common::create_reasoning_param_for_request;
use crate::client_common::create_text_param_for_request;
use crate::config::Config;
@@ -49,6 +48,7 @@ use crate::error::Result;
use crate::error::RetryLimitReachedError;
use crate::error::UnexpectedResponseError;
use crate::error::UsageLimitReachedError;
use crate::features::Feature;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_family::ModelFamily;
use crate::model_provider_info::ModelProviderInfo;
@@ -58,7 +58,6 @@ use crate::protocol::RateLimitSnapshot;
use crate::protocol::RateLimitWindow;
use crate::protocol::TokenUsage;
use crate::token_data::PlanType;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::backoff;
#[derive(Debug, Deserialize)]
@@ -90,6 +89,14 @@ pub struct ModelClient {
session_source: SessionSource,
}
#[derive(Debug, Clone)]
pub struct StreamPayload {
pub prompt: Prompt,
pub instructions: String,
pub store_response: bool,
pub previous_response_id: Option<String>,
}
#[allow(clippy::too_many_arguments)]
impl ModelClient {
pub fn new(
@@ -139,13 +146,18 @@ impl ModelClient {
&self.provider
}
pub async fn stream(&self, prompt: &Prompt) -> Result<ResponseStream> {
pub fn supports_responses_api_chaining(&self) -> bool {
self.provider.wire_api == WireApi::Responses
&& self.config.features.enabled(Feature::ResponsesApiChaining)
}
pub async fn stream(&self, payload: &StreamPayload) -> Result<ResponseStream> {
match self.provider.wire_api {
WireApi::Responses => self.stream_responses(prompt).await,
WireApi::Responses => self.stream_responses(payload).await,
WireApi::Chat => {
// Create the raw streaming connection first.
let response_stream = stream_chat_completions(
prompt,
payload,
&self.config.model_family,
&self.client,
&self.provider,
@@ -182,8 +194,22 @@ impl ModelClient {
}
}
pub async fn stream_for_test(&self, mut prompt: Prompt) -> Result<ResponseStream> {
crate::conversation_history::format_prompt_items(&mut prompt.input, false);
let instructions =
crate::client_common::compute_full_instructions(None, &self.config.model_family, false)
.into_owned();
let payload = StreamPayload {
prompt,
instructions,
store_response: false,
previous_response_id: None,
};
self.stream(&payload).await
}
/// Implementation for the OpenAI *Responses* experimental API.
async fn stream_responses(&self, prompt: &Prompt) -> Result<ResponseStream> {
async fn stream_responses(&self, payload: &StreamPayload) -> Result<ResponseStream> {
if let Some(path) = &*CODEX_RS_SSE_FIXTURE {
// short circuit for tests
warn!(path, "Streaming from fixture");
@@ -197,8 +223,8 @@ impl ModelClient {
let auth_manager = self.auth_manager.clone();
let full_instructions = prompt.get_full_instructions(&self.config.model_family);
let tools_json = create_tools_json_for_responses_api(&prompt.tools)?;
let prompt = &payload.prompt;
let tools_json = prompt.tools.clone();
let reasoning = create_reasoning_param_for_request(
&self.config.model_family,
self.effort,
@@ -211,7 +237,7 @@ impl ModelClient {
vec![]
};
let input_with_instructions = prompt.get_formatted_input();
let input_with_instructions = prompt.input.clone();
let verbosity = if self.config.model_family.support_verbosity {
self.config.model_verbosity
@@ -235,24 +261,49 @@ impl ModelClient {
// For Azure, we send `store: true` and preserve reasoning item IDs.
let azure_workaround = self.provider.is_azure_responses_endpoint();
let payload = ResponsesApiRequest {
model: &self.config.model,
instructions: &full_instructions,
input: &input_with_instructions,
tools: &tools_json,
tool_choice: "auto",
parallel_tool_calls: prompt.parallel_tool_calls,
reasoning,
store: azure_workaround,
stream: true,
include,
prompt_cache_key: Some(self.conversation_id.to_string()),
text,
};
let mut payload_json = serde_json::json!({
"model": self.config.model,
"instructions": payload.instructions,
"input": input_with_instructions,
"tools": tools_json,
"tool_choice": "auto",
"parallel_tool_calls": prompt.parallel_tool_calls,
"store": azure_workaround || payload.store_response,
"stream": true,
"include": include,
"prompt_cache_key": self.conversation_id.to_string(),
});
if let Some(reasoning) = reasoning {
payload_json
.as_object_mut()
.expect("payload object")
.insert("reasoning".to_string(), serde_json::to_value(reasoning)?);
}
if let Some(text) = text {
payload_json
.as_object_mut()
.expect("payload object")
.insert("text".to_string(), serde_json::to_value(text)?);
}
if let Some(previous) = payload.previous_response_id.as_ref() {
payload_json
.as_object_mut()
.expect("payload object")
.insert(
"previous_response_id".to_string(),
serde_json::Value::String(previous.clone()),
);
}
let mut payload_json = serde_json::to_value(&payload)?;
if azure_workaround {
attach_item_ids(&mut payload_json, &input_with_instructions);
if let Some(input_value) = payload_json.get_mut("input")
&& let Some(array) = input_value.as_array_mut()
{
attach_item_ids_array(array, &prompt.input);
}
}
let max_attempts = self.provider.request_max_retries();
@@ -588,14 +639,7 @@ struct ResponseCompletedOutputTokensDetails {
reasoning_tokens: i64,
}
fn attach_item_ids(payload_json: &mut Value, original_items: &[ResponseItem]) {
let Some(input_value) = payload_json.get_mut("input") else {
return;
};
let serde_json::Value::Array(items) = input_value else {
return;
};
fn attach_item_ids_array(items: &mut [Value], original_items: &[ResponseItem]) {
for (value, item) in items.iter_mut().zip(original_items.iter()) {
if let ResponseItem::Reasoning { id, .. }
| ResponseItem::Message { id: Some(id), .. }

View File

@@ -1,196 +1,88 @@
use crate::client_common::tools::ToolSpec;
use crate::error::Result;
use crate::model_family::ModelFamily;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::Verbosity as VerbosityConfig;
use codex_protocol::models::ResponseItem;
use futures::Stream;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashSet;
use std::ops::Deref;
use futures::Stream;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio::sync::mpsc;
use crate::error::Result;
pub use codex_api_client::Prompt;
pub use codex_api_client::Reasoning;
pub use codex_api_client::TextControls;
pub use codex_api_client::TextFormat;
pub use codex_api_client::TextFormatType;
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
use codex_protocol::config_types::Verbosity as VerbosityConfig;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::TokenUsage;
use serde_json::Value;
use crate::model_family::ModelFamily;
/// Review thread system prompt. Edit `core/src/review_prompt.md` to customize.
pub const REVIEW_PROMPT: &str = include_str!("../review_prompt.md");
// Centralized templates for review-related user messages
pub const REVIEW_EXIT_SUCCESS_TMPL: &str = include_str!("../templates/review/exit_success.xml");
pub const REVIEW_EXIT_INTERRUPTED_TMPL: &str =
include_str!("../templates/review/exit_interrupted.xml");
/// API request payload for a single model turn
#[derive(Default, Debug, Clone)]
pub struct Prompt {
/// Conversation context input items.
pub input: Vec<ResponseItem>,
/// Tools available to the model, including additional tools sourced from
/// external MCP servers.
pub(crate) tools: Vec<ToolSpec>,
/// Whether parallel tool calls are permitted for this prompt.
pub(crate) parallel_tool_calls: bool,
/// Optional override for the built-in BASE_INSTRUCTIONS.
pub base_instructions_override: Option<String>,
/// Optional the output schema for the model's response.
pub output_schema: Option<Value>,
}
impl Prompt {
pub(crate) fn get_full_instructions<'a>(&'a self, model: &'a ModelFamily) -> Cow<'a, str> {
let base = self
.base_instructions_override
.as_deref()
.unwrap_or(model.base_instructions.deref());
// When there are no custom instructions, add apply_patch_tool_instructions if:
// - the model needs special instructions (4.1)
// AND
// - there is no apply_patch tool present
let is_apply_patch_tool_present = self.tools.iter().any(|tool| match tool {
ToolSpec::Function(f) => f.name == "apply_patch",
ToolSpec::Freeform(f) => f.name == "apply_patch",
_ => false,
});
if self.base_instructions_override.is_none()
&& model.needs_special_apply_patch_instructions
&& !is_apply_patch_tool_present
{
Cow::Owned(format!("{base}\n{APPLY_PATCH_TOOL_INSTRUCTIONS}"))
} else {
Cow::Borrowed(base)
}
}
pub(crate) fn get_formatted_input(&self) -> Vec<ResponseItem> {
let mut input = self.input.clone();
// when using the *Freeform* apply_patch tool specifically, tool outputs
// should be structured text, not json. Do NOT reserialize when using
// the Function tool - note that this differs from the check above for
// instructions. We declare the result as a named variable for clarity.
let is_freeform_apply_patch_tool_present = self.tools.iter().any(|tool| match tool {
ToolSpec::Freeform(f) => f.name == "apply_patch",
_ => false,
});
if is_freeform_apply_patch_tool_present {
reserialize_shell_outputs(&mut input);
}
input
pub fn compute_full_instructions<'a>(
base_override: Option<&'a str>,
model: &'a ModelFamily,
is_apply_patch_present: bool,
) -> Cow<'a, str> {
let base = base_override.unwrap_or(model.base_instructions.deref());
if base_override.is_none()
&& model.needs_special_apply_patch_instructions
&& !is_apply_patch_present
{
Cow::Owned(format!("{base}\n{APPLY_PATCH_TOOL_INSTRUCTIONS}"))
} else {
Cow::Borrowed(base)
}
}
fn reserialize_shell_outputs(items: &mut [ResponseItem]) {
let mut shell_call_ids: HashSet<String> = HashSet::new();
pub fn create_reasoning_param_for_request(
model_family: &ModelFamily,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
) -> Option<Reasoning> {
if !model_family.supports_reasoning_summaries {
return None;
}
items.iter_mut().for_each(|item| match item {
ResponseItem::LocalShellCall { call_id, id, .. } => {
if let Some(identifier) = call_id.clone().or_else(|| id.clone()) {
shell_call_ids.insert(identifier);
}
}
ResponseItem::CustomToolCall {
id: _,
status: _,
call_id,
name,
input: _,
} => {
if name == "apply_patch" {
shell_call_ids.insert(call_id.clone());
}
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
if shell_call_ids.remove(call_id)
&& let Some(structured) = parse_structured_shell_output(output)
{
*output = structured
}
}
ResponseItem::FunctionCall { name, call_id, .. }
if is_shell_tool_name(name) || name == "apply_patch" =>
{
shell_call_ids.insert(call_id.clone());
}
ResponseItem::FunctionCallOutput { call_id, output } => {
if shell_call_ids.remove(call_id)
&& let Some(structured) = parse_structured_shell_output(&output.content)
{
output.content = structured
}
}
_ => {}
Some(Reasoning {
effort,
summary: Some(summary),
})
}
fn is_shell_tool_name(name: &str) -> bool {
matches!(name, "shell" | "container.exec")
}
#[derive(Deserialize)]
struct ExecOutputJson {
output: String,
metadata: ExecOutputMetadataJson,
}
#[derive(Deserialize)]
struct ExecOutputMetadataJson {
exit_code: i32,
duration_seconds: f32,
}
fn parse_structured_shell_output(raw: &str) -> Option<String> {
let parsed: ExecOutputJson = serde_json::from_str(raw).ok()?;
Some(build_structured_output(&parsed))
}
fn build_structured_output(parsed: &ExecOutputJson) -> String {
let mut sections = Vec::new();
sections.push(format!("Exit code: {}", parsed.metadata.exit_code));
sections.push(format!(
"Wall time: {} seconds",
parsed.metadata.duration_seconds
));
let mut output = parsed.output.clone();
if let Some(total_lines) = extract_total_output_lines(&parsed.output) {
sections.push(format!("Total output lines: {total_lines}"));
if let Some(stripped) = strip_total_output_header(&output) {
output = stripped.to_string();
}
pub fn create_text_param_for_request(
verbosity: Option<VerbosityConfig>,
output_schema: &Option<Value>,
) -> Option<TextControls> {
if verbosity.is_none() && output_schema.is_none() {
return None;
}
sections.push("Output:".to_string());
sections.push(output);
sections.join("\n")
}
fn extract_total_output_lines(output: &str) -> Option<u32> {
let marker_start = output.find("[... omitted ")?;
let marker = &output[marker_start..];
let (_, after_of) = marker.split_once(" of ")?;
let (total_segment, _) = after_of.split_once(' ')?;
total_segment.parse::<u32>().ok()
}
fn strip_total_output_header(output: &str) -> Option<&str> {
let after_prefix = output.strip_prefix("Total output lines: ")?;
let (_, remainder) = after_prefix.split_once('\n')?;
let remainder = remainder.strip_prefix('\n').unwrap_or(remainder);
Some(remainder)
Some(TextControls {
verbosity: verbosity.map(|v| match v {
VerbosityConfig::Low => "low".to_string(),
VerbosityConfig::Medium => "medium".to_string(),
VerbosityConfig::High => "high".to_string(),
}),
format: output_schema.as_ref().map(|schema| TextFormat {
r#type: TextFormatType::JsonSchema,
strict: true,
schema: schema.clone(),
name: "codex_output_schema".to_string(),
}),
})
}
#[derive(Debug)]
@@ -209,173 +101,6 @@ pub enum ResponseEvent {
RateLimits(RateLimitSnapshot),
}
#[derive(Debug, Serialize)]
pub(crate) struct Reasoning {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) effort: Option<ReasoningEffortConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) summary: Option<ReasoningSummaryConfig>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "snake_case")]
pub(crate) enum TextFormatType {
#[default]
JsonSchema,
}
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextFormat {
pub(crate) r#type: TextFormatType,
pub(crate) strict: bool,
pub(crate) schema: Value,
pub(crate) name: String,
}
/// Controls under the `text` field in the Responses API for GPT-5.
#[derive(Debug, Serialize, Default, Clone)]
pub(crate) struct TextControls {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) verbosity: Option<OpenAiVerbosity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) format: Option<TextFormat>,
}
#[derive(Debug, Serialize, Default, Clone)]
#[serde(rename_all = "lowercase")]
pub(crate) enum OpenAiVerbosity {
Low,
#[default]
Medium,
High,
}
impl From<VerbosityConfig> for OpenAiVerbosity {
fn from(v: VerbosityConfig) -> Self {
match v {
VerbosityConfig::Low => OpenAiVerbosity::Low,
VerbosityConfig::Medium => OpenAiVerbosity::Medium,
VerbosityConfig::High => OpenAiVerbosity::High,
}
}
}
/// Request object that is serialized as JSON and POST'ed when using the
/// Responses API.
#[derive(Debug, Serialize)]
pub(crate) struct ResponsesApiRequest<'a> {
pub(crate) model: &'a str,
pub(crate) instructions: &'a str,
// TODO(mbolin): ResponseItem::Other should not be serialized. Currently,
// we code defensively to avoid this case, but perhaps we should use a
// separate enum for serialization.
pub(crate) input: &'a Vec<ResponseItem>,
pub(crate) tools: &'a [serde_json::Value],
pub(crate) tool_choice: &'static str,
pub(crate) parallel_tool_calls: bool,
pub(crate) reasoning: Option<Reasoning>,
pub(crate) store: bool,
pub(crate) stream: bool,
pub(crate) include: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) prompt_cache_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) text: Option<TextControls>,
}
pub(crate) mod tools {
use crate::tools::spec::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
/// When serialized as JSON, this produces a valid "Tool" in the OpenAI
/// Responses API.
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(tag = "type")]
pub(crate) enum ToolSpec {
#[serde(rename = "function")]
Function(ResponsesApiTool),
#[serde(rename = "local_shell")]
LocalShell {},
// TODO: Understand why we get an error on web_search although the API docs say it's supported.
// https://platform.openai.com/docs/guides/tools-web-search?api-mode=responses#:~:text=%7B%20type%3A%20%22web_search%22%20%7D%2C
#[serde(rename = "web_search")]
WebSearch {},
#[serde(rename = "custom")]
Freeform(FreeformTool),
}
impl ToolSpec {
pub(crate) fn name(&self) -> &str {
match self {
ToolSpec::Function(tool) => tool.name.as_str(),
ToolSpec::LocalShell {} => "local_shell",
ToolSpec::WebSearch {} => "web_search",
ToolSpec::Freeform(tool) => tool.name.as_str(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformTool {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) format: FreeformToolFormat,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformToolFormat {
pub(crate) r#type: String,
pub(crate) syntax: String,
pub(crate) definition: String,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct ResponsesApiTool {
pub(crate) name: String,
pub(crate) description: String,
/// TODO: Validation. When strict is set to true, the JSON schema,
/// `required` and `additional_properties` must be present. All fields in
/// `properties` must be present in `required`.
pub(crate) strict: bool,
pub(crate) parameters: JsonSchema,
}
}
pub(crate) fn create_reasoning_param_for_request(
model_family: &ModelFamily,
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
) -> Option<Reasoning> {
if !model_family.supports_reasoning_summaries {
return None;
}
Some(Reasoning {
effort,
summary: Some(summary),
})
}
pub(crate) fn create_text_param_for_request(
verbosity: Option<VerbosityConfig>,
output_schema: &Option<Value>,
) -> Option<TextControls> {
if verbosity.is_none() && output_schema.is_none() {
return None;
}
Some(TextControls {
verbosity: verbosity.map(std::convert::Into::into),
format: output_schema.as_ref().map(|schema| TextFormat {
r#type: TextFormatType::JsonSchema,
strict: true,
schema: schema.clone(),
name: "codex_output_schema".to_string(),
}),
})
}
pub struct ResponseStream {
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
}
@@ -390,164 +115,48 @@ impl Stream for ResponseStream {
#[cfg(test)]
mod tests {
use crate::model_family::find_family_for_model;
use pretty_assertions::assert_eq;
use super::*;
struct InstructionsTestCase {
pub slug: &'static str,
pub expects_apply_patch_instructions: bool,
}
#[test]
fn get_full_instructions_no_user_content() {
let prompt = Prompt {
..Default::default()
};
let test_cases = vec![
InstructionsTestCase {
slug: "gpt-3.5",
expects_apply_patch_instructions: true,
},
InstructionsTestCase {
slug: "gpt-4.1",
expects_apply_patch_instructions: true,
},
InstructionsTestCase {
slug: "gpt-4o",
expects_apply_patch_instructions: true,
},
InstructionsTestCase {
slug: "gpt-5",
expects_apply_patch_instructions: true,
},
InstructionsTestCase {
slug: "codex-mini-latest",
expects_apply_patch_instructions: true,
},
InstructionsTestCase {
slug: "gpt-oss:120b",
expects_apply_patch_instructions: false,
},
InstructionsTestCase {
slug: "gpt-5-codex",
expects_apply_patch_instructions: false,
},
];
for test_case in test_cases {
let model_family = find_family_for_model(test_case.slug).expect("known model slug");
let expected = if test_case.expects_apply_patch_instructions {
format!(
"{}\n{}",
model_family.clone().base_instructions,
APPLY_PATCH_TOOL_INSTRUCTIONS
)
} else {
model_family.clone().base_instructions
};
let full = prompt.get_full_instructions(&model_family);
assert_eq!(full, expected);
}
}
use crate::model_family::find_family_for_model;
#[test]
fn serializes_text_verbosity_when_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(TextControls {
verbosity: Some(OpenAiVerbosity::Low),
format: None,
}),
};
fn compute_full_instructions_respects_apply_patch_flag() {
let model = find_family_for_model("gpt-4.1").expect("model");
let with_tool = compute_full_instructions(None, &model, true);
assert_eq!(with_tool.as_ref(), model.base_instructions.deref());
let v = serde_json::to_value(&req).expect("json");
assert_eq!(
v.get("text")
.and_then(|t| t.get("verbosity"))
.and_then(|s| s.as_str()),
Some("low")
let without_tool = compute_full_instructions(None, &model, false);
assert!(
without_tool
.as_ref()
.ends_with(APPLY_PATCH_TOOL_INSTRUCTIONS)
);
}
#[test]
fn serializes_text_schema_with_strict_format() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
fn create_text_controls_includes_verbosity() {
let controls = create_text_param_for_request(Some(VerbosityConfig::Low), &None)
.expect("text controls");
assert_eq!(controls.verbosity.as_deref(), Some("low"));
assert!(controls.format.is_none());
}
#[test]
fn create_text_controls_includes_schema() {
let schema = serde_json::json!({
"type": "object",
"properties": {
"answer": {"type": "string"}
},
"properties": {"answer": {"type": "string"}},
"required": ["answer"],
});
let text_controls =
let controls =
create_text_param_for_request(None, &Some(schema.clone())).expect("text controls");
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: Some(text_controls),
};
let v = serde_json::to_value(&req).expect("json");
let text = v.get("text").expect("text field");
assert!(text.get("verbosity").is_none());
let format = text.get("format").expect("format field");
assert_eq!(
format.get("name"),
Some(&serde_json::Value::String("codex_output_schema".into()))
);
assert_eq!(
format.get("type"),
Some(&serde_json::Value::String("json_schema".into()))
);
assert_eq!(format.get("strict"), Some(&serde_json::Value::Bool(true)));
assert_eq!(format.get("schema"), Some(&schema));
let format = controls.format.expect("format");
assert_eq!(format.name, "codex_output_schema");
assert!(format.strict);
assert_eq!(format.schema, schema);
}
#[test]
fn omits_text_when_not_set() {
let input: Vec<ResponseItem> = vec![];
let tools: Vec<serde_json::Value> = vec![];
let req = ResponsesApiRequest {
model: "gpt-5",
instructions: "i",
input: &input,
tools: &tools,
tool_choice: "auto",
parallel_tool_calls: true,
reasoning: None,
store: false,
stream: true,
include: vec![],
prompt_cache_key: None,
text: None,
};
let v = serde_json::to_value(&req).expect("json");
assert!(v.get("text").is_none());
fn create_text_controls_none_when_no_options() {
assert!(create_text_param_for_request(None, &None).is_none());
}
}

View File

@@ -53,6 +53,7 @@ use tracing::warn;
use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client::StreamPayload;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
@@ -66,6 +67,7 @@ use crate::error::Result as CodexResult;
use crate::exec::StreamOutput;
// Removed: legacy executor wiring replaced by ToolOrchestrator flows.
// legacy normalize_exec_result no longer used after orchestrator migration
use crate::conversation_history::ResponsesApiChainState;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::model_family::find_family_for_model;
@@ -921,6 +923,39 @@ impl Session {
self.send_raw_response_items(turn_context, items).await;
}
async fn prepare_prompt_items(
&self,
turn_context: &TurnContext,
) -> (Vec<ResponseItem>, Vec<ResponseItem>, Option<String>, bool) {
let use_chain = turn_context.client.supports_responses_api_chaining();
let mut history = self.clone_history().await;
let full_prompt_items = history.get_history_for_prompt();
if !use_chain {
let mut state = self.state.lock().await;
state.reset_responses_api_chain();
return (full_prompt_items.clone(), full_prompt_items, None, false);
}
let mut state = self.state.lock().await;
let mut previous_response_id = None;
let mut request_items = full_prompt_items.clone();
if let Some(chain) = state.responses_api_chain() {
if let Some(prev_id) = chain.last_response_id {
let prefix = common_prefix_len(&chain.last_prompt_items, &full_prompt_items);
if prefix == 0 && !chain.last_prompt_items.is_empty() {
state.reset_responses_api_chain();
} else {
previous_response_id = Some(prev_id);
request_items = full_prompt_items[prefix..].to_vec();
}
}
}
(request_items, full_prompt_items, previous_response_id, true)
}
fn reconstruct_history_from_rollout(
&self,
turn_context: &TurnContext,
@@ -959,6 +994,29 @@ impl Session {
state.replace_history(items);
}
async fn update_responses_api_chain_state(
&self,
chaining_intent: bool,
response_id: Option<String>,
prompt_items: Vec<ResponseItem>,
) {
let mut state = self.state.lock().await;
if !chaining_intent {
state.reset_responses_api_chain();
return;
}
let Some(response_id) = response_id.filter(|id| !id.is_empty()) else {
state.reset_responses_api_chain();
return;
};
state.set_responses_api_chain(ResponsesApiChainState {
last_response_id: Some(response_id),
last_prompt_items: prompt_items,
});
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
let rollout_items: Vec<RolloutItem> = items
.iter()
@@ -1737,30 +1795,34 @@ pub(crate) async fn run_task(
.collect::<Vec<ResponseItem>>();
// Construct the input that we will send to the model.
let turn_input: Vec<ResponseItem> = {
sess.record_conversation_items(&turn_context, &pending_input)
.await;
sess.clone_history().await.get_history_for_prompt()
};
sess.record_conversation_items(&turn_context, &pending_input)
.await;
let (request_items, full_prompt_items, previous_response_id, store_response) =
sess.prepare_prompt_items(&turn_context).await;
let turn_input_messages: Vec<String> = turn_input
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => Some(content),
_ => None,
})
.flat_map(|content| {
content.iter().filter_map(|item| match item {
ContentItem::OutputText { text } => Some(text.clone()),
let turn_input_messages: Vec<String> = {
full_prompt_items
.iter()
.filter_map(|item| match item {
ResponseItem::Message { content, .. } => Some(content),
_ => None,
})
})
.collect();
.flat_map(|content| {
content.iter().filter_map(|item| match item {
ContentItem::OutputText { text } => Some(text.clone()),
_ => None,
})
})
.collect()
};
match run_turn(
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
turn_input,
request_items,
full_prompt_items,
previous_response_id,
store_response,
cancellation_token.child_token(),
)
.await
@@ -1841,12 +1903,21 @@ pub(crate) async fn run_task(
last_agent_message
}
fn common_prefix_len(lhs: &[ResponseItem], rhs: &[ResponseItem]) -> usize {
lhs.iter()
.zip(rhs.iter())
.take_while(|(l, r)| l == r)
.count()
}
async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
input: Vec<ResponseItem>,
mut request_items: Vec<ResponseItem>,
mut full_prompt_items: Vec<ResponseItem>,
previous_response_id: Option<String>,
store_response: bool,
cancellation_token: CancellationToken,
) -> CodexResult<TurnRunResult> {
let mcp_tools = sess.services.mcp_connection_manager.list_all_tools();
@@ -1855,27 +1926,54 @@ async fn run_turn(
Some(mcp_tools),
));
let tool_specs = router.specs();
let (tools_json, has_freeform_apply_patch) =
crate::tools::spec::tools_metadata_for_prompt(&tool_specs)?;
crate::conversation_history::format_prompt_items(&mut request_items, has_freeform_apply_patch);
crate::conversation_history::format_prompt_items(
&mut full_prompt_items,
has_freeform_apply_patch,
);
let apply_patch_present = tool_specs.iter().any(|spec| spec.name() == "apply_patch");
let instructions = crate::client_common::compute_full_instructions(
turn_context.base_instructions.as_deref(),
&turn_context.client.get_model_family(),
apply_patch_present,
)
.into_owned();
let model_supports_parallel = turn_context
.client
.get_model_family()
.supports_parallel_tool_calls;
let parallel_tool_calls = model_supports_parallel;
let prompt = Prompt {
input,
tools: router.specs(),
let prompt = Prompt::new(
request_items,
tools_json,
parallel_tool_calls,
base_instructions_override: turn_context.base_instructions.clone(),
output_schema: turn_context.final_output_json_schema.clone(),
turn_context.final_output_json_schema.clone(),
);
let payload = StreamPayload {
prompt,
instructions,
store_response,
previous_response_id,
};
let mut retries = 0;
loop {
let attempt_payload = payload.clone();
let attempt_full_items = full_prompt_items.clone();
match try_run_turn(
Arc::clone(&router),
Arc::clone(&sess),
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
&prompt,
attempt_payload,
attempt_full_items,
cancellation_token.child_token(),
)
.await
@@ -1956,9 +2054,12 @@ async fn try_run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
turn_diff_tracker: SharedTurnDiffTracker,
prompt: &Prompt,
payload: StreamPayload,
full_prompt_items: Vec<ResponseItem>,
cancellation_token: CancellationToken,
) -> CodexResult<TurnRunResult> {
let chaining_intent = payload.store_response;
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
@@ -1972,7 +2073,7 @@ async fn try_run_turn(
let mut stream = turn_context
.client
.clone()
.stream(prompt)
.stream(&payload)
.or_cancel(&cancellation_token)
.await??;
@@ -2105,7 +2206,7 @@ async fn try_run_turn(
sess.update_rate_limits(&turn_context, snapshot).await;
}
ResponseEvent::Completed {
response_id: _,
response_id,
token_usage,
} => {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
@@ -2115,6 +2216,17 @@ async fn try_run_turn(
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
};
let prompt_items_for_chain = if chaining_intent {
full_prompt_items
} else {
Vec::new()
};
sess.update_responses_api_chain_state(
chaining_intent,
Some(response_id.clone()),
prompt_items_for_chain,
)
.await;
if let Ok(Some(unified_diff)) = unified_diff {
let msg = EventMsg::TurnDiff(TurnDiffEvent { unified_diff });
sess.send_event(&turn_context, msg).await;

View File

@@ -4,6 +4,7 @@ use super::Session;
use super::TurnContext;
use super::get_last_assistant_message_from_turn;
use crate::Prompt;
use crate::client::StreamPayload;
use crate::client_common::ResponseEvent;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -84,12 +85,23 @@ async fn run_compact_task_inner(
sess.persist_rollout_items(&[rollout_item]).await;
loop {
let turn_input = history.get_history_for_prompt();
let prompt = Prompt {
input: turn_input.clone(),
..Default::default()
let mut turn_input = history.get_history_for_prompt();
let turn_input_len = turn_input.len();
crate::conversation_history::format_prompt_items(&mut turn_input, false);
let prompt = Prompt::new(turn_input, Vec::new(), false, None);
let instructions = crate::client_common::compute_full_instructions(
turn_context.base_instructions.as_deref(),
&turn_context.client.get_model_family(),
false,
)
.into_owned();
let payload = StreamPayload {
prompt,
instructions,
store_response: false,
previous_response_id: None,
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), payload).await;
match attempt_result {
Ok(()) => {
@@ -108,7 +120,7 @@ async fn run_compact_task_inner(
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
if turn_input_len > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
@@ -252,9 +264,9 @@ fn build_compacted_history_with_limit(
async fn drain_to_completed(
sess: &Session,
turn_context: &TurnContext,
prompt: &Prompt,
payload: StreamPayload,
) -> CodexResult<()> {
let mut stream = turn_context.client.clone().stream(prompt).await?;
let mut stream = turn_context.client.clone().stream(&payload).await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {

View File

@@ -7,6 +7,7 @@ use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_utils_string::take_bytes_at_char_boundary;
use codex_utils_string::take_last_bytes_at_char_boundary;
use std::collections::HashSet;
use std::ops::Deref;
// Model-formatting limits: clients get full streams; only content sent to the model is truncated.
@@ -22,6 +23,13 @@ pub(crate) struct ConversationHistory {
/// The oldest items are at the beginning of the vector.
items: Vec<ResponseItem>,
token_info: Option<TokenUsageInfo>,
responses_api_chain: Option<ResponsesApiChainState>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct ResponsesApiChainState {
pub last_response_id: Option<String>,
pub last_prompt_items: Vec<ResponseItem>,
}
impl ConversationHistory {
@@ -29,6 +37,7 @@ impl ConversationHistory {
Self {
items: Vec::new(),
token_info: TokenUsageInfo::new_or_append(&None, &None, None),
responses_api_chain: None,
}
}
@@ -91,6 +100,7 @@ impl ConversationHistory {
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
self.items = items;
self.reset_responses_api_chain();
}
pub(crate) fn update_token_info(
@@ -429,6 +439,18 @@ impl ConversationHistory {
| ResponseItem::Other => item.clone(),
}
}
pub(crate) fn responses_api_chain(&self) -> Option<ResponsesApiChainState> {
self.responses_api_chain.clone()
}
pub(crate) fn reset_responses_api_chain(&mut self) {
self.responses_api_chain = None;
}
pub(crate) fn set_responses_api_chain(&mut self, chain: ResponsesApiChainState) {
self.responses_api_chain = Some(chain);
}
}
pub(crate) fn format_output_for_model_body(content: &str) -> String {
@@ -519,6 +541,102 @@ fn is_api_message(message: &ResponseItem) -> bool {
}
}
fn reserialize_shell_outputs(items: &mut [ResponseItem]) {
let mut shell_call_ids: HashSet<String> = HashSet::new();
items.iter_mut().for_each(|item| match item {
ResponseItem::LocalShellCall { call_id, id, .. } => {
if let Some(identifier) = call_id.clone().or_else(|| id.clone()) {
shell_call_ids.insert(identifier);
}
}
ResponseItem::CustomToolCall { call_id, name, .. } => {
if name == "apply_patch" {
shell_call_ids.insert(call_id.clone());
}
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
if shell_call_ids.remove(call_id)
&& let Some(structured) = parse_structured_shell_output(output)
{
*output = structured;
}
}
ResponseItem::FunctionCall { name, call_id, .. }
if name == "shell" || name == "container.exec" || name == "apply_patch" =>
{
shell_call_ids.insert(call_id.clone());
}
ResponseItem::FunctionCallOutput { call_id, output } => {
if shell_call_ids.remove(call_id)
&& let Some(structured) = parse_structured_shell_output(&output.content)
{
output.content = structured;
}
}
_ => {}
});
}
#[derive(serde::Deserialize)]
struct ExecOutputJson {
output: String,
metadata: ExecOutputMetadataJson,
}
#[derive(serde::Deserialize)]
struct ExecOutputMetadataJson {
exit_code: i32,
duration_seconds: f32,
}
fn parse_structured_shell_output(raw: &str) -> Option<String> {
let parsed: ExecOutputJson = serde_json::from_str(raw).ok()?;
Some(build_structured_output(&parsed))
}
fn build_structured_output(parsed: &ExecOutputJson) -> String {
let mut sections = Vec::new();
sections.push(format!("Exit code: {}", parsed.metadata.exit_code));
sections.push(format!(
"Wall time: {} seconds",
parsed.metadata.duration_seconds
));
let mut output = parsed.output.clone();
if let Some(total_lines) = extract_total_output_lines(&parsed.output) {
sections.push(format!("Total output lines: {total_lines}"));
if let Some(stripped) = strip_total_output_header(&output) {
output = stripped.to_string();
}
}
sections.push("Output:".to_string());
sections.push(output);
sections.join("\n")
}
fn extract_total_output_lines(output: &str) -> Option<u32> {
let marker_start = output.find("[... omitted ")?;
let marker = &output[marker_start..];
let (_, after_of) = marker.split_once(" of ")?;
let (total_segment, _) = after_of.split_once(' ')?;
total_segment.parse::<u32>().ok()
}
fn strip_total_output_header(output: &str) -> Option<&str> {
let after_prefix = output.strip_prefix("Total output lines: ")?;
let (_, remainder) = after_prefix.split_once('\n')?;
let remainder = remainder.strip_prefix('\n').unwrap_or(remainder);
Some(remainder)
}
pub(crate) fn format_prompt_items(items: &mut [ResponseItem], has_freeform_apply_patch: bool) {
if has_freeform_apply_patch {
reserialize_shell_outputs(items);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -43,6 +43,8 @@ pub enum Feature {
SandboxCommandAssessment,
/// Create a ghost commit at each turn.
GhostCommit,
/// Enable chaining Responses API calls via previous response IDs.
ResponsesApiChaining,
}
impl Feature {
@@ -295,4 +297,10 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::ResponsesApiChaining,
key: "responses_api_chaining",
stage: Stage::Experimental,
default_enabled: false,
},
];

View File

@@ -7,6 +7,7 @@ use std::time::Instant;
use crate::AuthManager;
use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client::StreamPayload;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
@@ -120,16 +121,31 @@ pub(crate) async fn assess_command(
.trim()
.to_string();
let prompt = Prompt {
input: vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_prompt }],
}],
tools: Vec::new(),
parallel_tool_calls: false,
base_instructions_override: Some(system_prompt),
output_schema: Some(sandbox_assessment_schema()),
let mut prompt_items = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_prompt }],
}];
crate::conversation_history::format_prompt_items(&mut prompt_items, false);
let prompt = Prompt::new(
prompt_items,
Vec::new(),
false,
Some(sandbox_assessment_schema()),
);
let instructions = crate::client_common::compute_full_instructions(
Some(system_prompt.as_str()),
&config.model_family,
false,
)
.into_owned();
let payload = StreamPayload {
prompt,
instructions,
store_response: false,
previous_response_id: None,
};
let child_otel =
@@ -148,7 +164,7 @@ pub(crate) async fn assess_command(
let start = Instant::now();
let assessment_result = timeout(SANDBOX_ASSESSMENT_TIMEOUT, async move {
let mut stream = client.stream(&prompt).await?;
let mut stream = client.stream(&payload).await?;
let mut last_json: Option<String> = None;
while let Some(event) = stream.next().await {
match event {

View File

@@ -7,3 +7,4 @@ pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::RunningTask;
pub(crate) use turn::TaskKind;
pub(crate) use crate::conversation_history::ResponsesApiChainState;

View File

@@ -4,6 +4,7 @@ use codex_protocol::models::ResponseItem;
use crate::codex::SessionConfiguration;
use crate::conversation_history::ConversationHistory;
use crate::conversation_history::ResponsesApiChainState;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
@@ -42,6 +43,18 @@ impl SessionState {
self.history.replace(items);
}
pub(crate) fn reset_responses_api_chain(&mut self) {
self.history.reset_responses_api_chain();
}
pub(crate) fn set_responses_api_chain(&mut self, chain: ResponsesApiChainState) {
self.history.set_responses_api_chain(chain);
}
pub(crate) fn responses_api_chain(&self) -> Option<ResponsesApiChainState> {
self.history.responses_api_chain()
}
// Token/rate limit helpers
pub(crate) fn update_token_info_from_usage(
&mut self,

View File

@@ -3,10 +3,6 @@ use std::collections::BTreeMap;
use crate::apply_patch;
use crate::apply_patch::InternalApplyPatchInvocation;
use crate::apply_patch::convert_apply_patch_to_protocol;
use crate::client_common::tools::FreeformTool;
use crate::client_common::tools::FreeformToolFormat;
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
@@ -20,7 +16,11 @@ use crate::tools::runtimes::apply_patch::ApplyPatchRequest;
use crate::tools::runtimes::apply_patch::ApplyPatchRuntime;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::spec::ApplyPatchToolArgs;
use crate::tools::spec::FreeformTool;
use crate::tools::spec::FreeformToolFormat;
use crate::tools::spec::JsonSchema;
use crate::tools::spec::ResponsesApiTool;
use crate::tools::spec::ToolSpec;
use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;

View File

@@ -1,5 +1,3 @@
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
@@ -9,6 +7,8 @@ use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use crate::tools::spec::JsonSchema;
use crate::tools::spec::ResponsesApiTool;
use crate::tools::spec::ToolSpec;
use async_trait::async_trait;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::EventMsg;

View File

@@ -6,11 +6,11 @@ use async_trait::async_trait;
use codex_protocol::models::ResponseInputItem;
use tracing::warn;
use crate::client_common::tools::ToolSpec;
use crate::function_tool::FunctionCallError;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::spec::ToolSpec;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum ToolKind {

View File

@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use crate::client_common::tools::ToolSpec;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
@@ -10,6 +9,7 @@ use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
use crate::tools::registry::ConfiguredToolSpec;
use crate::tools::registry::ToolRegistry;
use crate::tools::spec::ToolSpec;
use crate::tools::spec::ToolsConfig;
use crate::tools::spec::build_specs;
use codex_protocol::models::LocalShellAction;

View File

@@ -1,5 +1,3 @@
use crate::client_common::tools::ResponsesApiTool;
use crate::client_common::tools::ToolSpec;
use crate::features::Feature;
use crate::features::Features;
use crate::model_family::ModelFamily;
@@ -22,6 +20,52 @@ pub enum ConfigShellToolType {
Streamable,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(tag = "type")]
pub(crate) enum ToolSpec {
#[serde(rename = "function")]
Function(ResponsesApiTool),
#[serde(rename = "local_shell")]
LocalShell {},
#[serde(rename = "web_search")]
WebSearch {},
#[serde(rename = "custom")]
Freeform(FreeformTool),
}
impl ToolSpec {
pub(crate) fn name(&self) -> &str {
match self {
ToolSpec::Function(tool) => tool.name.as_str(),
ToolSpec::LocalShell {} => "local_shell",
ToolSpec::WebSearch {} => "web_search",
ToolSpec::Freeform(tool) => tool.name.as_str(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformTool {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) format: FreeformToolFormat,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FreeformToolFormat {
pub(crate) r#type: String,
pub(crate) syntax: String,
pub(crate) definition: String,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct ResponsesApiTool {
pub(crate) name: String,
pub(crate) description: String,
pub(crate) strict: bool,
pub(crate) parameters: JsonSchema,
}
#[derive(Debug, Clone)]
pub(crate) struct ToolsConfig {
pub shell_type: ConfigShellToolType,
@@ -681,32 +725,38 @@ pub fn create_tools_json_for_responses_api(
Ok(tools_json)
}
pub fn tools_metadata_for_prompt(
tools: &[ToolSpec],
) -> crate::error::Result<(Vec<serde_json::Value>, bool)> {
let tools_json = create_tools_json_for_responses_api(tools)?;
let has_freeform_apply_patch = tools.iter().any(|tool| match tool {
ToolSpec::Freeform(freeform) => freeform.name == "apply_patch",
_ => false,
});
Ok((tools_json, has_freeform_apply_patch))
}
/// Returns JSON values that are compatible with Function Calling in the
/// Chat Completions API:
/// https://platform.openai.com/docs/guides/function-calling?api-mode=chat
pub(crate) fn create_tools_json_for_chat_completions_api(
tools: &[ToolSpec],
tools: &[serde_json::Value],
) -> crate::error::Result<Vec<serde_json::Value>> {
// We start with the JSON for the Responses API and than rewrite it to match
// the chat completions tool call format.
let responses_api_tools_json = create_tools_json_for_responses_api(tools)?;
let tools_json = responses_api_tools_json
.into_iter()
.filter_map(|mut tool| {
let tools_json = tools
.iter()
.filter_map(|tool| {
if tool.get("type") != Some(&serde_json::Value::String("function".to_string())) {
return None;
}
if let Some(map) = tool.as_object_mut() {
// Remove "type" field as it is not needed in chat completions.
map.remove("type");
Some(json!({
tool.as_object().map(|map| {
let mut function = map.clone();
function.remove("type");
json!({
"type": "function",
"function": map,
}))
} else {
None
}
"function": function,
})
})
})
.collect::<Vec<serde_json::Value>>();
Ok(tools_json)
@@ -1002,7 +1052,6 @@ pub(crate) fn build_specs(
#[cfg(test)]
mod tests {
use crate::client_common::tools::FreeformTool;
use crate::model_family::find_family_for_model;
use crate::tools::registry::ConfiguredToolSpec;
use mcp_types::ToolInputSchema;

View File

@@ -100,7 +100,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
let mut prompt = Prompt::default();
prompt.input = input;
let mut stream = match client.stream(&prompt).await {
let mut stream = match client.stream_for_test(prompt).await {
Ok(s) => s,
Err(e) => panic!("stream chat failed: {e}"),
};

View File

@@ -106,7 +106,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
}],
}];
let mut stream = match client.stream(&prompt).await {
let mut stream = match client.stream_for_test(prompt).await {
Ok(s) => s,
Err(e) => panic!("stream chat failed: {e}"),
};

View File

@@ -91,7 +91,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
}],
}];
let mut stream = client.stream(&prompt).await.expect("stream failed");
let mut stream = client.stream_for_test(prompt).await.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
@@ -181,7 +181,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
}],
}];
let mut stream = client.stream(&prompt).await.expect("stream failed");
let mut stream = client.stream_for_test(prompt).await.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;

View File

@@ -15,6 +15,7 @@ use codex_core::WireApi;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::built_in_model_providers;
use codex_core::error::CodexErr;
use codex_core::features::Feature;
use codex_core::model_family::find_family_for_model;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
@@ -608,6 +609,98 @@ async fn includes_user_instructions_message_in_request() {
assert_message_ends_with(&request_body["input"][1], "</environment_context>");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_api_chaining_sets_store_and_previous_id() {
skip_if_no_network!();
let server = MockServer::start().await;
let first_response = responses::sse(vec![
responses::ev_response_created("resp-first"),
responses::ev_assistant_message("m1", "hi there"),
responses::ev_completed("resp-first"),
]);
let second_response = responses::sse(vec![
responses::ev_response_created("resp-second"),
responses::ev_assistant_message("m2", "second reply"),
responses::ev_completed("resp-second"),
]);
let response_mock =
responses::mount_sse_sequence(&server, vec![first_response, second_response]).await;
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
config.features.enable(Feature::ResponsesApiChaining);
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
let codex = conversation_manager
.new_conversation(config)
.await
.expect("create new conversation")
.conversation;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first turn".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "second turn".into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let requests = response_mock.requests();
assert_eq!(
requests.len(),
2,
"expected two responses API calls for two turns"
);
let first_body = requests[0].body_json();
assert_eq!(first_body["store"], serde_json::Value::Bool(true));
assert!(
first_body.get("previous_response_id").is_none(),
"first request should not set previous_response_id"
);
let second_body = requests[1].body_json();
assert_eq!(second_body["store"], serde_json::Value::Bool(true));
assert_eq!(
second_body["previous_response_id"].as_str(),
Some("resp-first")
);
let second_input = requests[1].input();
assert_eq!(
second_input.len(),
1,
"second request should only send new user input items"
);
let user_item = &second_input[0];
assert_eq!(user_item["type"].as_str(), Some("message"));
assert_eq!(user_item["role"].as_str(), Some("user"));
let content = user_item["content"][0]["text"]
.as_str()
.expect("missing user message text");
assert_eq!(content, "second turn");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn azure_responses_request_includes_store_and_reasoning_ids() {
skip_if_no_network!();
@@ -730,7 +823,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
});
let mut stream = client
.stream(&prompt)
.stream_for_test(prompt)
.await
.expect("responses stream to start");