Compare commits

...

4 Commits

Author SHA1 Message Date
Ahmed Ibrahim
7a7fdbf22f Accept string input for Python turns 2026-05-17 07:03:40 -07:00
Ahmed Ibrahim
b2becbfa87 Use TurnStatus in retry examples 2026-05-17 06:15:37 -07:00
Ahmed Ibrahim
ac86b0a0e1 Format Python SDK examples 2026-05-17 06:05:09 -07:00
Ahmed Ibrahim
fab7cba2c9 Return TurnResult from Python turn handles 2026-05-17 06:01:12 -07:00
44 changed files with 591 additions and 800 deletions

View File

@@ -34,8 +34,9 @@ with Codex() as codex:
print(len(result.items))
```
`result.final_response` is `None` when the turn completes without a final-answer
or phase-less assistant message item.
`thread.run(...)` and `thread.turn(...).run()` return `TurnResult`. Its
`final_response` is `None` when the turn completes without a final-answer or
phase-less assistant message item.
## Login
@@ -101,6 +102,8 @@ target wheel. The SDK package version and runtime package version must match.
- `Codex()` is eager and performs startup + `initialize` in the constructor.
- Use context managers (`with Codex() as codex:`) to ensure shutdown.
- Plain strings are accepted anywhere a turn input is accepted; they are
shorthand for `TextInput(...)`.
- Prefer `thread.run("...")` for the common case. Use `thread.turn(...)` when
you need streaming, steering, or interrupt control.
- For transient overload, use `retry_on_overload` from the package root.

View File

@@ -3,7 +3,7 @@
Public surface of `openai_codex` for app-server v2.
This SDK surface is experimental. Turn streams are routed by turn ID so one client can consume multiple active turns concurrently.
Thread and turn starts expose `approval_mode`. `ApprovalMode.auto_review` is the default; use `ApprovalMode.deny_all` to deny escalated permissions.
Thread starts default to `ApprovalMode.auto_review`; turn starts accept an optional `approval_mode` override.
## Package Entry
@@ -16,13 +16,14 @@ from openai_codex import (
DeviceCodeLoginHandle,
AsyncChatgptLoginHandle,
AsyncDeviceCodeLoginHandle,
RunResult,
Thread,
AsyncThread,
TurnHandle,
AsyncTurnHandle,
TurnResult,
Input,
InputItem,
RunInput,
TextInput,
ImageInput,
LocalImageInput,
@@ -38,6 +39,7 @@ from openai_codex.types import (
InitializeResponse,
ThreadItem,
ThreadTokenUsage,
TurnError,
TurnStatus,
)
```
@@ -146,16 +148,16 @@ attempt. API-key login completes synchronously and does not return a handle.
### Thread
- `run(input: str | Input, *, approval_mode=ApprovalMode.auto_review, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> RunResult`
- `turn(input: Input, *, approval_mode=ApprovalMode.auto_review, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> TurnHandle`
- `run(input: str | Input, *, approval_mode=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> TurnResult`
- `turn(input: str | Input, *, approval_mode=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> TurnHandle`
- `read(*, include_turns: bool = False) -> ThreadReadResponse`
- `set_name(name: str) -> ThreadSetNameResponse`
- `compact() -> ThreadCompactStartResponse`
### AsyncThread
- `run(input: str | Input, *, approval_mode=ApprovalMode.auto_review, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> Awaitable[RunResult]`
- `turn(input: Input, *, approval_mode=ApprovalMode.auto_review, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, summary=None) -> Awaitable[AsyncTurnHandle]`
- `run(input: str | Input, *, approval_mode=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> Awaitable[TurnResult]`
- `turn(input: str | Input, *, approval_mode=None, cwd=None, effort=None, model=None, output_schema=None, personality=None, sandbox_policy=None, service_tier=None, summary=None) -> Awaitable[AsyncTurnHandle]`
- `read(*, include_turns: bool = False) -> Awaitable[ThreadReadResponse]`
- `set_name(name: str) -> Awaitable[ThreadSetNameResponse]`
- `compact() -> Awaitable[ThreadCompactStartResponse]`
@@ -164,6 +166,12 @@ attempt. API-key login completes synchronously and does not return a handle.
the turn, consumes notifications until completion, and returns a small result
object with:
- `id: str`
- `status: TurnStatus`
- `error: TurnError | None`
- `started_at: int | None`
- `completed_at: int | None`
- `duration_ms: int | None`
- `final_response: str | None`
- `items: list[ThreadItem]`
- `usage: ThreadTokenUsage | None`
@@ -172,16 +180,16 @@ object with:
phase-less assistant message item.
Use `turn(...)` when you need low-level turn control (`stream()`, `steer()`,
`interrupt()`) or the public `Turn` model from `TurnHandle.run()`.
`interrupt()`) before collecting the turn result.
## TurnHandle / AsyncTurnHandle
### TurnHandle
- `steer(input: Input) -> TurnSteerResponse`
- `steer(input: str | Input) -> TurnSteerResponse`
- `interrupt() -> TurnInterruptResponse`
- `stream() -> Iterator[Notification]`
- `run() -> openai_codex.types.Turn`
- `run() -> TurnResult`
Behavior notes:
@@ -190,10 +198,10 @@ Behavior notes:
### AsyncTurnHandle
- `steer(input: Input) -> Awaitable[TurnSteerResponse]`
- `steer(input: str | Input) -> Awaitable[TurnSteerResponse]`
- `interrupt() -> Awaitable[TurnInterruptResponse]`
- `stream() -> AsyncIterator[Notification]`
- `run() -> Awaitable[openai_codex.types.Turn]`
- `run() -> Awaitable[TurnResult]`
Behavior notes:
@@ -211,8 +219,12 @@ Behavior notes:
InputItem = TextInput | ImageInput | LocalImageInput | SkillInput | MentionInput
Input = list[InputItem] | InputItem
RunInput = Input | str
```
Use a plain `str` as shorthand for `TextInput(...)` anywhere a turn input is accepted:
`thread.run("...")`, `thread.turn("...")`, and `turn.steer("...")`.
## Public Types
The SDK wrappers return and accept public app-server models wherever possible:

View File

@@ -8,7 +8,8 @@
## `run()` vs `stream()`
- `TurnHandle.run()` / `AsyncTurnHandle.run()` is the easiest path. It consumes events until completion and returns the public app-server `Turn` model from `openai_codex.types`.
- `Thread.run(...)` starts a turn and returns `TurnResult`.
- `TurnHandle.run()` / `AsyncTurnHandle.run()` consumes events for an existing turn handle and returns the same `TurnResult` shape.
- `TurnHandle.stream()` / `AsyncTurnHandle.stream()` yields raw notifications (`Notification`) so you can react event-by-event.
Choose `run()` for most apps. Choose `stream()` for progress UIs, custom timeout logic, or custom parsing.
@@ -66,7 +67,7 @@ Common causes:
- published runtime package (`openai-codex-cli-bin`) is not installed
- local `codex_bin` override points to a missing file
- incompatible/old app-server
- app-server version older than the SDK schema
## Why does a turn "hang"?
@@ -79,11 +80,11 @@ A turn is complete only when `turn/completed` arrives for that turn ID.
Use `retry_on_overload(...)` for transient overload failures (`ServerBusyError`).
Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundError`, fix inputs/version compatibility instead.
Do not blindly retry all errors. For `InvalidParamsError` or `MethodNotFoundError`, fix inputs or update the runtime/schema version instead.
## Common pitfalls
- Starting a new thread for every prompt when you wanted continuity.
- Forgetting to `close()` (or not using context managers).
- Assuming `run()` returns extra SDK-only fields instead of the public `Turn` model.
- Reading `Turn.items` from live start/completed payloads instead of using `TurnResult.items`.
- Mixing SDK input classes with raw dicts incorrectly.

View File

@@ -70,9 +70,10 @@ What happened:
- `Codex()` started and initialized `codex app-server`.
- `thread_start(...)` created a thread.
- `thread.run("...")` started a turn, consumed events until completion, and returned the final assistant response plus collected items and usage.
- `thread.run("...")` started a turn, consumed events until completion, and returned `TurnResult` with turn metadata, final assistant response, collected items, and usage.
- `result.final_response` is `None` when no final-answer or phase-less assistant message item completes for the turn.
- use `thread.turn(...)` when you need a `TurnHandle` for streaming, steering, interrupting, or turn IDs/status
- plain strings are accepted anywhere a turn input is accepted; typed inputs are still available for multimodal and structured cases
- use `thread.turn(...)` when you need a `TurnHandle` for streaming, steering, or interrupting before collecting `TurnResult`
- one client can consume multiple active turns concurrently; turn streams are routed by turn ID
## 4) Continue the same thread (multi-turn)

View File

@@ -5,18 +5,13 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main() -> None:
@@ -24,21 +19,16 @@ async def main() -> None:
thread = await codex.thread_start(
model="gpt-5.4", config={"model_reasoning_effort": "high"}
)
turn = await thread.turn(TextInput("Give 3 bullets about SIMD."))
turn = await thread.turn("Give 3 bullets about SIMD.")
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)
print("text:", result.final_response)
print("items.count:", len(result.items))
if __name__ == "__main__":

View File

@@ -5,30 +5,20 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
result = thread.turn(TextInput("Give 3 bullets about SIMD.")).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
result = thread.turn("Give 3 bullets about SIMD.").run()
print("thread_id:", thread.id)
print("turn_id:", result.id)
print("status:", result.status)
if result.error is not None:
print("error:", result.error)
print("text:", assistant_text_from_turn(persisted_turn))
print(
"persisted.items.count:",
0 if persisted_turn is None else len(persisted_turn.items or []),
)
print("text:", result.final_response)
print("items.count:", len(result.items))

View File

@@ -5,18 +5,13 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main() -> None:
@@ -24,12 +19,13 @@ async def main() -> None:
thread = await codex.thread_start(
model="gpt-5.4", config={"model_reasoning_effort": "high"}
)
turn = await thread.turn(TextInput("Explain SIMD in 3 short bullets."))
turn = await thread.turn("Explain SIMD in 3 short bullets.")
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
completed_status = None
completed_texts = []
async for event in turn.stream():
event_count += 1
@@ -38,24 +34,27 @@ async def main() -> None:
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
delta = event.payload.delta
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "item/completed":
root = event.payload.item.root
if root.type == "agentMessage":
completed_texts.append(root.text)
continue
if event.method == "turn/completed":
completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
completed_status = event.payload.turn.status.value
if completed_status is None:
raise RuntimeError("stream ended without turn/completed")
if saw_delta:
print()
else:
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
final_text = "".join(completed_texts).strip()
print("assistant>", final_text)
print("stream.started.seen:", saw_started)

View File

@@ -5,25 +5,21 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Explain SIMD in 3 short bullets."))
turn = thread.turn("Explain SIMD in 3 short bullets.")
event_count = 0
saw_started = False
saw_delta = False
completed_status = "unknown"
completed_status = None
completed_texts = []
for event in turn.stream():
event_count += 1
@@ -32,24 +28,27 @@ with Codex(config=runtime_config()) as codex:
print("stream.started")
continue
if event.method == "item/agentMessage/delta":
delta = getattr(event.payload, "delta", "")
delta = event.payload.delta
if delta:
if not saw_delta:
print("assistant> ", end="", flush=True)
print(delta, end="", flush=True)
saw_delta = True
continue
if event.method == "item/completed":
root = event.payload.item.root
if root.type == "agentMessage":
completed_texts.append(root.text)
continue
if event.method == "turn/completed":
completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
completed_status = event.payload.turn.status.value
if completed_status is None:
raise RuntimeError("stream ended without turn/completed")
if saw_delta:
print()
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, turn.id)
final_text = assistant_text_from_turn(persisted_turn).strip() or "[no assistant text]"
final_text = "".join(completed_texts).strip()
print("assistant>", final_text)
print("stream.started.seen:", saw_started)

View File

@@ -19,7 +19,7 @@ async def main() -> None:
print("server:", server_label(codex.metadata))
models = await codex.models()
print("models.count:", len(models.data))
print("models:", ", ".join(model.id for model in models.data[:5]) or "[none]")
print("models:", ", ".join(model.id for model in models.data[:5]))
if __name__ == "__main__":

View File

@@ -15,4 +15,4 @@ with Codex(config=runtime_config()) as codex:
print("server:", server_label(codex.metadata))
models = codex.models()
print("models.count:", len(models.data))
print("models:", ", ".join(model.id for model in models.data[:5]) or "[none]")
print("models:", ", ".join(model.id for model in models.data[:5]))

View File

@@ -5,18 +5,13 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main() -> None:
@@ -25,16 +20,14 @@ async def main() -> None:
model="gpt-5.4", config={"model_reasoning_effort": "high"}
)
first_turn = await original.turn(TextInput("Tell me one fact about Saturn."))
first_turn = await original.turn("Tell me one fact about Saturn.")
_ = await first_turn.run()
print("Created thread:", original.id)
resumed = await codex.thread_resume(original.id)
second_turn = await resumed.turn(TextInput("Continue with one more fact."))
second_turn = await resumed.turn("Continue with one more fact.")
second = await second_turn.run()
persisted = await resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))
print(second.final_response)
if __name__ == "__main__":

View File

@@ -5,26 +5,19 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
# Create an initial thread and turn so we have a real thread to resume.
original = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first = original.turn(TextInput("Tell me one fact about Saturn.")).run()
first = original.turn("Tell me one fact about Saturn.").run()
print("Created thread:", original.id)
# Resume the existing thread by ID.
resumed = codex.thread_resume(original.id)
second = resumed.turn(TextInput("Continue with one more fact.")).run()
persisted = resumed.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print(assistant_text_from_turn(persisted_turn))
second = resumed.turn("Continue with one more fact.").run()
print(second.final_response)

View File

@@ -11,7 +11,7 @@ ensure_local_sdk_src()
import asyncio
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main() -> None:
@@ -19,10 +19,8 @@ async def main() -> None:
thread = await codex.thread_start(
model="gpt-5.4", config={"model_reasoning_effort": "high"}
)
first = await (
await thread.turn(TextInput("One sentence about structured planning."))
).run()
second = await (await thread.turn(TextInput("Now restate it for a junior engineer."))).run()
first = await (await thread.turn("One sentence about structured planning.")).run()
second = await (await thread.turn("Now restate it for a junior engineer.")).run()
reopened = await codex.thread_resume(thread.id)
listing_active = await codex.thread_list(limit=20, archived=False)
@@ -33,45 +31,29 @@ async def main() -> None:
listing_archived = await codex.thread_list(limit=20, archived=True)
unarchived = await codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (
await resumed.turn(TextInput("Continue in one short sentence."))
).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
resumed = await codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = await (await resumed.turn("Continue in one short sentence.")).run()
forked_info = "n/a"
try:
forked = await codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = await (
await forked.turn(TextInput("Take a different angle in one short sentence."))
).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
forked = await codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = await (
await forked.turn("Take a different angle in one short sentence.")
).run()
compact_info = "sent"
try:
_ = await unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
compact_result = await unarchived.compact()
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("read.turns:", len(reading.thread.turns))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
print("resumed:", resumed_result.id, resumed_result.status)
print("forked:", forked_result.id, forked_result.status)
print("compact:", compact_result.model_dump(mode="json", by_alias=True))
if __name__ == "__main__":

View File

@@ -9,12 +9,12 @@ from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
first = thread.turn(TextInput("One sentence about structured planning.")).run()
second = thread.turn(TextInput("Now restate it for a junior engineer.")).run()
first = thread.turn("One sentence about structured planning.").run()
second = thread.turn("Now restate it for a junior engineer.").run()
reopened = codex.thread_resume(thread.id)
listing_active = codex.thread_list(limit=20, archived=False)
@@ -25,40 +25,24 @@ with Codex(config=runtime_config()) as codex:
listing_archived = codex.thread_list(limit=20, archived=True)
unarchived = codex.thread_unarchive(reopened.id)
resumed_info = "n/a"
try:
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn(TextInput("Continue in one short sentence.")).run()
resumed_info = f"{resumed_result.id} {resumed_result.status}"
except Exception as exc:
resumed_info = f"skipped({type(exc).__name__})"
resumed = codex.thread_resume(
unarchived.id,
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
resumed_result = resumed.turn("Continue in one short sentence.").run()
forked_info = "n/a"
try:
forked = codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = forked.turn(
TextInput("Take a different angle in one short sentence.")
).run()
forked_info = f"{forked_result.id} {forked_result.status}"
except Exception as exc:
forked_info = f"skipped({type(exc).__name__})"
forked = codex.thread_fork(unarchived.id, model="gpt-5.4")
forked_result = forked.turn("Take a different angle in one short sentence.").run()
compact_info = "sent"
try:
_ = unarchived.compact()
except Exception as exc:
compact_info = f"skipped({type(exc).__name__})"
compact_result = unarchived.compact()
print("Lifecycle OK:", thread.id)
print("first:", first.id, first.status)
print("second:", second.id, second.status)
print("read.turns:", len(reading.thread.turns or []))
print("read.turns:", len(reading.thread.turns))
print("list.active:", len(listing_active.data))
print("list.archived:", len(listing_archived.data))
print("resumed:", resumed_info)
print("forked:", forked_info)
print("compact:", compact_info)
print("resumed:", resumed_result.id, resumed_result.status)
print("forked:", forked_result.id, forked_result.status)
print("compact:", compact_result.model_dump(mode="json", by_alias=True))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -33,11 +28,9 @@ async def main() -> None:
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)
if __name__ == "__main__":

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -26,8 +21,6 @@ with Codex(config=runtime_config()) as codex:
ImageInput(REMOTE_IMAGE_URL),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)

View File

@@ -6,9 +6,7 @@ if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
@@ -36,11 +34,9 @@ async def main() -> None:
]
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)
if __name__ == "__main__":

View File

@@ -6,9 +6,7 @@ if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
temporary_sample_image_path,
)
@@ -29,8 +27,6 @@ with temporary_sample_image_path() as image_path:
LocalImageInput(str(image_path.resolve())),
]
).run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Status:", result.status)
print(assistant_text_from_turn(persisted_turn))
print(result.final_response)

View File

@@ -5,27 +5,19 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
server_label,
)
from _bootstrap import ensure_local_sdk_src, runtime_config, server_label
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
print("Server:", server_label(codex.metadata))
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(TextInput("Say hello in one sentence."))
turn = thread.turn("Say hello in one sentence.")
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
print("Thread:", thread.id)
print("Turn:", result.id)
print("Text:", assistant_text_from_turn(persisted_turn).strip())
print("Text:", result.final_response.strip())

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -23,7 +18,6 @@ from openai_codex import (
AsyncCodex,
JsonRpcError,
ServerBusyError,
TextInput,
is_retryable_error,
)
from openai_codex.types import TurnStatus
@@ -73,24 +67,21 @@ async def main() -> None:
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
return
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
return
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
return
print("Text:", assistant_text_from_turn(persisted_turn))
print("Text:", result.final_response)
def _run_turn(thread, prompt: str):
async def _inner():
turn = await thread.turn(TextInput(prompt))
turn = await thread.turn(prompt)
return await turn.run()
return _inner

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -18,7 +13,6 @@ from openai_codex import (
Codex,
JsonRpcError,
ServerBusyError,
TextInput,
retry_on_overload,
)
from openai_codex.types import TurnStatus
@@ -28,20 +22,17 @@ with Codex(config=runtime_config()) as codex:
try:
result = retry_on_overload(
lambda: thread.turn(TextInput("Summarize retry best practices in 3 bullets.")).run(),
lambda: thread.turn("Summarize retry best practices in 3 bullets.").run(),
max_attempts=3,
initial_delay_s=0.25,
max_delay_s=2.0,
)
except ServerBusyError as exc:
print("Server overloaded after retries:", exc.message)
print("Text:")
except JsonRpcError as exc:
print(f"JSON-RPC error {exc.code}: {exc.message}")
print("Text:")
else:
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
if result.status == TurnStatus.failed:
print("Turn failed:", result.error)
print("Text:", assistant_text_from_turn(persisted_turn))
else:
print("Text:", result.final_response)

View File

@@ -13,7 +13,6 @@ import asyncio
from openai_codex import (
AsyncCodex,
TextInput,
)
from openai_codex.types import (
ThreadTokenUsageUpdatedNotification,
@@ -21,19 +20,9 @@ from openai_codex.types import (
)
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
def _format_usage(usage: object) -> str:
last = usage.last
total = usage.total
return (
"usage>\n"
f" last: input={last.input_tokens} output={last.output_tokens} reasoning={last.reasoning_output_tokens} total={last.total_tokens} cached={last.cached_input_tokens}\n"
@@ -61,20 +50,18 @@ async def main() -> None:
if user_input in {"/exit", "/quit"}:
break
turn = await thread.turn(TextInput(user_input))
turn = await thread.turn(user_input)
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
async for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
delta = payload.delta
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
@@ -83,12 +70,13 @@ async def main() -> None:
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
print()
if status is None:
raise RuntimeError("stream ended without turn/completed")
if usage is None:
raise RuntimeError("stream ended without token usage")
status_text = _status_value(status)
status_text = status.value
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)

View File

@@ -11,7 +11,6 @@ ensure_local_sdk_src()
from openai_codex import (
Codex,
TextInput,
)
from openai_codex.types import (
ThreadTokenUsageUpdatedNotification,
@@ -21,19 +20,9 @@ from openai_codex.types import (
print("Codex mini CLI. Type /exit to quit.")
def _status_value(status: object | None) -> str:
return str(getattr(status, "value", status))
def _format_usage(usage: object | None) -> str:
if usage is None:
return "usage> (none)"
last = getattr(usage, "last", None)
total = getattr(usage, "total", None)
if last is None or total is None:
return f"usage> {usage}"
def _format_usage(usage: object) -> str:
last = usage.last
total = usage.total
return (
"usage>\n"
f" last: input={last.input_tokens} output={last.output_tokens} reasoning={last.reasoning_output_tokens} total={last.total_tokens} cached={last.cached_input_tokens}\n"
@@ -56,20 +45,18 @@ with Codex(config=runtime_config()) as codex:
if user_input in {"/exit", "/quit"}:
break
turn = thread.turn(TextInput(user_input))
turn = thread.turn(user_input)
usage = None
status = None
error = None
printed_delta = False
print("assistant> ", end="", flush=True)
for event in turn.stream():
payload = event.payload
if event.method == "item/agentMessage/delta":
delta = getattr(payload, "delta", "")
delta = payload.delta
if delta:
print(delta, end="", flush=True)
printed_delta = True
continue
if isinstance(payload, ThreadTokenUsageUpdatedNotification):
usage = payload.token_usage
@@ -78,12 +65,13 @@ with Codex(config=runtime_config()) as codex:
status = payload.turn.status
error = payload.turn.error
if printed_delta:
print()
else:
print("[no text]")
print()
if status is None:
raise RuntimeError("stream ended without turn/completed")
if usage is None:
raise RuntimeError("stream ended without token usage")
status_text = _status_value(status)
status_text = status.value
print(f"assistant.status> {status_text}")
if status_text == "failed":
print("assistant.error>", error)

View File

@@ -6,12 +6,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -19,7 +14,6 @@ import asyncio
from openai_codex import (
AsyncCodex,
TextInput,
)
from openai_codex.types import (
Personality,
@@ -54,15 +48,13 @@ async def main() -> None:
)
turn = await thread.turn(
TextInput(PROMPT),
PROMPT,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
summary=SUMMARY,
)
result = await turn.run()
persisted = await thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
structured_text = assistant_text_from_turn(persisted_turn).strip()
structured_text = result.final_response.strip()
try:
structured = json.loads(structured_text)
except json.JSONDecodeError as exc:
@@ -70,8 +62,8 @@ async def main() -> None:
f"Expected JSON matching OUTPUT_SCHEMA, got: {structured_text!r}"
) from exc
summary = structured.get("summary")
actions = structured.get("actions")
summary = structured["summary"]
actions = structured["actions"]
if (
not isinstance(summary, str)
or not isinstance(actions, list)
@@ -86,7 +78,7 @@ async def main() -> None:
print("actions:")
for action in actions:
print("-", action)
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))
print("Items:", len(result.items))
if __name__ == "__main__":

View File

@@ -6,18 +6,12 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import (
Codex,
TextInput,
)
from openai_codex.types import (
Personality,
@@ -48,15 +42,13 @@ with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
turn = thread.turn(
TextInput(PROMPT),
PROMPT,
output_schema=OUTPUT_SCHEMA,
personality=Personality.pragmatic,
summary=SUMMARY,
)
result = turn.run()
persisted = thread.read(include_turns=True)
persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)
structured_text = assistant_text_from_turn(persisted_turn).strip()
structured_text = result.final_response.strip()
try:
structured = json.loads(structured_text)
except json.JSONDecodeError as exc:
@@ -64,8 +56,8 @@ with Codex(config=runtime_config()) as codex:
f"Expected JSON matching OUTPUT_SCHEMA, got: {structured_text!r}"
) from exc
summary = structured.get("summary")
actions = structured.get("actions")
summary = structured["summary"]
actions = structured["actions"]
if (
not isinstance(summary, str)
or not isinstance(actions, list)
@@ -80,4 +72,4 @@ with Codex(config=runtime_config()) as codex:
print("actions:")
for action in actions:
print("-", action)
print("Items:", 0 if persisted_turn is None else len(persisted_turn.items or []))
print("Items:", len(result.items))

View File

@@ -5,12 +5,7 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
@@ -18,7 +13,6 @@ import asyncio
from openai_codex import (
AsyncCodex,
TextInput,
)
from openai_codex.types import (
Personality,
@@ -35,29 +29,27 @@ REASONING_RANK = {
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next(
(m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None
)
if preferred is not None:
return preferred
visible = [m for m in models if not m.hidden]
if not visible:
raise RuntimeError("models response did not include visible models")
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
if not top_candidates:
raise RuntimeError("models response did not include top-level visible models")
return max(top_candidates, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
raise RuntimeError(f"{model.model} did not advertise supported reasoning efforts")
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
key=lambda option: REASONING_RANK[option.reasoning_effort.value],
)
return ReasoningEffort(best.reasoning_effort.value)
@@ -98,21 +90,17 @@ async def main() -> None:
)
first_turn = await thread.turn(
TextInput("Give one short sentence about reliable production releases."),
"Give one short sentence about reliable production releases.",
model=selected_model.model,
effort=selected_effort,
)
first = await first_turn.run()
persisted = await thread.read(include_turns=True)
first_persisted_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_persisted_turn))
print(
"items:", 0 if first_persisted_turn is None else len(first_persisted_turn.items or [])
)
print("agent.message:", first.final_response)
print("items:", len(first.items))
second_turn = await thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
"Return JSON for a safe feature-flag rollout plan.",
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
@@ -122,14 +110,9 @@ async def main() -> None:
summary=ReasoningSummary.model_validate("concise"),
)
second = await second_turn.run()
persisted = await thread.read(include_turns=True)
second_persisted_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_persisted_turn))
print(
"items.params:",
0 if second_persisted_turn is None else len(second_persisted_turn.items or []),
)
print("agent.message.params:", second.final_response)
print("items.params:", len(second.items))
if __name__ == "__main__":

View File

@@ -5,18 +5,12 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
find_turn_by_id,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import (
Codex,
TextInput,
)
from openai_codex.types import (
Personality,
@@ -33,29 +27,27 @@ REASONING_RANK = {
"high": 4,
"xhigh": 5,
}
PREFERRED_MODEL = "gpt-5.4"
def _pick_highest_model(models):
visible = [m for m in models if not m.hidden] or models
preferred = next(
(m for m in visible if m.model == PREFERRED_MODEL or m.id == PREFERRED_MODEL), None
)
if preferred is not None:
return preferred
visible = [m for m in models if not m.hidden]
if not visible:
raise RuntimeError("models response did not include visible models")
known_names = {m.id for m in visible} | {m.model for m in visible}
top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]
pool = top_candidates or visible
return max(pool, key=lambda m: (m.model, m.id))
if not top_candidates:
raise RuntimeError("models response did not include top-level visible models")
return max(top_candidates, key=lambda m: (m.model, m.id))
def _pick_highest_turn_effort(model) -> ReasoningEffort:
if not model.supported_reasoning_efforts:
return ReasoningEffort.medium
raise RuntimeError(f"{model.model} did not advertise supported reasoning efforts")
best = max(
model.supported_reasoning_efforts,
key=lambda option: REASONING_RANK.get(option.reasoning_effort.value, -1),
key=lambda option: REASONING_RANK[option.reasoning_effort.value],
)
return ReasoningEffort(best.reasoning_effort.value)
@@ -95,18 +87,16 @@ with Codex(config=runtime_config()) as codex:
)
first = thread.turn(
TextInput("Give one short sentence about reliable production releases."),
"Give one short sentence about reliable production releases.",
model=selected_model.model,
effort=selected_effort,
).run()
persisted = thread.read(include_turns=True)
first_turn = find_turn_by_id(persisted.thread.turns, first.id)
print("agent.message:", assistant_text_from_turn(first_turn))
print("items:", 0 if first_turn is None else len(first_turn.items or []))
print("agent.message:", first.final_response)
print("items:", len(first.items))
second = thread.turn(
TextInput("Return JSON for a safe feature-flag rollout plan."),
"Return JSON for a safe feature-flag rollout plan.",
cwd=str(Path.cwd()),
effort=selected_effort,
model=selected_model.model,
@@ -115,8 +105,6 @@ with Codex(config=runtime_config()) as codex:
sandbox_policy=SANDBOX_POLICY,
summary=ReasoningSummary.model_validate("concise"),
).run()
persisted = thread.read(include_turns=True)
second_turn = find_turn_by_id(persisted.thread.turns, second.id)
print("agent.message.params:", assistant_text_from_turn(second_turn))
print("items.params:", 0 if second_turn is None else len(second_turn.items or []))
print("agent.message.params:", second.final_response)
print("items.params:", len(second.items))

View File

@@ -5,17 +5,13 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
import asyncio
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main() -> None:
@@ -23,59 +19,49 @@ async def main() -> None:
thread = await codex.thread_start(
model="gpt-5.4", config={"model_reasoning_effort": "high"}
)
steer_turn = await thread.turn(
TextInput("Count from 1 to 40 with commas, then one summary sentence.")
)
steer_result = "sent"
try:
_ = await steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_turn = await thread.turn("Count from 1 to 40 with commas, then one summary sentence.")
steer_result = await steer_turn.steer("Keep it brief and stop after 10 numbers.")
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
steer_completed_status = None
steer_deltas = []
async for event in steer_turn.stream():
steer_event_count += 1
if event.method == "item/agentMessage/delta":
steer_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
steer_completed_status = event.payload.turn.status.value
steer_preview = (
assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
)
if steer_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
steer_preview = "".join(steer_deltas).strip()
interrupt_turn = await thread.turn(
TextInput("Count from 1 to 200 with commas, then one summary sentence.")
"Count from 1 to 200 with commas, then one summary sentence."
)
interrupt_result = "sent"
try:
_ = await interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_result = await interrupt_turn.interrupt()
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
interrupt_completed_status = None
interrupt_deltas = []
async for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "item/agentMessage/delta":
interrupt_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
interrupt_completed_status = event.payload.turn.status.value
interrupt_preview = (
assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
)
if interrupt_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
interrupt_preview = "".join(interrupt_deltas).strip()
print("steer.result:", steer_result)
print("steer.result:", steer_result.model_dump(mode="json", by_alias=True))
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.result:", interrupt_result.model_dump(mode="json", by_alias=True))
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)

View File

@@ -5,69 +5,55 @@ _EXAMPLES_ROOT = Path(__file__).resolve().parents[1]
if str(_EXAMPLES_ROOT) not in sys.path:
sys.path.insert(0, str(_EXAMPLES_ROOT))
from _bootstrap import (
assistant_text_from_turn,
ensure_local_sdk_src,
runtime_config,
)
from _bootstrap import ensure_local_sdk_src, runtime_config
ensure_local_sdk_src()
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex(config=runtime_config()) as codex:
thread = codex.thread_start(model="gpt-5.4", config={"model_reasoning_effort": "high"})
steer_turn = thread.turn(
TextInput("Count from 1 to 40 with commas, then one summary sentence.")
)
steer_result = "sent"
try:
_ = steer_turn.steer(TextInput("Keep it brief and stop after 10 numbers."))
except Exception as exc:
steer_result = f"skipped {type(exc).__name__}"
steer_turn = thread.turn("Count from 1 to 40 with commas, then one summary sentence.")
steer_result = steer_turn.steer("Keep it brief and stop after 10 numbers.")
steer_event_count = 0
steer_completed_status = "unknown"
steer_completed_turn = None
steer_completed_status = None
steer_deltas = []
for event in steer_turn.stream():
steer_event_count += 1
if event.method == "item/agentMessage/delta":
steer_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
steer_completed_turn = event.payload.turn
steer_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
steer_completed_status = event.payload.turn.status.value
steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or "[no assistant text]"
if steer_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
steer_preview = "".join(steer_deltas).strip()
interrupt_turn = thread.turn(
TextInput("Count from 1 to 200 with commas, then one summary sentence.")
)
interrupt_result = "sent"
try:
_ = interrupt_turn.interrupt()
except Exception as exc:
interrupt_result = f"skipped {type(exc).__name__}"
interrupt_turn = thread.turn("Count from 1 to 200 with commas, then one summary sentence.")
interrupt_result = interrupt_turn.interrupt()
interrupt_event_count = 0
interrupt_completed_status = "unknown"
interrupt_completed_turn = None
interrupt_completed_status = None
interrupt_deltas = []
for event in interrupt_turn.stream():
interrupt_event_count += 1
if event.method == "item/agentMessage/delta":
interrupt_deltas.append(event.payload.delta)
continue
if event.method == "turn/completed":
interrupt_completed_turn = event.payload.turn
interrupt_completed_status = getattr(
event.payload.turn.status, "value", str(event.payload.turn.status)
)
interrupt_completed_status = event.payload.turn.status.value
interrupt_preview = (
assistant_text_from_turn(interrupt_completed_turn).strip() or "[no assistant text]"
)
if interrupt_completed_status is None:
raise RuntimeError("stream ended without turn/completed")
interrupt_preview = "".join(interrupt_deltas).strip()
print("steer.result:", steer_result)
print("steer.result:", steer_result.model_dump(mode="json", by_alias=True))
print("steer.final.status:", steer_completed_status)
print("steer.events.count:", steer_event_count)
print("steer.assistant.preview:", steer_preview)
print("interrupt.result:", interrupt_result)
print("interrupt.result:", interrupt_result.model_dump(mode="json", by_alias=True))
print("interrupt.final.status:", interrupt_completed_status)
print("interrupt.events.count:", interrupt_event_count)
print("interrupt.assistant.preview:", interrupt_preview)

View File

@@ -8,6 +8,9 @@ Each example folder contains runnable versions:
All examples intentionally use only public SDK exports from `openai_codex`
and `openai_codex.types`.
Examples use plain strings for text-only turns and typed input objects for
multimodal or structured input lists.
## Prerequisites
- Python `>=3.10`
@@ -81,6 +84,6 @@ python examples/01_quickstart_constructor/async.py
- `13_model_select_and_turn_params/`
- list models, pick highest model + highest supported reasoning effort, run turns, print message and usage
- `14_turn_controls/`
- separate best-effort `steer()` and `interrupt()` demos with concise summaries
- separate `steer()` and `interrupt()` demos with concise summaries
- `15_login_and_account/`
- browser-login handle lifecycle, cancellation, and account inspection

View File

@@ -6,7 +6,7 @@ import sys
import tempfile
import zlib
from pathlib import Path
from typing import Iterable, Iterator
from typing import Any, Iterator
_SDK_PYTHON_DIR = Path(__file__).resolve().parents[1]
_SDK_PYTHON_STR = str(_SDK_PYTHON_DIR)
@@ -103,53 +103,5 @@ def temporary_sample_image_path() -> Iterator[Path]:
yield image_path
def server_label(metadata: object) -> str:
server = getattr(metadata, "serverInfo", None)
server_name = ((getattr(server, "name", None) or "") if server is not None else "").strip()
server_version = (
(getattr(server, "version", None) or "") if server is not None else ""
).strip()
if server_name and server_version:
return f"{server_name} {server_version}"
user_agent = (
(getattr(metadata, "userAgent", None) or "") if metadata is not None else ""
).strip()
return user_agent or "unknown"
def find_turn_by_id(turns: Iterable[object] | None, turn_id: str) -> object | None:
for turn in turns or []:
if getattr(turn, "id", None) == turn_id:
return turn
return None
def assistant_text_from_turn(turn: object | None) -> str:
if turn is None:
return ""
chunks: list[str] = []
for item in getattr(turn, "items", []) or []:
raw_item = item.model_dump(mode="json") if hasattr(item, "model_dump") else item
if not isinstance(raw_item, dict):
continue
item_type = raw_item.get("type")
if item_type == "agentMessage":
text = raw_item.get("text")
if isinstance(text, str) and text:
chunks.append(text)
continue
if item_type != "message" or raw_item.get("role") != "assistant":
continue
for content in raw_item.get("content") or []:
if not isinstance(content, dict) or content.get("type") != "output_text":
continue
text = content.get("text")
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks)
def server_label(metadata: Any) -> str:
return f"{metadata.serverInfo.name} {metadata.serverInfo.version}"

View File

@@ -11,9 +11,20 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"id": "1b6614a5",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Kernel: /Users/aibrahim/code/codex/.venv/bin/python\n",
"SDK source: /Users/aibrahim/code/codex/sdk/python/src\n",
"Runtime package: 0.131.0a4\n"
]
}
],
"source": [
"# Cell 1: bootstrap local SDK imports + pinned runtime package\n",
"import os\n",
@@ -25,23 +36,10 @@
" f'Notebook requires Python 3.10+; current interpreter is {sys.version.split()[0]}.'\n",
" )\n",
"\n",
"try:\n",
" _ = os.getcwd()\n",
"except FileNotFoundError:\n",
" os.chdir(str(Path.home()))\n",
"\n",
"\n",
"def _is_sdk_python_dir(path: Path) -> bool:\n",
" return (path / 'pyproject.toml').exists() and (path / 'src' / 'openai_codex').exists()\n",
"\n",
"\n",
"def _iter_home_fallback_candidates(home: Path):\n",
" # bounded depth scan under home to support launching notebooks from unrelated cwd values\n",
" patterns = ('sdk/python', '*/sdk/python', '*/*/sdk/python', '*/*/*/sdk/python')\n",
" for pattern in patterns:\n",
" yield from home.glob(pattern)\n",
"\n",
"\n",
"def _find_sdk_python_dir(start: Path) -> Path | None:\n",
" checked = set()\n",
"\n",
@@ -70,21 +68,6 @@
" if found is not None:\n",
" return found\n",
"\n",
" for entry in sys.path:\n",
" if not entry:\n",
" continue\n",
" entry_path = Path(entry).expanduser()\n",
" for candidate in (entry_path, entry_path / 'sdk' / 'python'):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" home = Path.home()\n",
" for candidate in _iter_home_fallback_candidates(home):\n",
" found = _consider(candidate)\n",
" if found is not None:\n",
" return found\n",
"\n",
" return None\n",
"\n",
"\n",
@@ -124,12 +107,13 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"id": "137a6d64",
"metadata": {},
"outputs": [],
"source": [
"# Cell 2: imports (public only)\n",
"from _bootstrap import assistant_text_from_turn, find_turn_by_id, server_label\n",
"from _bootstrap import server_label\n",
"from openai_codex import (\n",
" AsyncCodex,\n",
" Codex,\n",
@@ -143,74 +127,89 @@
{
"cell_type": "code",
"execution_count": null,
"id": "5fae892d",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Please complete login at: https://auth.openai.com/oauth/authorize?response_type=code&client_id=app_EMoamEEZ73f0CkXaXp7hrann&redirect_uri=http%3A%2F%2Flocalhost%3A1455%2Fauth%2Fcallback&scope=openid%20profile%20email%20offline_access%20api.connectors.read%20api.connectors.invoke&code_challenge=Yl9Dc0ExoOBhzb_EhaEoHYEvUDVmqxbsTDG8S6Svj9E&code_challenge_method=S256&id_token_add_organizations=true&codex_cli_simplified_flow=true&state=BGkpiZDe6h7RymsrDCIgdr9f4cPPWNxVpQvo0owT9pg&originator=codex_python_sdk\n",
"login.id: 7f768161-e216-49a2-906c-587853917e75\n",
"login.auth_url: https://auth.openai.com/oauth/authorize?response_type=code&client_id=app_EMoamEEZ73f0CkXaXp7hrann&redirect_uri=http%3A%2F%2Flocalhost%3A1455%2Fauth%2Fcallback&scope=openid%20profile%20email%20offline_access%20api.connectors.read%20api.connectors.invoke&code_challenge=Yl9Dc0ExoOBhzb_EhaEoHYEvUDVmqxbsTDG8S6Svj9E&code_challenge_method=S256&id_token_add_organizations=true&codex_cli_simplified_flow=true&state=BGkpiZDe6h7RymsrDCIgdr9f4cPPWNxVpQvo0owT9pg&originator=codex_python_sdk\n",
"login.completed.success: True\n",
"account: account=Account(root=ChatgptAccount(email='aibrahim@openai.com', plan_type=<PlanType.business: 'business'>, type='chatgpt')) requires_openai_auth=True\n"
]
}
],
"source": [
"# Cell 2b: browser login handle lifecycle\n",
"with Codex() as codex:\n",
" # Open this URL and call `wait()` without canceling when completing login for real.\n",
" login = codex.login_chatgpt()\n",
" canceled = login.cancel()\n",
" print('Please complete login at:', login.auth_url)\n",
" completed = login.wait()\n",
" account = codex.account()\n",
"\n",
" print('login.id:', login.login_id)\n",
" print('login.auth_url:', login.auth_url)\n",
" print('login.cancel.status:', canceled.status)\n",
" print('login.completed.success:', completed.success)\n",
" print('account.requires_openai_auth:', account.requires_openai_auth)\n"
" print('account:', account.email)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 12,
"id": "ebdc04d9",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"- Gradient descent is a method for minimizing a function by repeatedly moving parameters a small step in the direction that most decreases the loss.\n",
"- It uses the gradient, which tells you the slope of the loss surface, so each update is typically `new = old - learning_rate * gradient`.\n",
"- Over many steps, it usually moves toward a local minimum, with speed and stability depending a lot on the learning rate.\n"
]
}
],
"source": [
"# Cell 3: simple sync conversation\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(TextInput('Explain gradient descent in 3 bullets.'))\n",
" result = turn.run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('server:', server_label(codex.metadata))\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
" result = thread.run('Explain gradient descent in 3 bullets.')\n",
" print(result.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bb4abb96",
"metadata": {},
"outputs": [],
"source": [
"# Cell 4: multi-turn continuity in same thread\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
"\n",
" first = thread.turn(TextInput('Give a short summary of transformers.')).run()\n",
" second = thread.turn(TextInput('Now explain that to a high-school student.')).run()\n",
" persisted = thread.read(include_turns=True)\n",
" second_turn = find_turn_by_id(persisted.thread.turns, second.id)\n",
"\n",
" first = thread.turn('Give a short summary of transformers.').run()\n",
" second = thread.turn('Now explain that to a high-school student.').run()\n",
" print('first status:', first.status)\n",
" print('second status:', second.status)\n",
" print('second text:', assistant_text_from_turn(second_turn))\n"
" print('second text:', second.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8b0c80fd",
"metadata": {},
"outputs": [],
"source": [
"# Cell 5: full thread lifecycle and branching (sync)\n",
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" first = thread.turn(TextInput('One sentence about structured planning.')).run()\n",
" second = thread.turn(TextInput('Now restate it for a junior engineer.')).run()\n",
" first = thread.turn('One sentence about structured planning.').run()\n",
" second = thread.turn('Now restate it for a junior engineer.').run()\n",
"\n",
" reopened = codex.thread_resume(thread.id)\n",
" listing_active = codex.thread_list(limit=20, archived=False)\n",
@@ -221,46 +220,33 @@
" listing_archived = codex.thread_list(limit=20, archived=True)\n",
" unarchived = codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = resumed.turn(TextInput('Continue in one short sentence.')).run()\n",
" resumed_info = f'{resumed_result.id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
" resumed = codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = resumed.turn('Continue in one short sentence.').run()\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = forked.turn(TextInput('Take a different angle in one short sentence.')).run()\n",
" forked_info = f'{forked_result.id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
" forked = codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = forked.turn('Take a different angle in one short sentence.').run()\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
" compact_result = unarchived.compact()\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.id, first.status)\n",
" print('second:', second.id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('read.turns:', len(reading.thread.turns))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n"
" print('resumed:', resumed_result.id, resumed_result.status)\n",
" print('forked:', forked_result.id, forked_result.status)\n",
" print('compact:', compact_result.model_dump(mode='json', by_alias=True))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "310db8c0",
"metadata": {},
"outputs": [],
"source": [
@@ -289,7 +275,7 @@
"with Codex() as codex:\n",
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" turn = thread.turn(\n",
" TextInput('Propose a safe production feature-flag rollout. Return JSON matching the schema.'),\n",
" 'Propose a safe production feature-flag rollout. Return JSON matching the schema.',\n",
" cwd=str(Path.cwd()),\n",
" effort=ReasoningEffort.medium,\n",
" model='gpt-5.4',\n",
@@ -299,16 +285,14 @@
" summary=summary,\n",
" )\n",
" result = turn.run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
" print(result.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7a33c97d",
"metadata": {},
"outputs": [],
"source": [
@@ -332,17 +316,20 @@
"\n",
"\n",
"def pick_highest_model(models):\n",
" visible = [m for m in models if not m.hidden] or models\n",
" visible = [m for m in models if not m.hidden]\n",
" if not visible:\n",
" raise RuntimeError('models response did not include visible models')\n",
" known_names = {m.id for m in visible} | {m.model for m in visible}\n",
" top_candidates = [m for m in visible if not (m.upgrade and m.upgrade in known_names)]\n",
" pool = top_candidates or visible\n",
" return max(pool, key=lambda m: (m.model, m.id))\n",
" if not top_candidates:\n",
" raise RuntimeError('models response did not include top-level visible models')\n",
" return max(top_candidates, key=lambda m: (m.model, m.id))\n",
"\n",
"\n",
"def pick_highest_turn_effort(model) -> ReasoningEffort:\n",
" if not model.supported_reasoning_efforts:\n",
" return ReasoningEffort.medium\n",
" best = max(model.supported_reasoning_efforts, key=lambda opt: reasoning_rank.get(opt.reasoning_effort.value, -1))\n",
" raise RuntimeError(f'{model.model} did not advertise supported reasoning efforts')\n",
" best = max(model.supported_reasoning_efforts, key=lambda opt: reasoning_rank[opt.reasoning_effort.value])\n",
" return ReasoningEffort(best.reasoning_effort.value)\n",
"\n",
"\n",
@@ -368,17 +355,15 @@
" thread = codex.thread_start(model=selected_model.model, config={'model_reasoning_effort': selected_effort.value})\n",
"\n",
" first = thread.turn(\n",
" TextInput('Give one short sentence about reliable production releases.'),\n",
" 'Give one short sentence about reliable production releases.',\n",
" model=selected_model.model,\n",
" effort=selected_effort,\n",
" ).run()\n",
" persisted = thread.read(include_turns=True)\n",
" first_turn = find_turn_by_id(persisted.thread.turns, first.id)\n",
" print('agent.message:', assistant_text_from_turn(first_turn))\n",
" print('items:', 0 if first_turn is None else len(first_turn.items or []))\n",
" print('agent.message:', first.final_response)\n",
" print('items:', len(first.items))\n",
"\n",
" second = thread.turn(\n",
" TextInput('Return JSON for a safe feature-flag rollout plan.'),\n",
" 'Return JSON for a safe feature-flag rollout plan.',\n",
" cwd=str(Path.cwd()),\n",
" effort=selected_effort,\n",
" model=selected_model.model,\n",
@@ -387,15 +372,14 @@
" sandbox_policy=sandbox_policy,\n",
" summary=ReasoningSummary.model_validate('concise'),\n",
" ).run()\n",
" persisted = thread.read(include_turns=True)\n",
" second_turn = find_turn_by_id(persisted.thread.turns, second.id)\n",
" print('agent.message.params:', assistant_text_from_turn(second_turn))\n",
" print('items.params:', 0 if second_turn is None else len(second_turn.items or []))\n"
" print('agent.message.params:', second.final_response)\n",
" print('items.params:', len(second.items))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e9aef26a",
"metadata": {},
"outputs": [],
"source": [
@@ -408,16 +392,14 @@
" TextInput('What do you see in this image? 3 bullets.'),\n",
" ImageInput(remote_image_url),\n",
" ]).run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
" print(result.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a0cecc6c",
"metadata": {},
"outputs": [],
"source": [
@@ -429,16 +411,14 @@
" TextInput('Describe the colors and layout in this generated local image in 2 bullets.'),\n",
" LocalImageInput(str(local_image_path.resolve())),\n",
" ]).run()\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
" print(result.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "91afa2b8",
"metadata": {},
"outputs": [],
"source": [
@@ -447,21 +427,19 @@
" thread = codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
"\n",
" result = retry_on_overload(\n",
" lambda: thread.turn(TextInput('List 5 failure modes in distributed systems.')).run(),\n",
" lambda: thread.turn('List 5 failure modes in distributed systems.').run(),\n",
" max_attempts=3,\n",
" initial_delay_s=0.25,\n",
" max_delay_s=2.0,\n",
" )\n",
" persisted = thread.read(include_turns=True)\n",
" persisted_turn = find_turn_by_id(persisted.thread.turns, result.id)\n",
"\n",
" print('status:', result.status)\n",
" print(assistant_text_from_turn(persisted_turn))\n"
" print(result.final_response)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "103be934",
"metadata": {},
"outputs": [],
"source": [
@@ -472,8 +450,8 @@
"async def async_lifecycle_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" first = await (await thread.turn(TextInput('One sentence about structured planning.'))).run()\n",
" second = await (await thread.turn(TextInput('Now restate it for a junior engineer.'))).run()\n",
" first = await (await thread.turn('One sentence about structured planning.')).run()\n",
" second = await (await thread.turn('Now restate it for a junior engineer.')).run()\n",
"\n",
" reopened = await codex.thread_resume(thread.id)\n",
" listing_active = await codex.thread_list(limit=20, archived=False)\n",
@@ -484,41 +462,27 @@
" listing_archived = await codex.thread_list(limit=20, archived=True)\n",
" unarchived = await codex.thread_unarchive(reopened.id)\n",
"\n",
" resumed_info = 'n/a'\n",
" try:\n",
" resumed = await codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = await (await resumed.turn(TextInput('Continue in one short sentence.'))).run()\n",
" resumed_info = f'{resumed_result.id} {resumed_result.status}'\n",
" except Exception as e:\n",
" resumed_info = f'skipped({type(e).__name__})'\n",
" resumed = await codex.thread_resume(\n",
" unarchived.id,\n",
" model='gpt-5.4',\n",
" config={'model_reasoning_effort': 'high'},\n",
" )\n",
" resumed_result = await (await resumed.turn('Continue in one short sentence.')).run()\n",
"\n",
" forked_info = 'n/a'\n",
" try:\n",
" forked = await codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = await (await forked.turn(TextInput('Take a different angle in one short sentence.'))).run()\n",
" forked_info = f'{forked_result.id} {forked_result.status}'\n",
" except Exception as e:\n",
" forked_info = f'skipped({type(e).__name__})'\n",
" forked = await codex.thread_fork(unarchived.id, model='gpt-5.4')\n",
" forked_result = await (await forked.turn('Take a different angle in one short sentence.')).run()\n",
"\n",
" compact_info = 'sent'\n",
" try:\n",
" _ = await unarchived.compact()\n",
" except Exception as e:\n",
" compact_info = f'skipped({type(e).__name__})'\n",
" compact_result = await unarchived.compact()\n",
"\n",
" print('Lifecycle OK:', thread.id)\n",
" print('first:', first.id, first.status)\n",
" print('second:', second.id, second.status)\n",
" print('read.turns:', len(reading.thread.turns or []))\n",
" print('read.turns:', len(reading.thread.turns))\n",
" print('list.active:', len(listing_active.data))\n",
" print('list.archived:', len(listing_archived.data))\n",
" print('resumed:', resumed_info)\n",
" print('forked:', forked_info)\n",
" print('compact:', compact_info)\n",
" print('resumed:', resumed_result.id, resumed_result.status)\n",
" print('forked:', forked_result.id, forked_result.status)\n",
" print('compact:', compact_result.model_dump(mode='json', by_alias=True))\n",
"\n",
"\n",
"await async_lifecycle_demo()\n"
@@ -527,58 +491,59 @@
{
"cell_type": "code",
"execution_count": null,
"id": "365aa10c",
"metadata": {},
"outputs": [],
"source": [
"# Cell 10: async turn controls (best effort steer + interrupt)\n",
"# Cell 10: async turn controls (steer + interrupt)\n",
"import asyncio\n",
"\n",
"\n",
"async def async_stream_demo():\n",
" async with AsyncCodex() as codex:\n",
" thread = await codex.thread_start(model='gpt-5.4', config={'model_reasoning_effort': 'high'})\n",
" steer_turn = await thread.turn(TextInput('Count from 1 to 40 with commas, then one summary sentence.'))\n",
" steer_turn = await thread.turn('Count from 1 to 40 with commas, then one summary sentence.')\n",
"\n",
" steer_result = 'sent'\n",
" try:\n",
" _ = await steer_turn.steer(TextInput('Keep it brief and stop after 10 numbers.'))\n",
" except Exception as e:\n",
" steer_result = f'skipped {type(e).__name__}'\n",
" steer_result = await steer_turn.steer('Keep it brief and stop after 10 numbers.')\n",
"\n",
" steer_event_count = 0\n",
" steer_completed_status = 'unknown'\n",
" steer_completed_turn = None\n",
" steer_completed_status = None\n",
" steer_deltas = []\n",
" async for event in steer_turn.stream():\n",
" steer_event_count += 1\n",
" if event.method == 'item/agentMessage/delta':\n",
" steer_deltas.append(event.payload.delta)\n",
" continue\n",
" if event.method == 'turn/completed':\n",
" steer_completed_turn = event.payload.turn\n",
" steer_completed_status = getattr(event.payload.turn.status, 'value', str(event.payload.turn.status))\n",
" steer_completed_status = event.payload.turn.status.value\n",
"\n",
" steer_preview = assistant_text_from_turn(steer_completed_turn).strip() or '[no assistant text]'\n",
" if steer_completed_status is None:\n",
" raise RuntimeError('stream ended without turn/completed')\n",
" steer_preview = ''.join(steer_deltas).strip()\n",
"\n",
" interrupt_turn = await thread.turn(TextInput('Count from 1 to 200 with commas, then one summary sentence.'))\n",
" interrupt_result = 'sent'\n",
" try:\n",
" _ = await interrupt_turn.interrupt()\n",
" except Exception as e:\n",
" interrupt_result = f'skipped {type(e).__name__}'\n",
" interrupt_turn = await thread.turn('Count from 1 to 200 with commas, then one summary sentence.')\n",
" interrupt_result = await interrupt_turn.interrupt()\n",
"\n",
" interrupt_event_count = 0\n",
" interrupt_completed_status = 'unknown'\n",
" interrupt_completed_turn = None\n",
" interrupt_completed_status = None\n",
" interrupt_deltas = []\n",
" async for event in interrupt_turn.stream():\n",
" interrupt_event_count += 1\n",
" if event.method == 'item/agentMessage/delta':\n",
" interrupt_deltas.append(event.payload.delta)\n",
" continue\n",
" if event.method == 'turn/completed':\n",
" interrupt_completed_turn = event.payload.turn\n",
" interrupt_completed_status = getattr(event.payload.turn.status, 'value', str(event.payload.turn.status))\n",
" interrupt_completed_status = event.payload.turn.status.value\n",
"\n",
" interrupt_preview = assistant_text_from_turn(interrupt_completed_turn).strip() or '[no assistant text]'\n",
" if interrupt_completed_status is None:\n",
" raise RuntimeError('stream ended without turn/completed')\n",
" interrupt_preview = ''.join(interrupt_deltas).strip()\n",
"\n",
" print('steer.result:', steer_result)\n",
" print('steer.result:', steer_result.model_dump(mode='json', by_alias=True))\n",
" print('steer.final.status:', steer_completed_status)\n",
" print('steer.events.count:', steer_event_count)\n",
" print('steer.assistant.preview:', steer_preview)\n",
" print('interrupt.result:', interrupt_result)\n",
" print('interrupt.result:', interrupt_result.model_dump(mode='json', by_alias=True))\n",
" print('interrupt.final.status:', interrupt_completed_status)\n",
" print('interrupt.events.count:', interrupt_event_count)\n",
" print('interrupt.assistant.preview:', interrupt_preview)\n",
@@ -590,13 +555,21 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"version": "3.10+"
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.14.3"
}
},
"nbformat": 4,

View File

@@ -1040,12 +1040,12 @@ def _render_thread_block(
lines = [
" def turn(",
" self,",
" input: Input,",
" input: RunInput,",
" *,",
*_approval_mode_override_signature_lines(),
*_kw_signature_lines(turn_fields),
" ) -> TurnHandle:",
" wire_input = _to_wire_input(input)",
" wire_input = _to_wire_input(_normalize_run_input(input))",
_approval_mode_assignment_line("_approval_mode_override_settings"),
" params = TurnStartParams(",
" thread_id=self.id,",
@@ -1065,13 +1065,13 @@ def _render_async_thread_block(
lines = [
" async def turn(",
" self,",
" input: Input,",
" input: RunInput,",
" *,",
*_approval_mode_override_signature_lines(),
*_kw_signature_lines(turn_fields),
" ) -> AsyncTurnHandle:",
" await self._codex._ensure_initialized()",
" wire_input = _to_wire_input(input)",
" wire_input = _to_wire_input(_normalize_run_input(input))",
_approval_mode_assignment_line("_approval_mode_override_settings"),
" params = TurnStartParams(",
" thread_id=self.id,",

View File

@@ -14,11 +14,12 @@ from .api import (
InputItem,
LocalImageInput,
MentionInput,
RunResult,
RunInput,
SkillInput,
TextInput,
Thread,
TurnHandle,
TurnResult,
)
from .client import AppServerConfig
from .errors import (
@@ -51,9 +52,10 @@ __all__ = [
"AsyncThread",
"TurnHandle",
"AsyncTurnHandle",
"RunResult",
"TurnResult",
"Input",
"InputItem",
"RunInput",
"TextInput",
"ImageInput",
"LocalImageInput",

View File

@@ -12,13 +12,20 @@ from .generated.v2_all import (
ThreadTokenUsageUpdatedNotification,
Turn as AppServerTurn,
TurnCompletedNotification,
TurnError,
TurnStatus,
)
from .models import Notification
@dataclass(slots=True)
class RunResult:
class TurnResult:
id: str
status: TurnStatus
error: TurnError | None
started_at: int | None
completed_at: int | None
duration_ms: int | None
final_response: str | None
items: list[ThreadItem]
usage: ThreadTokenUsage | None
@@ -56,7 +63,7 @@ def _raise_for_failed_turn(turn: AppServerTurn) -> None:
raise RuntimeError(f"turn failed with status {turn.status.value}")
def _collect_run_result(stream: Iterator[Notification], *, turn_id: str) -> RunResult:
def _collect_turn_result(stream: Iterator[Notification], *, turn_id: str) -> TurnResult:
completed: TurnCompletedNotification | None = None
items: list[ThreadItem] = []
usage: ThreadTokenUsage | None = None
@@ -76,16 +83,23 @@ def _collect_run_result(stream: Iterator[Notification], *, turn_id: str) -> RunR
raise RuntimeError("turn completed event not received")
_raise_for_failed_turn(completed.turn)
return RunResult(
turn = completed.turn
return TurnResult(
id=turn.id,
status=turn.status,
error=turn.error,
started_at=turn.started_at,
completed_at=turn.completed_at,
duration_ms=turn.duration_ms,
final_response=_final_assistant_response_from_items(items),
items=items,
usage=usage,
)
async def _collect_async_run_result(
async def _collect_async_turn_result(
stream: AsyncIterator[Notification], *, turn_id: str
) -> RunResult:
) -> TurnResult:
completed: TurnCompletedNotification | None = None
items: list[ThreadItem] = []
usage: ThreadTokenUsage | None = None
@@ -105,7 +119,14 @@ async def _collect_async_run_result(
raise RuntimeError("turn completed event not received")
_raise_for_failed_turn(completed.turn)
return RunResult(
turn = completed.turn
return TurnResult(
id=turn.id,
status=turn.status,
error=turn.error,
started_at=turn.started_at,
completed_at=turn.completed_at,
duration_ms=turn.duration_ms,
final_response=_final_assistant_response_from_items(items),
items=items,
usage=usage,

View File

@@ -12,7 +12,7 @@ from ._approval_mode import (
from ._initialize_metadata import validate_initialize_metadata
from ._inputs import (
ImageInput as ImageInput,
Input,
Input as Input,
InputItem as InputItem,
LocalImageInput as LocalImageInput,
MentionInput as MentionInput,
@@ -33,9 +33,9 @@ from ._login import (
start_device_code_login,
)
from ._run import (
RunResult,
_collect_async_run_result,
_collect_run_result,
TurnResult,
_collect_async_turn_result,
_collect_turn_result,
)
from .async_client import AsyncAppServerClient
from .client import AppServerClient, AppServerConfig
@@ -65,7 +65,6 @@ from .generated.v2_all import (
ThreadSourceKind,
ThreadStartParams,
ThreadStartSource,
Turn as AppServerTurn,
TurnCompletedNotification,
TurnInterruptResponse,
TurnStartParams,
@@ -533,9 +532,9 @@ class Thread:
sandbox_policy: SandboxPolicy | None = None,
service_tier: str | None = None,
summary: ReasoningSummary | None = None,
) -> RunResult:
) -> TurnResult:
turn = self.turn(
_normalize_run_input(input),
input,
approval_mode=approval_mode,
cwd=cwd,
effort=effort,
@@ -548,14 +547,14 @@ class Thread:
)
stream = turn.stream()
try:
return _collect_run_result(stream, turn_id=turn.id)
return _collect_turn_result(stream, turn_id=turn.id)
finally:
stream.close()
# BEGIN GENERATED: Thread.flat_methods
def turn(
self,
input: Input,
input: RunInput,
*,
approval_mode: ApprovalMode | None = None,
cwd: str | None = None,
@@ -567,7 +566,7 @@ class Thread:
service_tier: str | None = None,
summary: ReasoningSummary | None = None,
) -> TurnHandle:
wire_input = _to_wire_input(input)
wire_input = _to_wire_input(_normalize_run_input(input))
approval_policy, approvals_reviewer = _approval_mode_override_settings(approval_mode)
params = TurnStartParams(
thread_id=self.id,
@@ -616,9 +615,9 @@ class AsyncThread:
sandbox_policy: SandboxPolicy | None = None,
service_tier: str | None = None,
summary: ReasoningSummary | None = None,
) -> RunResult:
) -> TurnResult:
turn = await self.turn(
_normalize_run_input(input),
input,
approval_mode=approval_mode,
cwd=cwd,
effort=effort,
@@ -631,14 +630,14 @@ class AsyncThread:
)
stream = turn.stream()
try:
return await _collect_async_run_result(stream, turn_id=turn.id)
return await _collect_async_turn_result(stream, turn_id=turn.id)
finally:
await stream.aclose()
# BEGIN GENERATED: AsyncThread.flat_methods
async def turn(
self,
input: Input,
input: RunInput,
*,
approval_mode: ApprovalMode | None = None,
cwd: str | None = None,
@@ -651,7 +650,7 @@ class AsyncThread:
summary: ReasoningSummary | None = None,
) -> AsyncTurnHandle:
await self._codex._ensure_initialized()
wire_input = _to_wire_input(input)
wire_input = _to_wire_input(_normalize_run_input(input))
approval_policy, approvals_reviewer = _approval_mode_override_settings(approval_mode)
params = TurnStartParams(
thread_id=self.id,
@@ -695,8 +694,12 @@ class TurnHandle:
thread_id: str
id: str
def steer(self, input: Input) -> TurnSteerResponse:
return self._client.turn_steer(self.thread_id, self.id, _to_wire_input(input))
def steer(self, input: RunInput) -> TurnSteerResponse:
return self._client.turn_steer(
self.thread_id,
self.id,
_to_wire_input(_normalize_run_input(input)),
)
def interrupt(self) -> TurnInterruptResponse:
return self._client.turn_interrupt(self.thread_id, self.id)
@@ -717,21 +720,13 @@ class TurnHandle:
finally:
self._client.unregister_turn_notifications(self.id)
def run(self) -> AppServerTurn:
completed: TurnCompletedNotification | None = None
def run(self) -> TurnResult:
stream = self.stream()
try:
for event in stream:
payload = event.payload
if isinstance(payload, TurnCompletedNotification) and payload.turn.id == self.id:
completed = payload
return _collect_turn_result(stream, turn_id=self.id)
finally:
stream.close()
if completed is None:
raise RuntimeError("turn completed event not received")
return completed.turn
@dataclass(slots=True)
class AsyncTurnHandle:
@@ -739,12 +734,12 @@ class AsyncTurnHandle:
thread_id: str
id: str
async def steer(self, input: Input) -> TurnSteerResponse:
async def steer(self, input: RunInput) -> TurnSteerResponse:
await self._codex._ensure_initialized()
return await self._codex._client.turn_steer(
self.thread_id,
self.id,
_to_wire_input(input),
_to_wire_input(_normalize_run_input(input)),
)
async def interrupt(self) -> TurnInterruptResponse:
@@ -768,17 +763,9 @@ class AsyncTurnHandle:
finally:
self._codex._client.unregister_turn_notifications(self.id)
async def run(self) -> AppServerTurn:
completed: TurnCompletedNotification | None = None
async def run(self) -> TurnResult:
stream = self.stream()
try:
async for event in stream:
payload = event.payload
if isinstance(payload, TurnCompletedNotification) and payload.turn.id == self.id:
completed = payload
return await _collect_async_turn_result(stream, turn_id=self.id)
finally:
await stream.aclose()
if completed is None:
raise RuntimeError("turn completed event not received")
return completed.turn

View File

@@ -33,6 +33,7 @@ from .generated.v2_all import (
ThreadTokenUsageUpdatedNotification,
Turn,
TurnCompletedNotification,
TurnError,
TurnInterruptResponse,
TurnStatus,
TurnSteerResponse,
@@ -73,6 +74,7 @@ __all__ = [
"ThreadTokenUsageUpdatedNotification",
"Turn",
"TurnCompletedNotification",
"TurnError",
"TurnInterruptResponse",
"TurnStatus",
"TurnSteerResponse",

View File

@@ -111,7 +111,7 @@ def agent_message_texts(events: list[Notification]) -> list[str]:
def agent_message_texts_from_items(items: Iterable[Any]) -> list[str]:
"""Extract agent-message text from completed run result items."""
"""Extract agent-message text from completed turn result items."""
texts: list[str] = []
for item in items:
root = item.root

View File

@@ -157,7 +157,7 @@ def test_async_lifecycle_methods_round_trip(tmp_path) -> None:
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
run_result = await thread.run("materialize async thread")
turn_result = await thread.run("materialize async thread")
await thread.set_name("async lifecycle")
named = await thread.read()
resumed = await codex.thread_resume(thread.id)
@@ -166,14 +166,14 @@ def test_async_lifecycle_methods_round_trip(tmp_path) -> None:
unarchived = await codex.thread_unarchive(thread.id)
assert {
"run_final_response": run_result.final_response,
"turn_final_response": turn_result.final_response,
"named_thread": named.thread.name,
"resumed_id": resumed.id,
"forked_is_distinct": forked.id != thread.id,
"archive_response": archive_response.model_dump(by_alias=True, mode="json"),
"unarchived_id": unarchived.id,
} == {
"run_final_response": "async materialized",
"turn_final_response": "async materialized",
"named_thread": "async lifecycle",
"resumed_id": thread.id,
"forked_is_distinct": True,
@@ -253,19 +253,19 @@ def test_compact_rpc_hits_mock_responses(tmp_path) -> None:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
run_result = thread.run("create history")
turn_result = thread.run("create history")
compact_response = thread.compact()
requests = harness.responses.wait_for_requests(2)
assert {
"run_final_response": run_result.final_response,
"turn_final_response": turn_result.final_response,
"compact_response": compact_response.model_dump(
by_alias=True,
mode="json",
),
"request_kinds": [request_kind(request.path) for request in requests],
} == {
"run_final_response": "history",
"turn_final_response": "history",
"compact_response": {},
"request_kinds": ["responses", "responses"],
}

View File

@@ -145,8 +145,8 @@ def test_async_thread_run_uses_mock_responses(
asyncio.run(scenario())
def test_sync_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""RunResult should use the last unknown-phase agent message as final text."""
def test_sync_turn_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""TurnResult should use the last unknown-phase agent message as final text."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
@@ -171,8 +171,8 @@ def test_sync_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
}
def test_sync_run_result_preserves_empty_last_message(tmp_path) -> None:
"""RunResult should preserve an empty final agent message instead of skipping it."""
def test_sync_turn_result_preserves_empty_last_message(tmp_path) -> None:
"""TurnResult should preserve an empty final agent message instead of skipping it."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
@@ -197,8 +197,8 @@ def test_sync_run_result_preserves_empty_last_message(tmp_path) -> None:
}
def test_sync_run_result_does_not_promote_commentary_only_to_final(tmp_path) -> None:
"""RunResult final_response should stay unset when app-server marks only commentary."""
def test_sync_turn_result_does_not_promote_commentary_only_to_final(tmp_path) -> None:
"""TurnResult final_response should stay unset when app-server marks only commentary."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(
@@ -226,8 +226,8 @@ def test_sync_run_result_does_not_promote_commentary_only_to_final(tmp_path) ->
}
def test_async_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""Async RunResult should use the last unknown-phase agent message."""
def test_async_turn_result_uses_last_unknown_phase_message(tmp_path) -> None:
"""Async TurnResult should use the last unknown-phase agent message."""
async def scenario() -> None:
"""Run one async result-mapping case against a pinned app-server."""
@@ -263,10 +263,10 @@ def test_async_run_result_uses_last_unknown_phase_message(tmp_path) -> None:
asyncio.run(scenario())
def test_async_run_result_does_not_promote_commentary_only_to_final(
def test_async_turn_result_does_not_promote_commentary_only_to_final(
tmp_path,
) -> None:
"""Async RunResult final_response should stay unset for commentary-only output."""
"""Async TurnResult final_response should stay unset for commentary-only output."""
async def scenario() -> None:
"""Run one async commentary mapping case against a pinned app-server."""
@@ -318,7 +318,7 @@ def test_thread_run_raises_when_real_app_server_reports_failed_turn(tmp_path) ->
def test_final_answer_phase_survives_real_app_server_mapping(tmp_path) -> None:
"""RunResult should use the final-answer item emitted by app-server."""
"""TurnResult should use the final-answer item emitted by app-server."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_sse(
sse(

View File

@@ -5,12 +5,13 @@ import asyncio
from app_server_harness import AppServerHarness
from app_server_helpers import (
agent_message_texts,
agent_message_texts_from_items,
next_async_delta,
next_sync_delta,
streaming_response,
)
from openai_codex import AsyncCodex, Codex, TextInput
from openai_codex import AsyncCodex, Codex
from openai_codex.generated.v2_all import (
AgentMessageDeltaNotification,
TurnCompletedNotification,
@@ -25,8 +26,9 @@ def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
stream = thread.turn(TextInput("stream please")).stream()
stream = thread.turn("stream please").stream()
events = list(stream)
request = harness.responses.single_request()
assert {
"deltas": [
@@ -35,6 +37,7 @@ def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"request_user_texts": request.message_input_texts("user")[-1:],
"completed_statuses": [
event.payload.turn.status
for event in events
@@ -43,28 +46,31 @@ def test_sync_stream_routes_text_deltas_and_completion(tmp_path) -> None:
} == {
"deltas": ["he", "llo"],
"agent_messages": ["hello"],
"request_user_texts": ["stream please"],
"completed_statuses": [TurnStatus.completed],
}
def test_turn_run_returns_completed_turn(tmp_path) -> None:
"""TurnHandle.run should wait for the app-server completion notification."""
"""TurnHandle.run should collect output and completion metadata."""
with AppServerHarness(tmp_path) as harness:
harness.responses.enqueue_assistant_message("turn complete", response_id="turn-run-1")
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("complete this turn"))
turn = thread.turn("complete this turn")
completed = turn.run()
assert {
"turn_id": completed.id,
"status": completed.status,
"items": completed.items,
"agent_messages": agent_message_texts_from_items(completed.items),
"final_response": completed.final_response,
} == {
"turn_id": turn.id,
"status": TurnStatus.completed,
"items": [],
"agent_messages": ["turn complete"],
"final_response": "turn complete",
}
@@ -80,8 +86,9 @@ def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
async with AsyncCodex(config=harness.app_server_config()) as codex:
thread = await codex.thread_start()
turn = await thread.turn(TextInput("async stream please"))
turn = await thread.turn("async stream please")
events = [event async for event in turn.stream()]
request = harness.responses.single_request()
assert {
"deltas": [
@@ -90,6 +97,7 @@ def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
if isinstance(event.payload, AgentMessageDeltaNotification)
],
"agent_messages": agent_message_texts(events),
"request_user_texts": request.message_input_texts("user")[-1:],
"completed_statuses": [
event.payload.turn.status
for event in events
@@ -98,6 +106,7 @@ def test_async_stream_routes_text_deltas_and_completion(tmp_path) -> None:
} == {
"deltas": ["as", "ync"],
"agent_messages": ["async"],
"request_user_texts": ["async stream please"],
"completed_statuses": [TurnStatus.completed],
}
@@ -175,8 +184,8 @@ def test_interleaved_sync_turn_streams_route_by_turn_id(tmp_path) -> None:
with Codex(config=harness.app_server_config()) as codex:
first_thread = codex.thread_start()
second_thread = codex.thread_start()
first_turn = first_thread.turn(TextInput("first"))
second_turn = second_thread.turn(TextInput("second"))
first_turn = first_thread.turn("first")
second_turn = second_thread.turn("second")
first_stream = first_turn.stream()
second_stream = second_turn.stream()
@@ -228,8 +237,8 @@ def test_interleaved_async_turn_streams_route_by_turn_id(tmp_path) -> None:
async with AsyncCodex(config=harness.app_server_config()) as codex:
first_thread = await codex.thread_start()
second_thread = await codex.thread_start()
first_turn = await first_thread.turn(TextInput("async first"))
second_turn = await second_thread.turn(TextInput("async second"))
first_turn = await first_thread.turn("async first")
second_turn = await second_thread.turn("async second")
first_stream = first_turn.stream()
second_stream = second_turn.stream()

View File

@@ -3,7 +3,7 @@ from __future__ import annotations
from app_server_harness import AppServerHarness
from app_server_helpers import agent_message_texts, streaming_response
from openai_codex import Codex, TextInput
from openai_codex import Codex
from openai_codex.generated.v2_all import TurnStatus
@@ -21,9 +21,9 @@ def test_turn_steer_adds_follow_up_input(tmp_path) -> None:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
turn = thread.turn(TextInput("Start a steerable turn."))
turn = thread.turn("Start a steerable turn.")
harness.responses.wait_for_requests(1)
steer = turn.steer(TextInput("Use this steering input."))
steer = turn.steer("Use this steering input.")
events = list(turn.stream())
requests = harness.responses.wait_for_requests(2)
@@ -61,7 +61,7 @@ def test_turn_interrupt_stops_active_turn_and_follow_up_runs(tmp_path) -> None:
with Codex(config=harness.app_server_config()) as codex:
thread = codex.thread_start()
interrupted_turn = thread.turn(TextInput("Start a long turn."))
interrupted_turn = thread.turn("Start a long turn.")
harness.responses.wait_for_requests(1)
interrupt_response = interrupted_turn.interrupt()
completed = interrupted_turn.run()

View File

@@ -14,9 +14,11 @@ from openai_codex import (
AppServerConfig,
AsyncCodex,
AsyncThread,
AsyncTurnHandle,
Codex,
RunResult,
Thread,
TurnHandle,
TurnResult,
)
from openai_codex._initialize_metadata import validate_initialize_metadata
from openai_codex.types import InitializeResponse
@@ -35,9 +37,10 @@ EXPECTED_ROOT_EXPORTS = [
"AsyncThread",
"TurnHandle",
"AsyncTurnHandle",
"RunResult",
"TurnResult",
"Input",
"InputItem",
"RunInput",
"TextInput",
"ImageInput",
"LocalImageInput",
@@ -92,6 +95,7 @@ EXPECTED_TYPES_EXPORTS = [
"ThreadTokenUsageUpdatedNotification",
"Turn",
"TurnCompletedNotification",
"TurnError",
"TurnInterruptResponse",
"TurnStatus",
"TurnSteerResponse",
@@ -128,9 +132,55 @@ def test_root_exports_app_server_config() -> None:
assert AppServerConfig.__name__ == "AppServerConfig"
def test_root_exports_run_result() -> None:
"""The root package should expose the common-case run result wrapper."""
assert RunResult.__name__ == "RunResult"
def test_root_exports_turn_result() -> None:
"""The root package should expose the collected turn result wrapper."""
assert {
"name": TurnResult.__name__,
"fields": list(TurnResult.__dataclass_fields__),
} == {
"name": "TurnResult",
"fields": [
"id",
"status",
"error",
"started_at",
"completed_at",
"duration_ms",
"final_response",
"items",
"usage",
],
}
def test_turn_run_methods_return_turn_result() -> None:
"""Both convenience and handle-based run APIs return the same result shape."""
funcs = [
Thread.run,
TurnHandle.run,
AsyncThread.run,
AsyncTurnHandle.run,
]
assert {fn: inspect.signature(fn).return_annotation for fn in funcs} == dict.fromkeys(
funcs, "TurnResult"
)
def test_turn_input_methods_accept_string_shortcut() -> None:
"""Every public turn-input method should accept strings and typed inputs."""
funcs = [
Thread.run,
Thread.turn,
AsyncThread.run,
AsyncThread.turn,
TurnHandle.steer,
AsyncTurnHandle.steer,
]
assert {fn: inspect.signature(fn).parameters["input"].annotation for fn in funcs} == (
dict.fromkeys(funcs, "RunInput")
)
def test_root_exports_approval_mode() -> None:

View File

@@ -135,7 +135,7 @@ def _run_python(
)
def _runtime_compatibility_hint(
def _runtime_schema_hint(
runtime_env: PreparedRuntimeEnv,
*,
stdout: str,
@@ -144,7 +144,7 @@ def _runtime_compatibility_hint(
combined = f"{stdout}\n{stderr}"
if "ThreadStartResponse" in combined and "approvalsReviewer" in combined:
return (
"\nCompatibility hint:\n"
"\nSchema hint:\n"
f"Pinned runtime {runtime_env.runtime_version} returned a thread/start payload "
"that is older than the current SDK schema and is missing "
"`approvalsReviewer`. Bump `sdk/python/_runtime_setup.py` to a matching "
@@ -165,7 +165,7 @@ def _run_json_python(
"Python snippet failed.\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
f"{_runtime_schema_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
return json.loads(result.stdout)
@@ -234,25 +234,20 @@ def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> No
textwrap.dedent(
"""
import json
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = thread.turn(TextInput("hello")).run()
persisted = thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
result = thread.turn("hello").run()
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
"items_count": len(result.items),
"final_response_is_text": isinstance(result.final_response, str) and bool(result.final_response.strip()),
}))
"""
),
@@ -261,8 +256,8 @@ def test_real_thread_and_turn_start_smoke(runtime_env: PreparedRuntimeEnv) -> No
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
assert data["items_count"] > 0
assert data["final_response_is_text"] is True
def test_real_thread_run_convenience_smoke(runtime_env: PreparedRuntimeEnv) -> None:
@@ -336,7 +331,7 @@ def test_real_async_thread_turn_usage_and_ids_smoke(
"""
import asyncio
import json
from openai_codex import AsyncCodex, TextInput
from openai_codex import AsyncCodex
async def main():
async with AsyncCodex() as codex:
@@ -344,18 +339,13 @@ def test_real_async_thread_turn_usage_and_ids_smoke(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
result = await (await thread.turn(TextInput("say ok"))).run()
persisted = await thread.read(include_turns=True)
persisted_turn = next(
(turn for turn in persisted.thread.turns or [] if turn.id == result.id),
None,
)
result = await (await thread.turn("say ok")).run()
print(json.dumps({
"thread_id": thread.id,
"turn_id": result.id,
"status": result.status.value,
"items_count": len(result.items or []),
"persisted_items_count": 0 if persisted_turn is None else len(persisted_turn.items or []),
"items_count": len(result.items),
"final_response_is_text": isinstance(result.final_response, str) and bool(result.final_response.strip()),
}))
asyncio.run(main())
@@ -366,8 +356,8 @@ def test_real_async_thread_turn_usage_and_ids_smoke(
assert isinstance(data["thread_id"], str) and data["thread_id"].strip()
assert isinstance(data["turn_id"], str) and data["turn_id"].strip()
assert data["status"] == "completed"
assert isinstance(data["items_count"], int)
assert isinstance(data["persisted_items_count"], int)
assert data["items_count"] > 0
assert data["final_response_is_text"] is True
def test_real_async_thread_run_convenience_smoke(
@@ -468,14 +458,14 @@ def test_real_streaming_smoke_turn_completed(runtime_env: PreparedRuntimeEnv) ->
textwrap.dedent(
"""
import json
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Reply with one short sentence."))
turn = thread.turn("Reply with one short sentence.")
saw_delta = False
saw_completed = False
for event in turn.stream():
@@ -501,16 +491,16 @@ def test_real_turn_interrupt_smoke(runtime_env: PreparedRuntimeEnv) -> None:
textwrap.dedent(
"""
import json
from openai_codex import Codex, TextInput
from openai_codex import Codex
with Codex() as codex:
thread = codex.thread_start(
model="gpt-5.4",
config={"model_reasoning_effort": "high"},
)
turn = thread.turn(TextInput("Count from 1 to 200 with commas."))
turn = thread.turn("Count from 1 to 200 with commas.")
turn.interrupt()
follow_up = thread.turn(TextInput("Say 'ok' only.")).run()
follow_up = thread.turn("Say 'ok' only.").run()
print(json.dumps({"status": follow_up.status.value}))
"""
),
@@ -531,7 +521,7 @@ def test_real_examples_run_and_assert(
f"Example failed: {folder}/{script}\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}"
f"{_runtime_compatibility_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
f"{_runtime_schema_hint(runtime_env, stdout=result.stdout, stderr=result.stderr)}"
)
out = result.stdout
@@ -541,7 +531,7 @@ def test_real_examples_run_and_assert(
assert "Server: unknown" not in out
elif folder == "02_turn_run":
assert "thread_id:" in out and "turn_id:" in out and "status:" in out
assert "persisted.items.count:" in out
assert "items.count:" in out
elif folder == "03_turn_stream_events":
assert "stream.completed:" in out
assert "assistant>" in out