chore: better session recycling (#7368)

This commit is contained in:
jif-oai
2025-11-30 20:42:26 +00:00
committed by GitHub
parent 6eeaf46ac1
commit 457c9fdb87
2 changed files with 49 additions and 24 deletions

View File

@@ -98,8 +98,26 @@ pub(crate) struct UnifiedExecResponse {
#[derive(Default)]
pub(crate) struct UnifiedExecSessionManager {
sessions: Mutex<HashMap<String, SessionEntry>>,
used_session_ids: Mutex<HashSet<String>>,
session_store: Mutex<SessionStore>,
}
// Required for mutex sharing.
#[derive(Default)]
pub(crate) struct SessionStore {
sessions: HashMap<String, SessionEntry>,
reserved_sessions_id: HashSet<String>,
}
impl SessionStore {
fn remove(&mut self, session_id: &str) -> Option<SessionEntry> {
self.reserved_sessions_id.remove(session_id);
self.sessions.remove(session_id)
}
pub(crate) fn clear(&mut self) {
self.reserved_sessions_id.clear();
self.sessions.clear();
}
}
struct SessionEntry {
@@ -384,9 +402,10 @@ mod tests {
session
.services
.unified_exec_manager
.sessions
.session_store
.lock()
.await
.sessions
.is_empty()
);
@@ -425,9 +444,10 @@ mod tests {
session
.services
.unified_exec_manager
.sessions
.session_store
.lock()
.await
.sessions
.is_empty()
);

View File

@@ -36,6 +36,7 @@ use crate::truncate::formatted_truncate_text;
use super::ExecCommandRequest;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::SessionStore;
use super::UnifiedExecContext;
use super::UnifiedExecError;
use super::UnifiedExecResponse;
@@ -81,7 +82,7 @@ struct PreparedSessionHandles {
impl UnifiedExecSessionManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
let mut store = self.used_session_ids.lock().await;
let mut store = self.session_store.lock().await;
let process_id = if !cfg!(test) && !cfg!(feature = "deterministic_process_ids") {
// production mode → random
@@ -89,6 +90,7 @@ impl UnifiedExecSessionManager {
} else {
// test or deterministic mode
let next = store
.reserved_sessions_id
.iter()
.filter_map(|s| s.parse::<i32>().ok())
.max()
@@ -98,11 +100,11 @@ impl UnifiedExecSessionManager {
next.to_string()
};
if store.contains(&process_id) {
if store.reserved_sessions_id.contains(&process_id) {
continue;
}
store.insert(process_id.clone());
store.reserved_sessions_id.insert(process_id.clone());
return process_id;
}
}
@@ -331,8 +333,8 @@ impl UnifiedExecSessionManager {
}
async fn refresh_session_state(&self, process_id: &str) -> SessionStatus {
let mut sessions = self.sessions.lock().await;
let Some(entry) = sessions.get(process_id) else {
let mut store = self.session_store.lock().await;
let Some(entry) = store.sessions.get(process_id) else {
return SessionStatus::Unknown;
};
@@ -340,7 +342,7 @@ impl UnifiedExecSessionManager {
let process_id = entry.process_id.clone();
if entry.session.has_exited() {
let Some(entry) = sessions.remove(&process_id) else {
let Some(entry) = store.remove(&process_id) else {
return SessionStatus::Unknown;
};
SessionStatus::Exited {
@@ -360,12 +362,14 @@ impl UnifiedExecSessionManager {
&self,
process_id: &str,
) -> Result<PreparedSessionHandles, UnifiedExecError> {
let mut sessions = self.sessions.lock().await;
let entry = sessions
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownSessionId {
process_id: process_id.to_string(),
})?;
let mut store = self.session_store.lock().await;
let entry =
store
.sessions
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownSessionId {
process_id: process_id.to_string(),
})?;
entry.last_used = Instant::now();
let OutputHandles {
output_buffer,
@@ -417,9 +421,9 @@ impl UnifiedExecSessionManager {
started_at,
last_used: started_at,
};
let mut sessions = self.sessions.lock().await;
Self::prune_sessions_if_needed(&mut sessions);
sessions.insert(process_id, entry);
let mut store = self.session_store.lock().await;
Self::prune_sessions_if_needed(&mut store);
store.sessions.insert(process_id, entry);
}
async fn emit_exec_end_from_entry(
@@ -629,18 +633,19 @@ impl UnifiedExecSessionManager {
collected
}
fn prune_sessions_if_needed(sessions: &mut HashMap<String, SessionEntry>) {
if sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
fn prune_sessions_if_needed(store: &mut SessionStore) {
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
return;
}
let meta: Vec<(String, Instant, bool)> = sessions
let meta: Vec<(String, Instant, bool)> = store
.sessions
.iter()
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
.collect();
if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
sessions.remove(&session_id);
store.remove(&session_id);
}
}
@@ -674,7 +679,7 @@ impl UnifiedExecSessionManager {
}
pub(crate) async fn terminate_all_sessions(&self) {
let mut sessions = self.sessions.lock().await;
let mut sessions = self.session_store.lock().await;
sessions.clear();
}
}