Add collaboration developer instructions (#9424)

- Add additional instructions when they are available
- Make sure to update them on change either UserInput or UserTurn
This commit is contained in:
Ahmed Ibrahim
2026-01-17 17:31:14 -08:00
committed by GitHub
parent 80d7a5d7fe
commit 1478a88eb0
11 changed files with 875 additions and 43 deletions

View File

@@ -0,0 +1,500 @@
use anyhow::Result;
use codex_core::protocol::COLLABORATION_MODE_CLOSE_TAG;
use codex_core::protocol::COLLABORATION_MODE_OPEN_TAG;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::Settings;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::Value;
fn sse_completed(id: &str) -> String {
sse(vec![ev_response_created(id), ev_completed(id)])
}
fn collab_mode_with_instructions(instructions: Option<&str>) -> CollaborationMode {
CollaborationMode::Custom(Settings {
model: "gpt-5.1".to_string(),
reasoning_effort: None,
developer_instructions: instructions.map(str::to_string),
})
}
fn developer_texts(input: &[Value]) -> Vec<String> {
input
.iter()
.filter_map(|item| {
let role = item.get("role")?.as_str()?;
if role != "developer" {
return None;
}
let text = item
.get("content")?
.as_array()?
.first()?
.get("text")?
.as_str()?;
Some(text.to_string())
})
.collect()
}
fn collab_xml(text: &str) -> String {
format!("{COLLABORATION_MODE_OPEN_TAG}{text}{COLLABORATION_MODE_CLOSE_TAG}")
}
fn count_exact(texts: &[String], target: &str) -> usize {
texts.iter().filter(|text| text.as_str() == target).count()
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn no_collaboration_instructions_by_default() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
assert_eq!(dev_texts.len(), 1);
assert!(dev_texts[0].contains("`approval_policy`"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_input_includes_collaboration_instructions_after_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
let collab_text = "collab instructions";
let collaboration_mode = collab_mode_with_instructions(Some(collab_text));
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collaboration_mode),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
let collab_text = collab_xml(collab_text);
assert_eq!(count_exact(&dev_texts, &collab_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collaboration_instructions_added_on_user_turn() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
let collab_text = "turn instructions";
let collaboration_mode = collab_mode_with_instructions(Some(collab_text));
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
cwd: test.config.cwd.clone(),
approval_policy: test.config.approval_policy.value(),
sandbox_policy: test.config.sandbox_policy.get().clone(),
model: test.session_configured.model.clone(),
effort: None,
summary: test.config.model_reasoning_summary,
collaboration_mode: Some(collaboration_mode),
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
let collab_text = collab_xml(collab_text);
assert_eq!(count_exact(&dev_texts, &collab_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn override_then_user_turn_uses_updated_collaboration_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
let collab_text = "override instructions";
let collaboration_mode = collab_mode_with_instructions(Some(collab_text));
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collaboration_mode),
})
.await?;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
cwd: test.config.cwd.clone(),
approval_policy: test.config.approval_policy.value(),
sandbox_policy: test.config.sandbox_policy.get().clone(),
model: test.session_configured.model.clone(),
effort: None,
summary: test.config.model_reasoning_summary,
collaboration_mode: None,
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
let collab_text = collab_xml(collab_text);
assert_eq!(count_exact(&dev_texts, &collab_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_turn_overrides_collaboration_instructions_after_override() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
let base_text = "base instructions";
let base_mode = collab_mode_with_instructions(Some(base_text));
let turn_text = "turn override";
let turn_mode = collab_mode_with_instructions(Some(turn_text));
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(base_mode),
})
.await?;
test.codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
cwd: test.config.cwd.clone(),
approval_policy: test.config.approval_policy.value(),
sandbox_policy: test.config.sandbox_policy.get().clone(),
model: test.session_configured.model.clone(),
effort: None,
summary: test.config.model_reasoning_summary,
collaboration_mode: Some(turn_mode),
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
let base_text = collab_xml(base_text);
let turn_text = collab_xml(turn_text);
assert_eq!(count_exact(&dev_texts, &base_text), 1);
assert_eq!(count_exact(&dev_texts, &turn_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collaboration_mode_update_emits_new_instruction_message() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _req1 = mount_sse_once(&server, sse_completed("resp-1")).await;
let req2 = mount_sse_once(&server, sse_completed("resp-2")).await;
let test = test_codex().build(&server).await?;
let first_text = "first instructions";
let second_text = "second instructions";
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(first_text))),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 1".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(second_text))),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 2".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req2.single_request().input();
let dev_texts = developer_texts(&input);
let first_text = collab_xml(first_text);
let second_text = collab_xml(second_text);
assert_eq!(count_exact(&dev_texts, &first_text), 1);
assert_eq!(count_exact(&dev_texts, &second_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collaboration_mode_update_noop_does_not_append() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _req1 = mount_sse_once(&server, sse_completed("resp-1")).await;
let req2 = mount_sse_once(&server, sse_completed("resp-2")).await;
let test = test_codex().build(&server).await?;
let collab_text = "same instructions";
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(collab_text))),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 1".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(collab_text))),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello 2".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req2.single_request().input();
let dev_texts = developer_texts(&input);
let collab_text = collab_xml(collab_text);
assert_eq!(count_exact(&dev_texts, &collab_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_replays_collaboration_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let _req1 = mount_sse_once(&server, sse_completed("resp-1")).await;
let req2 = mount_sse_once(&server, sse_completed("resp-2")).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
let rollout_path = initial.session_configured.rollout_path.clone();
let home = initial.home.clone();
let collab_text = "resume instructions";
initial
.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(collab_text))),
})
.await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let resumed = builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after resume".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req2.single_request().input();
let dev_texts = developer_texts(&input);
let collab_text = collab_xml(collab_text);
assert_eq!(count_exact(&dev_texts, &collab_text), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn empty_collaboration_instructions_are_ignored() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let req = mount_sse_once(&server, sse_completed("resp-1")).await;
let test = test_codex().build(&server).await?;
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collab_mode_with_instructions(Some(""))),
})
.await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let input = req.single_request().input();
let dev_texts = developer_texts(&input);
assert_eq!(dev_texts.len(), 1);
let collab_text = collab_xml("");
assert_eq!(count_exact(&dev_texts, &collab_text), 0);
Ok(())
}

View File

@@ -24,6 +24,7 @@ mod cli_stream;
mod client;
mod client_websockets;
mod codex_delegate;
mod collaboration_instructions;
mod compact;
mod compact_remote;
mod compact_resume_fork;

View File

@@ -0,0 +1,216 @@
use anyhow::Result;
use codex_core::config::Constrained;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::COLLABORATION_MODE_CLOSE_TAG;
use codex_core::protocol::COLLABORATION_MODE_OPEN_TAG;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::Settings;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use std::collections::HashSet;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
fn collab_mode_with_instructions(instructions: Option<&str>) -> CollaborationMode {
CollaborationMode::Custom(Settings {
model: "gpt-5.1".to_string(),
reasoning_effort: None,
developer_instructions: instructions.map(str::to_string),
})
}
fn collab_xml(text: &str) -> String {
format!("{COLLABORATION_MODE_OPEN_TAG}{text}{COLLABORATION_MODE_CLOSE_TAG}")
}
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
for _ in 0..50 {
if path.exists()
&& let Ok(text) = std::fs::read_to_string(path)
&& !text.trim().is_empty()
{
return Ok(text);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
Ok(std::fs::read_to_string(path)?)
}
fn rollout_developer_texts(text: &str) -> Vec<String> {
let mut texts = Vec::new();
for line in text.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let rollout: RolloutLine = match serde_json::from_str(trimmed) {
Ok(rollout) => rollout,
Err(_) => continue,
};
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) =
rollout.item
&& role == "developer"
{
for item in content {
if let ContentItem::InputText { text } = item {
texts.push(text);
}
}
}
}
texts
}
fn rollout_environment_texts(text: &str) -> Vec<String> {
let mut texts = Vec::new();
for line in text.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let rollout: RolloutLine = match serde_json::from_str(trimmed) {
Ok(rollout) => rollout,
Err(_) => continue,
};
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) =
rollout.item
&& role == "user"
{
for item in content {
if let ContentItem::InputText { text } = item
&& text.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG)
{
texts.push(text);
}
}
}
}
texts
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn override_turn_context_records_permissions_update() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
});
let test = builder.build(&server).await?;
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: Some(AskForApproval::Never),
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
})
.await?;
test.codex.submit(Op::Shutdown).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path();
let rollout_text = read_rollout_text(&rollout_path).await?;
let developer_texts = rollout_developer_texts(&rollout_text);
let approval_texts: Vec<&String> = developer_texts
.iter()
.filter(|text| text.contains("`approval_policy`"))
.collect();
assert!(
approval_texts
.iter()
.any(|text| text.contains("`approval_policy` is `never`")),
"expected updated approval policy instructions in rollout"
);
let unique: HashSet<&String> = approval_texts.iter().copied().collect();
assert_eq!(unique.len(), 2);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn override_turn_context_records_environment_update() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let test = test_codex().build(&server).await?;
let new_cwd = TempDir::new()?;
test.codex
.submit(Op::OverrideTurnContext {
cwd: Some(new_cwd.path().to_path_buf()),
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
})
.await?;
test.codex.submit(Op::Shutdown).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path();
let rollout_text = read_rollout_text(&rollout_path).await?;
let env_texts = rollout_environment_texts(&rollout_text);
let new_cwd_text = new_cwd.path().display().to_string();
assert!(
env_texts.iter().any(|text| text.contains(&new_cwd_text)),
"expected environment update with new cwd in rollout"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn override_turn_context_records_collaboration_update() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let test = test_codex().build(&server).await?;
let collab_text = "override collaboration instructions";
let collaboration_mode = collab_mode_with_instructions(Some(collab_text));
test.codex
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: Some(collaboration_mode),
})
.await?;
test.codex.submit(Op::Shutdown).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
let rollout_path = test.codex.rollout_path();
let rollout_text = read_rollout_text(&rollout_path).await?;
let developer_texts = rollout_developer_texts(&rollout_text);
let collab_text = collab_xml(collab_text);
let collab_count = developer_texts
.iter()
.filter(|text| text.as_str() == collab_text.as_str())
.count();
assert_eq!(collab_count, 1);
Ok(())
}

View File

@@ -132,7 +132,7 @@ async fn permissions_message_added_on_override_change() -> Result<()> {
let permissions_2 = permissions_texts(input2);
assert_eq!(permissions_1.len(), 1);
assert_eq!(permissions_2.len(), 2);
assert_eq!(permissions_2.len(), 3);
let unique = permissions_2.into_iter().collect::<HashSet<String>>();
assert_eq!(unique.len(), 2);
@@ -257,7 +257,7 @@ async fn resume_replays_permissions_messages() -> Result<()> {
let body3 = req3.single_request().body_json();
let input = body3["input"].as_array().expect("input array");
let permissions = permissions_texts(input);
assert_eq!(permissions.len(), 3);
assert_eq!(permissions.len(), 4);
let unique = permissions.into_iter().collect::<HashSet<String>>();
assert_eq!(unique.len(), 2);
@@ -321,7 +321,7 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
let body2 = req2.single_request().body_json();
let input2 = body2["input"].as_array().expect("input array");
let permissions_base = permissions_texts(input2);
assert_eq!(permissions_base.len(), 2);
assert_eq!(permissions_base.len(), 3);
builder = builder.with_config(|config| {
config.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted);

View File

@@ -379,15 +379,18 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() -> an
"content": [ { "type": "input_text", "text": "hello 2" } ]
});
let expected_permissions_msg = body1["input"][0].clone();
// After overriding the turn context, emit a new permissions message.
let body1_input = body1["input"].as_array().expect("input array");
// After overriding the turn context, emit two updated permissions messages.
let expected_permissions_msg_2 = body2["input"][body1_input.len()].clone();
let expected_permissions_msg_3 = body2["input"][body1_input.len() + 1].clone();
assert_ne!(
expected_permissions_msg_2, expected_permissions_msg,
"expected updated permissions message after override"
);
let mut expected_body2 = body1["input"].as_array().expect("input array").to_vec();
assert_eq!(expected_permissions_msg_2, expected_permissions_msg_3);
let mut expected_body2 = body1_input.to_vec();
expected_body2.push(expected_permissions_msg_2);
expected_body2.push(expected_permissions_msg_3);
expected_body2.push(expected_user_message_2);
assert_eq!(body2["input"], serde_json::Value::Array(expected_body2));