Compare commits

...

1 Commits

Author SHA1 Message Date
Dylan Hurd
eb3ae71242 fix(app-server) emit logs as json-rpc 2026-03-23 23:50:10 -07:00
16 changed files with 855 additions and 17 deletions

View File

@@ -1440,6 +1440,71 @@
],
"type": "object"
},
"LogEntryLevel": {
"enum": [
"trace",
"debug",
"info",
"warn",
"error"
],
"type": "string"
},
"LogEntryNotification": {
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"level": {
"$ref": "#/definitions/LogEntryLevel"
},
"message": {
"type": "string"
},
"span": {
"anyOf": [
{
"$ref": "#/definitions/LogSpanContext"
},
{
"type": "null"
}
]
},
"target": {
"type": "string"
},
"timestamp": {
"format": "int64",
"type": "integer"
}
},
"required": [
"fields",
"level",
"message",
"target",
"timestamp"
],
"type": "object"
},
"LogSpanContext": {
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"name": {
"type": "string"
}
},
"required": [
"fields",
"name"
],
"type": "object"
},
"McpServerOauthLoginCompletedNotification": {
"properties": {
"error": {
@@ -3772,6 +3837,26 @@
"title": "ErrorNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"log/entry"
],
"title": "Log/entryNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/LogEntryNotification"
}
},
"required": [
"method",
"params"
],
"title": "Log/entryNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -3431,6 +3431,26 @@
"title": "ErrorNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"log/entry"
],
"title": "Log/entryNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/LogEntryNotification"
}
},
"required": [
"method",
"params"
],
"title": "Log/entryNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -8402,6 +8422,73 @@
],
"type": "string"
},
"LogEntryLevel": {
"enum": [
"trace",
"debug",
"info",
"warn",
"error"
],
"type": "string"
},
"LogEntryNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"level": {
"$ref": "#/definitions/v2/LogEntryLevel"
},
"message": {
"type": "string"
},
"span": {
"anyOf": [
{
"$ref": "#/definitions/v2/LogSpanContext"
},
{
"type": "null"
}
]
},
"target": {
"type": "string"
},
"timestamp": {
"format": "int64",
"type": "integer"
}
},
"required": [
"fields",
"level",
"message",
"target",
"timestamp"
],
"title": "LogEntryNotification",
"type": "object"
},
"LogSpanContext": {
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"name": {
"type": "string"
}
},
"required": [
"fields",
"name"
],
"type": "object"
},
"LoginAccountParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"oneOf": [

View File

@@ -5150,6 +5150,73 @@
],
"type": "string"
},
"LogEntryLevel": {
"enum": [
"trace",
"debug",
"info",
"warn",
"error"
],
"type": "string"
},
"LogEntryNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"level": {
"$ref": "#/definitions/LogEntryLevel"
},
"message": {
"type": "string"
},
"span": {
"anyOf": [
{
"$ref": "#/definitions/LogSpanContext"
},
{
"type": "null"
}
]
},
"target": {
"type": "string"
},
"timestamp": {
"format": "int64",
"type": "integer"
}
},
"required": [
"fields",
"level",
"message",
"target",
"timestamp"
],
"title": "LogEntryNotification",
"type": "object"
},
"LogSpanContext": {
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"name": {
"type": "string"
}
},
"required": [
"fields",
"name"
],
"type": "object"
},
"LoginAccountParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"oneOf": [
@@ -7896,6 +7963,26 @@
"title": "ErrorNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"log/entry"
],
"title": "Log/entryNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/LogEntryNotification"
}
},
"required": [
"method",
"params"
],
"title": "Log/entryNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -0,0 +1,69 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"LogEntryLevel": {
"enum": [
"trace",
"debug",
"info",
"warn",
"error"
],
"type": "string"
},
"LogSpanContext": {
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"name": {
"type": "string"
}
},
"required": [
"fields",
"name"
],
"type": "object"
}
},
"properties": {
"fields": {
"additionalProperties": true,
"type": "object"
},
"level": {
"$ref": "#/definitions/LogEntryLevel"
},
"message": {
"type": "string"
},
"span": {
"anyOf": [
{
"$ref": "#/definitions/LogSpanContext"
},
{
"type": "null"
}
]
},
"target": {
"type": "string"
},
"timestamp": {
"format": "int64",
"type": "integer"
}
},
"required": [
"fields",
"level",
"message",
"target",
"timestamp"
],
"title": "LogEntryNotification",
"type": "object"
}

View File

@@ -21,6 +21,7 @@ import type { ItemCompletedNotification } from "./v2/ItemCompletedNotification";
import type { ItemGuardianApprovalReviewCompletedNotification } from "./v2/ItemGuardianApprovalReviewCompletedNotification";
import type { ItemGuardianApprovalReviewStartedNotification } from "./v2/ItemGuardianApprovalReviewStartedNotification";
import type { ItemStartedNotification } from "./v2/ItemStartedNotification";
import type { LogEntryNotification } from "./v2/LogEntryNotification";
import type { McpServerOauthLoginCompletedNotification } from "./v2/McpServerOauthLoginCompletedNotification";
import type { McpServerStatusUpdatedNotification } from "./v2/McpServerStatusUpdatedNotification";
import type { McpToolCallProgressNotification } from "./v2/McpToolCallProgressNotification";
@@ -56,4 +57,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcriptUpdated", "params": ThreadRealtimeTranscriptUpdatedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "log/entry", "params": LogEntryNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "skills/changed", "params": SkillsChangedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "hook/started", "params": HookStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "hook/completed", "params": HookCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/autoApprovalReview/started", "params": ItemGuardianApprovalReviewStartedNotification } | { "method": "item/autoApprovalReview/completed", "params": ItemGuardianApprovalReviewCompletedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "command/exec/outputDelta", "params": CommandExecOutputDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "serverRequest/resolved", "params": ServerRequestResolvedNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "mcpServer/startupStatus/updated", "params": McpServerStatusUpdatedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/transcriptUpdated", "params": ThreadRealtimeTranscriptUpdatedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type LogEntryLevel = "trace" | "debug" | "info" | "warn" | "error";

View File

@@ -0,0 +1,8 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { JsonValue } from "../serde_json/JsonValue";
import type { LogEntryLevel } from "./LogEntryLevel";
import type { LogSpanContext } from "./LogSpanContext";
export type LogEntryNotification = { timestamp: bigint, level: LogEntryLevel, target: string, message: string, fields: { [key in string]?: JsonValue }, span: LogSpanContext | null, };

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { JsonValue } from "../serde_json/JsonValue";
export type LogSpanContext = { name: string, fields: { [key in string]?: JsonValue }, };

View File

@@ -137,6 +137,9 @@ export type { ItemGuardianApprovalReviewStartedNotification } from "./ItemGuardi
export type { ItemStartedNotification } from "./ItemStartedNotification";
export type { ListMcpServerStatusParams } from "./ListMcpServerStatusParams";
export type { ListMcpServerStatusResponse } from "./ListMcpServerStatusResponse";
export type { LogEntryLevel } from "./LogEntryLevel";
export type { LogEntryNotification } from "./LogEntryNotification";
export type { LogSpanContext } from "./LogSpanContext";
export type { LoginAccountParams } from "./LoginAccountParams";
export type { LoginAccountResponse } from "./LoginAccountResponse";
export type { LogoutAccountResponse } from "./LogoutAccountResponse";

View File

@@ -874,6 +874,7 @@ pub struct FuzzyFileSearchSessionCompletedNotification {
server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
LogEntry => "log/entry" (v2::LogEntryNotification),
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadStatusChanged => "thread/status/changed" (v2::ThreadStatusChangedNotification),
ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification),
@@ -1576,6 +1577,50 @@ mod tests {
Ok(())
}
#[test]
fn serialize_log_entry_notification() -> Result<()> {
let notification = ServerNotification::LogEntry(v2::LogEntryNotification {
timestamp: 1_742_710_400,
level: v2::LogEntryLevel::Warn,
target: "codex_app_server".to_string(),
message: "listener queue is full".to_string(),
fields: std::collections::BTreeMap::from([
("connectionId".to_string(), serde_json::json!(7)),
("retryable".to_string(), serde_json::json!(true)),
]),
span: Some(v2::LogSpanContext {
name: "app_server.request".to_string(),
fields: std::collections::BTreeMap::from([(
"rpc.method".to_string(),
serde_json::json!("thread/start"),
)]),
}),
});
assert_eq!(
json!({
"method": "log/entry",
"params": {
"timestamp": 1742710400,
"level": "warn",
"target": "codex_app_server",
"message": "listener queue is full",
"fields": {
"connectionId": 7,
"retryable": true,
},
"span": {
"name": "app_server.request",
"fields": {
"rpc.method": "thread/start",
}
}
}
}),
serde_json::to_value(&notification)?,
);
Ok(())
}
#[test]
fn serialize_thread_realtime_output_audio_delta_notification() -> Result<()> {
let notification = ServerNotification::ThreadRealtimeOutputAudioDelta(

View File

@@ -3693,6 +3693,37 @@ pub struct ErrorNotification {
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "lowercase")]
#[ts(rename_all = "lowercase", export_to = "v2/")]
pub enum LogEntryLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct LogSpanContext {
pub name: String,
pub fields: BTreeMap<String, JsonValue>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct LogEntryNotification {
pub timestamp: i64,
pub level: LogEntryLevel,
pub target: String,
pub message: String,
pub fields: BTreeMap<String, JsonValue>,
pub span: Option<LogSpanContext>,
}
/// EXPERIMENTAL - thread realtime audio chunk.
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
@@ -6807,6 +6838,49 @@ mod tests {
assert_eq!(decoded, notification);
}
#[test]
fn log_entry_notification_round_trips() {
let notification = LogEntryNotification {
timestamp: 1_742_710_400,
level: LogEntryLevel::Info,
target: "codex_app_server::transport".to_string(),
message: "connection opened".to_string(),
fields: BTreeMap::from([
("connectionId".to_string(), json!(7)),
("interactive".to_string(), json!(true)),
]),
span: Some(LogSpanContext {
name: "app_server.request".to_string(),
fields: BTreeMap::from([("rpc.method".to_string(), json!("initialize"))]),
}),
};
let value = serde_json::to_value(&notification).expect("serialize log/entry notification");
assert_eq!(
value,
json!({
"timestamp": 1742710400,
"level": "info",
"target": "codex_app_server::transport",
"message": "connection opened",
"fields": {
"connectionId": 7,
"interactive": true,
},
"span": {
"name": "app_server.request",
"fields": {
"rpc.method": "initialize",
}
}
})
);
let decoded =
serde_json::from_value::<LogEntryNotification>(value).expect("deserialize round-trip");
assert_eq!(decoded, notification);
}
#[test]
fn sandbox_policy_round_trips_external_sandbox_network_access() {
let v2_policy = SandboxPolicy::ExternalSandbox {

View File

@@ -37,7 +37,12 @@ Websocket transport is currently experimental and unsupported. Do not rely on it
Tracing/log output:
- `RUST_LOG` controls log filtering/verbosity.
- Set `LOG_FORMAT=json` to emit app-server tracing logs to `stderr` as JSON (one event per line).
- `stdio` transport emits operational logs as `log/entry` JSON-RPC notifications after the client finishes `initialize`.
- `log/entry` is on by default for initialized `stdio` clients and can be suppressed with `initialize.params.capabilities.optOutNotificationMethods: ["log/entry"]`.
- `log/entry.params` includes `timestamp`, `level`, `target`, `message`, `fields`, and optional `span` context.
- `stdio` transport no longer emits operational tracing logs to `stderr`.
- `websocket` transport keeps the existing `stderr` logger behavior.
- For `websocket`, set `LOG_FORMAT=json` to emit app-server tracing logs to `stderr` as JSON (one event per line).
Backpressure behavior:
@@ -152,6 +157,7 @@ Example with notification opt-out:
- `command/exec/resize` — resize a running PTY-backed `command/exec` session by `processId`; returns `{}`.
- `command/exec/terminate` — terminate a running `command/exec` session by `processId`; returns `{}`.
- `command/exec/outputDelta` — notification emitted for base64-encoded stdout/stderr chunks from a streaming `command/exec` session.
- `log/entry` — notification emitted for structured app-server tracing logs on `stdio` connections after initialization.
- `fs/readFile` — read an absolute file path and return `{ dataBase64 }`.
- `fs/writeFile` — write an absolute file path from base64-encoded `{ dataBase64 }`; returns `{}`.
- `fs/createDirectory` — create an absolute directory path; `recursive` defaults to `true`.

View File

@@ -68,6 +68,7 @@ mod filters;
mod fs_api;
mod fuzzy_file_search;
pub mod in_process;
mod log_notification_layer;
mod message_processor;
mod models;
mod outgoing_message;
@@ -503,22 +504,27 @@ pub async fn run_main_with_transport(
)
})?;
// Install a simple subscriber so `tracing` output is visible. Users can
// control the log level with `RUST_LOG` and switch to JSON logs with
// `LOG_FORMAT=json`.
let stderr_fmt: StderrLogLayer = match log_format_from_env() {
LogFormat::Json => tracing_subscriber::fmt::layer()
.json()
.with_writer(std::io::stderr)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_filter(EnvFilter::from_default_env())
.boxed(),
LogFormat::Default => tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_filter(EnvFilter::from_default_env())
.boxed(),
let stderr_fmt: Option<StderrLogLayer> = match transport {
AppServerTransport::Stdio => None,
AppServerTransport::WebSocket { .. } => Some(match log_format_from_env() {
LogFormat::Json => tracing_subscriber::fmt::layer()
.json()
.with_writer(std::io::stderr)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_filter(EnvFilter::from_default_env())
.boxed(),
LogFormat::Default => tracing_subscriber::fmt::layer()
.with_writer(std::io::stderr)
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_filter(EnvFilter::from_default_env())
.boxed(),
}),
};
let log_notification_layer = log_notification_layer::LogNotificationLayer::new(
outgoing_tx.clone(),
matches!(transport, AppServerTransport::Stdio),
)
.with_filter(EnvFilter::from_default_env());
let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
@@ -536,6 +542,7 @@ pub async fn run_main_with_transport(
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(log_notification_layer)
.with(feedback_layer)
.with(feedback_metadata_layer)
.with(log_db_layer)

View File

@@ -0,0 +1,251 @@
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use codex_app_server_protocol::LogEntryLevel;
use codex_app_server_protocol::LogEntryNotification;
use codex_app_server_protocol::LogSpanContext;
use codex_app_server_protocol::ServerNotification;
use serde_json::Value as JsonValue;
use std::collections::BTreeMap;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::mpsc;
use tracing::Event;
use tracing::Id;
use tracing::Subscriber;
use tracing::field::Field;
use tracing::field::Visit;
use tracing::span::Attributes;
use tracing::span::Record;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
#[derive(Clone)]
pub(crate) struct LogNotificationLayer {
sender: mpsc::Sender<OutgoingEnvelope>,
enabled: bool,
}
#[derive(Clone, Debug, Default)]
struct StoredSpanFields {
values: BTreeMap<String, JsonValue>,
}
#[derive(Default)]
struct JsonValueVisitor {
values: BTreeMap<String, JsonValue>,
message: Option<String>,
}
impl LogNotificationLayer {
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>, enabled: bool) -> Self {
Self { sender, enabled }
}
}
impl<S> Layer<S> for LogNotificationLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
if !self.enabled {
return;
}
let mut visitor = JsonValueVisitor::default();
event.record(&mut visitor);
let notification = LogEntryNotification {
timestamp: unix_timestamp_seconds(),
level: log_entry_level(*event.metadata().level()),
target: event.metadata().target().to_string(),
message: visitor
.message
.unwrap_or_else(|| event.metadata().name().to_string()),
fields: visitor.values,
span: current_span_context(&ctx, event),
};
let _ = self.sender.try_send(OutgoingEnvelope::Broadcast {
message: OutgoingMessage::AppServerNotification(ServerNotification::LogEntry(
notification,
)),
});
}
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let mut visitor = JsonValueVisitor::default();
attrs.record(&mut visitor);
if visitor.values.is_empty() {
return;
}
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(StoredSpanFields {
values: visitor.values,
});
}
}
fn on_record(&self, id: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
let Some(span) = ctx.span(id) else {
return;
};
let mut visitor = JsonValueVisitor::default();
values.record(&mut visitor);
if visitor.values.is_empty() {
return;
}
let mut extensions = span.extensions_mut();
if let Some(stored) = extensions.get_mut::<StoredSpanFields>() {
stored.values.extend(visitor.values);
} else {
extensions.insert(StoredSpanFields {
values: visitor.values,
});
}
}
}
impl Visit for JsonValueVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_value(field, JsonValue::from(value));
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record_value(field, JsonValue::from(value));
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record_value(field, JsonValue::from(value));
}
fn record_str(&mut self, field: &Field, value: &str) {
self.record_value(field, JsonValue::from(value));
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_value(field, JsonValue::from(value.to_string()));
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_value(field, JsonValue::from(format!("{value:?}")));
}
}
impl JsonValueVisitor {
fn record_value(&mut self, field: &Field, value: JsonValue) {
if field.name() == "message" {
self.message = Some(match value {
JsonValue::String(text) => text,
other => other.to_string(),
});
return;
}
self.values.insert(field.name().to_string(), value);
}
}
fn current_span_context<S>(ctx: &Context<'_, S>, event: &Event<'_>) -> Option<LogSpanContext>
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
let scope = ctx.event_scope(event)?;
let span = scope.from_root().last()?;
let fields = span
.extensions()
.get::<StoredSpanFields>()
.map(|stored| stored.values.clone())
.unwrap_or_default();
Some(LogSpanContext {
name: span.metadata().name().to_string(),
fields,
})
}
fn unix_timestamp_seconds() -> i64 {
let Ok(duration) = SystemTime::now().duration_since(UNIX_EPOCH) else {
return 0;
};
i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
}
fn log_entry_level(level: tracing::Level) -> LogEntryLevel {
match level {
tracing::Level::TRACE => LogEntryLevel::Trace,
tracing::Level::DEBUG => LogEntryLevel::Debug,
tracing::Level::INFO => LogEntryLevel::Info,
tracing::Level::WARN => LogEntryLevel::Warn,
tracing::Level::ERROR => LogEntryLevel::Error,
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
use tracing_subscriber::layer::SubscriberExt;
#[test]
fn emits_log_entry_notification_with_structured_fields() {
let (sender, mut receiver) = mpsc::channel(4);
let subscriber =
tracing_subscriber::registry().with(LogNotificationLayer::new(sender, true));
tracing::subscriber::with_default(subscriber, || {
let span = tracing::info_span!(
"app_server.request",
rpc.method = "thread/start",
app_server.connection_id = 7_u64
);
let _guard = span.enter();
tracing::warn!(attempt = 2_u64, retryable = true, "listener queue is full");
});
let notification = next_log_entry(&mut receiver);
assert_eq!(notification.level, LogEntryLevel::Warn);
assert_eq!(notification.target, module_path!());
assert_eq!(notification.message, "listener queue is full");
assert_eq!(notification.fields.get("attempt"), Some(&json!(2)));
assert_eq!(notification.fields.get("retryable"), Some(&json!(true)));
assert!(notification.timestamp > 0);
assert_eq!(
notification.span,
Some(LogSpanContext {
name: "app_server.request".to_string(),
fields: BTreeMap::from([
("app_server.connection_id".to_string(), json!(7)),
("rpc.method".to_string(), json!("thread/start")),
]),
})
);
}
#[test]
fn skips_notifications_when_disabled() {
let (sender, mut receiver) = mpsc::channel(1);
let subscriber =
tracing_subscriber::registry().with(LogNotificationLayer::new(sender, false));
tracing::subscriber::with_default(subscriber, || {
tracing::info!("this should not be delivered");
});
assert!(receiver.try_recv().is_err());
}
fn next_log_entry(receiver: &mut mpsc::Receiver<OutgoingEnvelope>) -> LogEntryNotification {
let envelope = receiver.try_recv().expect("missing log notification");
let OutgoingEnvelope::Broadcast { message } = envelope else {
panic!("expected broadcast envelope");
};
let OutgoingMessage::AppServerNotification(ServerNotification::LogEntry(notification)) =
message
else {
panic!("expected log/entry notification");
};
notification
}
}

View File

@@ -93,6 +93,7 @@ pub struct McpProcess {
}
pub const DEFAULT_CLIENT_NAME: &str = "codex-app-server-tests";
const LOG_ENTRY_NOTIFICATION_METHOD: &str = "log/entry";
impl McpProcess {
pub async fn new(codex_home: &Path) -> anyhow::Result<Self> {
@@ -211,6 +212,18 @@ impl McpProcess {
&mut self,
client_info: ClientInfo,
capabilities: Option<InitializeCapabilities>,
) -> anyhow::Result<JSONRPCMessage> {
self.initialize_with_params(InitializeParams {
client_info,
capabilities: capabilities.map(with_default_opted_out_notifications),
})
.await
}
pub async fn initialize_with_capabilities_allow_log_entry(
&mut self,
client_info: ClientInfo,
capabilities: Option<InitializeCapabilities>,
) -> anyhow::Result<JSONRPCMessage> {
self.initialize_with_params(InitializeParams {
client_info,
@@ -1159,6 +1172,21 @@ impl McpProcess {
}
}
fn with_default_opted_out_notifications(
mut capabilities: InitializeCapabilities,
) -> InitializeCapabilities {
let opt_out_notification_methods = capabilities
.opt_out_notification_methods
.get_or_insert_with(Vec::new);
if !opt_out_notification_methods
.iter()
.any(|method| method == LOG_ENTRY_NOTIFICATION_METHOD)
{
opt_out_notification_methods.push(LOG_ENTRY_NOTIFICATION_METHOD.to_string());
}
capabilities
}
impl Drop for McpProcess {
fn drop(&mut self) {
// These tests spawn a `codex-app-server` child process.

View File

@@ -8,6 +8,7 @@ use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LogEntryNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -196,6 +197,81 @@ async fn initialize_opt_out_notification_methods_filters_notifications() -> Resu
Ok(())
}
#[tokio::test]
async fn initialize_can_opt_in_to_log_entry_notifications() -> Result<()> {
let responses = Vec::new();
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
let message = timeout(
DEFAULT_READ_TIMEOUT,
mcp.initialize_with_capabilities_allow_log_entry(
ClientInfo {
name: "codex_vscode".to_string(),
title: Some("Codex VS Code Extension".to_string()),
version: "0.1.0".to_string(),
},
Some(InitializeCapabilities {
experimental_api: true,
opt_out_notification_methods: None,
}),
),
)
.await??;
let JSONRPCMessage::Response(_) = message else {
anyhow::bail!("expected initialize response, got {message:?}");
};
let request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let mut saw_log_entry = false;
let mut saw_response = false;
let mut saw_thread_started = false;
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
match mcp.read_next_message().await? {
JSONRPCMessage::Response(response)
if response.id == RequestId::Integer(request_id) =>
{
let _: ThreadStartResponse = to_response(response)?;
saw_response = true;
}
JSONRPCMessage::Notification(notification)
if notification.method == "log/entry" =>
{
let params = notification
.params
.ok_or_else(|| anyhow::format_err!("missing log/entry params"))?;
let log_entry = serde_json::from_value::<LogEntryNotification>(params)?;
if log_entry.target.contains("codex_app_server") || log_entry.target == "log" {
saw_log_entry = true;
}
}
JSONRPCMessage::Notification(notification)
if notification.method == "thread/started" =>
{
saw_thread_started = true;
}
_ => {}
}
if saw_log_entry && saw_response && saw_thread_started {
return Ok::<(), anyhow::Error>(());
}
}
})
.await??;
assert!(
saw_log_entry,
"expected to receive at least one log/entry notification"
);
Ok(())
}
#[tokio::test]
async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];