This commit is contained in:
Ahmed Ibrahim
2025-11-25 15:17:14 -08:00
parent 70b613be81
commit 087e571198
5 changed files with 171 additions and 10 deletions

View File

@@ -2182,14 +2182,28 @@ async fn try_run_turn(
});
sess.persist_rollout_items(&[rollout_item]).await;
let mut stream = turn_context
.client
.clone()
.stream(prompt)
.or_cancel(&cancellation_token)
.await??;
let mut idle_warning = IdleWarning::default();
let client = turn_context.client.clone();
let mut stream_future = Box::pin(client.stream(prompt).or_cancel(&cancellation_token));
let mut stream = loop {
tokio::select! {
biased;
result = &mut stream_future => break result??,
_ = sleep_until(idle_warning.deadline()) => {
if let Some(message) = idle_warning.maybe_warning_message().await {
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent { message }),
)
.await;
}
continue;
}
}
};
idle_warning.mark_event();
let tool_runtime = ToolCallRuntime::new(
Arc::clone(&router),

View File

@@ -1,3 +1,4 @@
use std::sync::OnceLock;
use std::time::Duration;
use anyhow::Context;
@@ -18,6 +19,9 @@ const STATUS_WIDGET_URL: &str = "https://status.openai.com/proxy/status.openai.c
const CODEX_COMPONENT_NAME: &str = "Codex";
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
static TEST_STATUS_WIDGET_URL: OnceLock<String> = OnceLock::new();
static TEST_IDLE_TIMEOUT: OnceLock<Duration> = OnceLock::new();
#[derive(Debug, Clone, Display, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ComponentHealth {
@@ -83,11 +87,13 @@ impl IdleWarning {
impl Default for IdleWarning {
fn default() -> Self {
Self::new(DEFAULT_IDLE_TIMEOUT)
Self::new(idle_timeout())
}
}
async fn fetch_codex_health() -> Result<ComponentHealth> {
let status_widget_url = status_widget_url();
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(10))
@@ -95,7 +101,7 @@ async fn fetch_codex_health() -> Result<ComponentHealth> {
.context("building HTTP client")?;
let response = ReqwestTransport::new(client)
.execute(Request::new(Method::GET, STATUS_WIDGET_URL.to_string()))
.execute(Request::new(Method::GET, status_widget_url.clone()))
.await
.context("requesting status widget")?;
@@ -113,7 +119,7 @@ async fn fetch_codex_health() -> Result<ComponentHealth> {
.collect::<String>();
bail!(
"Expected JSON from {STATUS_WIDGET_URL}: Content-Type={content_type}. Body starts with: {snippet:?}"
"Expected JSON from {status_widget_url}: Content-Type={content_type}. Body starts with: {snippet:?}"
);
}
@@ -172,6 +178,32 @@ fn derive_component_health(
Ok(status)
}
fn idle_timeout() -> Duration {
TEST_IDLE_TIMEOUT
.get()
.copied()
.unwrap_or(DEFAULT_IDLE_TIMEOUT)
}
fn status_widget_url() -> String {
TEST_STATUS_WIDGET_URL
.get()
.cloned()
.unwrap_or_else(|| STATUS_WIDGET_URL.to_string())
}
#[doc(hidden)]
#[cfg_attr(not(test), allow(dead_code))]
pub fn set_test_status_widget_url(url: impl Into<String>) {
let _ = TEST_STATUS_WIDGET_URL.set(url.into());
}
#[doc(hidden)]
#[cfg_attr(not(test), allow(dead_code))]
pub fn set_test_idle_timeout(duration: Duration) {
let _ = TEST_IDLE_TIMEOUT.set(duration);
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use anyhow::Result;
use base64::Engine;
@@ -546,6 +547,19 @@ pub async fn mount_sse_once(server: &MockServer, body: String) -> ResponseMock {
response_mock
}
pub async fn mount_sse_once_with_delay(
server: &MockServer,
body: String,
delay: Duration,
) -> ResponseMock {
let (mock, response_mock) = base_mock();
mock.respond_with(sse_response(body).set_delay(delay))
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}
pub async fn mount_compact_json_once_match<M>(
server: &MockServer,
matcher: M,

View File

@@ -0,0 +1,100 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::time::Duration;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::WarningEvent;
use codex_core::status::set_test_idle_timeout;
use codex_core::status::set_test_status_widget_url;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once_with_delay;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use wiremock::Mock;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn emits_warning_when_stream_is_idle_and_status_is_degraded() {
let status_server = start_mock_server().await;
let status_path = "/proxy/status.openai.com";
Mock::given(method("GET"))
.and(path(status_path))
.respond_with(status_payload())
.mount(&status_server)
.await;
set_test_status_widget_url(format!("{}{}", status_server.uri(), status_path));
set_test_idle_timeout(Duration::from_millis(30));
let responses_server = start_mock_server().await;
let stalled_response = sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "finally"),
ev_completed("resp-1"),
]);
let _responses_mock = mount_sse_once_with_delay(
&responses_server,
stalled_response,
Duration::from_millis(40),
)
.await;
let test_codex = test_codex().build(&responses_server).await.unwrap();
let codex = test_codex.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text { text: "hi".into() }],
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(35)).await;
let status_requests = status_server
.received_requests()
.await
.expect("status server running");
assert!(
!status_requests.is_empty(),
"status widget was not queried before idle warning"
);
let warning = wait_for_event(&codex, |event| matches!(event, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = warning else {
panic!("expected warning event");
};
assert!(
message.contains("OpenAI status: MajorOutage"),
"unexpected warning message: {message}"
);
tokio::time::sleep(Duration::from_millis(25)).await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
}
fn status_payload() -> ResponseTemplate {
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"summary": {
"components": [
{"id": "cmp-1", "name": "Codex", "status_page_id": "page-1"}
],
"affected_components": [
{"component_id": "cmp-1", "status": "major_outage"}
]
}
}))
}

View File

@@ -31,6 +31,7 @@ mod exec;
mod exec_policy;
mod fork_conversation;
mod grep_files;
mod idle_warning;
mod items;
mod json_result;
mod list_dir;