Use Android streaming HTTP exchange bridge

Adopt the new framework-owned streaming HTTP exchange APIs for Genie /responses traffic and drop the removed buffered bridge usage.\n\nCo-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Iliyan Malchev
2026-03-23 20:50:39 -07:00
parent 9b0d7a5271
commit bf11074248
3 changed files with 260 additions and 85 deletions

View File

@@ -3,19 +3,32 @@ package com.openai.codex.bridge
import android.app.agent.AgentManager
import android.app.agent.GenieService
import android.os.Bundle
import android.os.ParcelFileDescriptor
import java.io.ByteArrayOutputStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.lang.reflect.Constructor
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.lang.reflect.Modifier
import java.nio.charset.StandardCharsets
object FrameworkSessionTransportCompat {
private const val NETWORK_CONFIG_CLASS_NAME = "android.app.agent.AgentSessionNetworkConfig"
private const val HTTP_BRIDGE_CLASS_NAME = "android.app.agent.FrameworkSessionHttpBridge"
private const val HTTP_REQUEST_CLASS_NAME = "android.app.agent.FrameworkSessionHttpBridge\$HttpRequest"
private const val HTTP_RESPONSE_CLASS_NAME = "android.app.agent.FrameworkSessionHttpBridge\$HttpResponse"
private const val OPEN_FRAMEWORK_SESSION_BRIDGE_METHOD = "openFrameworkSessionBridge"
private const val HTTP_EXCHANGE_CLASS_NAME = "android.app.agent.FrameworkHttpExchange"
private const val HTTP_REQUEST_HEAD_CLASS_NAME = "android.app.agent.FrameworkHttpRequestHead"
private const val HTTP_RESPONSE_HEAD_CLASS_NAME = "android.app.agent.FrameworkHttpResponseHead"
private const val HTTP_RESPONSE_HEAD_RESULT_CLASS_NAME = "android.app.agent.FrameworkHttpResponseHeadResult"
private const val OPEN_EXCHANGE_METHOD = "openExchange"
private const val OPEN_REQUEST_BODY_OUTPUT_STREAM_METHOD = "openRequestBodyOutputStream"
private const val AWAIT_RESPONSE_HEAD_METHOD = "awaitResponseHead"
private const val OPEN_RESPONSE_BODY_INPUT_STREAM_METHOD = "openResponseBodyInputStream"
private const val CANCEL_METHOD = "cancel"
private const val SET_SESSION_NETWORK_CONFIG_METHOD = "setSessionNetworkConfig"
private const val EXECUTE_REQUEST_AND_READ_FULLY_METHOD = "executeRequestAndReadFully"
private const val STATUS_OK_FIELD_NAME = "STATUS_OK"
private const val READ_BUFFER_BYTES = 8192
private const val WRITE_BUFFER_BYTES = 8192
data class SessionNetworkConfig(
val baseUrl: String,
@@ -38,15 +51,38 @@ object FrameworkSessionTransportCompat {
val bodyString: String,
)
private data class HttpExchange(
val runtimeValue: Any,
)
private data class HttpResponseHead(
val statusCode: Int,
val headers: Bundle,
)
private data class HttpResponseHeadResult(
val status: Int,
val statusName: String,
val responseHead: HttpResponseHead?,
val message: String?,
)
private data class AvailableRuntimeApi(
val setSessionNetworkConfigMethod: Method,
val networkConfigConstructor: java.lang.reflect.Constructor<*>,
val executeRequestAndReadFullyMethod: Method,
val httpRequestConstructor: java.lang.reflect.Constructor<*>,
val httpResponseGetStatusCodeMethod: Method,
val httpResponseGetHeadersMethod: Method,
val httpResponseGetBodyMethod: Method,
val httpResponseGetBodyAsStringMethod: Method,
val networkConfigConstructor: Constructor<*>,
val requestHeadConstructor: Constructor<*>,
val openExchangeMethod: Method,
val openRequestBodyOutputStreamMethod: Method,
val awaitResponseHeadMethod: Method,
val openResponseBodyInputStreamMethod: Method,
val cancelMethod: Method,
val responseHeadResultGetStatusMethod: Method,
val responseHeadResultGetResponseHeadMethod: Method,
val responseHeadResultGetMessageMethod: Method?,
val responseHeadGetStatusCodeMethod: Method,
val responseHeadGetHeadersMethod: Method,
val statusNamesByValue: Map<Int, String>,
val okStatus: Int,
)
private val runtimeApi: AvailableRuntimeApi by lazy(LazyThreadSafetyMode.SYNCHRONIZED, ::resolveRuntimeApi)
@@ -69,59 +105,107 @@ object FrameworkSessionTransportCompat {
}
}
fun openFrameworkSessionBridge(
fun executeStreamingRequest(
callback: GenieService.Callback,
sessionId: String,
): ParcelFileDescriptor {
resolveRuntimeApi()
val method = try {
callback.javaClass.getMethod(
OPEN_FRAMEWORK_SESSION_BRIDGE_METHOD,
String::class.java,
request: HttpRequest,
): HttpResponse {
val exchange = openExchange(callback, sessionId, request)
var cancelExchange = true
try {
invokeChecked {
runtimeApi.openRequestBodyOutputStreamMethod.invoke(null, exchange.runtimeValue) as OutputStream
}.use { requestBody ->
writeAll(requestBody, request.body)
}
val responseHeadResult = awaitResponseHead(callback, sessionId, exchange)
if (responseHeadResult.status != runtimeApi.okStatus) {
val details = responseHeadResult.message?.takeIf(String::isNotBlank)
val suffix = if (details == null) "" else ": $details"
throw IOException(
"Framework HTTP exchange failed with ${responseHeadResult.statusName}$suffix",
)
}
val responseHead = responseHeadResult.responseHead
?: throw IOException("Framework HTTP exchange succeeded without a response head")
val responseBody = invokeChecked {
runtimeApi.openResponseBodyInputStreamMethod.invoke(null, exchange.runtimeValue) as InputStream
}.use(::readFully)
cancelExchange = false
return HttpResponse(
statusCode = responseHead.statusCode,
headers = responseHead.headers,
body = responseBody,
bodyString = responseBody.toString(StandardCharsets.UTF_8),
)
} catch (err: NoSuchMethodException) {
throw IllegalStateException(
"Framework session HTTP bridge callback is unavailable. The device runtime and AgentSDK are out of sync.",
err,
)
}
return invokeChecked {
method.invoke(callback, sessionId) as ParcelFileDescriptor
} finally {
if (cancelExchange) {
runCatching {
invokeChecked {
runtimeApi.cancelMethod.invoke(null, callback, sessionId, exchange.runtimeValue)
}
}
}
}
}
fun executeRequestAndReadFully(
bridge: ParcelFileDescriptor,
private fun openExchange(
callback: GenieService.Callback,
sessionId: String,
request: HttpRequest,
): HttpResponse {
val requestObject = invokeChecked {
runtimeApi.httpRequestConstructor.newInstance(
): HttpExchange {
val requestHead = invokeChecked {
runtimeApi.requestHeadConstructor.newInstance(
request.method,
request.path,
Bundle(request.headers),
request.body,
)
}
val responseObject = invokeChecked {
runtimeApi.executeRequestAndReadFullyMethod.invoke(null, bridge, requestObject)
val runtimeExchange = invokeChecked {
runtimeApi.openExchangeMethod.invoke(null, callback, sessionId, requestHead)
?: throw IOException("Framework HTTP exchange opened with no exchange handle")
}
val statusCode = invokeChecked {
runtimeApi.httpResponseGetStatusCodeMethod.invoke(responseObject) as Int
return HttpExchange(runtimeExchange)
}
private fun awaitResponseHead(
callback: GenieService.Callback,
sessionId: String,
exchange: HttpExchange,
): HttpResponseHeadResult {
val resultObject = invokeChecked {
runtimeApi.awaitResponseHeadMethod.invoke(null, callback, sessionId, exchange.runtimeValue)
}
val headers = invokeChecked {
runtimeApi.httpResponseGetHeadersMethod.invoke(responseObject) as? Bundle
} ?: Bundle.EMPTY
val body = invokeChecked {
runtimeApi.httpResponseGetBodyMethod.invoke(responseObject) as? ByteArray
} ?: ByteArray(0)
val bodyString = invokeChecked {
runtimeApi.httpResponseGetBodyAsStringMethod.invoke(responseObject) as? String
} ?: body.toString(StandardCharsets.UTF_8)
return HttpResponse(
statusCode = statusCode,
headers = Bundle(headers),
body = body,
bodyString = bodyString,
val status = invokeChecked {
runtimeApi.responseHeadResultGetStatusMethod.invoke(resultObject) as Int
}
val responseHeadObject = invokeChecked {
runtimeApi.responseHeadResultGetResponseHeadMethod.invoke(resultObject)
}
val responseHead = if (responseHeadObject == null) {
null
} else {
val statusCode = invokeChecked {
runtimeApi.responseHeadGetStatusCodeMethod.invoke(responseHeadObject) as Int
}
val headers = invokeChecked {
runtimeApi.responseHeadGetHeadersMethod.invoke(responseHeadObject) as? Bundle
} ?: Bundle.EMPTY
HttpResponseHead(
statusCode = statusCode,
headers = Bundle(headers),
)
}
val message = runtimeApi.responseHeadResultGetMessageMethod?.let { method ->
invokeChecked {
method.invoke(resultObject) as? String
}
}?.ifBlank { null }
return HttpResponseHeadResult(
status = status,
statusName = runtimeApi.statusNamesByValue[status] ?: "STATUS_$status",
responseHead = responseHead,
message = message,
)
}
@@ -129,8 +213,20 @@ object FrameworkSessionTransportCompat {
return try {
val networkConfigClass = Class.forName(NETWORK_CONFIG_CLASS_NAME)
val httpBridgeClass = Class.forName(HTTP_BRIDGE_CLASS_NAME)
val httpRequestClass = Class.forName(HTTP_REQUEST_CLASS_NAME)
val httpResponseClass = Class.forName(HTTP_RESPONSE_CLASS_NAME)
val exchangeClass = Class.forName(HTTP_EXCHANGE_CLASS_NAME)
val requestHeadClass = Class.forName(HTTP_REQUEST_HEAD_CLASS_NAME)
val responseHeadClass = Class.forName(HTTP_RESPONSE_HEAD_CLASS_NAME)
val responseHeadResultClass = Class.forName(HTTP_RESPONSE_HEAD_RESULT_CLASS_NAME)
val statusNamesByValue = responseHeadResultClass.fields
.filter { field ->
Modifier.isStatic(field.modifiers) &&
field.type == Int::class.javaPrimitiveType &&
field.name.startsWith("STATUS_")
}
.associate { field ->
field.getInt(null) to field.name
}
val okStatus = responseHeadResultClass.getField(STATUS_OK_FIELD_NAME).getInt(null)
AvailableRuntimeApi(
setSessionNetworkConfigMethod = AgentManager::class.java.getMethod(
SET_SESSION_NETWORK_CONFIG_METHOD,
@@ -143,30 +239,111 @@ object FrameworkSessionTransportCompat {
Int::class.javaPrimitiveType,
Int::class.javaPrimitiveType,
),
executeRequestAndReadFullyMethod = httpBridgeClass.getMethod(
EXECUTE_REQUEST_AND_READ_FULLY_METHOD,
ParcelFileDescriptor::class.java,
httpRequestClass,
),
httpRequestConstructor = httpRequestClass.getConstructor(
requestHeadConstructor = requestHeadClass.getConstructor(
String::class.java,
String::class.java,
Bundle::class.java,
ByteArray::class.java,
),
httpResponseGetStatusCodeMethod = httpResponseClass.getMethod("getStatusCode"),
httpResponseGetHeadersMethod = httpResponseClass.getMethod("getHeaders"),
httpResponseGetBodyMethod = httpResponseClass.getMethod("getBody"),
httpResponseGetBodyAsStringMethod = httpResponseClass.getMethod("getBodyAsString"),
openExchangeMethod = requireMethod(
owner = httpBridgeClass,
name = OPEN_EXCHANGE_METHOD,
GenieService.Callback::class.java,
String::class.java,
requestHeadClass,
),
openRequestBodyOutputStreamMethod = requireMethod(
owner = httpBridgeClass,
name = OPEN_REQUEST_BODY_OUTPUT_STREAM_METHOD,
exchangeClass,
),
awaitResponseHeadMethod = requireMethod(
owner = httpBridgeClass,
name = AWAIT_RESPONSE_HEAD_METHOD,
GenieService.Callback::class.java,
String::class.java,
exchangeClass,
),
openResponseBodyInputStreamMethod = requireMethod(
owner = httpBridgeClass,
name = OPEN_RESPONSE_BODY_INPUT_STREAM_METHOD,
exchangeClass,
),
cancelMethod = requireMethod(
owner = httpBridgeClass,
name = CANCEL_METHOD,
GenieService.Callback::class.java,
String::class.java,
exchangeClass,
),
responseHeadResultGetStatusMethod = requireMethod(
owner = responseHeadResultClass,
name = "getStatus",
),
responseHeadResultGetResponseHeadMethod = requireMethod(
owner = responseHeadResultClass,
name = "getResponseHead",
),
responseHeadResultGetMessageMethod = responseHeadResultClass.methods.firstOrNull { method ->
method.name == "getMessage" && method.parameterCount == 0
},
responseHeadGetStatusCodeMethod = requireMethod(
owner = responseHeadClass,
name = "getStatusCode",
),
responseHeadGetHeadersMethod = requireMethod(
owner = responseHeadClass,
name = "getHeaders",
),
statusNamesByValue = statusNamesByValue,
okStatus = okStatus,
)
} catch (err: ReflectiveOperationException) {
throw IllegalStateException(
"Framework-owned HTTP session transport APIs are unavailable. The device runtime and AgentSDK are out of sync.",
"Framework-owned HTTP streaming APIs are unavailable. The device runtime and AgentSDK are out of sync.",
err,
)
}
}
private fun requireMethod(
owner: Class<*>,
name: String,
vararg parameterTypes: Class<*>,
): Method {
return owner.methods.firstOrNull { method ->
method.name == name &&
method.parameterCount == parameterTypes.size &&
method.parameterTypes.contentEquals(parameterTypes)
} ?: throw NoSuchMethodException(
"${owner.name}#$name(${parameterTypes.joinToString { it.name }})",
)
}
private fun writeAll(
output: OutputStream,
bytes: ByteArray,
) {
var offset = 0
while (offset < bytes.size) {
val chunkSize = minOf(WRITE_BUFFER_BYTES, bytes.size - offset)
output.write(bytes, offset, chunkSize)
offset += chunkSize
}
output.flush()
}
private fun readFully(input: InputStream): ByteArray {
val buffer = ByteArray(READ_BUFFER_BYTES)
val bytes = ByteArrayOutputStream()
while (true) {
val read = input.read(buffer)
if (read == -1) {
return bytes.toByteArray()
}
bytes.write(buffer, 0, read)
}
}
private fun <T> invokeChecked(block: () -> T): T {
try {
return block()

View File

@@ -38,9 +38,8 @@ class AgentBridgeClient(
private const val HEADER_VALUE_IDENTITY = "identity"
}
private val frameworkCallback = callback
private val bridgeFd: ParcelFileDescriptor = callback.openSessionBridge(sessionId)
private val frameworkHttpBridgeFd: ParcelFileDescriptor =
FrameworkSessionTransportCompat.openFrameworkSessionBridge(callback, sessionId)
private val input = DataInputStream(BufferedInputStream(FileInputStream(bridgeFd.fileDescriptor)))
private val output = DataOutputStream(BufferedOutputStream(FileOutputStream(bridgeFd.fileDescriptor)))
private val ioLock = Any()
@@ -85,21 +84,20 @@ class AgentBridgeClient(
}
fun sendResponsesRequest(body: String): AgentResponsesHttpResponse {
val response = ParcelFileDescriptor.dup(frameworkHttpBridgeFd.fileDescriptor).use { requestBridge ->
FrameworkSessionTransportCompat.executeRequestAndReadFully(
bridge = requestBridge,
request = FrameworkSessionTransportCompat.HttpRequest(
method = RESPONSES_METHOD,
path = frameworkResponsesPath,
headers = Bundle().apply {
putString(HEADER_CONTENT_TYPE, HEADER_VALUE_APPLICATION_JSON)
putString(HEADER_ACCEPT, HEADER_VALUE_TEXT_EVENT_STREAM)
putString(HEADER_ACCEPT_ENCODING, HEADER_VALUE_IDENTITY)
},
body = body.toByteArray(StandardCharsets.UTF_8),
),
)
}
val response = FrameworkSessionTransportCompat.executeStreamingRequest(
callback = frameworkCallback,
sessionId = sessionId,
request = FrameworkSessionTransportCompat.HttpRequest(
method = RESPONSES_METHOD,
path = frameworkResponsesPath,
headers = Bundle().apply {
putString(HEADER_CONTENT_TYPE, HEADER_VALUE_APPLICATION_JSON)
putString(HEADER_ACCEPT, HEADER_VALUE_TEXT_EVENT_STREAM)
putString(HEADER_ACCEPT_ENCODING, HEADER_VALUE_IDENTITY)
},
body = body.toByteArray(StandardCharsets.UTF_8),
),
)
return AgentResponsesHttpResponse(
statusCode = response.statusCode,
body = response.bodyString,
@@ -111,7 +109,6 @@ class AgentBridgeClient(
runCatching { input.close() }
runCatching { output.close() }
runCatching { bridgeFd.close() }
runCatching { frameworkHttpBridgeFd.close() }
}
}

View File

@@ -177,7 +177,8 @@ the Android Agent/Genie flow.
- Agent-hosted runtime metadata for Genie bootstrap
- Shell-first Genie execution for package inspection, activity launch, input injection, and UI dumping
- Hosted `codex app-server` inside Genie, with model traffic routed through the
app-server request/response channel and then through the framework-owned HTTP bridge
app-server request/response channel and then through the framework-owned
streaming HTTP exchange bridge
- Per-session framework transport provisioning in
`android/bridge/src/main/java/com/openai/codex/bridge/FrameworkSessionTransportCompat.kt`
- Framework-only Android dynamic tools registered on the Genie Codex thread with:
@@ -228,7 +229,7 @@ the Android Agent/Genie flow.
- Agent-owned Responses transport used by the hosted Agent runtime itself
- `android/genie/src/main/java/com/openai/codex/genie/AgentBridgeClient.kt`
- Genie-side client for the framework-managed control bridge plus the
framework-owned HTTP bridge
framework-owned streaming HTTP exchange bridge
- `android/app/src/main/java/com/openai/codex/agent/AgentCodexAppServerClient.kt`
- hosted Agent `codex app-server` client for planning, orchestration, auto-answering, runtime metadata, and narrow Agent tool calls