mirror of
https://github.com/openai/codex.git
synced 2026-04-29 02:41:12 +03:00
[feat] persist thread_dynamic_tools in db (#10252)
Persist thread_dynamic_tools in sqlite and read first from it. Fall back
to rollout files if it's not found. Persist dynamic tools to both sqlite
and rollout files.
Saw that new sessions get populated to db correctly & old sessions get
backfilled correctly at startup:
```
celia@com-92114 codex-rs % sqlite3 ~/.codex/state.sqlite \ "select thread_id, position,name,description,input_schema from thread_dynamic_tools;"
019c0cad-ec0d-74b2-a787-e8b33a349117|0|geo_lookup|lookup a city|{"properties":{"city":{"type":"string"}},"required":["city"],"type":"object"}
....
019c10ca-aa4b-7620-ae40-c0919fbd7ea7|0|geo_lookup|lookup a city|{"properties":{"city":{"type":"string"}},"required":["city"],"type":"object"}
```
This commit is contained in:
@@ -16,8 +16,10 @@ use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::dynamic_tools::DynamicToolSpec;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use log::LevelFilter;
|
||||
use serde_json::Value;
|
||||
use sqlx::ConnectOptions;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Row;
|
||||
@@ -117,6 +119,38 @@ WHERE id = ?
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Get dynamic tools for a thread, if present.
|
||||
pub async fn get_dynamic_tools(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> anyhow::Result<Option<Vec<DynamicToolSpec>>> {
|
||||
let rows = sqlx::query(
|
||||
r#"
|
||||
SELECT name, description, input_schema
|
||||
FROM thread_dynamic_tools
|
||||
WHERE thread_id = ?
|
||||
ORDER BY position ASC
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.await?;
|
||||
if rows.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let mut tools = Vec::with_capacity(rows.len());
|
||||
for row in rows {
|
||||
let input_schema: String = row.try_get("input_schema")?;
|
||||
let input_schema = serde_json::from_str::<Value>(input_schema.as_str())?;
|
||||
tools.push(DynamicToolSpec {
|
||||
name: row.try_get("name")?,
|
||||
description: row.try_get("description")?,
|
||||
input_schema,
|
||||
});
|
||||
}
|
||||
Ok(Some(tools))
|
||||
}
|
||||
|
||||
/// Find a rollout path by thread id using the underlying database.
|
||||
pub async fn find_rollout_path_by_id(
|
||||
&self,
|
||||
@@ -369,6 +403,58 @@ ON CONFLICT(id) DO UPDATE SET
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist dynamic tools for a thread if none have been stored yet.
|
||||
///
|
||||
/// Dynamic tools are defined at thread start and should not change afterward.
|
||||
/// This only writes the first time we see tools for a given thread.
|
||||
pub async fn persist_dynamic_tools(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
tools: Option<&[DynamicToolSpec]>,
|
||||
) -> anyhow::Result<()> {
|
||||
let Some(tools) = tools else {
|
||||
return Ok(());
|
||||
};
|
||||
if tools.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let thread_id = thread_id.to_string();
|
||||
let existing: Option<i64> =
|
||||
sqlx::query_scalar("SELECT 1 FROM thread_dynamic_tools WHERE thread_id = ? LIMIT 1")
|
||||
.bind(thread_id.as_str())
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
if existing.is_some() {
|
||||
tx.commit().await?;
|
||||
return Ok(());
|
||||
}
|
||||
for (idx, tool) in tools.iter().enumerate() {
|
||||
let position = i64::try_from(idx).unwrap_or(i64::MAX);
|
||||
let input_schema = serde_json::to_string(&tool.input_schema)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_dynamic_tools (
|
||||
thread_id,
|
||||
position,
|
||||
name,
|
||||
description,
|
||||
input_schema
|
||||
) VALUES (?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.as_str())
|
||||
.bind(position)
|
||||
.bind(tool.name.as_str())
|
||||
.bind(tool.description.as_str())
|
||||
.bind(input_schema)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply rollout items incrementally using the underlying database.
|
||||
pub async fn apply_rollout_items(
|
||||
&self,
|
||||
@@ -390,12 +476,25 @@ ON CONFLICT(id) DO UPDATE SET
|
||||
if let Some(updated_at) = file_modified_time_utc(builder.rollout_path.as_path()).await {
|
||||
metadata.updated_at = updated_at;
|
||||
}
|
||||
// Keep the thread upsert before dynamic tools to satisfy the foreign key constraint:
|
||||
// thread_dynamic_tools.thread_id -> threads.id.
|
||||
if let Err(err) = self.upsert_thread(&metadata).await {
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
let dynamic_tools = extract_dynamic_tools(items);
|
||||
if let Some(dynamic_tools) = dynamic_tools
|
||||
&& let Err(err) = self
|
||||
.persist_dynamic_tools(builder.id, dynamic_tools.as_deref())
|
||||
.await
|
||||
{
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(DB_ERROR_METRIC, 1, &[("stage", "persist_dynamic_tools")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -507,6 +606,16 @@ fn push_like_filters<'a>(
|
||||
builder.push(")");
|
||||
}
|
||||
|
||||
fn extract_dynamic_tools(items: &[RolloutItem]) -> Option<Option<Vec<DynamicToolSpec>>> {
|
||||
items.iter().find_map(|item| match item {
|
||||
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()),
|
||||
RolloutItem::ResponseItem(_)
|
||||
| RolloutItem::Compacted(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
| RolloutItem::EventMsg(_) => None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
|
||||
Reference in New Issue
Block a user