mirror of
https://github.com/openai/codex.git
synced 2026-04-18 21:41:53 +03:00
Compare commits
10 Commits
latest-alp
...
goal-mode-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
77ece672f3 | ||
|
|
846d5203a2 | ||
|
|
7288c8061a | ||
|
|
59540e0a0a | ||
|
|
cd7929f796 | ||
|
|
e7ad22b86c | ||
|
|
38e7367af9 | ||
|
|
a7897034ee | ||
|
|
00ac0b1b3e | ||
|
|
6ae8cf4c4c |
@@ -398,6 +398,9 @@
|
||||
"general_analytics": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"goal_mode": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"guardian_approval": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -2294,6 +2297,9 @@
|
||||
"general_analytics": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"goal_mode": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"guardian_approval": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
@@ -169,6 +169,8 @@ pub enum Feature {
|
||||
DefaultModeRequestUserInput,
|
||||
/// Enable automatic review for approval prompts.
|
||||
GuardianApproval,
|
||||
/// Enable persisted thread goals and automatic goal continuation.
|
||||
GoalMode,
|
||||
/// Enable collaboration modes (Plan, Default).
|
||||
/// Kept for config backward compatibility; behavior is always collaboration-modes-enabled.
|
||||
CollaborationModes,
|
||||
@@ -884,6 +886,12 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
},
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::GoalMode,
|
||||
key: "goal_mode",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::CollaborationModes,
|
||||
key: "collaboration_modes",
|
||||
|
||||
@@ -121,6 +121,13 @@ fn request_permissions_tool_is_under_development() {
|
||||
assert_eq!(Feature::RequestPermissionsTool.default_enabled(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn goal_mode_is_under_development() {
|
||||
assert_eq!(Feature::GoalMode.stage(), Stage::UnderDevelopment);
|
||||
assert_eq!(Feature::GoalMode.default_enabled(), false);
|
||||
assert_eq!(feature_for_key("goal_mode"), Some(Feature::GoalMode));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_suggest_is_stable_and_enabled_by_default() {
|
||||
assert_eq!(Feature::ToolSuggest.stage(), Stage::Stable);
|
||||
|
||||
10
codex-rs/state/migrations/0026_thread_goals.sql
Normal file
10
codex-rs/state/migrations/0026_thread_goals.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
CREATE TABLE thread_goals (
|
||||
thread_id TEXT PRIMARY KEY NOT NULL REFERENCES threads(id) ON DELETE CASCADE,
|
||||
objective TEXT NOT NULL,
|
||||
status TEXT NOT NULL CHECK(status IN ('active', 'paused', 'budget_limited', 'complete')),
|
||||
token_budget INTEGER,
|
||||
tokens_used INTEGER NOT NULL DEFAULT 0,
|
||||
time_used_seconds INTEGER NOT NULL DEFAULT 0,
|
||||
created_at_ms INTEGER NOT NULL,
|
||||
updated_at_ms INTEGER NOT NULL
|
||||
);
|
||||
@@ -44,11 +44,16 @@ pub use model::Stage1JobClaimOutcome;
|
||||
pub use model::Stage1Output;
|
||||
pub use model::Stage1OutputRef;
|
||||
pub use model::Stage1StartupClaimParams;
|
||||
pub use model::ThreadGoal;
|
||||
pub use model::ThreadGoalStatus;
|
||||
pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::RemoteControlEnrollmentRecord;
|
||||
pub use runtime::ThreadFilterOptions;
|
||||
pub use runtime::ThreadGoalAccountingMode;
|
||||
pub use runtime::ThreadGoalAccountingOutcome;
|
||||
pub use runtime::ThreadGoalUpdate;
|
||||
pub use runtime::logs_db_filename;
|
||||
pub use runtime::logs_db_path;
|
||||
pub use runtime::state_db_filename;
|
||||
|
||||
@@ -3,6 +3,7 @@ mod backfill_state;
|
||||
mod graph;
|
||||
mod log;
|
||||
mod memories;
|
||||
mod thread_goal;
|
||||
mod thread_metadata;
|
||||
|
||||
pub use agent_job::AgentJob;
|
||||
@@ -25,6 +26,8 @@ pub use memories::Stage1JobClaimOutcome;
|
||||
pub use memories::Stage1Output;
|
||||
pub use memories::Stage1OutputRef;
|
||||
pub use memories::Stage1StartupClaimParams;
|
||||
pub use thread_goal::ThreadGoal;
|
||||
pub use thread_goal::ThreadGoalStatus;
|
||||
pub use thread_metadata::Anchor;
|
||||
pub use thread_metadata::BackfillStats;
|
||||
pub use thread_metadata::ExtractionOutcome;
|
||||
@@ -38,6 +41,7 @@ pub(crate) use agent_job::AgentJobItemRow;
|
||||
pub(crate) use agent_job::AgentJobRow;
|
||||
pub(crate) use memories::Stage1OutputRow;
|
||||
pub(crate) use memories::stage1_output_ref_from_parts;
|
||||
pub(crate) use thread_goal::ThreadGoalRow;
|
||||
pub(crate) use thread_metadata::ThreadRow;
|
||||
pub(crate) use thread_metadata::anchor_from_item;
|
||||
pub(crate) use thread_metadata::datetime_to_epoch_millis;
|
||||
|
||||
105
codex-rs/state/src/model/thread_goal.rs
Normal file
105
codex-rs/state/src/model/thread_goal.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteRow;
|
||||
|
||||
use super::epoch_millis_to_datetime;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ThreadGoalStatus {
|
||||
Active,
|
||||
Paused,
|
||||
BudgetLimited,
|
||||
Complete,
|
||||
}
|
||||
|
||||
impl ThreadGoalStatus {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Active => "active",
|
||||
Self::Paused => "paused",
|
||||
Self::BudgetLimited => "budget_limited",
|
||||
Self::Complete => "complete",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_active(self) -> bool {
|
||||
self == Self::Active
|
||||
}
|
||||
|
||||
pub fn is_terminal(self) -> bool {
|
||||
matches!(self, Self::BudgetLimited | Self::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for ThreadGoalStatus {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self> {
|
||||
match value {
|
||||
"active" => Ok(Self::Active),
|
||||
"paused" => Ok(Self::Paused),
|
||||
"budget_limited" => Ok(Self::BudgetLimited),
|
||||
"complete" => Ok(Self::Complete),
|
||||
other => Err(anyhow!("unknown thread goal status `{other}`")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadGoal {
|
||||
pub thread_id: ThreadId,
|
||||
pub objective: String,
|
||||
pub status: ThreadGoalStatus,
|
||||
pub token_budget: Option<i64>,
|
||||
pub tokens_used: i64,
|
||||
pub time_used_seconds: i64,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
pub(crate) struct ThreadGoalRow {
|
||||
pub thread_id: String,
|
||||
pub objective: String,
|
||||
pub status: String,
|
||||
pub token_budget: Option<i64>,
|
||||
pub tokens_used: i64,
|
||||
pub time_used_seconds: i64,
|
||||
pub created_at_ms: i64,
|
||||
pub updated_at_ms: i64,
|
||||
}
|
||||
|
||||
impl ThreadGoalRow {
|
||||
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
thread_id: row.try_get("thread_id")?,
|
||||
objective: row.try_get("objective")?,
|
||||
status: row.try_get("status")?,
|
||||
token_budget: row.try_get("token_budget")?,
|
||||
tokens_used: row.try_get("tokens_used")?,
|
||||
time_used_seconds: row.try_get("time_used_seconds")?,
|
||||
created_at_ms: row.try_get("created_at_ms")?,
|
||||
updated_at_ms: row.try_get("updated_at_ms")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ThreadGoalRow> for ThreadGoal {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(row: ThreadGoalRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
thread_id: ThreadId::try_from(row.thread_id)?,
|
||||
objective: row.objective,
|
||||
status: ThreadGoalStatus::try_from(row.status.as_str())?,
|
||||
token_budget: row.token_budget,
|
||||
tokens_used: row.tokens_used,
|
||||
time_used_seconds: row.time_used_seconds,
|
||||
created_at: epoch_millis_to_datetime(row.created_at_ms)?,
|
||||
updated_at: epoch_millis_to_datetime(row.updated_at_ms)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ use crate::apply_rollout_item;
|
||||
use crate::migrations::runtime_logs_migrator;
|
||||
use crate::migrations::runtime_state_migrator;
|
||||
use crate::model::AgentJobRow;
|
||||
use crate::model::ThreadGoalRow;
|
||||
use crate::model::ThreadRow;
|
||||
use crate::model::anchor_from_item;
|
||||
use crate::model::datetime_to_epoch_millis;
|
||||
@@ -55,6 +56,7 @@ use tracing::warn;
|
||||
|
||||
mod agent_jobs;
|
||||
mod backfill;
|
||||
mod goals;
|
||||
mod logs;
|
||||
mod memories;
|
||||
mod remote_control;
|
||||
@@ -62,6 +64,9 @@ mod remote_control;
|
||||
mod test_support;
|
||||
mod threads;
|
||||
|
||||
pub use goals::ThreadGoalAccountingMode;
|
||||
pub use goals::ThreadGoalAccountingOutcome;
|
||||
pub use goals::ThreadGoalUpdate;
|
||||
pub use remote_control::RemoteControlEnrollmentRecord;
|
||||
pub use threads::ThreadFilterOptions;
|
||||
|
||||
|
||||
626
codex-rs/state/src/runtime/goals.rs
Normal file
626
codex-rs/state/src/runtime/goals.rs
Normal file
@@ -0,0 +1,626 @@
|
||||
use super::*;
|
||||
|
||||
pub struct ThreadGoalUpdate {
|
||||
pub status: Option<crate::ThreadGoalStatus>,
|
||||
pub token_budget: Option<Option<i64>>,
|
||||
}
|
||||
|
||||
pub enum ThreadGoalAccountingOutcome {
|
||||
Unchanged(Option<crate::ThreadGoal>),
|
||||
Updated(crate::ThreadGoal),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum ThreadGoalAccountingMode {
|
||||
ActiveOnly,
|
||||
ActiveOrComplete,
|
||||
}
|
||||
|
||||
impl StateRuntime {
|
||||
pub async fn get_thread_goal(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> anyhow::Result<Option<crate::ThreadGoal>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
thread_id,
|
||||
objective,
|
||||
status,
|
||||
token_budget,
|
||||
tokens_used,
|
||||
time_used_seconds,
|
||||
created_at_ms,
|
||||
updated_at_ms
|
||||
FROM thread_goals
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
row.map(|row| ThreadGoalRow::try_from_row(&row).and_then(crate::ThreadGoal::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub async fn replace_thread_goal(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
objective: &str,
|
||||
status: crate::ThreadGoalStatus,
|
||||
token_budget: Option<i64>,
|
||||
) -> anyhow::Result<crate::ThreadGoal> {
|
||||
let now_ms = datetime_to_epoch_millis(Utc::now());
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_goals (
|
||||
thread_id,
|
||||
objective,
|
||||
status,
|
||||
token_budget,
|
||||
tokens_used,
|
||||
time_used_seconds,
|
||||
created_at_ms,
|
||||
updated_at_ms
|
||||
) VALUES (?, ?, ?, ?, 0, 0, ?, ?)
|
||||
ON CONFLICT(thread_id) DO UPDATE SET
|
||||
objective = excluded.objective,
|
||||
status = excluded.status,
|
||||
token_budget = excluded.token_budget,
|
||||
tokens_used = 0,
|
||||
time_used_seconds = 0,
|
||||
created_at_ms = excluded.created_at_ms,
|
||||
updated_at_ms = excluded.updated_at_ms
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id.to_string())
|
||||
.bind(objective)
|
||||
.bind(status.as_str())
|
||||
.bind(token_budget)
|
||||
.bind(now_ms)
|
||||
.bind(now_ms)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
self.get_thread_goal(thread_id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("thread goal disappeared after replacement"))
|
||||
}
|
||||
|
||||
pub async fn update_thread_goal(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
update: ThreadGoalUpdate,
|
||||
) -> anyhow::Result<Option<crate::ThreadGoal>> {
|
||||
let now_ms = datetime_to_epoch_millis(Utc::now());
|
||||
let result = match (update.status, update.token_budget) {
|
||||
(Some(status), Some(token_budget)) => {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_goals
|
||||
SET
|
||||
status = CASE
|
||||
WHEN ? = 'active' AND ? IS NOT NULL AND tokens_used >= ? THEN ?
|
||||
ELSE ?
|
||||
END,
|
||||
token_budget = ?,
|
||||
updated_at_ms = ?
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(status.as_str())
|
||||
.bind(token_budget)
|
||||
.bind(token_budget)
|
||||
.bind(crate::ThreadGoalStatus::BudgetLimited.as_str())
|
||||
.bind(status.as_str())
|
||||
.bind(token_budget)
|
||||
.bind(now_ms)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?
|
||||
}
|
||||
(Some(status), None) => {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_goals
|
||||
SET
|
||||
status = CASE
|
||||
WHEN ? = 'active' AND token_budget IS NOT NULL AND tokens_used >= token_budget THEN ?
|
||||
ELSE ?
|
||||
END,
|
||||
updated_at_ms = ?
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(status.as_str())
|
||||
.bind(crate::ThreadGoalStatus::BudgetLimited.as_str())
|
||||
.bind(status.as_str())
|
||||
.bind(now_ms)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?
|
||||
}
|
||||
(None, Some(token_budget)) => {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE thread_goals
|
||||
SET
|
||||
token_budget = ?,
|
||||
status = CASE
|
||||
WHEN status = 'active' AND ? IS NOT NULL AND tokens_used >= ? THEN ?
|
||||
ELSE status
|
||||
END,
|
||||
updated_at_ms = ?
|
||||
WHERE thread_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(token_budget)
|
||||
.bind(token_budget)
|
||||
.bind(token_budget)
|
||||
.bind(crate::ThreadGoalStatus::BudgetLimited.as_str())
|
||||
.bind(now_ms)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?
|
||||
}
|
||||
(None, None) => return self.get_thread_goal(thread_id).await,
|
||||
};
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.get_thread_goal(thread_id).await
|
||||
}
|
||||
|
||||
pub async fn account_thread_goal_usage(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
time_delta_seconds: i64,
|
||||
token_delta: i64,
|
||||
mode: ThreadGoalAccountingMode,
|
||||
) -> anyhow::Result<ThreadGoalAccountingOutcome> {
|
||||
let time_delta_seconds = time_delta_seconds.max(0);
|
||||
let token_delta = token_delta.max(0);
|
||||
if time_delta_seconds == 0 && token_delta == 0 {
|
||||
return Ok(ThreadGoalAccountingOutcome::Unchanged(
|
||||
self.get_thread_goal(thread_id).await?,
|
||||
));
|
||||
}
|
||||
|
||||
let now_ms = datetime_to_epoch_millis(Utc::now());
|
||||
let status_filter = match mode {
|
||||
ThreadGoalAccountingMode::ActiveOnly => "status = 'active'",
|
||||
ThreadGoalAccountingMode::ActiveOrComplete => "status IN ('active', 'complete')",
|
||||
};
|
||||
let query = format!(
|
||||
r#"
|
||||
UPDATE thread_goals
|
||||
SET
|
||||
time_used_seconds = time_used_seconds + ?,
|
||||
tokens_used = tokens_used + ?,
|
||||
status = CASE
|
||||
WHEN status = 'active' AND token_budget IS NOT NULL AND tokens_used + ? >= token_budget
|
||||
THEN ?
|
||||
ELSE status
|
||||
END,
|
||||
updated_at_ms = ?
|
||||
WHERE thread_id = ?
|
||||
AND {status_filter}
|
||||
"#,
|
||||
);
|
||||
|
||||
let result = sqlx::query(&query)
|
||||
.bind(time_delta_seconds)
|
||||
.bind(token_delta)
|
||||
.bind(token_delta)
|
||||
.bind(crate::ThreadGoalStatus::BudgetLimited.as_str())
|
||||
.bind(now_ms)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
|
||||
if result.rows_affected() == 0 {
|
||||
return Ok(ThreadGoalAccountingOutcome::Unchanged(
|
||||
self.get_thread_goal(thread_id).await?,
|
||||
));
|
||||
}
|
||||
|
||||
let updated = self
|
||||
.get_thread_goal(thread_id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("thread goal disappeared after usage accounting"))?;
|
||||
Ok(ThreadGoalAccountingOutcome::Updated(updated))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::runtime::test_support::test_thread_metadata;
|
||||
use crate::runtime::test_support::unique_temp_dir;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
async fn test_runtime() -> std::sync::Arc<StateRuntime> {
|
||||
StateRuntime::init(unique_temp_dir(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize")
|
||||
}
|
||||
|
||||
fn test_thread_id() -> ThreadId {
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000123").expect("valid thread id")
|
||||
}
|
||||
|
||||
async fn upsert_test_thread(runtime: &StateRuntime, thread_id: ThreadId) {
|
||||
let metadata = test_thread_metadata(
|
||||
runtime.codex_home(),
|
||||
thread_id,
|
||||
runtime.codex_home().join("workspace"),
|
||||
);
|
||||
runtime
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.expect("test thread should be upserted");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replace_update_and_get_thread_goal() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
|
||||
let goal = runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"optimize the benchmark",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(100_000),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
assert_eq!(
|
||||
Some(goal.clone()),
|
||||
runtime.get_thread_goal(thread_id).await.unwrap()
|
||||
);
|
||||
|
||||
let updated = runtime
|
||||
.update_thread_goal(
|
||||
thread_id,
|
||||
ThreadGoalUpdate {
|
||||
status: Some(crate::ThreadGoalStatus::Paused),
|
||||
token_budget: Some(Some(200_000)),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("goal update should succeed")
|
||||
.expect("goal should exist");
|
||||
let expected = crate::ThreadGoal {
|
||||
status: crate::ThreadGoalStatus::Paused,
|
||||
token_budget: Some(200_000),
|
||||
updated_at: updated.updated_at,
|
||||
..goal.clone()
|
||||
};
|
||||
assert_eq!(expected, updated);
|
||||
|
||||
let replaced = runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"ship the new result",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
assert_eq!("ship the new result", replaced.objective);
|
||||
assert_eq!(crate::ThreadGoalStatus::Active, replaced.status);
|
||||
assert_eq!(None, replaced.token_budget);
|
||||
assert_eq!(0, replaced.tokens_used);
|
||||
assert_eq!(0, replaced.time_used_seconds);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn concurrent_partial_updates_preserve_independent_fields() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"optimize the benchmark",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(100_000),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
|
||||
let status_update = runtime.update_thread_goal(
|
||||
thread_id,
|
||||
ThreadGoalUpdate {
|
||||
status: Some(crate::ThreadGoalStatus::Paused),
|
||||
token_budget: None,
|
||||
},
|
||||
);
|
||||
let budget_update = runtime.update_thread_goal(
|
||||
thread_id,
|
||||
ThreadGoalUpdate {
|
||||
status: None,
|
||||
token_budget: Some(Some(200_000)),
|
||||
},
|
||||
);
|
||||
let (status_update, budget_update) = tokio::join!(status_update, budget_update);
|
||||
status_update.expect("status update should succeed");
|
||||
budget_update.expect("budget update should succeed");
|
||||
|
||||
let goal = runtime
|
||||
.get_thread_goal(thread_id)
|
||||
.await
|
||||
.expect("goal read should succeed")
|
||||
.expect("goal should exist");
|
||||
assert_eq!(crate::ThreadGoalStatus::Paused, goal.status);
|
||||
assert_eq!(Some(200_000), goal.token_budget);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn usage_accounting_updates_active_goals_and_stops_on_budget() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"stay within budget",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(20),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
|
||||
let outcome = runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 7,
|
||||
/*token_delta*/ 5,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
let ThreadGoalAccountingOutcome::Updated(goal) = outcome else {
|
||||
panic!("active goal should be updated");
|
||||
};
|
||||
assert_eq!(crate::ThreadGoalStatus::Active, goal.status);
|
||||
assert_eq!(5, goal.tokens_used);
|
||||
assert_eq!(7, goal.time_used_seconds);
|
||||
|
||||
let outcome = runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 3,
|
||||
/*token_delta*/ 15,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
let ThreadGoalAccountingOutcome::Updated(goal) = outcome else {
|
||||
panic!("budget crossing should update the goal");
|
||||
};
|
||||
assert_eq!(crate::ThreadGoalStatus::BudgetLimited, goal.status);
|
||||
assert_eq!(20, goal.tokens_used);
|
||||
assert_eq!(10, goal.time_used_seconds);
|
||||
|
||||
let outcome = runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 5,
|
||||
/*token_delta*/ 5,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
let ThreadGoalAccountingOutcome::Unchanged(Some(goal)) = outcome else {
|
||||
panic!("terminal goal should not continue accounting");
|
||||
};
|
||||
assert_eq!(crate::ThreadGoalStatus::BudgetLimited, goal.status);
|
||||
assert_eq!(20, goal.tokens_used);
|
||||
assert_eq!(10, goal.time_used_seconds);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn budget_updates_immediately_stop_active_goals_already_over_budget() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"stay within budget",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(100),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 1,
|
||||
/*token_delta*/ 50,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
|
||||
let lowered = runtime
|
||||
.update_thread_goal(
|
||||
thread_id,
|
||||
ThreadGoalUpdate {
|
||||
status: None,
|
||||
token_budget: Some(Some(40)),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("goal update should succeed")
|
||||
.expect("goal should exist");
|
||||
|
||||
assert_eq!(crate::ThreadGoalStatus::BudgetLimited, lowered.status);
|
||||
assert_eq!(Some(40), lowered.token_budget);
|
||||
assert_eq!(50, lowered.tokens_used);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn activating_goal_already_over_budget_keeps_it_budget_limited() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"stay within budget",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(40),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 1,
|
||||
/*token_delta*/ 50,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
|
||||
let reactivated = runtime
|
||||
.update_thread_goal(
|
||||
thread_id,
|
||||
ThreadGoalUpdate {
|
||||
status: Some(crate::ThreadGoalStatus::Active),
|
||||
token_budget: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("goal update should succeed")
|
||||
.expect("goal should exist");
|
||||
|
||||
assert_eq!(crate::ThreadGoalStatus::BudgetLimited, reactivated.status);
|
||||
assert_eq!(Some(40), reactivated.token_budget);
|
||||
assert_eq!(50, reactivated.tokens_used);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn usage_accounting_can_finalize_completed_goal_for_completing_turn() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"finish the report",
|
||||
crate::ThreadGoalStatus::Complete,
|
||||
/*token_budget*/ Some(1_000),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
|
||||
let active_only = runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 30,
|
||||
/*token_delta*/ 200,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
let ThreadGoalAccountingOutcome::Unchanged(Some(goal)) = active_only else {
|
||||
panic!("completed goal should not be updated by active-only accounting");
|
||||
};
|
||||
assert_eq!(crate::ThreadGoalStatus::Complete, goal.status);
|
||||
assert_eq!(0, goal.tokens_used);
|
||||
assert_eq!(0, goal.time_used_seconds);
|
||||
|
||||
let completing_turn = runtime
|
||||
.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 30,
|
||||
/*token_delta*/ 200,
|
||||
ThreadGoalAccountingMode::ActiveOrComplete,
|
||||
)
|
||||
.await
|
||||
.expect("usage accounting should succeed");
|
||||
let ThreadGoalAccountingOutcome::Updated(goal) = completing_turn else {
|
||||
panic!("completed goal should be updated for final accounting");
|
||||
};
|
||||
assert_eq!(crate::ThreadGoalStatus::Complete, goal.status);
|
||||
assert_eq!(200, goal.tokens_used);
|
||||
assert_eq!(30, goal.time_used_seconds);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn usage_accounting_adds_concurrent_token_deltas() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"count every token",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ Some(1_000),
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
|
||||
let first = runtime.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 4,
|
||||
/*token_delta*/ 40,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
);
|
||||
let second = runtime.account_thread_goal_usage(
|
||||
thread_id,
|
||||
/*time_delta_seconds*/ 6,
|
||||
/*token_delta*/ 60,
|
||||
ThreadGoalAccountingMode::ActiveOnly,
|
||||
);
|
||||
let (first, second) = tokio::join!(first, second);
|
||||
first.expect("first usage accounting should succeed");
|
||||
second.expect("second usage accounting should succeed");
|
||||
|
||||
let goal = runtime
|
||||
.get_thread_goal(thread_id)
|
||||
.await
|
||||
.expect("goal read should succeed")
|
||||
.expect("goal should exist");
|
||||
assert_eq!(100, goal.tokens_used);
|
||||
assert_eq!(10, goal.time_used_seconds);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deleting_thread_deletes_goal() {
|
||||
let runtime = test_runtime().await;
|
||||
let thread_id = test_thread_id();
|
||||
upsert_test_thread(&runtime, thread_id).await;
|
||||
runtime
|
||||
.replace_thread_goal(
|
||||
thread_id,
|
||||
"clean up with the thread",
|
||||
crate::ThreadGoalStatus::Active,
|
||||
/*token_budget*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("goal replacement should succeed");
|
||||
|
||||
runtime
|
||||
.delete_thread(thread_id)
|
||||
.await
|
||||
.expect("thread deletion should succeed");
|
||||
|
||||
assert_eq!(
|
||||
None,
|
||||
runtime
|
||||
.get_thread_goal(thread_id)
|
||||
.await
|
||||
.expect("goal read should succeed")
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user