This commit is contained in:
jimmyfraiture
2025-10-13 14:37:42 +01:00
parent 26f7c46856
commit 17edcefa4f
10 changed files with 1399 additions and 1061 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1076,6 +1076,7 @@ dependencies = [
"thiserror 2.0.16",
"time",
"tokio",
"tokio-stream",
"tokio-test",
"tokio-util",
"toml",

View File

@@ -61,6 +61,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
toml = { workspace = true }
toml_edit = { workspace = true }

View File

@@ -31,6 +31,7 @@ use tracing::debug;
use tracing::trace;
/// Implementation for the classic Chat Completions API.
#[allow(dead_code)]
pub(crate) async fn stream_chat_completions(
prompt: &Prompt,
model_family: &ModelFamily,
@@ -361,6 +362,7 @@ pub(crate) async fn stream_chat_completions(
/// Lightweight SSE processor for the Chat Completions streaming format. The
/// output is mapped onto Codex's internal [`ResponseEvent`] so that the rest
/// of the pipeline can stay agnostic of the underlying wire format.
#[allow(dead_code)]
async fn process_chat_sse<S>(
stream: S,
tx_event: mpsc::Sender<Result<ResponseEvent>>,
@@ -660,6 +662,7 @@ async fn process_chat_sse<S>(
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
/// events.
#[derive(Copy, Clone, Eq, PartialEq)]
#[allow(dead_code)]
enum AggregateMode {
AggregatedOnly,
Streaming,
@@ -885,6 +888,7 @@ impl<S> AggregatedChatStream<S> {
}
}
#[allow(dead_code)]
pub(crate) fn streaming_mode(inner: S) -> Self {
Self::new(inner, AggregateMode::Streaming)
}

File diff suppressed because it is too large Load Diff

View File

@@ -2109,7 +2109,7 @@ async fn try_run_turn(
summary: turn_context.client.get_reasoning_summary(),
});
sess.persist_rollout_items(&[rollout_item]).await;
let mut stream = turn_context.client.clone().stream(&prompt).await?;
let mut stream = turn_context.client.clone().stream(&prompt, ()).await?;
let tool_runtime = ToolCallRuntime::new(
Arc::clone(&router),

View File

@@ -258,7 +258,7 @@ async fn drain_to_completed(
sub_id: &str,
prompt: &Prompt,
) -> CodexResult<()> {
let mut stream = turn_context.client.clone().stream(prompt).await?;
let mut stream = turn_context.client.clone().stream(prompt, ()).await?;
loop {
let maybe_event = stream.next().await;
let Some(event) = maybe_event else {

View File

@@ -93,7 +93,13 @@ pub use codex_protocol::protocol;
// as those in the protocol crate when constructing protocol messages.
pub use codex_protocol::config_types as protocol_config_types;
pub use client::CallOpts;
pub use client::Client;
pub use client::ModelClient;
pub use client::StreamMode;
pub use client::TurnResult;
pub use client::TurnStream;
pub use client::WireDialect;
pub use client_common::Prompt;
pub use client_common::REVIEW_PROMPT;
pub use client_common::ResponseEvent;

View File

@@ -97,7 +97,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
let mut prompt = Prompt::default();
prompt.input = input;
let mut stream = match client.stream(&prompt).await {
let mut stream = match client.stream(&prompt, ()).await {
Ok(s) => s,
Err(e) => panic!("stream chat failed: {e}"),
};

View File

@@ -102,7 +102,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
}],
}];
let mut stream = match client.stream(&prompt).await {
let mut stream = match client.stream(&prompt, ()).await {
Ok(s) => s,
Err(e) => panic!("stream chat failed: {e}"),
};
@@ -115,6 +115,9 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
}
}
events
.into_iter()
.filter(|ev| !matches!(ev, ResponseEvent::RateLimits(_)))
.collect()
}
fn assert_message(item: &ResponseItem, expected: &str) {

View File

@@ -724,7 +724,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
});
let mut stream = client
.stream(&prompt)
.stream(&prompt, ())
.await
.expect("responses stream to start");