Compare commits

...

3 Commits

Author SHA1 Message Date
Ahmed Ibrahim
433dd643cb Move macOS ObjC link arg to Cargo config
Co-authored-by: Codex <noreply@openai.com>
2026-03-31 23:12:43 -07:00
Ahmed Ibrahim
61709ad6b3 Use WebRTC for both realtime protocol versions
Co-authored-by: Codex <noreply@openai.com>
2026-03-31 22:44:10 -07:00
Ahmed Ibrahim
3c6a3a0fad Add realtime WebRTC external audio bridge
Co-authored-by: Codex <noreply@openai.com>
2026-03-31 22:27:22 -07:00
14 changed files with 1326 additions and 73 deletions

View File

@@ -9,3 +9,6 @@ rustflags = ["-C", "link-arg=/STACK:8388608", "-C", "link-arg=/arm64hazardfree"]
[target.'cfg(all(windows, target_env = "gnu"))']
rustflags = ["-C", "link-arg=-Wl,--stack,8388608"]
[target.'cfg(target_os = "macos")']
rustflags = ["-C", "link-arg=-ObjC"]

515
codex-rs/Cargo.lock generated
View File

@@ -1051,6 +1051,16 @@ dependencies = [
"bytes",
]
[[package]]
name = "bzip2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8"
dependencies = [
"bzip2-sys",
"libc",
]
[[package]]
name = "bzip2"
version = "0.5.2"
@@ -1164,6 +1174,16 @@ dependencies = [
"nom 7.1.3",
]
[[package]]
name = "cfg-expr"
version = "0.20.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6b04e07d8080154ed4ac03546d9a2b303cc2fe1901ba0b35b301516e289368"
dependencies = [
"smallvec",
"target-lexicon",
]
[[package]]
name = "cfg-if"
version = "1.0.4"
@@ -1297,7 +1317,7 @@ version = "4.5.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -1333,6 +1353,17 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9b18233253483ce2f65329a24072ec414db782531bdbb7d0bbc4bd2ce6b7e21"
[[package]]
name = "codespan-reporting"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af491d569909a7e4dee0ad7db7f5341fef5c614d5b8ec8cf765732aba3cff681"
dependencies = [
"serde",
"termcolor",
"unicode-width 0.2.1",
]
[[package]]
name = "codex-analytics"
version = "0.0.0"
@@ -1367,6 +1398,7 @@ dependencies = [
"anyhow",
"assert_matches",
"async-trait",
"base64 0.22.1",
"bytes",
"codex-client",
"codex-protocol",
@@ -1374,6 +1406,7 @@ dependencies = [
"eventsource-stream",
"futures",
"http 1.4.0",
"libwebrtc",
"pretty_assertions",
"regex-lite",
"reqwest",
@@ -1425,7 +1458,7 @@ dependencies = [
"codex-utils-cli",
"codex-utils-json-to-toml",
"codex-utils-pty",
"constant_time_eq",
"constant_time_eq 0.3.1",
"core_test_support",
"futures",
"hmac",
@@ -1684,7 +1717,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"zstd",
"zstd 0.13.3",
]
[[package]]
@@ -1922,8 +1955,8 @@ dependencies = [
"wildmatch",
"windows-sys 0.52.0",
"wiremock",
"zip",
"zstd",
"zip 2.4.2",
"zstd 0.13.3",
]
[[package]]
@@ -1952,7 +1985,7 @@ dependencies = [
"tokio",
"toml 0.9.11+spec-1.1.0",
"tracing",
"zip",
"zip 2.4.2",
]
[[package]]
@@ -3070,6 +3103,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "constant_time_eq"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
[[package]]
name = "constant_time_eq"
version = "0.3.1"
@@ -3170,7 +3209,7 @@ dependencies = [
"tracing-subscriber",
"walkdir",
"wiremock",
"zstd",
"zstd 0.13.3",
]
[[package]]
@@ -3412,6 +3451,68 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "cxx"
version = "1.0.194"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "747d8437319e3a2f43d93b341c137927ca70c0f5dabeea7a005a73665e247c7e"
dependencies = [
"cc",
"cxx-build",
"cxxbridge-cmd",
"cxxbridge-flags",
"cxxbridge-macro",
"foldhash 0.2.0",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.194"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0f4697d190a142477b16aef7da8a99bfdc41e7e8b1687583c0d23a79c7afc1e"
dependencies = [
"cc",
"codespan-reporting",
"indexmap 2.13.0",
"proc-macro2",
"quote",
"scratch",
"syn 2.0.114",
]
[[package]]
name = "cxxbridge-cmd"
version = "1.0.194"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0956799fa8678d4c50eed028f2de1c0552ae183c76e976cf7ca8c4e36a7c328"
dependencies = [
"clap",
"codespan-reporting",
"indexmap 2.13.0",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.194"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23384a836ab4f0ad98ace7e3955ad2de39de42378ab487dc28d3990392cb283a"
[[package]]
name = "cxxbridge-macro"
version = "1.0.194"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6acc6b5822b9526adfb4fc377b67128fdd60aac757cc4a741a6278603f763cf"
dependencies = [
"indexmap 2.13.0",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "darling"
version = "0.20.11"
@@ -3964,7 +4065,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -4355,6 +4456,16 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fs2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
@@ -4589,6 +4700,63 @@ version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
[[package]]
name = "gio-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0071fe88dba8e40086c8ff9bbb62622999f49628344b1d1bf490a48a29d80f22"
dependencies = [
"glib-sys",
"gobject-sys",
"libc",
"system-deps",
"windows-sys 0.61.2",
]
[[package]]
name = "glib"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16de123c2e6c90ce3b573b7330de19be649080ec612033d397d72da265f1bd8b"
dependencies = [
"bitflags 2.10.0",
"futures-channel",
"futures-core",
"futures-executor",
"futures-task",
"futures-util",
"gio-sys",
"glib-macros",
"glib-sys",
"gobject-sys",
"libc",
"memchr",
"smallvec",
]
[[package]]
name = "glib-macros"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf59b675301228a696fe01c3073974643365080a76cc3ed5bc2cbc466ad87f17"
dependencies = [
"heck 0.5.0",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "glib-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d95e1a3a19ae464a7286e14af9a90683c64d70c02532d88d87ce95056af3e6c"
dependencies = [
"libc",
"system-deps",
]
[[package]]
name = "glob"
version = "0.3.3"
@@ -4608,6 +4776,17 @@ dependencies = [
"regex-syntax 0.8.8",
]
[[package]]
name = "gobject-sys"
version = "0.21.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dca35da0d19a18f4575f3cb99fe1c9e029a2941af5662f326f738a21edaf294"
dependencies = [
"glib-sys",
"libc",
"system-deps",
]
[[package]]
name = "gzip-header"
version = "1.0.0"
@@ -4718,6 +4897,12 @@ dependencies = [
"http 1.4.0",
]
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heck"
version = "0.5.0"
@@ -5500,6 +5685,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
@@ -5759,6 +5953,31 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libwebrtc"
version = "0.3.26"
source = "git+https://github.com/juberti-oai/rust-sdks.git?rev=e2d1d1d230c6fc9df171ccb181423f957bb3c1f0#e2d1d1d230c6fc9df171ccb181423f957bb3c1f0"
dependencies = [
"cxx",
"glib",
"jni",
"js-sys",
"lazy_static",
"livekit-protocol",
"livekit-runtime",
"log",
"parking_lot",
"rtrb",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webrtc-sys",
]
[[package]]
name = "libz-sys"
version = "1.1.23"
@@ -5770,6 +5989,15 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f78c730aaa7d0b9336a299029ea49f9ee53b0ed06e9202e8cb7db9bae7b8c82"
dependencies = [
"cc",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
@@ -5804,6 +6032,31 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
name = "livekit-protocol"
version = "0.7.1"
source = "git+https://github.com/juberti-oai/rust-sdks.git?rev=e2d1d1d230c6fc9df171ccb181423f957bb3c1f0#e2d1d1d230c6fc9df171ccb181423f957bb3c1f0"
dependencies = [
"futures-util",
"livekit-runtime",
"parking_lot",
"pbjson",
"pbjson-types",
"prost 0.12.6",
"serde",
"thiserror 1.0.69",
"tokio",
]
[[package]]
name = "livekit-runtime"
version = "0.4.0"
source = "git+https://github.com/juberti-oai/rust-sdks.git?rev=e2d1d1d230c6fc9df171ccb181423f957bb3c1f0#e2d1d1d230c6fc9df171ccb181423f957bb3c1f0"
dependencies = [
"tokio",
"tokio-stream",
]
[[package]]
name = "local-waker"
version = "0.1.4"
@@ -6810,7 +7063,7 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto",
"opentelemetry_sdk",
"prost",
"prost 0.14.3",
"reqwest",
"serde_json",
"thiserror 2.0.18",
@@ -6829,7 +7082,7 @@ dependencies = [
"const-hex",
"opentelemetry",
"opentelemetry_sdk",
"prost",
"prost 0.14.3",
"serde",
"serde_json",
"tonic",
@@ -6940,6 +7193,17 @@ dependencies = [
"windows-link",
]
[[package]]
name = "password-hash"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7676374caaee8a325c9e7a2ae557f216c5563a171d6997b0ef8a65af35147700"
dependencies = [
"base64ct",
"rand_core 0.6.4",
"subtle",
]
[[package]]
name = "paste"
version = "1.0.15"
@@ -6976,6 +7240,55 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3"
[[package]]
name = "pbjson"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1030c719b0ec2a2d25a5df729d6cff1acf3cc230bf766f4f97833591f7577b90"
dependencies = [
"base64 0.21.7",
"serde",
]
[[package]]
name = "pbjson-build"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2580e33f2292d34be285c5bc3dba5259542b083cfad6037b6d70345f24dcb735"
dependencies = [
"heck 0.4.1",
"itertools 0.11.0",
"prost 0.12.6",
"prost-types",
]
[[package]]
name = "pbjson-types"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18f596653ba4ac51bdecbb4ef6773bc7f56042dc13927910de1684ad3d32aa12"
dependencies = [
"bytes",
"chrono",
"pbjson",
"pbjson-build",
"prost 0.12.6",
"prost-build",
"serde",
]
[[package]]
name = "pbkdf2"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83a0692ec44e4cf1ef28ca317f14f8f07da2d95ec3fa01f86e4467b725e60917"
dependencies = [
"digest",
"hmac",
"password-hash",
"sha2",
]
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -7349,6 +7662,16 @@ dependencies = [
"unarray",
]
[[package]]
name = "prost"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29"
dependencies = [
"bytes",
"prost-derive 0.12.6",
]
[[package]]
name = "prost"
version = "0.14.3"
@@ -7356,7 +7679,41 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.14.3",
]
[[package]]
name = "prost-build"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
"heck 0.5.0",
"itertools 0.10.5",
"log",
"multimap",
"once_cell",
"petgraph 0.6.5",
"prettyplease",
"prost 0.12.6",
"prost-types",
"regex",
"syn 2.0.114",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
@@ -7372,6 +7729,15 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "prost-types"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0"
dependencies = [
"prost 0.12.6",
]
[[package]]
name = "psl"
version = "2.1.184"
@@ -8086,6 +8452,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
@@ -8211,6 +8578,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rtrb"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7204ed6420f698836b76d4d5c2ec5dec7585fd5c3a788fd1cde855d1de598239"
[[package]]
name = "runfiles"
version = "0.1.0"
@@ -8558,13 +8931,19 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "scratch"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d68f2ec51b097e4c1a75b681a8bec621909b5e91f15bb7b840c4f2f7b01148b2"
[[package]]
name = "scrypt"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f"
dependencies = [
"pbkdf2",
"pbkdf2 0.12.2",
"salsa20",
"sha2",
]
@@ -9261,7 +9640,7 @@ checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b"
dependencies = [
"dotenvy",
"either",
"heck",
"heck 0.5.0",
"hex",
"once_cell",
"proc-macro2",
@@ -9588,7 +9967,7 @@ version = "0.26.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"rustversion",
@@ -9601,7 +9980,7 @@ version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -9613,7 +9992,7 @@ version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab85eea0270ee17587ed4156089e10b9e6880ee688791d45a905f5b1ca36f664"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -9737,12 +10116,31 @@ dependencies = [
"libc",
]
[[package]]
name = "system-deps"
version = "7.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c8f33736f986f16d69b6cb8b03f55ddcad5c41acc4ccc39dd88e84aa805e7f"
dependencies = [
"cfg-expr",
"heck 0.5.0",
"pkg-config",
"toml 0.9.11+spec-1.1.0",
"version-compare",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "target-lexicon"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df7f62577c25e07834649fc3b39fafdc597c0a3527dc1c60129201ccfcbaa50c"
[[package]]
name = "tempfile"
version = "3.24.0"
@@ -10277,7 +10675,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6c55a2d6a14174563de34409c9f92ff981d006f56da9c6ecd40d9d4a31500b0"
dependencies = [
"bytes",
"prost",
"prost 0.14.3",
"tonic",
]
@@ -10836,6 +11234,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version-compare"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e"
[[package]]
name = "version_check"
version = "0.9.5"
@@ -11117,6 +11521,34 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "webrtc-sys"
version = "0.3.24"
source = "git+https://github.com/juberti-oai/rust-sdks.git?rev=e2d1d1d230c6fc9df171ccb181423f957bb3c1f0#e2d1d1d230c6fc9df171ccb181423f957bb3c1f0"
dependencies = [
"cc",
"cxx",
"cxx-build",
"glob",
"log",
"pkg-config",
"webrtc-sys-build",
]
[[package]]
name = "webrtc-sys-build"
version = "0.3.13"
source = "git+https://github.com/juberti-oai/rust-sdks.git?rev=e2d1d1d230c6fc9df171ccb181423f957bb3c1f0#e2d1d1d230c6fc9df171ccb181423f957bb3c1f0"
dependencies = [
"anyhow",
"fs2",
"regex",
"reqwest",
"scratch",
"semver",
"zip 0.6.6",
]
[[package]]
name = "weezl"
version = "0.1.12"
@@ -12092,6 +12524,26 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "zip"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261"
dependencies = [
"aes",
"byteorder",
"bzip2 0.4.4",
"constant_time_eq 0.1.5",
"crc32fast",
"crossbeam-utils",
"flate2",
"hmac",
"pbkdf2 0.11.0",
"sha1",
"time",
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]
name = "zip"
version = "2.4.2"
@@ -12100,8 +12552,8 @@ checksum = "fabe6324e908f85a1c52063ce7aa26b68dcb7eb6dbc83a2d148403c9bc3eba50"
dependencies = [
"aes",
"arbitrary",
"bzip2",
"constant_time_eq",
"bzip2 0.5.2",
"constant_time_eq 0.3.1",
"crc32fast",
"crossbeam-utils",
"deflate64",
@@ -12112,14 +12564,14 @@ dependencies = [
"indexmap 2.13.0",
"lzma-rs",
"memchr",
"pbkdf2",
"pbkdf2 0.12.2",
"sha1",
"thiserror 2.0.18",
"time",
"xz2",
"zeroize",
"zopfli",
"zstd",
"zstd 0.13.3",
]
[[package]]
@@ -12153,13 +12605,32 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4"
dependencies = [
"zstd-safe 5.0.2+zstd.1.5.2",
]
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
"zstd-safe 7.2.4",
]
[[package]]
name = "zstd-safe"
version = "5.0.2+zstd.1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]

View File

@@ -415,6 +415,8 @@ crossterm = { git = "https://github.com/nornagon/crossterm", branch = "nornagon/
ratatui = { git = "https://github.com/nornagon/ratatui", branch = "nornagon-v0.29.0-patch" }
tokio-tungstenite = { git = "https://github.com/openai-oss-forks/tokio-tungstenite", rev = "132f5b39c862e3a970f731d709608b3e6276d5f6" }
tungstenite = { git = "https://github.com/openai-oss-forks/tungstenite-rs", rev = "9200079d3b54a1ff51072e24d81fd354f085156f" }
libwebrtc = { git = "https://github.com/juberti-oai/rust-sdks.git", rev = "e2d1d1d230c6fc9df171ccb181423f957bb3c1f0" }
webrtc-sys = { git = "https://github.com/juberti-oai/rust-sdks.git", rev = "e2d1d1d230c6fc9df171ccb181423f957bb3c1f0" }
# Uncomment to debug local changes.
# rmcp = { path = "../../rust-sdk/crates/rmcp" }

View File

@@ -6,12 +6,15 @@ license.workspace = true
[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
codex-client = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
libwebrtc = "0.3.26"
reqwest = { workspace = true, features = ["json", "multipart"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
@@ -30,7 +33,6 @@ assert_matches = { workspace = true }
pretty_assertions = { workspace = true }
tokio-test = { workspace = true }
wiremock = { workspace = true }
reqwest = { workspace = true }
[lints]
workspace = true

View File

@@ -1,6 +1,7 @@
pub mod compact;
pub mod memories;
pub mod models;
pub mod realtime_webrtc;
pub mod realtime_websocket;
pub mod responses;
pub mod responses_websocket;

View File

@@ -0,0 +1,603 @@
use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode;
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
use crate::endpoint::realtime_websocket::protocol::parse_realtime_event;
use crate::error::ApiError;
use crate::provider::Provider;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use futures::StreamExt;
use http::HeaderMap;
use libwebrtc::MediaType;
use libwebrtc::audio_frame::AudioFrame;
use libwebrtc::audio_source::AudioSourceOptions;
use libwebrtc::audio_source::native::NativeAudioSource;
use libwebrtc::audio_stream::native::NativeAudioStream;
use libwebrtc::audio_stream::native::NativeAudioStreamOptions;
use libwebrtc::data_channel::DataChannel;
use libwebrtc::data_channel::DataChannelInit;
use libwebrtc::data_channel::DataChannelState;
use libwebrtc::media_stream_track::MediaStreamTrack;
use libwebrtc::peer_connection::OfferOptions;
use libwebrtc::peer_connection::PeerConnection;
use libwebrtc::peer_connection_factory::PeerConnectionFactory;
use libwebrtc::peer_connection_factory::RtcConfiguration;
use libwebrtc::peer_connection_factory::native::PeerConnectionFactoryExt;
use libwebrtc::rtp_transceiver::RtpTransceiverDirection;
use libwebrtc::rtp_transceiver::RtpTransceiverInit;
use libwebrtc::session_description::SdpType;
use libwebrtc::session_description::SessionDescription;
use reqwest::Client;
use reqwest::multipart::Form;
use serde::Serialize;
use serde_json::Value;
use serde_json::json;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::Duration;
use tracing::debug;
use tracing::info;
use tracing::warn;
use url::Url;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
const REALTIME_CALLS_PATH: &str = "/v1/realtime/calls";
const REALTIME_DATA_CHANNEL_LABEL: &str = "oai-events";
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_AUDIO_NUM_CHANNELS: u32 = 1;
// Keep one extra 60 ms queue on the native source and roughly 100 ms on the
// sink side so WebRTC can absorb small scheduling spikes without drifting.
const REALTIME_AUDIO_SOURCE_QUEUE_SIZE_MS: u32 = 60;
const REALTIME_AUDIO_STREAM_QUEUE_SIZE_FRAMES: usize = 10;
pub struct RealtimeWebrtcConnection {
writer: RealtimeWebrtcWriter,
events: RealtimeWebrtcEvents,
}
#[derive(Clone)]
pub struct RealtimeWebrtcWriter {
peer_connection: PeerConnection,
data_channel: DataChannel,
local_audio_source: NativeAudioSource,
is_closed: Arc<AtomicBool>,
event_parser: RealtimeEventParser,
}
#[derive(Clone)]
pub struct RealtimeWebrtcEvents {
rx_event: Arc<Mutex<mpsc::UnboundedReceiver<RealtimeEvent>>>,
active_transcript: Arc<Mutex<ActiveTranscriptState>>,
is_closed: Arc<AtomicBool>,
event_parser: RealtimeEventParser,
}
#[derive(Default)]
struct ActiveTranscriptState {
entries: Vec<RealtimeTranscriptEntry>,
}
pub struct RealtimeWebrtcClient {
provider: Provider,
}
impl RealtimeWebrtcConnection {
pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> {
self.writer.send_audio_frame(frame).await
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
self.writer.send_conversation_item_create(text).await
}
pub async fn send_conversation_handoff_append(
&self,
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.writer
.send_conversation_handoff_append(handoff_id, output_text)
.await
}
pub async fn send_response_create(&self) -> Result<(), ApiError> {
self.writer.send_response_create().await
}
pub async fn close(&self) -> Result<(), ApiError> {
self.writer.close().await
}
pub async fn next_event(&self) -> Result<Option<RealtimeEvent>, ApiError> {
self.events.next_event().await
}
pub fn writer(&self) -> RealtimeWebrtcWriter {
self.writer.clone()
}
pub fn events(&self) -> RealtimeWebrtcEvents {
self.events.clone()
}
fn new(
peer_connection: PeerConnection,
data_channel: DataChannel,
local_audio_source: NativeAudioSource,
rx_event: mpsc::UnboundedReceiver<RealtimeEvent>,
is_closed: Arc<AtomicBool>,
event_parser: RealtimeEventParser,
) -> Self {
Self {
writer: RealtimeWebrtcWriter {
peer_connection,
data_channel,
local_audio_source,
is_closed: Arc::clone(&is_closed),
event_parser,
},
events: RealtimeWebrtcEvents {
rx_event: Arc::new(Mutex::new(rx_event)),
active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())),
is_closed,
event_parser,
},
}
}
}
impl RealtimeWebrtcWriter {
pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> {
if self.is_closed.load(Ordering::SeqCst) {
return Err(ApiError::Stream(
"realtime WebRTC connection is closed".to_string(),
));
}
if frame.sample_rate != self.local_audio_source.sample_rate() {
return Err(ApiError::Stream(format!(
"unexpected realtime audio sample rate: got {got}, expected {expected}",
got = frame.sample_rate,
expected = self.local_audio_source.sample_rate()
)));
}
if u32::from(frame.num_channels) != self.local_audio_source.num_channels() {
return Err(ApiError::Stream(format!(
"unexpected realtime audio channel count: got {got}, expected {expected}",
got = frame.num_channels,
expected = self.local_audio_source.num_channels()
)));
}
let decoded = BASE64_STANDARD.decode(&frame.data).map_err(|err| {
ApiError::Stream(format!("failed to decode realtime audio frame: {err}"))
})?;
if decoded.len() % 2 != 0 {
return Err(ApiError::Stream(
"realtime audio frame has an odd byte length".to_string(),
));
}
let samples = decoded
.chunks_exact(2)
.map(|sample| i16::from_le_bytes([sample[0], sample[1]]))
.collect::<Vec<_>>();
let num_channels = self.local_audio_source.num_channels();
let samples_per_channel = samples
.len()
.checked_div(num_channels as usize)
.and_then(|samples_per_channel| u32::try_from(samples_per_channel).ok())
.ok_or_else(|| {
ApiError::Stream("failed to calculate realtime audio frame size".to_string())
})?;
let audio_frame = AudioFrame {
data: Cow::Owned(samples),
sample_rate: self.local_audio_source.sample_rate(),
num_channels,
samples_per_channel,
};
self.local_audio_source
.capture_frame(&audio_frame)
.await
.map_err(|err| {
ApiError::Stream(format!("failed to capture realtime audio frame: {err}"))
})
}
pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> {
self.send_json(&conversation_item_create_message(self.event_parser, text))
.await
}
pub async fn send_conversation_handoff_append(
&self,
handoff_id: String,
output_text: String,
) -> Result<(), ApiError> {
self.send_json(&conversation_handoff_append_message(
self.event_parser,
handoff_id,
output_text,
))
.await
}
pub async fn send_response_create(&self) -> Result<(), ApiError> {
self.send_json(&RealtimeOutboundMessage::ResponseCreate)
.await
}
pub async fn close(&self) -> Result<(), ApiError> {
if self.is_closed.swap(true, Ordering::SeqCst) {
return Ok(());
}
self.local_audio_source.clear_buffer();
self.data_channel.close();
self.peer_connection.close();
Ok(())
}
async fn send_json(&self, message: &impl Serialize) -> Result<(), ApiError> {
let payload = serde_json::to_string(message)
.map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?;
debug!(payload = %payload, "realtime WebRTC data channel request");
self.send_payload(payload).await
}
pub async fn send_payload(&self, payload: String) -> Result<(), ApiError> {
if self.is_closed.load(Ordering::SeqCst) {
return Err(ApiError::Stream(
"realtime WebRTC connection is closed".to_string(),
));
}
self.data_channel
.send(payload.as_bytes(), false)
.map_err(|err| {
ApiError::Stream(format!("failed to send realtime data channel event: {err}"))
})
}
}
impl RealtimeWebrtcEvents {
pub async fn next_event(&self) -> Result<Option<RealtimeEvent>, ApiError> {
if self.is_closed.load(Ordering::SeqCst) {
return Ok(None);
}
match self.rx_event.lock().await.recv().await {
Some(mut event) => {
self.update_active_transcript(&mut event).await;
Ok(Some(event))
}
None => {
self.is_closed.store(true, Ordering::SeqCst);
Ok(None)
}
}
}
async fn update_active_transcript(&self, event: &mut RealtimeEvent) {
let mut active_transcript = self.active_transcript.lock().await;
match event {
RealtimeEvent::InputAudioSpeechStarted(_) => {}
RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "user", delta);
}
RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta }) => {
append_transcript_delta(&mut active_transcript.entries, "assistant", delta);
}
RealtimeEvent::HandoffRequested(handoff) => {
if self.event_parser == RealtimeEventParser::V1 {
handoff.active_transcript = std::mem::take(&mut active_transcript.entries);
}
}
RealtimeEvent::SessionUpdated { .. }
| RealtimeEvent::AudioOut(_)
| RealtimeEvent::ResponseCancelled(_)
| RealtimeEvent::ConversationItemAdded(_)
| RealtimeEvent::ConversationItemDone { .. }
| RealtimeEvent::Error(_) => {}
}
}
}
impl RealtimeWebrtcClient {
pub fn new(provider: Provider) -> Self {
Self { provider }
}
pub async fn connect(
&self,
config: RealtimeSessionConfig,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> Result<RealtimeWebrtcConnection, ApiError> {
info!("initializing realtime WebRTC peer connection");
let factory = PeerConnectionFactory::default();
let peer_connection = factory
.create_peer_connection(RtcConfiguration::default())
.map_err(|err| {
ApiError::Stream(format!("failed to create WebRTC peer connection: {err}"))
})?;
let local_audio_source = NativeAudioSource::new(
AudioSourceOptions::default(),
REALTIME_AUDIO_SAMPLE_RATE,
REALTIME_AUDIO_NUM_CHANNELS,
REALTIME_AUDIO_SOURCE_QUEUE_SIZE_MS,
);
let local_audio_track =
factory.create_audio_track("realtime-mic", local_audio_source.clone());
let audio_transceiver = peer_connection
.add_transceiver_for_media(
MediaType::Audio,
RtpTransceiverInit {
direction: RtpTransceiverDirection::SendRecv,
stream_ids: vec!["realtime".to_string()],
send_encodings: Vec::new(),
},
)
.map_err(|err| ApiError::Stream(format!("failed to add audio transceiver: {err}")))?;
audio_transceiver
.sender()
.set_track(Some(local_audio_track.into()))
.map_err(|err| {
ApiError::Stream(format!("failed to attach realtime audio track: {err}"))
})?;
let data_channel = peer_connection
.create_data_channel(REALTIME_DATA_CHANNEL_LABEL, DataChannelInit::default())
.map_err(|err| {
ApiError::Stream(format!("failed to create realtime data channel: {err}"))
})?;
let (tx_event, rx_event) = mpsc::unbounded_channel();
let is_closed = Arc::new(AtomicBool::new(false));
let (tx_open, rx_open) = oneshot::channel::<()>();
let tx_open = Arc::new(StdMutex::new(Some(tx_open)));
{
let tx_event = tx_event.clone();
let event_parser = config.event_parser;
data_channel.on_message(Some(Box::new(move |buffer| {
if buffer.binary {
debug!(
payload_len = buffer.data.len(),
"ignoring binary realtime data channel message"
);
return;
}
let payload = match std::str::from_utf8(buffer.data) {
Ok(payload) => payload,
Err(err) => {
debug!("received non-utf8 realtime data channel message: {err}");
return;
}
};
if let Some(event) = parse_realtime_event(payload, event_parser)
&& tx_event.send(event).is_err()
{
debug!("dropping realtime event because receiver closed");
}
})));
}
{
let is_closed = Arc::clone(&is_closed);
let tx_open = Arc::clone(&tx_open);
data_channel.on_state_change(Some(Box::new(move |state| match state {
DataChannelState::Connecting => {}
DataChannelState::Open => {
if let Ok(mut tx_open) = tx_open.lock()
&& let Some(tx_open) = tx_open.take()
{
let _ = tx_open.send(());
}
}
DataChannelState::Closing | DataChannelState::Closed => {
is_closed.store(true, Ordering::SeqCst);
}
})));
}
{
let tx_event = tx_event.clone();
peer_connection.on_track(Some(Box::new(move |track_event| {
let MediaStreamTrack::Audio(audio_track) = track_event.track else {
return;
};
let tx_event = tx_event.clone();
tokio::spawn(async move {
let mut audio_stream = NativeAudioStream::with_options(
audio_track,
REALTIME_AUDIO_SAMPLE_RATE as i32,
REALTIME_AUDIO_NUM_CHANNELS as i32,
NativeAudioStreamOptions {
queue_size_frames: Some(REALTIME_AUDIO_STREAM_QUEUE_SIZE_FRAMES),
},
);
while let Some(frame) = audio_stream.next().await {
let event = RealtimeEvent::AudioOut(encode_audio_frame(frame));
if tx_event.send(event).is_err() {
break;
}
}
});
})));
}
let offer = peer_connection
.create_offer(OfferOptions {
ice_restart: false,
offer_to_receive_audio: true,
offer_to_receive_video: false,
})
.await
.map_err(|err| ApiError::Stream(format!("failed to create WebRTC offer: {err}")))?;
peer_connection
.set_local_description(offer.clone())
.await
.map_err(|err| ApiError::Stream(format!("failed to set local description: {err}")))?;
let url = realtime_calls_url(&self.provider.base_url, self.provider.query_params.as_ref())?;
let headers = merge_request_headers(&self.provider.headers, extra_headers, default_headers);
info!(url = %url, "posting realtime WebRTC offer");
let http_client = Client::new();
let mut request = http_client
.post(url)
.multipart(session_form(&config, &offer)?);
for (name, value) in &headers {
request = request.header(name, value);
}
let response = request.send().await.map_err(|err| {
ApiError::Stream(format!("failed to post realtime WebRTC offer: {err}"))
})?;
let status = response.status();
let answer_sdp = response.text().await.map_err(|err| {
ApiError::Stream(format!("failed to read realtime WebRTC answer body: {err}"))
})?;
if !status.is_success() {
return Err(ApiError::Stream(format!(
"realtime WebRTC offer failed with HTTP {status}: {answer_sdp}"
)));
}
let answer = SessionDescription::parse(&answer_sdp, SdpType::Answer)
.map_err(|err| ApiError::Stream(format!("failed to parse WebRTC answer SDP: {err}")))?;
peer_connection
.set_remote_description(answer)
.await
.map_err(|err| ApiError::Stream(format!("failed to set remote description: {err}")))?;
if tokio::time::timeout(Duration::from_secs(10), rx_open)
.await
.is_err()
{
warn!("timed out waiting for realtime data channel to open");
}
Ok(RealtimeWebrtcConnection::new(
peer_connection,
data_channel,
local_audio_source,
rx_event,
is_closed,
config.event_parser,
))
}
}
fn append_transcript_delta(entries: &mut Vec<RealtimeTranscriptEntry>, role: &str, delta: &str) {
if delta.is_empty() {
return;
}
if let Some(last_entry) = entries.last_mut()
&& last_entry.role == role
{
last_entry.text.push_str(delta);
return;
}
entries.push(RealtimeTranscriptEntry {
role: role.to_string(),
text: delta.to_string(),
});
}
fn encode_audio_frame(frame: AudioFrame<'_>) -> RealtimeAudioFrame {
let mut bytes = Vec::with_capacity(frame.data.len() * 2);
for sample in frame.data.iter() {
bytes.extend_from_slice(&sample.to_le_bytes());
}
RealtimeAudioFrame {
data: BASE64_STANDARD.encode(bytes),
sample_rate: frame.sample_rate,
num_channels: frame.num_channels as u16,
samples_per_channel: Some(frame.samples_per_channel),
item_id: None,
}
}
fn realtime_calls_url(
base_url: &str,
query_params: Option<&HashMap<String, String>>,
) -> Result<Url, ApiError> {
let mut url =
Url::parse(base_url).map_err(|err| ApiError::Stream(format!("invalid base URL: {err}")))?;
url.set_path(REALTIME_CALLS_PATH);
if let Some(query_params) = query_params
&& !query_params.is_empty()
{
let mut query = url.query_pairs_mut();
for (key, value) in query_params {
query.append_pair(key, value);
}
}
Ok(url)
}
fn session_form(
config: &RealtimeSessionConfig,
offer: &SessionDescription,
) -> Result<Form, ApiError> {
let session_json = serde_json::to_string(&session_payload(config)?)
.map_err(|err| ApiError::Stream(format!("failed to serialize realtime session: {err}")))?;
Ok(Form::new()
.text("sdp", offer.to_string())
.text("session", session_json))
}
fn session_payload(config: &RealtimeSessionConfig) -> Result<Value, ApiError> {
let session_mode = normalized_session_mode(config.event_parser, config.session_mode);
let mut session = serde_json::to_value(session_update_session(
config.event_parser,
config.instructions.clone(),
session_mode,
))
.map_err(|err| ApiError::Stream(format!("failed to encode realtime session: {err}")))?;
if let Some(model) = &config.model {
session["model"] = json!(model);
}
Ok(session)
}
fn merge_request_headers(
provider_headers: &HeaderMap,
extra_headers: HeaderMap,
default_headers: HeaderMap,
) -> HeaderMap {
let mut headers = provider_headers.clone();
headers.extend(extra_headers);
for (name, value) in &default_headers {
if let http::header::Entry::Vacant(entry) = headers.entry(name) {
entry.insert(value.clone());
}
}
headers
}

View File

@@ -11,10 +11,10 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
pub(crate) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const AGENT_FINAL_MESSAGE_PREFIX: &str = "\"Agent Final Message\":\n\n";
pub(super) fn normalized_session_mode(
pub(crate) fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
@@ -24,7 +24,7 @@ pub(super) fn normalized_session_mode(
}
}
pub(super) fn conversation_item_create_message(
pub(crate) fn conversation_item_create_message(
event_parser: RealtimeEventParser,
text: String,
) -> RealtimeOutboundMessage {
@@ -34,7 +34,7 @@ pub(super) fn conversation_item_create_message(
}
}
pub(super) fn conversation_handoff_append_message(
pub(crate) fn conversation_handoff_append_message(
event_parser: RealtimeEventParser,
handoff_id: String,
output_text: String,
@@ -48,7 +48,7 @@ pub(super) fn conversation_handoff_append_message(
}
}
pub(super) fn session_update_session(
pub(crate) fn session_update_session(
event_parser: RealtimeEventParser,
instructions: String,
session_mode: RealtimeSessionMode,
@@ -60,7 +60,7 @@ pub(super) fn session_update_session(
}
}
pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
pub(crate) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_websocket_intent(),
RealtimeEventParser::RealtimeV2 => v2_websocket_intent(),

View File

@@ -1,5 +1,5 @@
pub mod methods;
mod methods_common;
pub(crate) mod methods_common;
mod methods_v1;
mod methods_v2;
pub mod protocol;

View File

@@ -31,7 +31,7 @@ pub struct RealtimeSessionConfig {
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub(super) enum RealtimeOutboundMessage {
pub(crate) enum RealtimeOutboundMessage {
#[serde(rename = "input_audio_buffer.append")]
InputAudioBufferAppend { audio: String },
#[serde(rename = "conversation.handoff.append")]
@@ -48,7 +48,7 @@ pub(super) enum RealtimeOutboundMessage {
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct SessionUpdateSession {
pub(crate) struct SessionUpdateSession {
#[serde(rename = "type")]
pub(super) r#type: SessionType,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -148,7 +148,7 @@ pub(super) struct SessionAudioOutputFormat {
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationMessageItem {
pub(crate) struct ConversationMessageItem {
#[serde(rename = "type")]
pub(super) r#type: ConversationItemType,
pub(super) role: ConversationRole,
@@ -170,13 +170,13 @@ pub(super) enum ConversationRole {
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub(super) enum ConversationItemPayload {
pub(crate) enum ConversationItemPayload {
Message(ConversationMessageItem),
FunctionCallOutput(ConversationFunctionCallOutputItem),
}
#[derive(Debug, Clone, Serialize)]
pub(super) struct ConversationFunctionCallOutputItem {
pub(crate) struct ConversationFunctionCallOutputItem {
#[serde(rename = "type")]
pub(super) r#type: ConversationItemType,
pub(super) call_id: String,
@@ -212,7 +212,7 @@ pub(super) enum SessionToolType {
Function,
}
pub(super) fn parse_realtime_event(
pub(crate) fn parse_realtime_event(
payload: &str,
event_parser: RealtimeEventParser,
) -> Option<RealtimeEvent> {

View File

@@ -30,6 +30,8 @@ pub use crate::common::response_create_client_metadata;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_webrtc::RealtimeWebrtcClient;
pub use crate::endpoint::realtime_webrtc::RealtimeWebrtcConnection;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;

View File

@@ -491,22 +491,22 @@ pub struct Config {
pub realtime_audio: RealtimeAudioConfig,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport base URL (the `Op::RealtimeConversation`
/// transport base URL (the `Op::RealtimeConversation`
/// `/v1/realtime`
/// connection) without changing normal provider HTTP requests.
pub experimental_realtime_ws_base_url: Option<String>,
/// Experimental / do not use. Selects the realtime websocket model/snapshot
/// Experimental / do not use. Selects the realtime transport model/snapshot
/// used for the `Op::RealtimeConversation` connection.
pub experimental_realtime_ws_model: Option<String>,
/// Experimental / do not use. Realtime websocket session selection.
/// Experimental / do not use. Realtime transport session selection.
/// `version` controls v1/v2 and `type` controls conversational/transcription.
pub realtime: RealtimeConfig,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport instructions (the `Op::RealtimeConversation`
/// `/ws` session.update instructions) without changing normal prompts.
/// transport instructions (the `Op::RealtimeConversation`
/// session instructions) without changing normal prompts.
pub experimental_realtime_ws_backend_prompt: Option<String>,
/// Experimental / do not use. Replaces the synthesized realtime startup
/// context appended to websocket session instructions. An empty string
/// context appended to realtime session instructions. An empty string
/// disables startup context injection entirely.
pub experimental_realtime_ws_startup_context: Option<String>,
/// Experimental / do not use. Replaces the built-in realtime start
@@ -1290,23 +1290,23 @@ pub struct ConfigToml {
pub audio: Option<RealtimeAudioToml>,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport base URL (the `Op::RealtimeConversation`
/// transport base URL (the `Op::RealtimeConversation`
/// `/v1/realtime`
/// connection) without changing normal provider HTTP requests.
pub experimental_realtime_ws_base_url: Option<String>,
/// Experimental / do not use. Selects the realtime websocket model/snapshot
/// Experimental / do not use. Selects the realtime transport model/snapshot
/// used for the `Op::RealtimeConversation` connection.
pub experimental_realtime_ws_model: Option<String>,
/// Experimental / do not use. Realtime websocket session selection.
/// Experimental / do not use. Realtime transport session selection.
/// `version` controls v1/v2 and `type` controls conversational/transcription.
#[serde(default)]
pub realtime: Option<RealtimeToml>,
/// Experimental / do not use. Overrides only the realtime conversation
/// websocket transport instructions (the `Op::RealtimeConversation`
/// `/ws` session.update instructions) without changing normal prompts.
/// transport instructions (the `Op::RealtimeConversation`
/// session instructions) without changing normal prompts.
pub experimental_realtime_ws_backend_prompt: Option<String>,
/// Experimental / do not use. Replaces the synthesized realtime startup
/// context appended to websocket session instructions. An empty string
/// context appended to realtime session instructions. An empty string
/// disables startup context injection entirely.
pub experimental_realtime_ws_startup_context: Option<String>,
/// Experimental / do not use. Replaces the built-in realtime start

View File

@@ -14,6 +14,7 @@ mod auth_env_telemetry;
mod client;
mod client_common;
pub mod codex;
mod realtime_audio_bridge;
mod realtime_context;
mod realtime_conversation;
pub use codex::SteerInputError;

View File

@@ -0,0 +1,179 @@
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::endpoint::realtime_webrtc::RealtimeWebrtcWriter;
use std::collections::VecDeque;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::MissedTickBehavior;
use tracing::error;
use tracing::warn;
// Buffer three 20 ms frames before playout, then cap at six frames so remote
// jitter does not turn into unbounded latency.
const TARGET_BUFFER_MS: u64 = 60;
const MAX_BUFFER_MS: u64 = 120;
const PACE_TICK_MS: u64 = 5;
const DEFAULT_FRAME_DURATION_MS: u64 = 20;
struct BufferedAudioFrame {
frame: RealtimeAudioFrame,
duration_ms: u64,
}
#[derive(Clone, Copy)]
struct AudioFrameShape {
sample_rate: u32,
num_channels: u16,
samples_per_channel: u32,
}
pub(crate) fn spawn_realtime_audio_bridge(
writer: RealtimeWebrtcWriter,
audio_rx: async_channel::Receiver<RealtimeAudioFrame>,
events_tx: async_channel::Sender<RealtimeEvent>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut pending_frames = VecDeque::<BufferedAudioFrame>::new();
let mut buffered_ms = 0_u64;
let mut started = false;
let mut next_send_at: Option<Instant> = None;
let mut last_frame_shape: Option<AudioFrameShape> = None;
let mut audio_rx_closed = false;
let mut underrun_reported = false;
let mut pace_tick = tokio::time::interval(Duration::from_millis(PACE_TICK_MS));
pace_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
frame = audio_rx.recv(), if !audio_rx_closed => {
match frame {
Ok(frame) => {
let duration_ms = frame_duration_ms(&frame);
if let Some(frame_shape) = AudioFrameShape::from_frame(&frame) {
last_frame_shape = Some(frame_shape);
}
pending_frames.push_back(BufferedAudioFrame {
frame,
duration_ms,
});
buffered_ms = buffered_ms.saturating_add(duration_ms);
let mut dropped_frames = 0_u64;
while buffered_ms > MAX_BUFFER_MS {
let Some(frame) = pending_frames.pop_front() else {
break;
};
buffered_ms = buffered_ms.saturating_sub(frame.duration_ms);
dropped_frames = dropped_frames.saturating_add(1);
}
if dropped_frames > 0 {
warn!(
dropped_frames,
buffered_ms,
"dropping buffered input audio to keep realtime latency bounded"
);
}
if !started && buffered_ms >= TARGET_BUFFER_MS {
started = true;
next_send_at = Some(Instant::now());
}
}
Err(_) => {
audio_rx_closed = true;
}
}
}
_ = pace_tick.tick() => {
let Some(send_at) = next_send_at else {
if audio_rx_closed {
break;
}
continue;
};
if Instant::now() < send_at {
continue;
}
let frame = if let Some(frame) = pending_frames.pop_front() {
buffered_ms = buffered_ms.saturating_sub(frame.duration_ms);
underrun_reported = false;
frame.frame
} else if let Some(frame_shape) = last_frame_shape {
if !underrun_reported {
warn!("realtime audio bridge underrun; inserting silence");
underrun_reported = true;
}
frame_shape.silence_frame()
} else if audio_rx_closed {
break;
} else {
continue;
};
let duration_ms = frame_duration_ms(&frame);
if let Err(err) = writer.send_audio_frame(frame).await {
error!("failed to send bridged realtime audio: {err}");
let _ = events_tx.send(RealtimeEvent::Error(err.to_string())).await;
break;
}
next_send_at = Some(send_at + Duration::from_millis(duration_ms));
if audio_rx_closed && pending_frames.is_empty() {
break;
}
}
}
}
})
}
impl AudioFrameShape {
fn from_frame(frame: &RealtimeAudioFrame) -> Option<Self> {
Some(Self {
sample_rate: frame.sample_rate,
num_channels: frame.num_channels,
samples_per_channel: frame
.samples_per_channel
.or(decoded_samples_per_channel(frame))?,
})
}
fn silence_frame(self) -> RealtimeAudioFrame {
let byte_len = usize::from(self.num_channels)
.saturating_mul(self.samples_per_channel as usize)
.saturating_mul(2);
RealtimeAudioFrame {
data: BASE64_STANDARD.encode(vec![0_u8; byte_len]),
sample_rate: self.sample_rate,
num_channels: self.num_channels,
samples_per_channel: Some(self.samples_per_channel),
item_id: None,
}
}
}
fn frame_duration_ms(frame: &RealtimeAudioFrame) -> u64 {
u64::from(audio_duration_ms(frame)).max(DEFAULT_FRAME_DURATION_MS)
}
fn audio_duration_ms(frame: &RealtimeAudioFrame) -> u32 {
let Some(samples_per_channel) = frame
.samples_per_channel
.or(decoded_samples_per_channel(frame))
else {
return 0;
};
let sample_rate = u64::from(frame.sample_rate.max(1));
((u64::from(samples_per_channel) * 1_000) / sample_rate) as u32
}
fn decoded_samples_per_channel(frame: &RealtimeAudioFrame) -> Option<u32> {
let bytes = BASE64_STANDARD.decode(&frame.data).ok()?;
let channels = usize::from(frame.num_channels.max(1));
let samples = bytes.len().checked_div(2)?.checked_div(channels)?;
u32::try_from(samples).ok()
}

View File

@@ -7,6 +7,7 @@ use crate::config::RealtimeWsVersion;
use crate::default_client::default_headers;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::realtime_audio_bridge::spawn_realtime_audio_bridge;
use crate::realtime_context::build_realtime_startup_context;
use async_channel::Receiver;
use async_channel::Sender;
@@ -19,9 +20,9 @@ use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
use codex_api::RealtimeWebrtcClient;
use codex_api::endpoint::realtime_webrtc::RealtimeWebrtcEvents;
use codex_api::endpoint::realtime_webrtc::RealtimeWebrtcWriter;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::ConversationStartParams;
@@ -105,11 +106,10 @@ struct OutputAudioState {
}
struct RealtimeInputTask {
writer: RealtimeWebsocketWriter,
events: RealtimeWebsocketEvents,
writer: RealtimeWebrtcWriter,
events: RealtimeWebrtcEvents,
user_text_rx: Receiver<String>,
handoff_output_rx: Receiver<HandoffOutput>,
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
handoff_state: RealtimeHandoffState,
session_kind: RealtimeSessionKind,
@@ -130,8 +130,9 @@ impl RealtimeHandoffState {
struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
user_text_tx: Sender<String>,
writer: RealtimeWebsocketWriter,
writer: RealtimeWebrtcWriter,
handoff: RealtimeHandoffState,
audio_bridge_task: JoinHandle<()>,
input_task: JoinHandle<()>,
fanout_task: Option<JoinHandle<()>>,
realtime_active: Arc<AtomicBool>,
@@ -170,7 +171,7 @@ impl RealtimeConversationManager {
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let client = RealtimeWebsocketClient::new(api_provider);
let client = RealtimeWebrtcClient::new(api_provider);
let connection = client
.connect(
session_config,
@@ -193,12 +194,13 @@ impl RealtimeConversationManager {
let realtime_active = Arc::new(AtomicBool::new(true));
let handoff = RealtimeHandoffState::new(handoff_output_tx, session_kind);
let audio_bridge_task =
spawn_realtime_audio_bridge(writer.clone(), audio_rx, events_tx.clone());
let task = spawn_realtime_input_task(RealtimeInputTask {
writer: writer.clone(),
events,
user_text_rx,
handoff_output_rx,
audio_rx,
events_tx,
handoff_state: handoff.clone(),
session_kind,
@@ -210,6 +212,7 @@ impl RealtimeConversationManager {
user_text_tx,
writer,
handoff,
audio_bridge_task,
input_task: task,
fanout_task: None,
realtime_active: Arc::clone(&realtime_active),
@@ -390,6 +393,8 @@ async fn stop_conversation_state(
fanout_task_stop: RealtimeFanoutTaskStop,
) {
state.realtime_active.store(false, Ordering::Relaxed);
state.audio_bridge_task.abort();
let _ = state.audio_bridge_task.await;
state.input_task.abort();
let _ = state.input_task.await;
@@ -706,7 +711,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
events,
user_text_rx,
handoff_output_rx,
audio_rx,
events_tx,
handoff_state,
session_kind,
@@ -944,21 +948,6 @@ fn spawn_realtime_input_task(input: RealtimeInputTask) -> JoinHandle<()> {
}
}
}
frame = audio_rx.recv() => {
match frame {
Ok(frame) => {
if let Err(err) = writer.send_audio_frame(frame).await {
let mapped_error = map_api_error(err);
error!("failed to send input audio: {mapped_error}");
let _ = events_tx
.send(RealtimeEvent::Error(mapped_error.to_string()))
.await;
break;
}
}
Err(_) => break,
}
}
}
}
})