Compare commits

..

1 Commits

Author SHA1 Message Date
Dave Aitel
b9034c8c24 exec: handle typed turn completion for fanout jobs 2026-03-16 20:48:55 -04:00
53 changed files with 527 additions and 5147 deletions

View File

@@ -27,7 +27,6 @@ Object.defineProperty(globalThis, '__codexContentItems', {
}
defineGlobal('ALL_TOOLS', __codexRuntime.ALL_TOOLS);
defineGlobal('exit', __codexRuntime.exit);
defineGlobal('image', __codexRuntime.image);
defineGlobal('load', __codexRuntime.load);
defineGlobal('store', __codexRuntime.store);

View File

@@ -9,7 +9,6 @@
- They return either a structured value or a string based on the description above.
- Global helpers:
- `exit()`: Immediately ends the current script successfully (like an early return from the top level).
- `text(value: string | number | boolean | undefined | null)`: Appends a text item and returns it. Non-string values are stringified with `JSON.stringify(...)` when possible.
- `image(imageUrl: string)`: Appends an image item and returns it. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL.
- `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session.

View File

@@ -55,17 +55,6 @@ function codeModeWorkerMain() {
return JSON.parse(JSON.stringify(value));
}
class CodeModeExitSignal extends Error {
constructor() {
super('code mode exit');
this.name = 'CodeModeExitSignal';
}
}
function isCodeModeExitSignal(error) {
return error instanceof CodeModeExitSignal;
}
function createToolCaller() {
let nextId = 0;
const pending = new Map();
@@ -268,12 +257,8 @@ function codeModeWorkerMain() {
const yieldControl = () => {
parentPort.postMessage({ type: 'yield' });
};
const exit = () => {
throw new CodeModeExitSignal();
};
return Object.freeze({
exit,
image,
load,
output_image: image,
@@ -286,18 +271,8 @@ function codeModeWorkerMain() {
function createCodeModeModule(context, helpers) {
return new SyntheticModule(
[
'exit',
'image',
'load',
'output_text',
'output_image',
'store',
'text',
'yield_control',
],
['image', 'load', 'output_text', 'output_image', 'store', 'text', 'yield_control'],
function initCodeModeModule() {
this.setExport('exit', helpers.exit);
this.setExport('image', helpers.image);
this.setExport('load', helpers.load);
this.setExport('output_text', helpers.output_text);
@@ -313,7 +288,6 @@ function codeModeWorkerMain() {
function createBridgeRuntime(callTool, enabledTools, helpers) {
return Object.freeze({
ALL_TOOLS: createAllToolsMetadata(enabledTools),
exit: helpers.exit,
image: helpers.image,
load: helpers.load,
store: helpers.store,
@@ -472,13 +446,6 @@ function codeModeWorkerMain() {
stored_values: state.storedValues,
});
} catch (error) {
if (isCodeModeExitSignal(error)) {
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
});
return;
}
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,

View File

@@ -1551,47 +1551,6 @@ text({ json: true });
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_exit_stops_script_immediately() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let (_test, second_mock) = run_code_mode_turn(
&server,
"use exec to stop script early with exit helper",
r#"
import { exit, text } from "@openai/code_mode";
text("before");
exit();
text("after");
"#,
false,
)
.await?;
let req = second_mock.single_request();
let items = custom_tool_output_items(&req, "call-1");
let (output, success) = custom_tool_output_body_and_success(&req, "call-1");
assert_ne!(
success,
Some(false),
"exec exit helper call failed unexpectedly: {output}"
);
assert_eq!(items.len(), 2);
assert_regex_match(
concat!(
r"(?s)\A",
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
),
text_item(&items, 0),
);
assert_eq!(text_item(&items, 1), "before");
assert_eq!(output, "before");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn code_mode_surfaces_text_stringify_errors() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1950,7 +1909,6 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort()));
"encodeURI",
"encodeURIComponent",
"escape",
"exit",
"eval",
"globalThis",
"image",

View File

@@ -36,10 +36,12 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_arg0::Arg0DispatchPaths;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
@@ -70,6 +72,9 @@ use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::user_input::UserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_oss::ensure_oss_provider_ready;
@@ -782,6 +787,24 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
.await;
}
InProcessServerEvent::ServerNotification(notification) => {
if let Some((event, terminal_error)) = decode_turn_completed_notification_for_exec(
&notification,
primary_thread_id_for_requests.as_str(),
task_id.as_str(),
) {
error_seen |= terminal_error;
if handle_exec_status(
event_processor.process_event(event),
&client,
&mut request_ids,
&primary_thread_id_for_requests,
)
.await
{
break;
}
continue;
}
if let ServerNotification::Error(payload) = &notification
&& payload.thread_id == primary_thread_id_for_requests
&& payload.turn_id == task_id
@@ -850,25 +873,15 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
_ => {}
}
match event_processor.process_event(event) {
CodexStatus::Running => {}
CodexStatus::InitiateShutdown => {
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
&primary_thread_id_for_requests,
)
.await
{
warn!("thread/unsubscribe failed during shutdown: {err}");
}
break;
}
CodexStatus::Shutdown => {
// `ShutdownComplete` does not identify which attached
// thread emitted it, so subagent shutdowns must not end
// the primary exec loop early.
}
if handle_exec_status(
event_processor.process_event(event),
&client,
&mut request_ids,
&primary_thread_id_for_requests,
)
.await
{
break;
}
}
InProcessServerEvent::Lagged { skipped } => {
@@ -1131,6 +1144,80 @@ fn canceled_mcp_server_elicitation_response() -> Result<Value, String> {
.map_err(|err| format!("failed to encode mcp elicitation response: {err}"))
}
async fn handle_exec_status(
status: CodexStatus,
client: &InProcessAppServerClient,
request_ids: &mut RequestIdSequencer,
primary_thread_id_for_requests: &str,
) -> bool {
match status {
CodexStatus::Running => false,
CodexStatus::InitiateShutdown => {
if let Err(err) =
request_shutdown(client, request_ids, primary_thread_id_for_requests).await
{
warn!("thread/unsubscribe failed during shutdown: {err}");
}
true
}
CodexStatus::Shutdown => {
// `ShutdownComplete` does not identify which attached
// thread emitted it, so subagent shutdowns must not end
// the primary exec loop early.
false
}
}
}
fn decode_turn_completed_notification_for_exec(
notification: &ServerNotification,
primary_thread_id_for_requests: &str,
task_id: &str,
) -> Option<(Event, bool)> {
let ServerNotification::TurnCompleted(TurnCompletedNotification { thread_id, turn }) =
notification
else {
return None;
};
if thread_id != primary_thread_id_for_requests || turn.id != task_id {
return None;
}
match turn.status {
TurnStatus::Completed => Some((
Event {
id: String::new(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.id.clone(),
last_agent_message: None,
}),
},
false,
)),
TurnStatus::Failed => Some((
Event {
id: String::new(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.id.clone(),
last_agent_message: None,
}),
},
true,
)),
TurnStatus::Interrupted => Some((
Event {
id: String::new(),
msg: EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(turn.id.clone()),
reason: TurnAbortReason::Interrupted,
}),
},
false,
)),
TurnStatus::InProgress => None,
}
}
async fn request_shutdown(
client: &InProcessAppServerClient,
request_ids: &mut RequestIdSequencer,
@@ -1922,4 +2009,118 @@ mod tests {
ApprovalsReviewer::GuardianSubagent
);
}
#[test]
fn decode_turn_completed_notification_ignores_other_threads_and_turns() {
let thread_mismatch = ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-a".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn-a".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
},
});
assert!(
decode_turn_completed_notification_for_exec(&thread_mismatch, "thread-b", "turn-a")
.is_none()
);
let turn_mismatch = ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-a".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn-a".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
},
});
assert!(
decode_turn_completed_notification_for_exec(&turn_mismatch, "thread-a", "turn-b")
.is_none()
);
}
#[test]
fn decode_turn_completed_notification_maps_completed_and_failed_turns() {
let completed_notification = ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-a".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn-a".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
},
});
let Some((completed, completed_error)) = decode_turn_completed_notification_for_exec(
&completed_notification,
"thread-a",
"turn-a",
) else {
panic!("completed turn should decode");
};
assert!(!completed_error);
match completed.msg {
EventMsg::TurnComplete(event) => {
assert_eq!(event.turn_id, "turn-a");
assert_eq!(event.last_agent_message, None);
}
other => panic!("unexpected event: {other:?}"),
}
let failed_notification = ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-a".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn-a".to_string(),
items: Vec::new(),
status: TurnStatus::Failed,
error: Some(codex_app_server_protocol::TurnError {
message: "synthetic".to_string(),
codex_error_info: None,
additional_details: None,
}),
},
});
let Some((failed, failed_error)) =
decode_turn_completed_notification_for_exec(&failed_notification, "thread-a", "turn-a")
else {
panic!("failed turn should decode");
};
assert!(failed_error);
match failed.msg {
EventMsg::TurnComplete(event) => {
assert_eq!(event.turn_id, "turn-a");
assert_eq!(event.last_agent_message, None);
}
other => panic!("unexpected event: {other:?}"),
}
}
#[test]
fn decode_turn_completed_notification_maps_interrupted_turns() {
let interrupted_notification =
ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-a".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn-a".to_string(),
items: Vec::new(),
status: TurnStatus::Interrupted,
error: None,
},
});
let Some((event, terminal_error)) = decode_turn_completed_notification_for_exec(
&interrupted_notification,
"thread-a",
"turn-a",
) else {
panic!("interrupted turn should decode");
};
assert!(!terminal_error);
match event.msg {
EventMsg::TurnAborted(event) => {
assert_eq!(event.turn_id.as_deref(), Some("turn-a"));
assert_eq!(event.reason, TurnAbortReason::Interrupted);
}
other => panic!("unexpected event: {other:?}"),
}
}
}

View File

@@ -0,0 +1,237 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]
use anyhow::Result;
use core_test_support::test_codex_exec::test_codex_exec;
use serde_json::Value;
use serde_json::json;
use std::fs;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use wiremock::Mock;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path_regex;
struct AgentJobsResponder {
spawn_args_json: String,
seen_main: AtomicBool,
call_counter: AtomicUsize,
}
impl AgentJobsResponder {
fn new(spawn_args_json: String) -> Self {
Self {
spawn_args_json,
seen_main: AtomicBool::new(false),
call_counter: AtomicUsize::new(0),
}
}
}
impl Respond for AgentJobsResponder {
fn respond(&self, request: &wiremock::Request) -> ResponseTemplate {
let body_bytes = decode_body_bytes(request);
let body: Value = serde_json::from_slice(&body_bytes).unwrap_or(Value::Null);
if has_function_call_output(&body) {
return sse_response(sse(vec![
ev_response_created("resp-tool"),
ev_completed("resp-tool"),
]));
}
if let Some((job_id, item_id)) = extract_job_and_item(&body) {
let call_id = format!(
"call-worker-{}",
self.call_counter.fetch_add(1, Ordering::SeqCst)
);
let args = json!({
"job_id": job_id,
"item_id": item_id,
"result": { "item_id": item_id }
});
let args_json = serde_json::to_string(&args).unwrap_or_else(|err| {
panic!("worker args serialize: {err}");
});
return sse_response(sse(vec![
ev_response_created("resp-worker"),
ev_function_call(&call_id, "report_agent_job_result", &args_json),
ev_completed("resp-worker"),
]));
}
if !self.seen_main.swap(true, Ordering::SeqCst) {
return sse_response(sse(vec![
ev_response_created("resp-main"),
ev_function_call("call-spawn", "spawn_agents_on_csv", &self.spawn_args_json),
ev_completed("resp-main"),
]));
}
sse_response(sse(vec![
ev_response_created("resp-default"),
ev_completed("resp-default"),
]))
}
}
fn decode_body_bytes(request: &wiremock::Request) -> Vec<u8> {
request.body.clone()
}
fn has_function_call_output(body: &Value) -> bool {
body.get("input")
.and_then(Value::as_array)
.is_some_and(|items| {
items.iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
})
})
}
fn extract_job_and_item(body: &Value) -> Option<(String, String)> {
let texts = message_input_texts(body);
let mut combined = texts.join("\n");
if let Some(instructions) = body.get("instructions").and_then(Value::as_str) {
combined.push('\n');
combined.push_str(instructions);
}
if !combined.contains("You are processing one item for a generic agent job.") {
return None;
}
let mut job_id = None;
let mut item_id = None;
for line in combined.lines() {
if let Some(value) = line.strip_prefix("Job ID: ") {
job_id = Some(value.trim().to_string());
}
if let Some(value) = line.strip_prefix("Item ID: ") {
item_id = Some(value.trim().to_string());
}
}
Some((job_id?, item_id?))
}
fn message_input_texts(body: &Value) -> Vec<String> {
let Some(items) = body.get("input").and_then(Value::as_array) else {
return Vec::new();
};
items
.iter()
.filter(|item| item.get("type").and_then(Value::as_str) == Some("message"))
.filter_map(|item| item.get("content").and_then(Value::as_array))
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text"))
.filter_map(|span| span.get("text").and_then(Value::as_str))
.map(str::to_string)
.collect()
}
fn sse(events: Vec<serde_json::Value>) -> String {
let mut body = String::new();
for event in events {
body.push_str("data: ");
body.push_str(&event.to_string());
body.push_str("\n\n");
}
body.push_str("data: [DONE]\n\n");
body
}
fn sse_response(body: String) -> ResponseTemplate {
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_string(body)
}
fn ev_response_created(response_id: &str) -> serde_json::Value {
json!({
"type": "response.created",
"response": {
"id": response_id,
"model": "gpt-5",
"output": []
}
})
}
fn ev_function_call(call_id: &str, name: &str, arguments: &str) -> serde_json::Value {
json!({
"type": "response.output_item.done",
"output_index": 0,
"item": {
"type": "function_call",
"id": format!("item-{call_id}"),
"call_id": call_id,
"name": name,
"arguments": arguments,
"status": "completed"
}
})
}
fn ev_completed(response_id: &str) -> serde_json::Value {
json!({
"type": "response.completed",
"response": {
"id": response_id,
"usage": {
"input_tokens": 1,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens": 1,
"output_tokens_details": {"reasoning_tokens": 0},
"total_tokens": 2
}
}
})
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_spawn_agents_on_csv_exits_after_mock_job_completion() -> Result<()> {
let test = test_codex_exec();
let server = wiremock::MockServer::start().await;
let input_path = test.cwd_path().join("agent_jobs_input.csv");
let output_path = test.cwd_path().join("agent_jobs_output.csv");
let mut csv = String::from("name\n");
for index in 1..=100 {
csv.push_str(&format!("cat_{index}\n"));
}
fs::write(&input_path, csv)?;
let args = json!({
"csv_path": input_path.display().to_string(),
"instruction": "Write a playful 2-line poem about the cat named {name}. Return a JSON object with keys name and poem. Call report_agent_job_result exactly once and then stop.",
"output_csv_path": output_path.display().to_string(),
"max_concurrency": 64,
});
let args_json = serde_json::to_string(&args)?;
Mock::given(method("POST"))
.and(path_regex(".*/responses$"))
.respond_with(AgentJobsResponder::new(args_json))
.mount(&server)
.await;
let mut cmd = test.cmd_with_server(&server);
cmd.timeout(Duration::from_secs(60));
cmd.arg("-c")
.arg("features.enable_fanout=true")
.arg("-c")
.arg("agents.max_threads=64")
.arg("--skip-git-repo-check")
.arg("Use spawn_agents_on_csv on the provided CSV and do not do work yourself.")
.assert()
.success();
let output = fs::read_to_string(&output_path)?;
assert_eq!(output.lines().count(), 101);
Ok(())
}

View File

@@ -1,5 +1,6 @@
// Aggregates all former standalone integration tests as modules.
mod add_dir;
mod agent_jobs;
mod apply_patch;
mod auth_env;
mod ephemeral;

View File

@@ -12,9 +12,8 @@ python -m pip install -e .
```
Published SDK builds pin an exact `codex-cli-bin` runtime dependency. For local
repo development, either pass `AppServerConfig(codex_bin=...)` to point at a
local build explicitly, or use the repo examples/notebook bootstrap which
installs the pinned runtime package automatically.
repo development, pass `AppServerConfig(codex_bin=...)` to point at a local
build explicitly.
## Quickstart
@@ -23,9 +22,8 @@ from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5")
completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run()
print(completed_turn.status)
print(completed_turn.id)
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print(result.text)
```
## Docs map
@@ -56,8 +54,7 @@ wheel.
For local repo development, the checked-in `sdk/python-runtime` package is only
a template for staged release artifacts. Editable installs should use an
explicit `codex_bin` override for manual SDK usage; the repo examples and
notebook bootstrap the pinned runtime package automatically.
explicit `codex_bin` override instead.
## Maintainer workflow

View File

@@ -1,359 +0,0 @@
from __future__ import annotations
import importlib
import importlib.util
import json
import os
import platform
import shutil
import subprocess
import sys
import tarfile
import tempfile
import urllib.error
import urllib.request
import zipfile
from pathlib import Path
PACKAGE_NAME = "codex-cli-bin"
PINNED_RUNTIME_VERSION = "0.116.0-alpha.1"
REPO_SLUG = "openai/codex"
class RuntimeSetupError(RuntimeError):
pass
def pinned_runtime_version() -> str:
return PINNED_RUNTIME_VERSION
def ensure_runtime_package_installed(
python_executable: str | Path,
sdk_python_dir: Path,
install_target: Path | None = None,
) -> str:
requested_version = pinned_runtime_version()
installed_version = None
if install_target is None:
installed_version = _installed_runtime_version(python_executable)
normalized_requested = _normalized_package_version(requested_version)
if installed_version is not None and _normalized_package_version(installed_version) == normalized_requested:
return requested_version
with tempfile.TemporaryDirectory(prefix="codex-python-runtime-") as temp_root_str:
temp_root = Path(temp_root_str)
archive_path = _download_release_archive(requested_version, temp_root)
runtime_binary = _extract_runtime_binary(archive_path, temp_root)
staged_runtime_dir = _stage_runtime_package(
sdk_python_dir,
requested_version,
runtime_binary,
temp_root / "runtime-stage",
)
_install_runtime_package(python_executable, staged_runtime_dir, install_target)
if install_target is not None:
return requested_version
if Path(python_executable).resolve() == Path(sys.executable).resolve():
importlib.invalidate_caches()
installed_version = _installed_runtime_version(python_executable)
if installed_version is None or _normalized_package_version(installed_version) != normalized_requested:
raise RuntimeSetupError(
f"Expected {PACKAGE_NAME} {requested_version} in {python_executable}, "
f"but found {installed_version!r} after installation."
)
return requested_version
def platform_asset_name() -> str:
system = platform.system().lower()
machine = platform.machine().lower()
if system == "darwin":
if machine in {"arm64", "aarch64"}:
return "codex-aarch64-apple-darwin.tar.gz"
if machine in {"x86_64", "amd64"}:
return "codex-x86_64-apple-darwin.tar.gz"
elif system == "linux":
if machine in {"aarch64", "arm64"}:
return "codex-aarch64-unknown-linux-musl.tar.gz"
if machine in {"x86_64", "amd64"}:
return "codex-x86_64-unknown-linux-musl.tar.gz"
elif system == "windows":
if machine in {"aarch64", "arm64"}:
return "codex-aarch64-pc-windows-msvc.exe.zip"
if machine in {"x86_64", "amd64"}:
return "codex-x86_64-pc-windows-msvc.exe.zip"
raise RuntimeSetupError(
f"Unsupported runtime artifact platform: system={platform.system()!r}, "
f"machine={platform.machine()!r}"
)
def runtime_binary_name() -> str:
return "codex.exe" if platform.system().lower() == "windows" else "codex"
def _installed_runtime_version(python_executable: str | Path) -> str | None:
snippet = (
"import importlib.metadata, json, sys\n"
"try:\n"
" from codex_cli_bin import bundled_codex_path\n"
" bundled_codex_path()\n"
" print(json.dumps({'version': importlib.metadata.version('codex-cli-bin')}))\n"
"except Exception:\n"
" sys.exit(1)\n"
)
result = subprocess.run(
[str(python_executable), "-c", snippet],
text=True,
capture_output=True,
check=False,
)
if result.returncode != 0:
return None
return json.loads(result.stdout)["version"]
def _release_metadata(version: str) -> dict[str, object]:
url = f"https://api.github.com/repos/{REPO_SLUG}/releases/tags/rust-v{version}"
token = _github_token()
attempts = [True, False] if token is not None else [False]
last_error: urllib.error.HTTPError | None = None
for include_auth in attempts:
headers = {
"Accept": "application/vnd.github+json",
"User-Agent": "codex-python-runtime-setup",
}
if include_auth and token is not None:
headers["Authorization"] = f"Bearer {token}"
request = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(request) as response:
return json.load(response)
except urllib.error.HTTPError as exc:
last_error = exc
if include_auth and exc.code == 401:
continue
break
assert last_error is not None
raise RuntimeSetupError(
f"Failed to resolve release metadata for rust-v{version} from {REPO_SLUG}: "
f"{last_error.code} {last_error.reason}"
) from last_error
def _download_release_archive(version: str, temp_root: Path) -> Path:
asset_name = platform_asset_name()
archive_path = temp_root / asset_name
browser_download_url = (
f"https://github.com/{REPO_SLUG}/releases/download/rust-v{version}/{asset_name}"
)
request = urllib.request.Request(
browser_download_url,
headers={"User-Agent": "codex-python-runtime-setup"},
)
try:
with urllib.request.urlopen(request) as response, archive_path.open("wb") as fh:
shutil.copyfileobj(response, fh)
return archive_path
except urllib.error.HTTPError:
pass
metadata = _release_metadata(version)
assets = metadata.get("assets")
if not isinstance(assets, list):
raise RuntimeSetupError(f"Release rust-v{version} returned malformed assets metadata.")
asset = next(
(
item
for item in assets
if isinstance(item, dict) and item.get("name") == asset_name
),
None,
)
if asset is None:
raise RuntimeSetupError(
f"Release rust-v{version} does not contain asset {asset_name} for this platform."
)
api_url = asset.get("url")
if not isinstance(api_url, str):
api_url = None
if api_url is not None:
token = _github_token()
if token is not None:
request = urllib.request.Request(
api_url,
headers=_github_api_headers("application/octet-stream"),
)
try:
with urllib.request.urlopen(request) as response, archive_path.open("wb") as fh:
shutil.copyfileobj(response, fh)
return archive_path
except urllib.error.HTTPError:
pass
if shutil.which("gh") is None:
raise RuntimeSetupError(
f"Unable to download {asset_name} for rust-v{version}. "
"Provide GH_TOKEN/GITHUB_TOKEN or install/authenticate GitHub CLI."
)
try:
subprocess.run(
[
"gh",
"release",
"download",
f"rust-v{version}",
"--repo",
REPO_SLUG,
"--pattern",
asset_name,
"--dir",
str(temp_root),
],
check=True,
text=True,
capture_output=True,
)
except subprocess.CalledProcessError as exc:
raise RuntimeSetupError(
f"gh release download failed for rust-v{version} asset {asset_name}.\n"
f"STDOUT:\n{exc.stdout}\nSTDERR:\n{exc.stderr}"
) from exc
return archive_path
def _extract_runtime_binary(archive_path: Path, temp_root: Path) -> Path:
extract_dir = temp_root / "extracted"
extract_dir.mkdir(parents=True, exist_ok=True)
if archive_path.name.endswith(".tar.gz"):
with tarfile.open(archive_path, "r:gz") as tar:
try:
tar.extractall(extract_dir, filter="data")
except TypeError:
tar.extractall(extract_dir)
elif archive_path.suffix == ".zip":
with zipfile.ZipFile(archive_path) as zip_file:
zip_file.extractall(extract_dir)
else:
raise RuntimeSetupError(f"Unsupported release archive format: {archive_path.name}")
binary_name = runtime_binary_name()
archive_stem = archive_path.name.removesuffix(".tar.gz").removesuffix(".zip")
candidates = [
path
for path in extract_dir.rglob("*")
if path.is_file()
and (
path.name == binary_name
or path.name == archive_stem
or path.name.startswith("codex-")
)
]
if not candidates:
raise RuntimeSetupError(
f"Failed to find {binary_name} in extracted runtime archive {archive_path.name}."
)
return candidates[0]
def _stage_runtime_package(
sdk_python_dir: Path,
runtime_version: str,
runtime_binary: Path,
staging_dir: Path,
) -> Path:
script_module = _load_update_script_module(sdk_python_dir)
return script_module.stage_python_runtime_package( # type: ignore[no-any-return]
staging_dir,
runtime_version,
runtime_binary.resolve(),
)
def _install_runtime_package(
python_executable: str | Path,
staged_runtime_dir: Path,
install_target: Path | None,
) -> None:
args = [
str(python_executable),
"-m",
"pip",
"install",
"--force-reinstall",
"--no-deps",
]
if install_target is not None:
install_target.mkdir(parents=True, exist_ok=True)
args.extend(["--target", str(install_target)])
args.append(str(staged_runtime_dir))
try:
subprocess.run(
args,
check=True,
text=True,
capture_output=True,
)
except subprocess.CalledProcessError as exc:
raise RuntimeSetupError(
f"Failed to install {PACKAGE_NAME} into {python_executable} from {staged_runtime_dir}.\n"
f"STDOUT:\n{exc.stdout}\nSTDERR:\n{exc.stderr}"
) from exc
def _load_update_script_module(sdk_python_dir: Path):
script_path = sdk_python_dir / "scripts" / "update_sdk_artifacts.py"
spec = importlib.util.spec_from_file_location("update_sdk_artifacts", script_path)
if spec is None or spec.loader is None:
raise RuntimeSetupError(f"Failed to load {script_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def _github_api_headers(accept: str) -> dict[str, str]:
headers = {
"Accept": accept,
"User-Agent": "codex-python-runtime-setup",
}
token = _github_token()
if token is not None:
headers["Authorization"] = f"Bearer {token}"
return headers
def _github_token() -> str | None:
for env_name in ("GH_TOKEN", "GITHUB_TOKEN"):
token = os.environ.get(env_name)
if token:
return token
return None
def _normalized_package_version(version: str) -> str:
return version.strip().replace("-alpha.", "a").replace("-beta.", "b")
__all__ = [
"PACKAGE_NAME",
"PINNED_RUNTIME_VERSION",
"RuntimeSetupError",
"ensure_runtime_package_installed",
"pinned_runtime_version",
"platform_asset_name",
]

View File

@@ -1,190 +0,0 @@
# Codex App Server SDK — API Reference
Public surface of `codex_app_server` for app-server v2.
This SDK surface is experimental. The current implementation intentionally allows only one active `TurnHandle.stream()` or `TurnHandle.run()` consumer per client instance at a time.
## Package Entry
```python
from codex_app_server import (
Codex,
AsyncCodex,
Thread,
AsyncThread,
TurnHandle,
AsyncTurnHandle,
InitializeResponse,
Input,
InputItem,
TextInput,
ImageInput,
LocalImageInput,
SkillInput,
MentionInput,
TurnStatus,
)
from codex_app_server.generated.v2_all import ThreadItem
```
- Version: `codex_app_server.__version__`
- Requires Python >= 3.10
- Canonical generated app-server models live in `codex_app_server.generated.v2_all`
## Codex (sync)
```python
Codex(config: AppServerConfig | None = None)
```
Properties/methods:
- `metadata -> InitializeResponse`
- `close() -> None`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> ThreadListResponse`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Thread`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, sandbox=None) -> Thread`
- `thread_archive(thread_id: str) -> ThreadArchiveResponse`
- `thread_unarchive(thread_id: str) -> Thread`
- `models(*, include_hidden: bool = False) -> ModelListResponse`
Context manager:
```python
with Codex() as codex:
...
```
## AsyncCodex (async parity)
```python
AsyncCodex(config: AppServerConfig | None = None)
```
Preferred usage:
```python
async with AsyncCodex() as codex:
...
```
`AsyncCodex` initializes lazily. Context entry is the standard path because it
ensures startup and shutdown are paired explicitly.
Properties/methods:
- `metadata -> InitializeResponse`
- `close() -> Awaitable[None]`
- `thread_start(*, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_list(*, archived=None, cursor=None, cwd=None, limit=None, model_providers=None, sort_key=None, source_kinds=None) -> Awaitable[ThreadListResponse]`
- `thread_resume(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, model=None, model_provider=None, personality=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_fork(thread_id: str, *, approval_policy=None, base_instructions=None, config=None, cwd=None, developer_instructions=None, ephemeral=None, model=None, model_provider=None, sandbox=None) -> Awaitable[AsyncThread]`
- `thread_archive(thread_id: str) -> Awaitable[ThreadArchiveResponse]`
- `thread_unarchive(thread_id: str) -> Awaitable[AsyncThread]`
- `models(*, include_hidden: bool = False) -> Awaitable[ModelListResponse]`
Async context manager:
```python
async with AsyncCodex() as codex:
...
```
## Thread / AsyncThread
`Thread` and `AsyncThread` share the same shape and intent.
### Thread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> TurnHandle`
- `read(*, include_turns: bool = False) -> ThreadReadResponse`
- `set_name(name: str) -> ThreadSetNameResponse`
- `compact() -> ThreadCompactStartResponse`
### AsyncThread
- `turn(input: Input, *, approval_policy=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurnHandle]`
- `read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]`
- `set_name(name: str) -> Awaitable[ThreadSetNameResponse]`
- `compact() -> Awaitable[ThreadCompactStartResponse]`
## TurnHandle / AsyncTurnHandle
### TurnHandle
- `steer(input: Input) -> TurnSteerResponse`
- `interrupt() -> TurnInterruptResponse`
- `stream() -> Iterator[Notification]`
- `run() -> codex_app_server.generated.v2_all.Turn`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `Codex` instance raises `RuntimeError`
### AsyncTurnHandle
- `steer(input: Input) -> Awaitable[TurnSteerResponse]`
- `interrupt() -> Awaitable[TurnInterruptResponse]`
- `stream() -> AsyncIterator[Notification]`
- `run() -> Awaitable[codex_app_server.generated.v2_all.Turn]`
Behavior notes:
- `stream()` and `run()` are exclusive per client instance in the current experimental build
- starting a second turn consumer on the same `AsyncCodex` instance raises `RuntimeError`
## Inputs
```python
@dataclass class TextInput: text: str
@dataclass class ImageInput: url: str
@dataclass class LocalImageInput: path: str
@dataclass class SkillInput: name: str; path: str
@dataclass class MentionInput: name: str; path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
```
## Generated Models
The SDK wrappers return and accept canonical generated app-server models wherever possible:
```python
from codex_app_server.generated.v2_all import (
AskForApproval,
ThreadReadResponse,
Turn,
TurnStartParams,
TurnStatus,
)
```
## Retry + errors
```python
from codex_app_server import (
retry_on_overload,
JsonRpcError,
MethodNotFoundError,
InvalidParamsError,
ServerBusyError,
is_retryable_error,
)
```
- `retry_on_overload(...)` retries transient overload errors with exponential backoff + jitter.
- `is_retryable_error(exc)` checks if an exception is transient/overload-like.
## Example
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run()
print(completed_turn.id, completed_turn.status)
```

View File

@@ -8,45 +8,24 @@
## `run()` vs `stream()`
- `TurnHandle.run()` / `AsyncTurnHandle.run()` is the easiest path. It consumes events until completion and returns the canonical generated app-server `Turn` model.
- `TurnHandle.stream()` / `AsyncTurnHandle.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
- `Turn.run()` is the easiest path. It consumes events until completion and returns `TurnResult`.
- `Turn.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
Choose `run()` for most apps. Choose `stream()` for progress UIs, custom timeout logic, or custom parsing.
## Sync vs async clients
- `Codex` is the sync public API.
- `AsyncCodex` is an async replica of the same public API shape.
- Prefer `async with AsyncCodex()` for async code. It is the standard path for
explicit startup/shutdown, and `AsyncCodex` initializes lazily on context
entry or first awaited API use.
- `Codex` is the minimal sync SDK and best default.
- `AsyncAppServerClient` wraps the sync transport with `asyncio.to_thread(...)` for async-friendly call sites.
If your app is not already async, stay with `Codex`.
## Public kwargs are snake_case
## `thread(...)` vs `thread_resume(...)`
Public API keyword names are snake_case. The SDK still maps them to wire camelCase under the hood.
- `codex.thread(thread_id)` only binds a local helper to an existing thread ID.
- `codex.thread_resume(thread_id, ...)` performs a `thread/resume` RPC and can apply overrides (model, instructions, sandbox, etc.).
If you are migrating older code, update these names:
- `approvalPolicy` -> `approval_policy`
- `baseInstructions` -> `base_instructions`
- `developerInstructions` -> `developer_instructions`
- `modelProvider` -> `model_provider`
- `modelProviders` -> `model_providers`
- `sortKey` -> `sort_key`
- `sourceKinds` -> `source_kinds`
- `outputSchema` -> `output_schema`
- `sandboxPolicy` -> `sandbox_policy`
## Why only `thread_start(...)` and `thread_resume(...)`?
The public API keeps only explicit lifecycle calls:
- `thread_start(...)` to create new threads
- `thread_resume(thread_id, ...)` to continue existing threads
This avoids duplicate ways to do the same operation and keeps behavior explicit.
Use `thread(...)` for simple continuation. Use `thread_resume(...)` when you need explicit resume semantics or override fields.
## Why does constructor fail?
@@ -82,7 +61,7 @@ python scripts/update_sdk_artifacts.py \
A turn is complete only when `turn/completed` arrives for that turn ID.
- `run()` waits for this automatically.
- With `stream()`, keep consuming notifications until completion.
- With `stream()`, make sure you keep consuming notifications until completion.
## How do I retry safely?
@@ -93,6 +72,6 @@ Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundErro
## Common pitfalls
- Starting a new thread for every prompt when you wanted continuity.
- Forgetting to `close()` (or not using context managers).
- Assuming `run()` returns extra SDK-only fields instead of the generated `Turn` model.
- Mixing SDK input classes with raw dicts incorrectly.
- Forgetting to `close()` (or not using `with Codex() as codex:`).
- Ignoring `TurnResult.status` and `TurnResult.error`.
- Mixing SDK input classes with raw dicts incorrectly in minimal API paths.

View File

@@ -1,8 +1,6 @@
# Getting Started
This is the fastest path from install to a multi-turn thread using the public SDK surface.
The SDK is experimental. Treat the API, bundled runtime strategy, and packaging details as unstable until the first public release.
This is the fastest path from install to a multi-turn thread using the minimal SDK surface.
## 1) Install
@@ -17,32 +15,30 @@ Requirements:
- Python `>=3.10`
- installed `codex-cli-bin` runtime package, or an explicit `codex_bin` override
- local Codex auth/session configured
- Local Codex auth/session configured
## 2) Run your first turn (sync)
## 2) Run your first turn
```python
from codex_app_server import Codex, TextInput
with Codex() as codex:
server = codex.metadata.serverInfo
print("Server:", None if server is None else server.name, None if server is None else server.version)
print("Server:", codex.metadata.server_name, codex.metadata.server_version)
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
completed_turn = thread.turn(TextInput("Say hello in one sentence.")).run()
thread = codex.thread_start(model="gpt-5")
result = thread.turn(TextInput("Say hello in one sentence.")).run()
print("Thread:", thread.id)
print("Turn:", completed_turn.id)
print("Status:", completed_turn.status)
print("Items:", len(completed_turn.items or []))
print("Thread:", result.thread_id)
print("Turn:", result.turn_id)
print("Status:", result.status)
print("Text:", result.text)
```
What happened:
- `Codex()` started and initialized `codex app-server`.
- `thread_start(...)` created a thread.
- `turn(...).run()` consumed events until `turn/completed` and returned the canonical generated app-server `Turn` model.
- one client can have only one active `TurnHandle.stream()` / `TurnHandle.run()` consumer at a time in the current experimental build
- `turn(...).run()` consumed events until `turn/completed` and returned a `TurnResult`.
## 3) Continue the same thread (multi-turn)
@@ -50,37 +46,16 @@ What happened:
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
thread = codex.thread_start(model="gpt-5")
first = thread.turn(TextInput("Summarize Rust ownership in 2 bullets.")).run()
second = thread.turn(TextInput("Now explain it to a Python developer.")).run()
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("first:", first.text)
print("second:", second.text)
```
## 4) Async parity
Use `async with AsyncCodex()` as the normal async entrypoint. `AsyncCodex`
initializes lazily, and context entry makes startup/shutdown explicit.
```python
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex() as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Continue where we left off."))
completed_turn = await turn.run()
print(completed_turn.id, completed_turn.status)
asyncio.run(main())
```
## 5) Resume an existing thread
## 4) Resume an existing thread
```python
from codex_app_server import Codex, TextInput
@@ -88,20 +63,12 @@ from codex_app_server import Codex, TextInput
THREAD_ID = "thr_123" # replace with a real id
with Codex() as codex:
thread = codex.thread_resume(THREAD_ID)
completed_turn = thread.turn(TextInput("Continue where we left off.")).run()
print(completed_turn.id, completed_turn.status)
thread = codex.thread(THREAD_ID)
result = thread.turn(TextInput("Continue where we left off.")).run()
print(result.text)
```
## 6) Generated models
The convenience wrappers live at the package root, but the canonical app-server models live under:
```python
from codex_app_server.generated.v2_all import Turn, TurnStatus, ThreadReadResponse
```
## 7) Next stops
## 5) Next stops
- API surface and signatures: `docs/api-reference.md`
- Common decisions/pitfalls: `docs/faq.md`

View File

@@ -1,38 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
server_label,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
print("Server:", server_label(codex.metadata))
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Say hello in one sentence."))
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print("Text:", assistant_text_from_turn(persisted_turn))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,28 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
server_label,
)
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
print("Server:", server_label(codex.metadata))
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Say hello in one sentence.")).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print("Text:", assistant_text_from_turn(persisted_turn))

View File

@@ -1,43 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Give 3 bullets about SIMD."))
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,34 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Give 3 bullets about SIMD.")).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)

View File

@@ -1,63 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(TextInput("Explain SIMD in 3 short bullets."))
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
async for event in turn.stream():
event_count += 1
if event.method == "turn/started":
saw_started = True
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "turn/completed":
completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
if saw_delta:
print()
else:
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
print("assistant>", final_text)
print("stream.started.seen:", saw_started)
print("stream.completed:", completed_status)
print("events.count:", event_count)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,55 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Explain SIMD in 3 short bullets."))
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
for event in turn.stream():
event_count += 1
if event.method == "turn/started":
saw_started = True
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "turn/completed":
completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
if saw_delta:
print()
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
print("assistant>", final_text)
print("stream.started.seen:", saw_started)
print("stream.completed:", completed_status)
print("events.count:", event_count)

View File

@@ -1,28 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
print("metadata:", codex.metadata)
models = await codex.models(include_hidden=True)
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,20 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from codex_app_server import Codex
with Codex(config=runtime_config()) as codex:
print("metadata:", codex.metadata)
models = codex.models()
print("models.count:", len(models.data))
if models.data:
print("first model id:", models.data[0].id)

View File

@@ -1,34 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import assistant_text_from_turn, ensure_local_sdk_src, find_turn_by_id, runtime_config
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
original = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first_turn = await original.turn(TextInput("Tell me one fact about Saturn."))
_ = await first_turn.run()
print("Created thread:", original.id)
resumed = await codex.thread_resume(original.id)
second_turn = await resumed.turn(TextInput("Continue with one more fact."))
second = await second_turn.run()
persisted = await resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,25 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import assistant_text_from_turn, ensure_local_sdk_src, find_turn_by_id, runtime_config
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
# Create an initial thread and turn so we have a real thread to resume.
original = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first = original.turn(TextInput("Tell me one fact about Saturn.")).run()
print("Created thread:", original.id)
# Resume the existing thread by ID.
resumed = codex.thread_resume(original.id)
second = resumed.turn(TextInput("Continue with one more fact.")).run()
persisted = resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))

View File

@@ -1,70 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first = await (await thread.turn(TextInput("One sentence about structured planning."))).run()
second = await (await thread.turn(TextInput("Now restate it for a junior engineer."))).run()
reopened = await codex.thread_resume(thread.id)
listing_active = await codex.thread_list(limit=20, archived=False)
reading = await reopened.read(include_turns=True)
_ = await reopened.set_name("sdk-lifecycle-demo")
_ = await codex.thread_archive(reopened.id)
listing_archived = await codex.thread_list(limit=20, archived=True)
unarchived = await codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (await resumed.turn(TextInput("Continue in one short sentence."))).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = await codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = await (await forked.turn(TextInput("Take a different angle in one short sentence."))).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = await unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,63 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("One sentence about structured planning.")).run()
second = thread.turn(TextInput("Now restate it for a junior engineer.")).run()
reopened = codex.thread_resume(thread.id)
listing_active = codex.thread_list(limit=20, archived=False)
reading = reopened.read(include_turns=True)
_ = reopened.set_name("sdk-lifecycle-demo")
_ = codex.thread_archive(reopened.id)
listing_archived = codex.thread_list(limit=20, archived=True)
unarchived = codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
forked_info = "n/a"
try:
forked = codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = forked.turn(TextInput("Take a different angle in one short sentence.")).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
compact_info = "sent"
try:
_ = unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)

View File

@@ -1,42 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,33 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import Codex, ImageInput, TextInput
REMOTE_IMAGE_URL = "https://raw.githubusercontent.com/github/explore/main/topics/python/python.png"
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("What is in this image? Give 3 bullets."),
ImageInput(REMOTE_IMAGE_URL),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))

View File

@@ -1,43 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, LocalImageInput, TextInput
async def main() -> None:
with temporary_sample_image_path() as image_path:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
[
TextInput("Read this generated local image and summarize the colors/layout in 2 bullets."),
LocalImageInput(str(image_path.resolve())),
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,34 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
ensure_local_sdk_src()
from codex_app_server import Codex, LocalImageInput, TextInput
with temporary_sample_image_path() as image_path:
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(
[
TextInput("Read this generated local image and summarize the colors/layout in 2 bullets."),
LocalImageInput(str(image_path.resolve())),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))

View File

@@ -1,31 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
server_label,
)
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
print("Server:", server_label(codex.metadata))
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Say hello in one sentence."))
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Thread:", thread.id)
print("Turn:", result.id)
print("Text:", assistant_text_from_turn(persisted_turn).strip())

View File

@@ -1,98 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
import random
from collections.abc import Awaitable, Callable
from typing import TypeVar
from codex_app_server import (
AsyncCodex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
is_retryable_error,
)
ResultT = TypeVar("ResultT")
async def retry_on_overload_async(
op: Callable[[], Awaitable[ResultT]],
*,
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
jitter_ratio: float = 0.2,
) -> ResultT:
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
delay = initial_delay_s
attempt = 0
while True:
attempt += 1
try:
return await op()
except Exception as exc: # noqa: BLE001
if attempt >= max_attempts or not is_retryable_error(exc):
raise
jitter = delay * jitter_ratio
sleep_for = min(max_delay_s, delay) + random.uniform(-jitter, jitter)
if sleep_for > 0:
await asyncio.sleep(sleep_for)
delay = min(max_delay_s, delay * 2)
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
try:
result = await retry_on_overload_async(
_run_turn(thread, "Summarize retry best practices in 3 bullets."),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
return
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
return
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", assistant_text_from_turn(persisted_turn))
def _run_turn(thread, prompt: str):
async def _inner():
turn = await thread.turn(TextInput(prompt))
return await turn.run()
return _inner
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,47 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import (
Codex,
JsonRpcError,
ServerBusyError,
TextInput,
TurnStatus,
retry_on_overload,
)
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
try:
result = retry_on_overload(
lambda: thread.turn(TextInput("Summarize retry best practices in 3 bullets.")).run(),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", assistant_text_from_turn(persisted_turn))

View File

@@ -1,96 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AsyncCodex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
)
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
async def main() -> None:
print("Codex async mini CLI. Type /exit to quit.")
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = (await asyncio.to_thread(input, "you> ")).strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = await thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
async for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
continue
if isinstance(payload, TurnCompletedNotification):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,89 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from codex_app_server import (
Codex,
TextInput,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
)
print("Codex mini CLI. Type /exit to quit.")
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
return (
"usage>\n"
f" last: input={last.inputTokens} output={last.outputTokens} reasoning={last.reasoningOutputTokens} total={last.totalTokens} cached={last.cachedInputTokens}\n"
f" total: input={total.inputTokens} output={total.outputTokens} reasoning={total.reasoningOutputTokens} total={total.totalTokens} cached={total.cachedInputTokens}"
)
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
print("Thread:", thread.id)
while True:
try:
user_input = input("you> ").strip()
except EOFError:
break
if not user_input:
continue
if user_input in {"/exit", "/quit"}:
break
turn = thread.turn(TextInput(user_input))
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
continue
if isinstance(payload, TurnCompletedNotification):
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
status_text = _status_value(status)
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)
print(_format_usage(usage))

View File

@@ -1,82 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AskForApproval,
AsyncCodex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = ReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = await thread.turn(
TextInput(PROMPT),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=ReasoningEffort.medium,
model="gpt-5.4",
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print("Text:", assistant_text_from_turn(persisted_turn))
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,74 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import (
AskForApproval,
Codex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
SUMMARY = ReasoningSummary.model_validate("concise")
PROMPT = (
"Analyze a safe rollout plan for enabling a feature flag in production. "
"Return JSON matching the requested schema."
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(
TextInput(PROMPT),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=ReasoningEffort.medium,
model="gpt-5.4",
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=SUMMARY,
)
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print("Text:", assistant_text_from_turn(persisted_turn))
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))

View File

@@ -1,125 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import assistant_text_from_turn, ensure_local_sdk_src, find_turn_by_id, runtime_config
ensure_local_sdk_src()
import asyncio
from codex_app_server import (
AskForApproval,
AsyncCodex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next((m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None)
if preferred is not None:
return preferred
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
)
return ReasoningEffort(best.reasoning_effort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
models = await codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = await codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first_turn = await thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
)
first = await first_turn.run()
persisted = await thread.read(include_turns=True)
first_persisted_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_persisted_turn))
print("items:", 0 if first_persisted_turn is None else len(first_persisted_turn.items or []))
second_turn = await thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
)
second = await second_turn.run()
persisted = await thread.read(include_turns=True)
second_persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_persisted_turn))
print("items.params:", 0 if second_persisted_turn is None else len(second_persisted_turn.items or []))
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,116 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import assistant_text_from_turn, ensure_local_sdk_src, find_turn_by_id, runtime_config
ensure_local_sdk_src()
from codex_app_server import (
AskForApproval,
Codex,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxPolicy,
TextInput,
)
REASONING_RANK = {
"none": 0,
"minimal": 1,
"low": 2,
"medium": 3,
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next((m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None)
if preferred is not None:
return preferred
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
)
return ReasoningEffort(best.reasoning_effort.value)
OUTPUT_SCHEMA = {
"type": "object",
"properties": {
"summary": {"type": "string"},
"actions": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["summary", "actions"],
"additionalProperties": False,
}
SANDBOX_POLICY = SandboxPolicy.model_validate(
{
"type": "readOnly",
"access": {"type": "fullAccess"},
}
)
APPROVAL_POLICY = AskForApproval.model_validate("never")
with Codex(config=runtime_config()) as codex:
models = codex.models(include_hidden=True)
selected_model = _pick_highest_model(models.data)
selected_effort = _pick_highest_turn_effort(selected_model)
print("selected.model:", selected_model.model)
print("selected.effort:", selected_effort.value)
thread = codex.thread_start(
model=selected_model.model,
config={"model_reasoning_effort": selected_effort.value},
)
first = thread.turn(
TextInput("Give one short sentence about reliable production releases."),
model=selected_model.model,
effort=selected_effort,
).run()
persisted = thread.read(include_turns=True)
first_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_turn))
print("items:", 0 if first_turn is None else len(first_turn.items or []))
second = thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
approval_policy=APPROVAL_POLICY,
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
).run()
persisted = thread.read(include_turns=True)
second_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_turn))
print("items.params:", 0 if second_turn is None else len(second_turn.items or []))

View File

@@ -1,71 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
ensure_local_sdk_src()
import asyncio
from codex_app_server import AsyncCodex, TextInput
async def main() -> None:
async with AsyncCodex(config=runtime_config()) as codex:
thread = await codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
steer_turn = await thread.turn(TextInput("Count from 1 to 40 with commas, then one summary sentence."))
steer_result = "sent"
try:
_ = await steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
async for event in steer_turn.stream():
steer_event_count += 1
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
interrupt_turn = await thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
interrupt_result = "sent"
try:
_ = await interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
async for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
interrupt_preview = assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
print("steer.result:", steer_result)
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,63 +0,0 @@
import sys
from pathlib import Path
_EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
ensure_local_sdk_src()
from codex_app_server import Codex, TextInput
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
steer_turn = thread.turn(TextInput("Count from 1 to 40 with commas, then one summary sentence."))
steer_result = "sent"
try:
_ = steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
for event in steer_turn.stream():
steer_event_count += 1
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
interrupt_turn = thread.turn(TextInput("Count from 1 to 200 with commas, then one summary sentence."))
interrupt_result = "sent"
try:
_ = interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(event.payload.turn.status, "value", str(event.payload.turn.status))
interrupt_preview = assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
print("steer.result:", steer_result)
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)

View File

@@ -1,85 +0,0 @@
# Python SDK Examples
Each example folder contains runnable versions:
- `sync.py` (public sync surface: `Codex`)
- `async.py` (public async surface: `AsyncCodex`)
All examples intentionally use only public SDK exports from `codex_app_server`.
## Prerequisites
- Python `>=3.10`
- Install SDK dependencies for the same Python interpreter you will use to run examples
Recommended setup (from `sdk/python`):
```bash
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
python -m pip install -e .
```
When running examples from this repo checkout, the SDK source uses the local
tree and does not bundle a runtime binary. The helper in `examples/_bootstrap.py`
uses the installed `codex-cli-bin` runtime package.
If the pinned `codex-cli-bin` runtime is not already installed, the bootstrap
will download the matching GitHub release artifact, stage a temporary local
`codex-cli-bin` package, install it into your active interpreter, and clean up
the temporary files afterward.
Current pinned runtime version: `0.116.0-alpha.1`
## Run examples
From `sdk/python`:
```bash
python examples/<example-folder>/sync.py
python examples/<example-folder>/async.py
```
The examples bootstrap local imports from `sdk/python/src` automatically, so no
SDK wheel install is required. You only need the Python dependencies for your
active interpreter and an installed `codex-cli-bin` runtime package (either
already present or automatically provisioned by the bootstrap).
## Recommended first run
```bash
python examples/01_quickstart_constructor/sync.py
python examples/01_quickstart_constructor/async.py
```
## Index
- `01_quickstart_constructor/`
- first run / sanity check
- `02_turn_run/`
- inspect full turn output fields
- `03_turn_stream_events/`
- stream a turn with a small curated event view
- `04_models_and_metadata/`
- read server metadata and model list
- `05_existing_thread/`
- resume a real existing thread (created in-script)
- `06_thread_lifecycle_and_controls/`
- thread lifecycle + control calls
- `07_image_and_text/`
- remote image URL + text multimodal turn
- `08_local_image_and_text/`
- local image + text multimodal turn using a generated temporary sample image
- `09_async_parity/`
- parity-style sync flow (see async parity in other examples)
- `10_error_handling_and_retry/`
- overload retry pattern + typed error handling structure
- `11_cli_mini_app/`
- interactive chat loop
- `12_turn_params_kitchen_sink/`
- one turn using most optional `turn(...)` params (sync + async)
- `13_model_select_and_turn_params/`
- list models, pick highest model + highest supported reasoning effort, run turns, print message and usage
- `14_turn_controls/`
- separate best-effort `steer()` and `interrupt()` demos with concise summaries

View File

@@ -1,152 +0,0 @@
from __future__ import annotations
import contextlib
import importlib.util
import os
import sys
import tempfile
import zlib
from pathlib import Path
from typing import Iterable, Iterator
_SDK_PYTHON_DIR = Path(__file__).resolve().parents[1]
_SDK_PYTHON_STR = str(_SDK_PYTHON_DIR)
if _SDK_PYTHON_STR not in sys.path:
sys.path.insert(0, _SDK_PYTHON_STR)
from _runtime_setup import ensure_runtime_package_installed
def _ensure_runtime_dependencies(sdk_python_dir: Path) -> None:
if importlib.util.find_spec("pydantic") is not None:
return
python = sys.executable
raise RuntimeError(
"Missing required dependency: pydantic.\n"
f"Interpreter: {python}\n"
"Install dependencies with the same interpreter used to run this example:\n"
f" {python} -m pip install -e {sdk_python_dir}\n"
"If you installed with `pip` from another Python, reinstall using the command above."
)
def ensure_local_sdk_src() -> Path:
"""Add sdk/python/src to sys.path so examples run without installing the package."""
sdk_python_dir = _SDK_PYTHON_DIR
src_dir = sdk_python_dir / "src"
package_dir = src_dir / "codex_app_server"
if not package_dir.exists():
raise RuntimeError(f"Could not locate local SDK package at {package_dir}")
_ensure_runtime_dependencies(sdk_python_dir)
src_str = str(src_dir)
if src_str not in sys.path:
sys.path.insert(0, src_str)
return src_dir
def runtime_config():
"""Return an example-friendly AppServerConfig for repo-source SDK usage."""
from codex_app_server import AppServerConfig
ensure_runtime_package_installed(sys.executable, _SDK_PYTHON_DIR)
return AppServerConfig()
def _png_chunk(chunk_type: bytes, data: bytes) -> bytes:
import struct
payload = chunk_type + data
checksum = zlib.crc32(payload) & 0xFFFFFFFF
return struct.pack(">I", len(data)) + payload + struct.pack(">I", checksum)
def _generated_sample_png_bytes() -> bytes:
import struct
width = 96
height = 96
top_left = (120, 180, 255)
top_right = (255, 220, 90)
bottom_left = (90, 180, 95)
bottom_right = (180, 85, 85)
rows = bytearray()
for y in range(height):
rows.append(0)
for x in range(width):
if y < height // 2 and x < width // 2:
color = top_left
elif y < height // 2:
color = top_right
elif x < width // 2:
color = bottom_left
else:
color = bottom_right
rows.extend(color)
header = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0)
return (
b"\x89PNG\r\n\x1a\n"
+ _png_chunk(b"IHDR", header)
+ _png_chunk(b"IDAT", zlib.compress(bytes(rows)))
+ _png_chunk(b"IEND", b"")
)
@contextlib.contextmanager
def temporary_sample_image_path() -> Iterator[Path]:
with tempfile.TemporaryDirectory(prefix="codex-python-example-image-") as temp_root:
image_path = Path(temp_root) / "generated_sample.png"
image_path.write_bytes(_generated_sample_png_bytes())
yield image_path
def server_label(metadata: object) -> str:
server = getattr(metadata, "serverInfo", None)
server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip()
server_version = ((getattr(server, "version", None) or "") if server is not None else "").strip()
if server_name and server_version:
return f"{server_name} {server_version}"
user_agent = ((getattr(metadata, "userAgent", None) or "") if metadata is not None else "").strip()
return user_agent or "unknown"
def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None:
for turn in turns or []:
if getattr(turn, "id", None) == turn_id:
return turn
return None
def assistant_text_from_turn(turn: object | None) -> str:
if turn is None:
return ""
chunks: list[str] = []
for item in getattr(turn, "items", []) or []:
raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item
if not isinstance(raw_item, dict):
continue
item_type = raw_item.get("type")
if item_type == "agentMessage":
text = raw_item.get("text")
if isinstance(text, str) and text:
chunks.append(text)
continue
if item_type != "message" or raw_item.get("role") != "assistant":
continue
for content in raw_item.get("content") or []:
if not isinstance(content, dict) or content.get("type") != "output_text":
continue
text = content.get("text")
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks)

View File

@@ -1,583 +0,0 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Codex Python SDK Walkthrough\n",
"\n",
"Public SDK surface only (`codex_app_server` root exports)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 1: bootstrap local SDK imports + pinned runtime package\n",
"import os\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
"if sys.version_info < (3, 10):\n",
" raise RuntimeError(\n",
" f'Notebook requires Python 3.10+; current interpreter is {sys.version.split()[0]}.'\n",
" )\n",
"\n",
"try:\n",
" _ = os.getcwd()\n",
"except FileNotFoundError:\n",
" os.chdir(str(Path.home()))\n",
"\n",
"\n",
"def _is_sdk_python_dir(path: Path) -> bool:\n",
" return (path / 'pyproject.toml').exists() and (path / 'src' / 'codex_app_server').exists()\n",
"\n",
"\n",
"def _iter_home_fallback_candidates(home: Path):\n",
" # bounded depth scan under home to support launching notebooks from unrelated cwd values\n",
" patterns = ('sdk/python', '*/sdk/python', '*/*/sdk/python', '*/*/*/sdk/python')\n",
" for pattern in patterns:\n",
" yield from home.glob(pattern)\n",
"\n",
"\n",
"def _find_sdk_python_dir(start: Path) -> Path | None:\n",
" checked = set()\n",
"\n",
" def _consider(candidate: Path) -> Path | None:\n",
" resolved = candidate.resolve()\n",
" if resolved in checked:\n",
" return None\n",
" checked.add(resolved)\n",
" if _is_sdk_python_dir(resolved):\n",
" return resolved\n",
" return None\n",
"\n",
" for candidate in [start, *start.parents]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" for candidate in [start / 'sdk' / 'python', *(parent / 'sdk' / 'python' for parent in start.parents)]:\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" env_dir = os.environ.get('CODEX_PYTHON_SDK_DIR')\n",
" if env_dir:\n",
" found = _consider(Path(env_dir).expanduser())\n",
" if found is not None:\n",
" return found\n",
"\n",
" for entry in sys.path:\n",
" if not entry:\n",
" continue\n",
" entry_path = Path(entry).expanduser()\n",
" for candidate in (entry_path, entry_path / 'sdk' / 'python'):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" home = Path.home()\n",
" for candidate in _iter_home_fallback_candidates(home):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" return None\n",
"\n",
"\n",
"repo_python_dir = _find_sdk_python_dir(Path.cwd())\n",
"if repo_python_dir is None:\n",
" raise RuntimeError('Could not locate sdk/python. Set CODEX_PYTHON_SDK_DIR to your sdk/python path.')\n",
"\n",
"repo_python_str = str(repo_python_dir)\n",
"if repo_python_str not in sys.path:\n",
" sys.path.insert(0, repo_python_str)\n",
"\n",
"from _runtime_setup import ensure_runtime_package_installed\n",
"\n",
"runtime_version = ensure_runtime_package_installed(\n",
" sys.executable,\n",
" repo_python_dir,\n",
")\n",
"\n",
"src_dir = repo_python_dir / 'src'\n",
"examples_dir = repo_python_dir / 'examples'\n",
"src_str = str(src_dir)\n",
"examples_str = str(examples_dir)\n",
"if src_str not in sys.path:\n",
" sys.path.insert(0, src_str)\n",
"if examples_str not in sys.path:\n",
" sys.path.insert(0, examples_str)\n",
"\n",
"# Force fresh imports after SDK upgrades in the same notebook kernel.\n",
"for module_name in list(sys.modules):\n",
" if module_name == 'codex_app_server' or module_name.startswith('codex_app_server.'):\n",
" sys.modules.pop(module_name, None)\n",
"\n",
"print('Kernel:', sys.executable)\n",
"print('SDK source:', src_dir)\n",
"print('Runtime package:', runtime_version)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 2: imports (public only)\n",
"from _bootstrap import assistant_text_from_turn, find_turn_by_id, server_label\n",
"from codex_app_server import (\n",
" AsyncCodex,\n",
" Codex,\n",
" ImageInput,\n",
" LocalImageInput,\n",
" TextInput,\n",
" retry_on_overload,\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 3: simple sync conversation\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(TextInput('Explain gradient descent in 3 bullets.'))\n",
" result = turn.run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('server:', server_label(codex.metadata))\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 4: multi-turn continuity in same thread\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
"\n",
" first = thread.turn(TextInput('Give a short summary of transformers.')).run()\n",
" second = thread.turn(TextInput('Now explain that to a high-school student.')).run()\n",
" persisted = thread.read(include_turns=True)\n",
" second_turn = find_turn_by_id(persisted.thread.turns, second.id)\n",
"\n",
" print('first status:', first.status)\n",
" print('second status:', second.status)\n",
" print('second text:', assistant_text_from_turn(second_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5: full thread lifecycle and branching (sync)\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" first = thread.turn(TextInput('One sentence about structured planning.')).run()\n",
" second = thread.turn(TextInput('Now restate it for a junior engineer.')).run()\n",
"\n",
" reopened = codex.thread_resume(thread.id)\n",
" listing_active = codex.thread_list(limit=20, archived=False)\n",
" reading = reopened.read(include_turns=True)\n",
"\n",
" _ = reopened.set_name('sdk-lifecycle-demo')\n",
" _ = codex.thread_archive(reopened.id)\n",
" listing_archived = codex.thread_list(limit=20, archived=True)\n",
" unarchived = codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = resumed.turn(TextInput('Continue in one short sentence.')).run()\n",
" resumed_info = f'{resumed_result.id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = forked.turn(TextInput('Take a different angle in one short sentence.')).run()\n",
" forked_info = f'{forked_result.id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.id, first.status)\n",
" print('second:', second.id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5b: one turn with most optional turn params\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" AskForApproval,\n",
" Personality,\n",
" ReasoningEffort,\n",
" ReasoningSummary,\n",
" SandboxPolicy,\n",
")\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"\n",
"sandbox_policy = SandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"summary = ReasoningSummary.model_validate('concise')\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(\n",
" TextInput('Propose a safe production feature-flag rollout. Return JSON matching the schema.'),\n",
" approval_policy=AskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=ReasoningEffort.medium,\n",
" model='gpt-5.4',\n",
" output_schema=output_schema,\n",
" personality=Personality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=summary,\n",
" )\n",
" result = turn.run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 5c: choose highest model + highest supported reasoning, then run turns\n",
"from pathlib import Path\n",
"from codex_app_server import (\n",
" AskForApproval,\n",
" Personality,\n",
" ReasoningEffort,\n",
" ReasoningSummary,\n",
" SandboxPolicy,\n",
")\n",
"\n",
"reasoning_rank = {\n",
" 'none': 0,\n",
" 'minimal': 1,\n",
" 'low': 2,\n",
" 'medium': 3,\n",
" 'high': 4,\n",
" 'xhigh': 5,\n",
"}\n",
"\n",
"\n",
"def pick_highest_model(models):\n",
" visible = [m for m in models if not m.hidden] or models\n",
" known_names = {m.id for m in visible} | {m.model for m in visible}\n",
" top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]\n",
" pool = top_candidates or visible\n",
" return max(pool, key=lambda m: (m.model, m.id))\n",
"\n",
"\n",
"def pick_highest_turn_effort(model) -> ReasoningEffort:\n",
" if not model.supported_reasoning_efforts:\n",
" return ReasoningEffort.medium\n",
" best = max(model.supported_reasoning_efforts, key=lambda opt: reasoning_rank.get(opt.reasoning_effort.value, -1))\n",
" return ReasoningEffort(best.reasoning_effort.value)\n",
"\n",
"\n",
"output_schema = {\n",
" 'type': 'object',\n",
" 'properties': {\n",
" 'summary': {'type': 'string'},\n",
" 'actions': {'type': 'array', 'items': {'type': 'string'}},\n",
" },\n",
" 'required': ['summary', 'actions'],\n",
" 'additionalProperties': False,\n",
"}\n",
"sandbox_policy = SandboxPolicy.model_validate({'type': 'readOnly', 'access': {'type': 'fullAccess'}})\n",
"\n",
"with Codex() as codex:\n",
" models = codex.models(include_hidden=True)\n",
" selected_model = pick_highest_model(models.data)\n",
" selected_effort = pick_highest_turn_effort(selected_model)\n",
"\n",
" print('selected.model:', selected_model.model)\n",
" print('selected.effort:', selected_effort.value)\n",
"\n",
" thread = codex.thread_start(model=selected_model.model, config={'model_reasoning_effort': selected_effort.value})\n",
"\n",
" first = thread.turn(\n",
" TextInput('Give one short sentence about reliable production releases.'),\n",
" model=selected_model.model,\n",
" effort=selected_effort,\n",
" ).run()\n",
" print('agent.message:', first.text)\n",
" print('usage:', first.usage)\n",
"\n",
" second = thread.turn(\n",
" TextInput('Return JSON for a safe feature-flag rollout plan.'),\n",
" approval_policy=AskForApproval.never,\n",
" cwd=str(Path.cwd()),\n",
" effort=selected_effort,\n",
" model=selected_model.model,\n",
" output_schema=output_schema,\n",
" personality=Personality.pragmatic,\n",
" sandbox_policy=sandbox_policy,\n",
" summary=ReasoningSummary.model_validate('concise'),\n",
" ).run()\n",
" print('agent.message.params:', second.text)\n",
" print('usage.params:', second.usage)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 6: multimodal with remote image\n",
"remote_image_url = 'https://raw.githubusercontent.com/github/explore/main/topics/python/python.png'\n",
"\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('What do you see in this image? 3 bullets.'),\n",
" ImageInput(remote_image_url),\n",
" ]).run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 7: multimodal with local image (generated temporary file)\n",
"with temporary_sample_image_path() as local_image_path:\n",
" with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" result = thread.turn([\n",
" TextInput('Describe the colors and layout in this generated local image in 2 bullets.'),\n",
" LocalImageInput(str(local_image_path.resolve())),\n",
" ]).run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 8: retry-on-overload pattern\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
"\n",
" result = retry_on_overload(\n",
" lambda: thread.turn(TextInput('List 5 failure modes in distributed systems.')).run(),\n",
" max_attempts=3,\n",
" initial_delay_s=0.25,\n",
" max_delay_s=2.0,\n",
" )\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 9: full thread lifecycle and branching (async)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_lifecycle_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" first = await (await thread.turn(TextInput('One sentence about structured planning.'))).run()\n",
" second = await (await thread.turn(TextInput('Now restate it for a junior engineer.'))).run()\n",
"\n",
" reopened = await codex.thread_resume(thread.id)\n",
" listing_active = await codex.thread_list(limit=20, archived=False)\n",
" reading = await reopened.read(include_turns=True)\n",
"\n",
" _ = await reopened.set_name('sdk-lifecycle-demo')\n",
" _ = await codex.thread_archive(reopened.id)\n",
" listing_archived = await codex.thread_list(limit=20, archived=True)\n",
" unarchived = await codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = await codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = await (await resumed.turn(TextInput('Continue in one short sentence.'))).run()\n",
" resumed_info = f'{resumed_result.id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = await codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = await (await forked.turn(TextInput('Take a different angle in one short sentence.'))).run()\n",
" forked_info = f'{forked_result.id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = await unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.id, first.status)\n",
" print('second:', second.id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
"\n",
"\n",
"await async_lifecycle_demo()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cell 10: async turn controls (best effort steer + interrupt)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_stream_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" steer_turn = await thread.turn(TextInput('Count from 1 to 40 with commas, then one summary sentence.'))\n",
"\n",
" steer_result = 'sent'\n",
" try:\n",
" _ = await steer_turn.steer(TextInput('Keep it brief and stop after 10 numbers.'))\n",
" except Exception as e:\n",
" steer_result = f'skipped {type(e).__name__}'\n",
"\n",
" steer_event_count = 0\n",
" steer_completed_status = 'unknown'\n",
" steer_completed_turn = None\n",
" async for event in steer_turn.stream():\n",
" steer_event_count += 1\n",
" if event.method == 'turn/completed':\n",
" steer_completed_turn = event.payload.turn\n",
" steer_completed_status = getattr(event.payload.turn.status, 'value', str(event.payload.turn.status))\n",
"\n",
" steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or '[no assistant text]'\n",
"\n",
" interrupt_turn = await thread.turn(TextInput('Count from 1 to 200 with commas, then one summary sentence.'))\n",
" interrupt_result = 'sent'\n",
" try:\n",
" _ = await interrupt_turn.interrupt()\n",
" except Exception as e:\n",
" interrupt_result = f'skipped {type(e).__name__}'\n",
"\n",
" interrupt_event_count = 0\n",
" interrupt_completed_status = 'unknown'\n",
" interrupt_completed_turn = None\n",
" async for event in interrupt_turn.stream():\n",
" interrupt_event_count += 1\n",
" if event.method == 'turn/completed':\n",
" interrupt_completed_turn = event.payload.turn\n",
" interrupt_completed_status = getattr(event.payload.turn.status, 'value', str(event.payload.turn.status))\n",
"\n",
" interrupt_preview = assistant_text_from_turn(interrupt_completed_turn).strip() or '[no assistant text]'\n",
"\n",
" print('steer.result:', steer_result)\n",
" print('steer.final.status:', steer_completed_status)\n",
" print('steer.events.count:', steer_event_count)\n",
" print('steer.assistant.preview:', steer_preview)\n",
" print('interrupt.result:', interrupt_result)\n",
" print('interrupt.final.status:', interrupt_completed_status)\n",
" print('interrupt.events.count:', interrupt_event_count)\n",
" print('interrupt.assistant.preview:', interrupt_preview)\n",
"\n",
"\n",
"await async_stream_demo()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.10+"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@@ -793,7 +793,7 @@ def _render_thread_block(
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> TurnHandle:",
" ) -> Turn:",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
" thread_id=self.id,",
@@ -801,7 +801,7 @@ def _render_thread_block(
*_model_arg_lines(turn_fields),
" )",
" turn = self._client.turn_start(self.id, wire_input, params=params)",
" return TurnHandle(self._client, self.id, turn.turn.id)",
" return Turn(self._client, self.id, turn.turn.id)",
]
return "\n".join(lines)
@@ -815,7 +815,7 @@ def _render_async_thread_block(
" input: Input,",
" *,",
*_kw_signature_lines(turn_fields),
" ) -> AsyncTurnHandle:",
" ) -> AsyncTurn:",
" await self._codex._ensure_initialized()",
" wire_input = _to_wire_input(input)",
" params = TurnStartParams(",
@@ -828,7 +828,7 @@ def _render_async_thread_block(
" wire_input,",
" params=params,",
" )",
" return AsyncTurnHandle(self._codex, self.id, turn.turn.id)",
" return AsyncTurn(self._codex, self.id, turn.turn.id)",
]
return "\n".join(lines)

View File

@@ -1,111 +1,10 @@
from .async_client import AsyncAppServerClient
from .client import AppServerClient, AppServerConfig
from .errors import (
AppServerError,
AppServerRpcError,
InternalRpcError,
InvalidParamsError,
InvalidRequestError,
JsonRpcError,
MethodNotFoundError,
ParseError,
RetryLimitExceededError,
ServerBusyError,
TransportClosedError,
is_retryable_error,
)
from .generated.v2_all import (
AskForApproval,
Personality,
PlanType,
ReasoningEffort,
ReasoningSummary,
SandboxMode,
SandboxPolicy,
ServiceTier,
ThreadItem,
ThreadForkParams,
ThreadListParams,
ThreadResumeParams,
ThreadSortKey,
ThreadSourceKind,
ThreadStartParams,
ThreadTokenUsageUpdatedNotification,
TurnCompletedNotification,
TurnStartParams,
TurnStatus,
TurnSteerParams,
)
from .models import InitializeResponse
from .public_api import (
AsyncCodex,
AsyncThread,
AsyncTurnHandle,
Codex,
ImageInput,
Input,
InputItem,
LocalImageInput,
MentionInput,
SkillInput,
TextInput,
Thread,
TurnHandle,
)
from .retry import retry_on_overload
__version__ = "0.2.0"
from .errors import AppServerError, JsonRpcError, TransportClosedError
__all__ = [
"__version__",
"AppServerClient",
"AsyncAppServerClient",
"AppServerConfig",
"Codex",
"AsyncCodex",
"Thread",
"AsyncThread",
"TurnHandle",
"AsyncTurnHandle",
"InitializeResponse",
"Input",
"InputItem",
"TextInput",
"ImageInput",
"LocalImageInput",
"SkillInput",
"MentionInput",
"ThreadItem",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotification",
"AskForApproval",
"Personality",
"PlanType",
"ReasoningEffort",
"ReasoningSummary",
"SandboxMode",
"SandboxPolicy",
"ServiceTier",
"ThreadStartParams",
"ThreadResumeParams",
"ThreadListParams",
"ThreadSortKey",
"ThreadSourceKind",
"ThreadForkParams",
"TurnStatus",
"TurnStartParams",
"TurnSteerParams",
"retry_on_overload",
"AppServerError",
"TransportClosedError",
"JsonRpcError",
"AppServerRpcError",
"ParseError",
"InvalidRequestError",
"MethodNotFoundError",
"InvalidParamsError",
"InternalRpcError",
"ServerBusyError",
"RetryLimitExceededError",
"is_retryable_error",
"TransportClosedError",
]

View File

@@ -1,208 +0,0 @@
from __future__ import annotations
import asyncio
from collections.abc import Iterator
from typing import AsyncIterator, Callable, Iterable, ParamSpec, TypeVar
from pydantic import BaseModel
from .client import AppServerClient, AppServerConfig
from .generated.v2_all import (
AgentMessageDeltaNotification,
ModelListResponse,
ThreadArchiveResponse,
ThreadCompactStartResponse,
ThreadForkParams as V2ThreadForkParams,
ThreadForkResponse,
ThreadListParams as V2ThreadListParams,
ThreadListResponse,
ThreadReadResponse,
ThreadResumeParams as V2ThreadResumeParams,
ThreadResumeResponse,
ThreadSetNameResponse,
ThreadStartParams as V2ThreadStartParams,
ThreadStartResponse,
ThreadUnarchiveResponse,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams as V2TurnStartParams,
TurnStartResponse,
TurnSteerResponse,
)
from .models import InitializeResponse, JsonObject, Notification
ModelT = TypeVar("ModelT", bound=BaseModel)
ParamsT = ParamSpec("ParamsT")
ReturnT = TypeVar("ReturnT")
class AsyncAppServerClient:
"""Async wrapper around AppServerClient using thread offloading."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._sync = AppServerClient(config=config)
# Single stdio transport cannot be read safely from multiple threads.
self._transport_lock = asyncio.Lock()
async def __aenter__(self) -> "AsyncAppServerClient":
await self.start()
return self
async def __aexit__(self, _exc_type, _exc, _tb) -> None:
await self.close()
async def _call_sync(
self,
fn: Callable[ParamsT, ReturnT],
/,
*args: ParamsT.args,
**kwargs: ParamsT.kwargs,
) -> ReturnT:
async with self._transport_lock:
return await asyncio.to_thread(fn, *args, **kwargs)
@staticmethod
def _next_from_iterator(
iterator: Iterator[AgentMessageDeltaNotification],
) -> tuple[bool, AgentMessageDeltaNotification | None]:
try:
return True, next(iterator)
except StopIteration:
return False, None
async def start(self) -> None:
await self._call_sync(self._sync.start)
async def close(self) -> None:
await self._call_sync(self._sync.close)
async def initialize(self) -> InitializeResponse:
return await self._call_sync(self._sync.initialize)
def acquire_turn_consumer(self, turn_id: str) -> None:
self._sync.acquire_turn_consumer(turn_id)
def release_turn_consumer(self, turn_id: str) -> None:
self._sync.release_turn_consumer(turn_id)
async def request(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
) -> ModelT:
return await self._call_sync(
self._sync.request,
method,
params,
response_model=response_model,
)
async def thread_start(self, params: V2ThreadStartParams | JsonObject | None = None) -> ThreadStartResponse:
return await self._call_sync(self._sync.thread_start, params)
async def thread_resume(
self,
thread_id: str,
params: V2ThreadResumeParams | JsonObject | None = None,
) -> ThreadResumeResponse:
return await self._call_sync(self._sync.thread_resume, thread_id, params)
async def thread_list(self, params: V2ThreadListParams | JsonObject | None = None) -> ThreadListResponse:
return await self._call_sync(self._sync.thread_list, params)
async def thread_read(self, thread_id: str, include_turns: bool = False) -> ThreadReadResponse:
return await self._call_sync(self._sync.thread_read, thread_id, include_turns)
async def thread_fork(
self,
thread_id: str,
params: V2ThreadForkParams | JsonObject | None = None,
) -> ThreadForkResponse:
return await self._call_sync(self._sync.thread_fork, thread_id, params)
async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return await self._call_sync(self._sync.thread_archive, thread_id)
async def thread_unarchive(self, thread_id: str) -> ThreadUnarchiveResponse:
return await self._call_sync(self._sync.thread_unarchive, thread_id)
async def thread_set_name(self, thread_id: str, name: str) -> ThreadSetNameResponse:
return await self._call_sync(self._sync.thread_set_name, thread_id, name)
async def thread_compact(self, thread_id: str) -> ThreadCompactStartResponse:
return await self._call_sync(self._sync.thread_compact, thread_id)
async def turn_start(
self,
thread_id: str,
input_items: list[JsonObject] | JsonObject | str,
params: V2TurnStartParams | JsonObject | None = None,
) -> TurnStartResponse:
return await self._call_sync(self._sync.turn_start, thread_id, input_items, params)
async def turn_interrupt(self, thread_id: str, turn_id: str) -> TurnInterruptResponse:
return await self._call_sync(self._sync.turn_interrupt, thread_id, turn_id)
async def turn_steer(
self,
thread_id: str,
expected_turn_id: str,
input_items: list[JsonObject] | JsonObject | str,
) -> TurnSteerResponse:
return await self._call_sync(
self._sync.turn_steer,
thread_id,
expected_turn_id,
input_items,
)
async def model_list(self, include_hidden: bool = False) -> ModelListResponse:
return await self._call_sync(self._sync.model_list, include_hidden)
async def request_with_retry_on_overload(
self,
method: str,
params: JsonObject | None,
*,
response_model: type[ModelT],
max_attempts: int = 3,
initial_delay_s: float = 0.25,
max_delay_s: float = 2.0,
) -> ModelT:
return await self._call_sync(
self._sync.request_with_retry_on_overload,
method,
params,
response_model=response_model,
max_attempts=max_attempts,
initial_delay_s=initial_delay_s,
max_delay_s=max_delay_s,
)
async def next_notification(self) -> Notification:
return await self._call_sync(self._sync.next_notification)
async def wait_for_turn_completed(self, turn_id: str) -> TurnCompletedNotification:
return await self._call_sync(self._sync.wait_for_turn_completed, turn_id)
async def stream_until_methods(self, methods: Iterable[str] | str) -> list[Notification]:
return await self._call_sync(self._sync.stream_until_methods, methods)
async def stream_text(
self,
thread_id: str,
text: str,
params: V2TurnStartParams | JsonObject | None = None,
) -> AsyncIterator[AgentMessageDeltaNotification]:
async with self._transport_lock:
iterator = self._sync.stream_text(thread_id, text, params)
while True:
has_value, chunk = await asyncio.to_thread(
self._next_from_iterator,
iterator,
)
if not has_value:
break
yield chunk

View File

@@ -339,7 +339,6 @@ class CodexErrorInfo(
class CollabAgentStatus(Enum):
pending_init = "pendingInit"
running = "running"
interrupted = "interrupted"
completed = "completed"
errored = "errored"
shutdown = "shutdown"
@@ -747,7 +746,6 @@ class DynamicToolSpec(BaseModel):
model_config = ConfigDict(
populate_by_name=True,
)
defer_loading: Annotated[bool | None, Field(alias="deferLoading")] = None
description: str
input_schema: Annotated[Any, Field(alias="inputSchema")]
name: str
@@ -1659,13 +1657,7 @@ class PluginInterface(BaseModel):
capabilities: list[str]
category: str | None = None
composer_icon: Annotated[AbsolutePathBuf | None, Field(alias="composerIcon")] = None
default_prompt: Annotated[
list[str] | None,
Field(
alias="defaultPrompt",
description="Starter prompts for the plugin. Capped at 3 entries with a maximum of 128 characters per entry.",
),
] = None
default_prompt: Annotated[str | None, Field(alias="defaultPrompt")] = None
developer_name: Annotated[str | None, Field(alias="developerName")] = None
display_name: Annotated[str | None, Field(alias="displayName")] = None
logo: AbsolutePathBuf | None = None

View File

@@ -0,0 +1,25 @@
"""Stable aliases over full v2 autogenerated models (datamodel-code-generator)."""
from .v2_all.ModelListResponse import ModelListResponse
from .v2_all.ThreadCompactStartResponse import ThreadCompactStartResponse
from .v2_all.ThreadListResponse import ThreadListResponse
from .v2_all.ThreadReadResponse import ThreadReadResponse
from .v2_all.ThreadTokenUsageUpdatedNotification import (
ThreadTokenUsageUpdatedNotification,
)
from .v2_all.TurnCompletedNotification import ThreadItem153 as ThreadItem
from .v2_all.TurnCompletedNotification import (
TurnCompletedNotification as TurnCompletedNotificationPayload,
)
from .v2_all.TurnSteerResponse import TurnSteerResponse
__all__ = [
"ModelListResponse",
"ThreadCompactStartResponse",
"ThreadListResponse",
"ThreadReadResponse",
"ThreadTokenUsageUpdatedNotification",
"TurnCompletedNotificationPayload",
"TurnSteerResponse",
"ThreadItem",
]

View File

@@ -1,701 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import AsyncIterator, Iterator
from .async_client import AsyncAppServerClient
from .client import AppServerClient, AppServerConfig
from .generated.v2_all import (
AskForApproval,
ModelListResponse,
Personality,
ReasoningEffort,
ReasoningSummary,
SandboxMode,
SandboxPolicy,
ServiceTier,
ThreadArchiveResponse,
ThreadCompactStartResponse,
ThreadForkParams,
ThreadItem,
ThreadListParams,
ThreadListResponse,
ThreadReadResponse,
ThreadResumeParams,
ThreadSetNameResponse,
ThreadSortKey,
ThreadSourceKind,
ThreadStartParams,
Turn as AppServerTurn,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams,
TurnSteerResponse,
)
from .models import InitializeResponse, JsonObject, Notification, ServerInfo
@dataclass(slots=True)
class TextInput:
text: str
@dataclass(slots=True)
class ImageInput:
url: str
@dataclass(slots=True)
class LocalImageInput:
path: str
@dataclass(slots=True)
class SkillInput:
name: str
path: str
@dataclass(slots=True)
class MentionInput:
name: str
path: str
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
def _to_wire_item(item: InputItem) -> JsonObject:
if isinstance(item, TextInput):
return {"type": "text", "text": item.text}
if isinstance(item, ImageInput):
return {"type": "image", "url": item.url}
if isinstance(item, LocalImageInput):
return {"type": "localImage", "path": item.path}
if isinstance(item, SkillInput):
return {"type": "skill", "name": item.name, "path": item.path}
if isinstance(item, MentionInput):
return {"type": "mention", "name": item.name, "path": item.path}
raise TypeError(f"unsupported input item: {type(item)!r}")
def _to_wire_input(input: Input) -> list[JsonObject]:
if isinstance(input, list):
return [_to_wire_item(i) for i in input]
return [_to_wire_item(input)]
def _split_user_agent(user_agent: str) -> tuple[str | None, str | None]:
raw = user_agent.strip()
if not raw:
return None, None
if "/" in raw:
name, version = raw.split("/", 1)
return (name or None), (version or None)
parts = raw.split(maxsplit=1)
if len(parts) == 2:
return parts[0], parts[1]
return raw, None
class Codex:
"""Minimal typed SDK surface for app-server v2."""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._client = AppServerClient(config=config)
try:
self._client.start()
self._init = self._validate_initialize(self._client.initialize())
except Exception:
self._client.close()
raise
def __enter__(self) -> "Codex":
return self
def __exit__(self, _exc_type, _exc, _tb) -> None:
self.close()
@staticmethod
def _validate_initialize(payload: InitializeResponse) -> InitializeResponse:
user_agent = (payload.userAgent or "").strip()
server = payload.serverInfo
server_name: str | None = None
server_version: str | None = None
if server is not None:
server_name = (server.name or "").strip() or None
server_version = (server.version or "").strip() or None
if (server_name is None or server_version is None) and user_agent:
parsed_name, parsed_version = _split_user_agent(user_agent)
if server_name is None:
server_name = parsed_name
if server_version is None:
server_version = parsed_version
normalized_server_name = (server_name or "").strip()
normalized_server_version = (server_version or "").strip()
if not user_agent or not normalized_server_name or not normalized_server_version:
raise RuntimeError(
"initialize response missing required metadata "
f"(user_agent={user_agent!r}, server_name={normalized_server_name!r}, server_version={normalized_server_version!r})"
)
if server is None:
payload.serverInfo = ServerInfo(
name=normalized_server_name,
version=normalized_server_version,
)
else:
server.name = normalized_server_name
server.version = normalized_server_version
return payload
@property
def metadata(self) -> InitializeResponse:
return self._init
def close(self) -> None:
self._client.close()
# BEGIN GENERATED: Codex.flat_methods
def thread_start(
self,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_name: str | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadStartParams(
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_name=service_name,
service_tier=service_tier,
)
started = self._client.thread_start(params)
return Thread(self._client, started.thread.id)
def thread_list(
self,
*,
archived: bool | None = None,
cursor: str | None = None,
cwd: str | None = None,
limit: int | None = None,
model_providers: list[str] | None = None,
search_term: str | None = None,
sort_key: ThreadSortKey | None = None,
source_kinds: list[ThreadSourceKind] | None = None,
) -> ThreadListResponse:
params = ThreadListParams(
archived=archived,
cursor=cursor,
cwd=cwd,
limit=limit,
model_providers=model_providers,
search_term=search_term,
sort_key=sort_key,
source_kinds=source_kinds,
)
return self._client.thread_list(params)
def thread_resume(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadResumeParams(
thread_id=thread_id,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_tier=service_tier,
)
resumed = self._client.thread_resume(thread_id, params)
return Thread(self._client, resumed.thread.id)
def thread_fork(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> Thread:
params = ThreadForkParams(
thread_id=thread_id,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
sandbox=sandbox,
service_tier=service_tier,
)
forked = self._client.thread_fork(thread_id, params)
return Thread(self._client, forked.thread.id)
def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
return self._client.thread_archive(thread_id)
def thread_unarchive(self, thread_id: str) -> Thread:
unarchived = self._client.thread_unarchive(thread_id)
return Thread(self._client, unarchived.thread.id)
# END GENERATED: Codex.flat_methods
def models(self, *, include_hidden: bool = False) -> ModelListResponse:
return self._client.model_list(include_hidden=include_hidden)
class AsyncCodex:
"""Async mirror of :class:`Codex`.
Prefer ``async with AsyncCodex()`` so initialization and shutdown are
explicit and paired. The async client initializes lazily on context entry
or first awaited API use.
"""
def __init__(self, config: AppServerConfig | None = None) -> None:
self._client = AsyncAppServerClient(config=config)
self._init: InitializeResponse | None = None
self._initialized = False
self._init_lock = asyncio.Lock()
async def __aenter__(self) -> "AsyncCodex":
await self._ensure_initialized()
return self
async def __aexit__(self, _exc_type, _exc, _tb) -> None:
await self.close()
async def _ensure_initialized(self) -> None:
if self._initialized:
return
async with self._init_lock:
if self._initialized:
return
try:
await self._client.start()
payload = await self._client.initialize()
self._init = Codex._validate_initialize(payload)
self._initialized = True
except Exception:
await self._client.close()
self._init = None
self._initialized = False
raise
@property
def metadata(self) -> InitializeResponse:
if self._init is None:
raise RuntimeError(
"AsyncCodex is not initialized yet. Prefer `async with AsyncCodex()`; "
"initialization also happens on first awaited API use."
)
return self._init
async def close(self) -> None:
await self._client.close()
self._init = None
self._initialized = False
# BEGIN GENERATED: AsyncCodex.flat_methods
async def thread_start(
self,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_name: str | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadStartParams(
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_name=service_name,
service_tier=service_tier,
)
started = await self._client.thread_start(params)
return AsyncThread(self, started.thread.id)
async def thread_list(
self,
*,
archived: bool | None = None,
cursor: str | None = None,
cwd: str | None = None,
limit: int | None = None,
model_providers: list[str] | None = None,
search_term: str | None = None,
sort_key: ThreadSortKey | None = None,
source_kinds: list[ThreadSourceKind] | None = None,
) -> ThreadListResponse:
await self._ensure_initialized()
params = ThreadListParams(
archived=archived,
cursor=cursor,
cwd=cwd,
limit=limit,
model_providers=model_providers,
search_term=search_term,
sort_key=sort_key,
source_kinds=source_kinds,
)
return await self._client.thread_list(params)
async def thread_resume(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
model: str | None = None,
model_provider: str | None = None,
personality: Personality | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadResumeParams(
thread_id=thread_id,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
model=model,
model_provider=model_provider,
personality=personality,
sandbox=sandbox,
service_tier=service_tier,
)
resumed = await self._client.thread_resume(thread_id, params)
return AsyncThread(self, resumed.thread.id)
async def thread_fork(
self,
thread_id: str,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
base_instructions: str | None = None,
config: JsonObject | None = None,
cwd: str | None = None,
developer_instructions: str | None = None,
ephemeral: bool | None = None,
model: str | None = None,
model_provider: str | None = None,
sandbox: SandboxMode | None = None,
service_tier: ServiceTier | None = None,
) -> AsyncThread:
await self._ensure_initialized()
params = ThreadForkParams(
thread_id=thread_id,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
base_instructions=base_instructions,
config=config,
cwd=cwd,
developer_instructions=developer_instructions,
ephemeral=ephemeral,
model=model,
model_provider=model_provider,
sandbox=sandbox,
service_tier=service_tier,
)
forked = await self._client.thread_fork(thread_id, params)
return AsyncThread(self, forked.thread.id)
async def thread_archive(self, thread_id: str) -> ThreadArchiveResponse:
await self._ensure_initialized()
return await self._client.thread_archive(thread_id)
async def thread_unarchive(self, thread_id: str) -> AsyncThread:
await self._ensure_initialized()
unarchived = await self._client.thread_unarchive(thread_id)
return AsyncThread(self, unarchived.thread.id)
# END GENERATED: AsyncCodex.flat_methods
async def models(self, *, include_hidden: bool = False) -> ModelListResponse:
await self._ensure_initialized()
return await self._client.model_list(include_hidden=include_hidden)
@dataclass(slots=True)
class Thread:
_client: AppServerClient
id: str
# BEGIN GENERATED: Thread.flat_methods
def turn(
self,
input: Input,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
cwd: str | None = None,
effort: ReasoningEffort | None = None,
model: str | None = None,
output_schema: JsonObject | None = None,
personality: Personality | None = None,
sandbox_policy: SandboxPolicy | None = None,
service_tier: ServiceTier | None = None,
summary: ReasoningSummary | None = None,
) -> TurnHandle:
wire_input = _to_wire_input(input)
params = TurnStartParams(
thread_id=self.id,
input=wire_input,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
cwd=cwd,
effort=effort,
model=model,
output_schema=output_schema,
personality=personality,
sandbox_policy=sandbox_policy,
service_tier=service_tier,
summary=summary,
)
turn = self._client.turn_start(self.id, wire_input, params=params)
return TurnHandle(self._client, self.id, turn.turn.id)
# END GENERATED: Thread.flat_methods
def read(self, *, include_turns: bool = False) -> ThreadReadResponse:
return self._client.thread_read(self.id, include_turns=include_turns)
def set_name(self, name: str) -> ThreadSetNameResponse:
return self._client.thread_set_name(self.id, name)
def compact(self) -> ThreadCompactStartResponse:
return self._client.thread_compact(self.id)
@dataclass(slots=True)
class AsyncThread:
_codex: AsyncCodex
id: str
# BEGIN GENERATED: AsyncThread.flat_methods
async def turn(
self,
input: Input,
*,
approval_policy: AskForApproval | None = None,
approvals_reviewer: ApprovalsReviewer | None = None,
cwd: str | None = None,
effort: ReasoningEffort | None = None,
model: str | None = None,
output_schema: JsonObject | None = None,
personality: Personality | None = None,
sandbox_policy: SandboxPolicy | None = None,
service_tier: ServiceTier | None = None,
summary: ReasoningSummary | None = None,
) -> AsyncTurnHandle:
await self._codex._ensure_initialized()
wire_input = _to_wire_input(input)
params = TurnStartParams(
thread_id=self.id,
input=wire_input,
approval_policy=approval_policy,
approvals_reviewer=approvals_reviewer,
cwd=cwd,
effort=effort,
model=model,
output_schema=output_schema,
personality=personality,
sandbox_policy=sandbox_policy,
service_tier=service_tier,
summary=summary,
)
turn = await self._codex._client.turn_start(
self.id,
wire_input,
params=params,
)
return AsyncTurnHandle(self._codex, self.id, turn.turn.id)
# END GENERATED: AsyncThread.flat_methods
async def read(self, *, include_turns: bool = False) -> ThreadReadResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_read(self.id, include_turns=include_turns)
async def set_name(self, name: str) -> ThreadSetNameResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_set_name(self.id, name)
async def compact(self) -> ThreadCompactStartResponse:
await self._codex._ensure_initialized()
return await self._codex._client.thread_compact(self.id)
@dataclass(slots=True)
class TurnHandle:
_client: AppServerClient
thread_id: str
id: str
def steer(self, input: Input) -> TurnSteerResponse:
return self._client.turn_steer(self.thread_id, self.id, _to_wire_input(input))
def interrupt(self) -> TurnInterruptResponse:
return self._client.turn_interrupt(self.thread_id, self.id)
def stream(self) -> Iterator[Notification]:
# TODO: replace this client-wide experimental guard with per-turn event demux.
self._client.acquire_turn_consumer(self.id)
try:
while True:
event = self._client.next_notification()
yield event
if (
event.method == "turn/completed"
and isinstance(event.payload, TurnCompletedNotification)
and event.payload.turn.id == self.id
):
break
finally:
self._client.release_turn_consumer(self.id)
def run(self) -> AppServerTurn:
completed: TurnCompletedNotification | None = None
stream = self.stream()
try:
for event in stream:
payload = event.payload
if isinstance(payload, TurnCompletedNotification) and payload.turn.id == self.id:
completed = payload
finally:
stream.close()
if completed is None:
raise RuntimeError("turn completed event not received")
return completed.turn
@dataclass(slots=True)
class AsyncTurnHandle:
_codex: AsyncCodex
thread_id: str
id: str
async def steer(self, input: Input) -> TurnSteerResponse:
await self._codex._ensure_initialized()
return await self._codex._client.turn_steer(
self.thread_id,
self.id,
_to_wire_input(input),
)
async def interrupt(self) -> TurnInterruptResponse:
await self._codex._ensure_initialized()
return await self._codex._client.turn_interrupt(self.thread_id, self.id)
async def stream(self) -> AsyncIterator[Notification]:
await self._codex._ensure_initialized()
# TODO: replace this client-wide experimental guard with per-turn event demux.
self._codex._client.acquire_turn_consumer(self.id)
try:
while True:
event = await self._codex._client.next_notification()
yield event
if (
event.method == "turn/completed"
and isinstance(event.payload, TurnCompletedNotification)
and event.payload.turn.id == self.id
):
break
finally:
self._codex._client.release_turn_consumer(self.id)
async def run(self) -> AppServerTurn:
completed: TurnCompletedNotification | None = None
stream = self.stream()
try:
async for event in stream:
payload = event.payload
if isinstance(payload, TurnCompletedNotification) and payload.turn.id == self.id:
completed = payload
finally:
await stream.aclose()
if completed is None:
raise RuntimeError("turn completed event not received")
return completed.turn

View File

@@ -2,11 +2,9 @@ from __future__ import annotations
import ast
import importlib.util
import io
import json
import sys
import tomllib
import urllib.error
from pathlib import Path
import pytest
@@ -25,17 +23,6 @@ def _load_update_script_module():
return module
def _load_runtime_setup_module():
runtime_setup_path = ROOT / "_runtime_setup.py"
spec = importlib.util.spec_from_file_location("_runtime_setup", runtime_setup_path)
if spec is None or spec.loader is None:
raise AssertionError(f"Failed to load runtime setup module: {runtime_setup_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_generation_has_single_maintenance_entrypoint_script() -> None:
scripts = sorted(p.name for p in (ROOT / "scripts").glob("*.py"))
assert scripts == ["update_sdk_artifacts.py"]
@@ -159,39 +146,6 @@ def test_runtime_package_template_has_no_checked_in_binaries() -> None:
) == ["__init__.py"]
def test_examples_readme_matches_pinned_runtime_version() -> None:
runtime_setup = _load_runtime_setup_module()
readme = (ROOT / "examples" / "README.md").read_text()
assert (
f"Current pinned runtime version: `{runtime_setup.pinned_runtime_version()}`"
in readme
)
def test_release_metadata_retries_without_invalid_auth(monkeypatch: pytest.MonkeyPatch) -> None:
runtime_setup = _load_runtime_setup_module()
authorizations: list[str | None] = []
def fake_urlopen(request):
authorization = request.headers.get("Authorization")
authorizations.append(authorization)
if authorization is not None:
raise urllib.error.HTTPError(
request.full_url,
401,
"Unauthorized",
hdrs=None,
fp=None,
)
return io.StringIO('{"assets": []}')
monkeypatch.setenv("GH_TOKEN", "invalid-token")
monkeypatch.setattr(runtime_setup.urllib.request, "urlopen", fake_urlopen)
assert runtime_setup._release_metadata("1.2.3") == {"assets": []}
assert authorizations == ["Bearer invalid-token", None]
def test_runtime_package_is_wheel_only_and_builds_platform_specific_wheels() -> None:
pyproject = tomllib.loads(
(ROOT.parent / "python-runtime" / "pyproject.toml").read_text()

View File

@@ -1,64 +0,0 @@
from __future__ import annotations
import asyncio
import time
from codex_app_server.async_client import AsyncAppServerClient
def test_async_client_serializes_transport_calls() -> None:
async def scenario() -> int:
client = AsyncAppServerClient()
active = 0
max_active = 0
def fake_model_list(include_hidden: bool = False) -> bool:
nonlocal active, max_active
active += 1
max_active = max(max_active, active)
time.sleep(0.05)
active -= 1
return include_hidden
client._sync.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(client.model_list(), client.model_list())
return max_active
assert asyncio.run(scenario()) == 1
def test_async_stream_text_is_incremental_and_blocks_parallel_calls() -> None:
async def scenario() -> tuple[str, list[str], bool]:
client = AsyncAppServerClient()
def fake_stream_text(thread_id: str, text: str, params=None): # type: ignore[no-untyped-def]
yield "first"
time.sleep(0.03)
yield "second"
yield "third"
def fake_model_list(include_hidden: bool = False) -> str:
return "done"
client._sync.stream_text = fake_stream_text # type: ignore[method-assign]
client._sync.model_list = fake_model_list # type: ignore[method-assign]
stream = client.stream_text("thread-1", "hello")
first = await anext(stream)
blocked_before_stream_done = False
competing_call = asyncio.create_task(client.model_list())
await asyncio.sleep(0.01)
blocked_before_stream_done = not competing_call.done()
remaining: list[str] = []
async for item in stream:
remaining.append(item)
await competing_call
return first, remaining, blocked_before_stream_done
first, remaining, blocked = asyncio.run(scenario())
assert first == "first"
assert remaining == ["second", "third"]
assert blocked

View File

@@ -1,235 +0,0 @@
from __future__ import annotations
import asyncio
from collections import deque
from pathlib import Path
import pytest
import codex_app_server.public_api as public_api_module
from codex_app_server.client import AppServerClient
from codex_app_server.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
TurnStatus,
)
from codex_app_server.models import InitializeResponse, Notification
from codex_app_server.public_api import (
AsyncCodex,
AsyncTurnHandle,
Codex,
TurnHandle,
)
ROOT = Path(__file__).resolve().parents[1]
def _delta_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
text: str = "delta-text",
) -> Notification:
return Notification(
method="item/agentMessage/delta",
payload=AgentMessageDeltaNotification.model_validate(
{
"delta": text,
"itemId": "item-1",
"threadId": thread_id,
"turnId": turn_id,
}
),
)
def _completed_notification(
*,
thread_id: str = "thread-1",
turn_id: str = "turn-1",
status: str = "completed",
) -> Notification:
return Notification(
method="turn/completed",
payload=TurnCompletedNotification.model_validate(
{
"threadId": thread_id,
"turn": {
"id": turn_id,
"items": [],
"status": status,
},
}
),
)
def test_codex_init_failure_closes_client(monkeypatch: pytest.MonkeyPatch) -> None:
closed: list[bool] = []
class FakeClient:
def __init__(self, config=None) -> None: # noqa: ANN001,ARG002
self._closed = False
def start(self) -> None:
return None
def initialize(self) -> InitializeResponse:
return InitializeResponse.model_validate({})
def close(self) -> None:
self._closed = True
closed.append(True)
monkeypatch.setattr(public_api_module, "AppServerClient", FakeClient)
with pytest.raises(RuntimeError, match="missing required metadata"):
Codex()
assert closed == [True]
def test_async_codex_init_failure_closes_client() -> None:
async def scenario() -> None:
codex = AsyncCodex()
close_calls = 0
async def fake_start() -> None:
return None
async def fake_initialize() -> InitializeResponse:
return InitializeResponse.model_validate({})
async def fake_close() -> None:
nonlocal close_calls
close_calls += 1
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.close = fake_close # type: ignore[method-assign]
with pytest.raises(RuntimeError, match="missing required metadata"):
await codex.models()
assert close_calls == 1
assert codex._initialized is False
assert codex._init is None
asyncio.run(scenario())
def test_async_codex_initializes_only_once_under_concurrency() -> None:
async def scenario() -> None:
codex = AsyncCodex()
start_calls = 0
initialize_calls = 0
ready = asyncio.Event()
async def fake_start() -> None:
nonlocal start_calls
start_calls += 1
async def fake_initialize() -> InitializeResponse:
nonlocal initialize_calls
initialize_calls += 1
ready.set()
await asyncio.sleep(0.02)
return InitializeResponse.model_validate(
{
"userAgent": "codex-cli/1.2.3",
"serverInfo": {"name": "codex-cli", "version": "1.2.3"},
}
)
async def fake_model_list(include_hidden: bool = False): # noqa: ANN202,ARG001
await ready.wait()
return object()
codex._client.start = fake_start # type: ignore[method-assign]
codex._client.initialize = fake_initialize # type: ignore[method-assign]
codex._client.model_list = fake_model_list # type: ignore[method-assign]
await asyncio.gather(codex.models(), codex.models())
assert start_calls == 1
assert initialize_calls == 1
asyncio.run(scenario())
def test_turn_stream_rejects_second_active_consumer() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
first_stream = TurnHandle(client, "thread-1", "turn-1").stream()
assert next(first_stream).method == "item/agentMessage/delta"
second_stream = TurnHandle(client, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
next(second_stream)
first_stream.close()
def test_async_turn_stream_rejects_second_active_consumer() -> None:
async def scenario() -> None:
codex = AsyncCodex()
async def fake_ensure_initialized() -> None:
return None
notifications: deque[Notification] = deque(
[
_delta_notification(turn_id="turn-1"),
_completed_notification(turn_id="turn-1"),
]
)
async def fake_next_notification() -> Notification:
return notifications.popleft()
codex._ensure_initialized = fake_ensure_initialized # type: ignore[method-assign]
codex._client.next_notification = fake_next_notification # type: ignore[method-assign]
first_stream = AsyncTurnHandle(codex, "thread-1", "turn-1").stream()
assert (await anext(first_stream)).method == "item/agentMessage/delta"
second_stream = AsyncTurnHandle(codex, "thread-1", "turn-2").stream()
with pytest.raises(RuntimeError, match="Concurrent turn consumers are not yet supported"):
await anext(second_stream)
await first_stream.aclose()
asyncio.run(scenario())
def test_turn_run_returns_completed_turn_payload() -> None:
client = AppServerClient()
notifications: deque[Notification] = deque(
[
_completed_notification(),
]
)
client.next_notification = notifications.popleft # type: ignore[method-assign]
result = TurnHandle(client, "thread-1", "turn-1").run()
assert result.id == "turn-1"
assert result.status == TurnStatus.completed
assert result.items == []
def test_retry_examples_compare_status_with_enum() -> None:
for path in (
ROOT / "examples" / "10_error_handling_and_retry" / "sync.py",
ROOT / "examples" / "10_error_handling_and_retry" / "async.py",
):
source = path.read_text()
assert '== "failed"' not in source
assert "TurnStatus.failed" in source

View File

@@ -1,222 +0,0 @@
from __future__ import annotations
import importlib.resources as resources
import inspect
from typing import Any
from codex_app_server import AppServerConfig
from codex_app_server.models import InitializeResponse
from codex_app_server.public_api import AsyncCodex, AsyncThread, Codex, Thread
def _keyword_only_names(fn: object) -> list[str]:
signature = inspect.signature(fn)
return [
param.name
for param in signature.parameters.values()
if param.kind == inspect.Parameter.KEYWORD_ONLY
]
def _assert_no_any_annotations(fn: object) -> None:
signature = inspect.signature(fn)
for param in signature.parameters.values():
if param.annotation is Any:
raise AssertionError(f"{fn} has public parameter typed as Any: {param.name}")
if signature.return_annotation is Any:
raise AssertionError(f"{fn} has public return annotation typed as Any")
def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_package_includes_py_typed_marker() -> None:
marker = resources.files("codex_app_server").joinpath("py.typed")
assert marker.is_file()
def test_generated_public_signatures_are_snake_case_and_typed() -> None:
expected = {
Codex.thread_start: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
Codex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
Codex.thread_resume: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
Codex.thread_fork: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
Thread.turn: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
AsyncCodex.thread_start: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"personality",
"sandbox",
"service_name",
"service_tier",
],
AsyncCodex.thread_list: [
"archived",
"cursor",
"cwd",
"limit",
"model_providers",
"search_term",
"sort_key",
"source_kinds",
],
AsyncCodex.thread_resume: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"model",
"model_provider",
"personality",
"sandbox",
"service_tier",
],
AsyncCodex.thread_fork: [
"approval_policy",
"approvals_reviewer",
"base_instructions",
"config",
"cwd",
"developer_instructions",
"ephemeral",
"model",
"model_provider",
"sandbox",
"service_tier",
],
AsyncThread.turn: [
"approval_policy",
"approvals_reviewer",
"cwd",
"effort",
"model",
"output_schema",
"personality",
"sandbox_policy",
"service_tier",
"summary",
],
}
for fn, expected_kwargs in expected.items():
actual = _keyword_only_names(fn)
assert actual == expected_kwargs, f"unexpected kwargs for {fn}: {actual}"
assert all(name == name.lower() for name in actual), f"non snake_case kwargs in {fn}: {actual}"
_assert_no_any_annotations(fn)
def test_lifecycle_methods_are_codex_scoped() -> None:
assert hasattr(Codex, "thread_resume")
assert hasattr(Codex, "thread_fork")
assert hasattr(Codex, "thread_archive")
assert hasattr(Codex, "thread_unarchive")
assert hasattr(AsyncCodex, "thread_resume")
assert hasattr(AsyncCodex, "thread_fork")
assert hasattr(AsyncCodex, "thread_archive")
assert hasattr(AsyncCodex, "thread_unarchive")
assert not hasattr(Codex, "thread")
assert not hasattr(AsyncCodex, "thread")
assert not hasattr(Thread, "resume")
assert not hasattr(Thread, "fork")
assert not hasattr(Thread, "archive")
assert not hasattr(Thread, "unarchive")
assert not hasattr(AsyncThread, "resume")
assert not hasattr(AsyncThread, "fork")
assert not hasattr(AsyncThread, "archive")
assert not hasattr(AsyncThread, "unarchive")
for fn in (
Codex.thread_archive,
Codex.thread_unarchive,
AsyncCodex.thread_archive,
AsyncCodex.thread_unarchive,
):
_assert_no_any_annotations(fn)
def test_initialize_metadata_parses_user_agent_shape() -> None:
payload = InitializeResponse.model_validate({"userAgent": "codex-cli/1.2.3"})
parsed = Codex._validate_initialize(payload)
assert parsed is payload
assert parsed.userAgent == "codex-cli/1.2.3"
assert parsed.serverInfo is not None
assert parsed.serverInfo.name == "codex-cli"
assert parsed.serverInfo.version == "1.2.3"
def test_initialize_metadata_requires_non_empty_information() -> None:
try:
Codex._validate_initialize(InitializeResponse.model_validate({}))
except RuntimeError as exc:
assert "missing required metadata" in str(exc)
else:
raise AssertionError("expected RuntimeError when initialize metadata is missing")

View File

@@ -1,451 +0,0 @@
from __future__ import annotations
import json
import os
import subprocess
import sys
import tempfile
import textwrap
from dataclasses import dataclass
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parents[1]
EXAMPLES_DIR = ROOT / "examples"
NOTEBOOK_PATH = ROOT / "notebooks" / "sdk_walkthrough.ipynb"
root_str = str(ROOT)
if root_str not in sys.path:
sys.path.insert(0, root_str)
from _runtime_setup import ensure_runtime_package_installed, pinned_runtime_version
RUN_REAL_CODEX_TESTS = os.environ.get("RUN_REAL_CODEX_TESTS") == "1"
pytestmark = pytest.mark.skipif(
not RUN_REAL_CODEX_TESTS,
reason="set RUN_REAL_CODEX_TESTS=1 to run real Codex integration coverage",
)
# 11_cli_mini_app is interactive; we still run it by feeding '/exit'.
EXAMPLE_CASES: list[tuple[str, str]] = [
("01_quickstart_constructor", "sync.py"),
("01_quickstart_constructor", "async.py"),
("02_turn_run", "sync.py"),
("02_turn_run", "async.py"),
("03_turn_stream_events", "sync.py"),
("03_turn_stream_events", "async.py"),
("04_models_and_metadata", "sync.py"),
("04_models_and_metadata", "async.py"),
("05_existing_thread", "sync.py"),
("05_existing_thread", "async.py"),
("06_thread_lifecycle_and_controls", "sync.py"),
("06_thread_lifecycle_and_controls", "async.py"),
("07_image_and_text", "sync.py"),
("07_image_and_text", "async.py"),
("08_local_image_and_text", "sync.py"),
("08_local_image_and_text", "async.py"),
("09_async_parity", "sync.py"),
# 09_async_parity async path is represented by 01 async + dedicated async-based cases above.
("10_error_handling_and_retry", "sync.py"),
("10_error_handling_and_retry", "async.py"),
("11_cli_mini_app", "sync.py"),
("11_cli_mini_app", "async.py"),
("12_turn_params_kitchen_sink", "sync.py"),
("12_turn_params_kitchen_sink", "async.py"),
("13_model_select_and_turn_params", "sync.py"),
("13_model_select_and_turn_params", "async.py"),
("14_turn_controls", "sync.py"),
("14_turn_controls", "async.py"),
]
@dataclass(frozen=True)
class PreparedRuntimeEnv:
python: str
env: dict[str, str]
runtime_version: str
@pytest.fixture(scope="session")
def runtime_env(tmp_path_factory: pytest.TempPathFactory) -> PreparedRuntimeEnv:
runtime_version = pinned_runtime_version()
temp_root = tmp_path_factory.mktemp("python-runtime-env")
isolated_site = temp_root / "site-packages"
python = sys.executable
_run_command(
[
python,
"-m",
"pip",
"install",
"--target",
str(isolated_site),
"pydantic>=2.12",
],
cwd=ROOT,
env=os.environ.copy(),
timeout_s=240,
)
ensure_runtime_package_installed(
python,
ROOT,
install_target=isolated_site,
)
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join([str(isolated_site), str(ROOT / "src")])
env["CODEX_PYTHON_SDK_DIR"] = str(ROOT)
return PreparedRuntimeEnv(python=python, env=env, runtime_version=runtime_version)
def _run_command(
args: list[str],
*,
cwd: Path,
env: dict[str, str],
timeout_s: int,
stdin: str | None = None,
) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
cwd=str(cwd),
env=env,
input=stdin,
text=True,
capture_output=True,
timeout=timeout_s,
check=False,
)
def _run_python(
runtime_env: PreparedRuntimeEnv,
source: str,
*,
cwd: Path | None = None,
timeout_s: int = 180,
) -> subprocess.CompletedProcess[str]:
return _run_command(
[str(runtime_env.python), "-c", source],
cwd=cwd or ROOT,
env=runtime_env.env,
timeout_s=timeout_s,
)
def _runtime_compatibility_hint(
runtime_env: PreparedRuntimeEnv,
*,
stdout: str,
stderr: str,
) -> str:
combined = f"{stdout}\n{stderr}"
if "ThreadStartResponse" in combined and "approvalsReviewer" in combined:
return (
"\nCompatibility hint:\n"
f"Pinned runtime {runtime_env.runtime_version} returned a thread/start payload "
"that is older than the current SDK schema and is missing "
"`approvalsReviewer`. Bump `sdk/python/_runtime_setup.py` to a matching "
"released runtime version.\n"
)
return ""
def _run_json_python(
runtime_env: PreparedRuntimeEnv,
source: str,
*,
cwd: Path | None = None,
timeout_s: int = 180,
) -> dict[str, object]:
result = _run_python(runtime_env, source, cwd=cwd, timeout_s=timeout_s)
assert result.returncode == 0, (
"Python snippet failed.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
return json.loads(result.stdout)
def _run_example(
runtime_env: PreparedRuntimeEnv,
folder: str,
script: str,
*,
timeout_s: int = 180,
) -> subprocess.CompletedProcess[str]:
path = EXAMPLES_DIR / folder / script
assert path.exists(), f"Missing example script: {path}"
stdin = "/exit\n" if folder == "11_cli_mini_app" else None
return _run_command(
[str(runtime_env.python), str(path)],
cwd=ROOT,
env=runtime_env.env,
timeout_s=timeout_s,
stdin=stdin,
)
def _notebook_cell_source(cell_index: int) -> str:
notebook = json.loads(NOTEBOOK_PATH.read_text())
return "".join(notebook["cells"][cell_index]["source"])
def test_real_initialize_and_model_list(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex
with Codex() as codex:
models = codex.models(include_hidden=True)
server = codex.metadata.serverInfo
print(json.dumps({
"user_agent": codex.metadata.userAgent,
"server_name": None if server is None else server.name,
"server_version": None if server is None else server.version,
"model_count": len(models.data),
}))
"""
),
)
assert isinstance(data["user_agent"], str) and data["user_agent"].strip()
if data["server_name"] is not None:
assert isinstance(data["server_name"], str) and data["server_name"].strip()
if data["server_version"] is not None:
assert isinstance(data["server_version"], str) and data["server_version"].strip()
assert isinstance(data["model_count"], int)
def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = thread.turn(TextInput("hello")).run()
persisted = thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
}))
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
def test_real_async_thread_turn_usage_and_ids_smoke(
runtime_env: PreparedRuntimeEnv,
) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import asyncio
import json
from codex_app_server import AsyncCodex, TextInput
async def main():
async with AsyncCodex() as codex:
thread = await codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = await (await thread.turn(TextInput("say ok"))).run()
persisted = await thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
}))
asyncio.run(main())
"""
),
)
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
def test_notebook_bootstrap_resolves_sdk_and_runtime_from_unrelated_cwd(
runtime_env: PreparedRuntimeEnv,
) -> None:
cell_1_source = _notebook_cell_source(1)
env = runtime_env.env.copy()
with tempfile.TemporaryDirectory() as temp_cwd:
result = _run_command(
[str(runtime_env.python), "-c", cell_1_source],
cwd=Path(temp_cwd),
env=env,
timeout_s=180,
)
assert result.returncode == 0, (
f"Notebook bootstrap failed from unrelated cwd.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
)
assert "SDK source:" in result.stdout
assert f"Runtime package: {runtime_env.runtime_version}" in result.stdout
def test_notebook_sync_cell_smoke(runtime_env: PreparedRuntimeEnv) -> None:
source = "\n\n".join(
[
_notebook_cell_source(1),
_notebook_cell_source(2),
_notebook_cell_source(3),
]
)
result = _run_python(runtime_env, source, timeout_s=240)
assert result.returncode == 0, (
f"Notebook sync smoke failed.\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
)
assert "status:" in result.stdout
assert "server:" in result.stdout
def test_real_streaming_smoke_turn_completed(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Reply with one short sentence."))
saw_delta = False
saw_completed = False
for event in turn.stream():
if event.method == "item/agentMessage/delta":
saw_delta = True
if event.method == "turn/completed":
saw_completed = True
print(json.dumps({
"saw_delta": saw_delta,
"saw_completed": saw_completed,
}))
"""
),
)
assert data["saw_completed"] is True
assert isinstance(data["saw_delta"], bool)
def test_real_turn_interrupt_smoke(runtime_env: PreparedRuntimeEnv) -> None:
data = _run_json_python(
runtime_env,
textwrap.dedent(
"""
import json
from codex_app_server import Codex, TextInput
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Count from 1 to 200 with commas."))
turn.interrupt()
follow_up = thread.turn(TextInput("Say 'ok' only.")).run()
print(json.dumps({"status": follow_up.status.value}))
"""
),
)
assert data["status"] in {"completed", "failed"}
@pytest.mark.parametrize(("folder", "script"), EXAMPLE_CASES)
def test_real_examples_run_and_assert(
runtime_env: PreparedRuntimeEnv,
folder: str,
script: str,
) -> None:
result = _run_example(runtime_env, folder, script)
assert result.returncode == 0, (
f"Example failed: {folder}/{script}\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
out = result.stdout
if folder == "01_quickstart_constructor":
assert "Status:" in out and "Text:" in out
assert "Server: unknown" not in out
elif folder == "02_turn_run":
assert "thread_id:" in out and "turn_id:" in out and "status:" in out
assert "persisted.items.count:" in out
elif folder == "03_turn_stream_events":
assert "stream.completed:" in out
assert "assistant>" in out
elif folder == "04_models_and_metadata":
assert "models.count:" in out
assert "server_name=None" not in out
assert "server_version=None" not in out
elif folder == "05_existing_thread":
assert "Created thread:" in out
elif folder == "06_thread_lifecycle_and_controls":
assert "Lifecycle OK:" in out
elif folder in {"07_image_and_text", "08_local_image_and_text"}:
assert "completed" in out.lower() or "Status:" in out
elif folder == "09_async_parity":
assert "Thread:" in out and "Turn:" in out
elif folder == "10_error_handling_and_retry":
assert "Text:" in out
elif folder == "11_cli_mini_app":
assert "Thread:" in out
elif folder == "12_turn_params_kitchen_sink":
assert "Status:" in out and "Items:" in out
elif folder == "13_model_select_and_turn_params":
assert "selected.model:" in out and "agent.message.params:" in out and "items.params:" in out
elif folder == "14_turn_controls":
assert "steer.result:" in out and "steer.final.status:" in out
assert "interrupt.result:" in out and "interrupt.final.status:" in out