mirror of
https://github.com/openai/codex.git
synced 2026-03-20 04:46:31 +03:00
Compare commits
3 Commits
dev/cc/ref
...
bot/update
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d1ea08f71 | ||
|
|
403b397e4e | ||
|
|
6b8175c734 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2011,6 +2011,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"test-case",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
|
||||
@@ -34,7 +34,7 @@ pub(crate) struct FsApi {
|
||||
impl Default for FsApi {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
file_system: Arc::new(Environment::default().get_filesystem()),
|
||||
file_system: Environment::default().get_filesystem(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -3711,7 +3711,11 @@ async fn handle_output_item_done_records_image_save_history_message() {
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context);
|
||||
let call_id = "ig_history_records_message";
|
||||
let expected_saved_path = std::env::temp_dir().join(format!("{call_id}.png"));
|
||||
let expected_saved_path = crate::stream_events_utils::image_generation_artifact_path(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session.conversation_id.to_string(),
|
||||
call_id,
|
||||
);
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
let item = ResponseItem::ImageGenerationCall {
|
||||
id: call_id.to_string(),
|
||||
@@ -3731,10 +3735,18 @@ async fn handle_output_item_done_records_image_save_history_message() {
|
||||
.expect("image generation item should succeed");
|
||||
|
||||
let history = session.clone_history().await;
|
||||
let image_output_path = crate::stream_events_utils::image_generation_artifact_path(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session.conversation_id.to_string(),
|
||||
"<image_id>",
|
||||
);
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.expect("generated image path should have a parent");
|
||||
let save_message: ResponseItem = DeveloperInstructions::new(format!(
|
||||
"Generated images are saved to {} as {} by default.",
|
||||
std::env::temp_dir().display(),
|
||||
std::env::temp_dir().join("<image_id>.png").display(),
|
||||
image_output_dir.display(),
|
||||
image_output_path.display(),
|
||||
))
|
||||
.into();
|
||||
assert_eq!(history.raw_items(), &[save_message, item]);
|
||||
@@ -3751,7 +3763,11 @@ async fn handle_output_item_done_skips_image_save_message_when_save_fails() {
|
||||
let session = Arc::new(session);
|
||||
let turn_context = Arc::new(turn_context);
|
||||
let call_id = "ig_history_no_message";
|
||||
let expected_saved_path = std::env::temp_dir().join(format!("{call_id}.png"));
|
||||
let expected_saved_path = crate::stream_events_utils::image_generation_artifact_path(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session.conversation_id.to_string(),
|
||||
call_id,
|
||||
);
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
let item = ResponseItem::ImageGenerationCall {
|
||||
id: call_id.to_string(),
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
@@ -30,6 +31,36 @@ use futures::Future;
|
||||
use tracing::debug;
|
||||
use tracing::instrument;
|
||||
|
||||
const GENERATED_IMAGE_ARTIFACTS_DIR: &str = "generated_images";
|
||||
|
||||
pub(crate) fn image_generation_artifact_path(
|
||||
codex_home: &Path,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
) -> PathBuf {
|
||||
let sanitize = |value: &str| {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
};
|
||||
|
||||
codex_home
|
||||
.join(GENERATED_IMAGE_ARTIFACTS_DIR)
|
||||
.join(sanitize(session_id))
|
||||
.join(format!("{}.png", sanitize(call_id)))
|
||||
}
|
||||
|
||||
fn strip_hidden_assistant_markup(text: &str, plan_mode: bool) -> String {
|
||||
let (without_citations, _) = strip_citations(text);
|
||||
if plan_mode {
|
||||
@@ -71,26 +102,21 @@ pub(crate) fn raw_assistant_output_text_from_item(item: &ResponseItem) -> Option
|
||||
None
|
||||
}
|
||||
|
||||
async fn save_image_generation_result(call_id: &str, result: &str) -> Result<PathBuf> {
|
||||
async fn save_image_generation_result(
|
||||
codex_home: &std::path::Path,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
result: &str,
|
||||
) -> Result<PathBuf> {
|
||||
let bytes = BASE64_STANDARD
|
||||
.decode(result.trim().as_bytes())
|
||||
.map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid image generation payload: {err}"))
|
||||
})?;
|
||||
let mut file_stem: String = call_id
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if file_stem.is_empty() {
|
||||
file_stem = "generated_image".to_string();
|
||||
let path = image_generation_artifact_path(codex_home, session_id, call_id);
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
let path = std::env::temp_dir().join(format!("{file_stem}.png"));
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
Ok(path)
|
||||
}
|
||||
@@ -321,14 +347,29 @@ pub(crate) async fn handle_non_tool_response_item(
|
||||
agent_message.memory_citation = memory_citation;
|
||||
}
|
||||
if let TurnItem::ImageGeneration(image_item) = &mut turn_item {
|
||||
match save_image_generation_result(&image_item.id, &image_item.result).await {
|
||||
let session_id = sess.conversation_id.to_string();
|
||||
match save_image_generation_result(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
&image_item.result,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => {
|
||||
image_item.saved_path = Some(path.to_string_lossy().into_owned());
|
||||
let image_output_dir = std::env::temp_dir();
|
||||
let image_output_path = image_generation_artifact_path(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session_id,
|
||||
"<image_id>",
|
||||
);
|
||||
let image_output_dir = image_output_path
|
||||
.parent()
|
||||
.unwrap_or(turn_context.config.codex_home.as_path());
|
||||
let message: ResponseItem = DeveloperInstructions::new(format!(
|
||||
"Generated images are saved to {} as {} by default.",
|
||||
image_output_dir.display(),
|
||||
image_output_dir.join("<image_id>.png").display(),
|
||||
image_output_path.display(),
|
||||
))
|
||||
.into();
|
||||
sess.record_conversation_items(
|
||||
@@ -338,7 +379,14 @@ pub(crate) async fn handle_non_tool_response_item(
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let output_dir = std::env::temp_dir();
|
||||
let output_path = image_generation_artifact_path(
|
||||
turn_context.config.codex_home.as_path(),
|
||||
&session_id,
|
||||
&image_item.id,
|
||||
);
|
||||
let output_dir = output_path
|
||||
.parent()
|
||||
.unwrap_or(turn_context.config.codex_home.as_path());
|
||||
tracing::warn!(
|
||||
call_id = %image_item.id,
|
||||
output_dir = %output_dir.display(),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::handle_non_tool_response_item;
|
||||
use super::image_generation_artifact_path;
|
||||
use super::last_assistant_message_from_item;
|
||||
use super::save_image_generation_result;
|
||||
use crate::codex::make_session_and_context;
|
||||
@@ -80,13 +81,16 @@ fn last_assistant_message_from_item_returns_none_for_plan_only_hidden_message()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_saves_base64_to_png_in_temp_dir() {
|
||||
let expected_path = std::env::temp_dir().join("ig_save_base64.png");
|
||||
async fn save_image_generation_result_saves_base64_to_png_in_codex_home() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let expected_path =
|
||||
image_generation_artifact_path(codex_home.path(), "session-1", "ig_save_base64");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path = save_image_generation_result("ig_save_base64", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path =
|
||||
save_image_generation_result(codex_home.path(), "session-1", "ig_save_base64", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -96,8 +100,9 @@ async fn save_image_generation_result_saves_base64_to_png_in_temp_dir() {
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_rejects_data_url_payload() {
|
||||
let result = "data:image/jpeg;base64,Zm9v";
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
|
||||
let err = save_image_generation_result("ig_456", result)
|
||||
let err = save_image_generation_result(codex_home.path(), "session-1", "ig_456", result)
|
||||
.await
|
||||
.expect_err("data url payload should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
@@ -105,12 +110,21 @@ async fn save_image_generation_result_rejects_data_url_payload() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_overwrites_existing_file() {
|
||||
let existing_path = std::env::temp_dir().join("ig_overwrite.png");
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let existing_path =
|
||||
image_generation_artifact_path(codex_home.path(), "session-1", "ig_overwrite");
|
||||
std::fs::create_dir_all(
|
||||
existing_path
|
||||
.parent()
|
||||
.expect("generated image path should have a parent"),
|
||||
)
|
||||
.expect("create image output dir");
|
||||
std::fs::write(&existing_path, b"existing").expect("seed existing image");
|
||||
|
||||
let saved_path = save_image_generation_result("ig_overwrite", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path =
|
||||
save_image_generation_result(codex_home.path(), "session-1", "ig_overwrite", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, existing_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -118,13 +132,15 @@ async fn save_image_generation_result_overwrites_existing_file() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_sanitizes_call_id_for_temp_dir_output_path() {
|
||||
let expected_path = std::env::temp_dir().join("___ig___.png");
|
||||
async fn save_image_generation_result_sanitizes_call_id_for_codex_home_output_path() {
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let expected_path = image_generation_artifact_path(codex_home.path(), "session-1", "../ig/..");
|
||||
let _ = std::fs::remove_file(&expected_path);
|
||||
|
||||
let saved_path = save_image_generation_result("../ig/..", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
let saved_path =
|
||||
save_image_generation_result(codex_home.path(), "session-1", "../ig/..", "Zm9v")
|
||||
.await
|
||||
.expect("image should be saved");
|
||||
|
||||
assert_eq!(saved_path, expected_path);
|
||||
assert_eq!(std::fs::read(&saved_path).expect("saved file"), b"foo");
|
||||
@@ -133,7 +149,8 @@ async fn save_image_generation_result_sanitizes_call_id_for_temp_dir_output_path
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_rejects_non_standard_base64() {
|
||||
let err = save_image_generation_result("ig_urlsafe", "_-8")
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let err = save_image_generation_result(codex_home.path(), "session-1", "ig_urlsafe", "_-8")
|
||||
.await
|
||||
.expect_err("non-standard base64 should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
@@ -141,8 +158,14 @@ async fn save_image_generation_result_rejects_non_standard_base64() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn save_image_generation_result_rejects_non_base64_data_urls() {
|
||||
let err = save_image_generation_result("ig_svg", "data:image/svg+xml,<svg/>")
|
||||
.await
|
||||
.expect_err("non-base64 data url should error");
|
||||
let codex_home = tempfile::tempdir().expect("create codex home");
|
||||
let err = save_image_generation_result(
|
||||
codex_home.path(),
|
||||
"session-1",
|
||||
"ig_svg",
|
||||
"data:image/svg+xml,<svg/>",
|
||||
)
|
||||
.await
|
||||
.expect_err("non-base64 data url should error");
|
||||
assert!(matches!(err, CodexErr::InvalidRequest(_)));
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use codex_exec_server::ExecutorFileSystem;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
|
||||
@@ -35,6 +35,32 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn image_generation_artifact_path(codex_home: &Path, session_id: &str, call_id: &str) -> PathBuf {
|
||||
fn sanitize(value: &str) -> String {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
}
|
||||
|
||||
codex_home
|
||||
.join("generated_images")
|
||||
.join(sanitize(session_id))
|
||||
.join(format!("{}.png", sanitize(call_id)))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn user_message_item_is_emitted() -> anyhow::Result<()> {
|
||||
@@ -269,9 +295,18 @@ async fn image_generation_call_event_is_emitted() -> anyhow::Result<()> {
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let TestCodex { codex, .. } = test_codex().build(&server).await?;
|
||||
let TestCodex {
|
||||
codex,
|
||||
config,
|
||||
session_configured,
|
||||
..
|
||||
} = test_codex().build(&server).await?;
|
||||
let call_id = "ig_image_saved_to_temp_dir_default";
|
||||
let expected_saved_path = std::env::temp_dir().join(format!("{call_id}.png"));
|
||||
let expected_saved_path = image_generation_artifact_path(
|
||||
config.codex_home.as_path(),
|
||||
&session_configured.session_id.to_string(),
|
||||
call_id,
|
||||
);
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
|
||||
let first_response = sse(vec![
|
||||
@@ -323,8 +358,17 @@ async fn image_generation_call_event_is_emitted_when_image_save_fails() -> anyho
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let TestCodex { codex, .. } = test_codex().build(&server).await?;
|
||||
let expected_saved_path = std::env::temp_dir().join("ig_invalid.png");
|
||||
let TestCodex {
|
||||
codex,
|
||||
config,
|
||||
session_configured,
|
||||
..
|
||||
} = test_codex().build(&server).await?;
|
||||
let expected_saved_path = image_generation_artifact_path(
|
||||
config.codex_home.as_path(),
|
||||
&session_configured.session_id.to_string(),
|
||||
"ig_invalid",
|
||||
);
|
||||
let _ = std::fs::remove_file(&expected_saved_path);
|
||||
|
||||
let first_response = sse(vec![
|
||||
|
||||
@@ -32,8 +32,34 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use wiremock::MockServer;
|
||||
|
||||
fn image_generation_artifact_path(codex_home: &Path, session_id: &str, call_id: &str) -> PathBuf {
|
||||
fn sanitize(value: &str) -> String {
|
||||
let mut sanitized: String = value
|
||||
.chars()
|
||||
.map(|ch| {
|
||||
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
|
||||
ch
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
if sanitized.is_empty() {
|
||||
sanitized = "generated_image".to_string();
|
||||
}
|
||||
sanitized
|
||||
}
|
||||
|
||||
codex_home
|
||||
.join("generated_images")
|
||||
.join(sanitize(session_id))
|
||||
.join(format!("{}.png", sanitize(call_id)))
|
||||
}
|
||||
|
||||
fn test_model_info(
|
||||
slug: &str,
|
||||
display_name: &str,
|
||||
@@ -444,9 +470,6 @@ async fn model_change_from_image_to_text_strips_prior_image_content() -> Result<
|
||||
async fn generated_image_is_replayed_for_image_capable_models() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let saved_path = std::env::temp_dir().join("ig_123.png");
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let image_model_slug = "test-image-model";
|
||||
let image_model = test_model_info(
|
||||
@@ -482,6 +505,12 @@ async fn generated_image_is_replayed_for_image_capable_models() -> Result<()> {
|
||||
config.model = Some(image_model_slug.to_string());
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let saved_path = image_generation_artifact_path(
|
||||
test.codex_home_path(),
|
||||
&test.session_configured.session_id.to_string(),
|
||||
"ig_123",
|
||||
);
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
let models_manager = test.thread_manager.get_models_manager();
|
||||
let _ = models_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
@@ -564,9 +593,6 @@ async fn model_change_from_generated_image_to_text_preserves_prior_generated_ima
|
||||
-> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let saved_path = std::env::temp_dir().join("ig_123.png");
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let image_model_slug = "test-image-model";
|
||||
let text_model_slug = "test-text-only-model";
|
||||
@@ -609,6 +635,12 @@ async fn model_change_from_generated_image_to_text_preserves_prior_generated_ima
|
||||
config.model = Some(image_model_slug.to_string());
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let saved_path = image_generation_artifact_path(
|
||||
test.codex_home_path(),
|
||||
&test.session_configured.session_id.to_string(),
|
||||
"ig_123",
|
||||
);
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
let models_manager = test.thread_manager.get_models_manager();
|
||||
let _ = models_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
@@ -700,9 +732,6 @@ async fn model_change_from_generated_image_to_text_preserves_prior_generated_ima
|
||||
async fn thread_rollback_after_generated_image_drops_entire_image_turn_history() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let saved_path = std::env::temp_dir().join("ig_rollback.png");
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
|
||||
let server = MockServer::start().await;
|
||||
let image_model_slug = "test-image-model";
|
||||
let image_model = test_model_info(
|
||||
@@ -738,6 +767,12 @@ async fn thread_rollback_after_generated_image_drops_entire_image_turn_history()
|
||||
config.model = Some(image_model_slug.to_string());
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
let saved_path = image_generation_artifact_path(
|
||||
test.codex_home_path(),
|
||||
&test.session_configured.session_id.to_string(),
|
||||
"ig_rollback",
|
||||
);
|
||||
let _ = std::fs::remove_file(&saved_path);
|
||||
let models_manager = test.thread_manager.get_models_manager();
|
||||
let _ = models_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
|
||||
@@ -44,3 +44,4 @@ anyhow = { workspace = true }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
test-case = "3.3.1"
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::RemoteExecServerConnectArgs;
|
||||
use crate::fs;
|
||||
use crate::fs::ExecutorFileSystem;
|
||||
use crate::file_system::ExecutorFileSystem;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::remote_file_system::RemoteFileSystem;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Environment {
|
||||
@@ -56,8 +58,12 @@ impl Environment {
|
||||
self.remote_exec_server_client.as_ref()
|
||||
}
|
||||
|
||||
pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> {
|
||||
fs::LocalFileSystem
|
||||
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
|
||||
if let Some(client) = self.remote_exec_server_client.clone() {
|
||||
Arc::new(RemoteFileSystem::new(client))
|
||||
} else {
|
||||
Arc::new(LocalFileSystem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
65
codex-rs/exec-server/src/file_system.rs
Normal file
65
codex-rs/exec-server/src/file_system.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
use async_trait::async_trait;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use tokio::io;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct CreateDirectoryOptions {
|
||||
pub recursive: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct RemoveOptions {
|
||||
pub recursive: bool,
|
||||
pub force: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct CopyOptions {
|
||||
pub recursive: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct FileMetadata {
|
||||
pub is_directory: bool,
|
||||
pub is_file: bool,
|
||||
pub created_at_ms: i64,
|
||||
pub modified_at_ms: i64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ReadDirectoryEntry {
|
||||
pub file_name: String,
|
||||
pub is_directory: bool,
|
||||
pub is_file: bool,
|
||||
}
|
||||
|
||||
pub type FileSystemResult<T> = io::Result<T>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ExecutorFileSystem: Send + Sync {
|
||||
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>>;
|
||||
|
||||
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()>;
|
||||
|
||||
async fn create_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
options: CreateDirectoryOptions,
|
||||
) -> FileSystemResult<()>;
|
||||
|
||||
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata>;
|
||||
|
||||
async fn read_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
) -> FileSystemResult<Vec<ReadDirectoryEntry>>;
|
||||
|
||||
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>;
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
source_path: &AbsolutePathBuf,
|
||||
destination_path: &AbsolutePathBuf,
|
||||
options: CopyOptions,
|
||||
) -> FileSystemResult<()>;
|
||||
}
|
||||
@@ -2,8 +2,10 @@ mod client;
|
||||
mod client_api;
|
||||
mod connection;
|
||||
mod environment;
|
||||
mod fs;
|
||||
mod file_system;
|
||||
mod local_file_system;
|
||||
mod protocol;
|
||||
mod remote_file_system;
|
||||
mod rpc;
|
||||
mod server;
|
||||
|
||||
@@ -28,13 +30,13 @@ pub use codex_app_server_protocol::FsRemoveResponse;
|
||||
pub use codex_app_server_protocol::FsWriteFileParams;
|
||||
pub use codex_app_server_protocol::FsWriteFileResponse;
|
||||
pub use environment::Environment;
|
||||
pub use fs::CopyOptions;
|
||||
pub use fs::CreateDirectoryOptions;
|
||||
pub use fs::ExecutorFileSystem;
|
||||
pub use fs::FileMetadata;
|
||||
pub use fs::FileSystemResult;
|
||||
pub use fs::ReadDirectoryEntry;
|
||||
pub use fs::RemoveOptions;
|
||||
pub use file_system::CopyOptions;
|
||||
pub use file_system::CreateDirectoryOptions;
|
||||
pub use file_system::ExecutorFileSystem;
|
||||
pub use file_system::FileMetadata;
|
||||
pub use file_system::FileSystemResult;
|
||||
pub use file_system::ReadDirectoryEntry;
|
||||
pub use file_system::RemoveOptions;
|
||||
pub use protocol::ExecExitedNotification;
|
||||
pub use protocol::ExecOutputDeltaNotification;
|
||||
pub use protocol::ExecOutputStream;
|
||||
|
||||
@@ -7,70 +7,16 @@ use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tokio::io;
|
||||
|
||||
use crate::CopyOptions;
|
||||
use crate::CreateDirectoryOptions;
|
||||
use crate::ExecutorFileSystem;
|
||||
use crate::FileMetadata;
|
||||
use crate::FileSystemResult;
|
||||
use crate::ReadDirectoryEntry;
|
||||
use crate::RemoveOptions;
|
||||
|
||||
const MAX_READ_FILE_BYTES: u64 = 512 * 1024 * 1024;
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct CreateDirectoryOptions {
|
||||
pub recursive: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct RemoveOptions {
|
||||
pub recursive: bool,
|
||||
pub force: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct CopyOptions {
|
||||
pub recursive: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct FileMetadata {
|
||||
pub is_directory: bool,
|
||||
pub is_file: bool,
|
||||
pub created_at_ms: i64,
|
||||
pub modified_at_ms: i64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct ReadDirectoryEntry {
|
||||
pub file_name: String,
|
||||
pub is_directory: bool,
|
||||
pub is_file: bool,
|
||||
}
|
||||
|
||||
pub type FileSystemResult<T> = io::Result<T>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait ExecutorFileSystem: Send + Sync {
|
||||
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>>;
|
||||
|
||||
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()>;
|
||||
|
||||
async fn create_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
options: CreateDirectoryOptions,
|
||||
) -> FileSystemResult<()>;
|
||||
|
||||
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata>;
|
||||
|
||||
async fn read_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
) -> FileSystemResult<Vec<ReadDirectoryEntry>>;
|
||||
|
||||
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()>;
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
source_path: &AbsolutePathBuf,
|
||||
destination_path: &AbsolutePathBuf,
|
||||
options: CopyOptions,
|
||||
) -> FileSystemResult<()>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct LocalFileSystem;
|
||||
|
||||
154
codex-rs/exec-server/src/remote_file_system.rs
Normal file
154
codex-rs/exec-server/src/remote_file_system.rs
Normal file
@@ -0,0 +1,154 @@
|
||||
use async_trait::async_trait;
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
use codex_app_server_protocol::FsCopyParams;
|
||||
use codex_app_server_protocol::FsCreateDirectoryParams;
|
||||
use codex_app_server_protocol::FsGetMetadataParams;
|
||||
use codex_app_server_protocol::FsReadDirectoryParams;
|
||||
use codex_app_server_protocol::FsReadFileParams;
|
||||
use codex_app_server_protocol::FsRemoveParams;
|
||||
use codex_app_server_protocol::FsWriteFileParams;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use tokio::io;
|
||||
|
||||
use crate::CopyOptions;
|
||||
use crate::CreateDirectoryOptions;
|
||||
use crate::ExecServerClient;
|
||||
use crate::ExecServerError;
|
||||
use crate::ExecutorFileSystem;
|
||||
use crate::FileMetadata;
|
||||
use crate::FileSystemResult;
|
||||
use crate::ReadDirectoryEntry;
|
||||
use crate::RemoveOptions;
|
||||
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RemoteFileSystem {
|
||||
client: ExecServerClient,
|
||||
}
|
||||
|
||||
impl RemoteFileSystem {
|
||||
pub(crate) fn new(client: ExecServerClient) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ExecutorFileSystem for RemoteFileSystem {
|
||||
async fn read_file(&self, path: &AbsolutePathBuf) -> FileSystemResult<Vec<u8>> {
|
||||
let response = self
|
||||
.client
|
||||
.fs_read_file(FsReadFileParams { path: path.clone() })
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
STANDARD.decode(response.data_base64).map_err(|err| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("remote fs/readFile returned invalid base64 dataBase64: {err}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_file(&self, path: &AbsolutePathBuf, contents: Vec<u8>) -> FileSystemResult<()> {
|
||||
self.client
|
||||
.fs_write_file(FsWriteFileParams {
|
||||
path: path.clone(),
|
||||
data_base64: STANDARD.encode(contents),
|
||||
})
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
options: CreateDirectoryOptions,
|
||||
) -> FileSystemResult<()> {
|
||||
self.client
|
||||
.fs_create_directory(FsCreateDirectoryParams {
|
||||
path: path.clone(),
|
||||
recursive: Some(options.recursive),
|
||||
})
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_metadata(&self, path: &AbsolutePathBuf) -> FileSystemResult<FileMetadata> {
|
||||
let response = self
|
||||
.client
|
||||
.fs_get_metadata(FsGetMetadataParams { path: path.clone() })
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(FileMetadata {
|
||||
is_directory: response.is_directory,
|
||||
is_file: response.is_file,
|
||||
created_at_ms: response.created_at_ms,
|
||||
modified_at_ms: response.modified_at_ms,
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_directory(
|
||||
&self,
|
||||
path: &AbsolutePathBuf,
|
||||
) -> FileSystemResult<Vec<ReadDirectoryEntry>> {
|
||||
let response = self
|
||||
.client
|
||||
.fs_read_directory(FsReadDirectoryParams { path: path.clone() })
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(response
|
||||
.entries
|
||||
.into_iter()
|
||||
.map(|entry| ReadDirectoryEntry {
|
||||
file_name: entry.file_name,
|
||||
is_directory: entry.is_directory,
|
||||
is_file: entry.is_file,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn remove(&self, path: &AbsolutePathBuf, options: RemoveOptions) -> FileSystemResult<()> {
|
||||
self.client
|
||||
.fs_remove(FsRemoveParams {
|
||||
path: path.clone(),
|
||||
recursive: Some(options.recursive),
|
||||
force: Some(options.force),
|
||||
})
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&self,
|
||||
source_path: &AbsolutePathBuf,
|
||||
destination_path: &AbsolutePathBuf,
|
||||
options: CopyOptions,
|
||||
) -> FileSystemResult<()> {
|
||||
self.client
|
||||
.fs_copy(FsCopyParams {
|
||||
source_path: source_path.clone(),
|
||||
destination_path: destination_path.clone(),
|
||||
recursive: options.recursive,
|
||||
})
|
||||
.await
|
||||
.map_err(map_remote_error)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn map_remote_error(error: ExecServerError) -> io::Error {
|
||||
match error {
|
||||
ExecServerError::Server { code, message } if code == INVALID_REQUEST_ERROR_CODE => {
|
||||
io::Error::new(io::ErrorKind::InvalidInput, message)
|
||||
}
|
||||
ExecServerError::Server { message, .. } => io::Error::other(message),
|
||||
ExecServerError::Closed => {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, "exec-server transport closed")
|
||||
}
|
||||
_ => io::Error::other(error.to_string()),
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
mod filesystem;
|
||||
mod file_system_handler;
|
||||
mod handler;
|
||||
mod processor;
|
||||
mod registry;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
@@ -22,26 +21,18 @@ use codex_app_server_protocol::JSONRPCErrorError;
|
||||
|
||||
use crate::CopyOptions;
|
||||
use crate::CreateDirectoryOptions;
|
||||
use crate::Environment;
|
||||
use crate::ExecutorFileSystem;
|
||||
use crate::RemoveOptions;
|
||||
use crate::local_file_system::LocalFileSystem;
|
||||
use crate::rpc::internal_error;
|
||||
use crate::rpc::invalid_request;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ExecServerFileSystem {
|
||||
file_system: Arc<dyn ExecutorFileSystem>,
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct FileSystemHandler {
|
||||
file_system: LocalFileSystem,
|
||||
}
|
||||
|
||||
impl Default for ExecServerFileSystem {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
file_system: Arc::new(Environment::default().get_filesystem()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExecServerFileSystem {
|
||||
impl FileSystemHandler {
|
||||
pub(crate) async fn read_file(
|
||||
&self,
|
||||
params: FsReadFileParams,
|
||||
@@ -43,7 +43,7 @@ use crate::rpc::RpcNotificationSender;
|
||||
use crate::rpc::internal_error;
|
||||
use crate::rpc::invalid_params;
|
||||
use crate::rpc::invalid_request;
|
||||
use crate::server::filesystem::ExecServerFileSystem;
|
||||
use crate::server::file_system_handler::FileSystemHandler;
|
||||
|
||||
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
|
||||
#[cfg(test)]
|
||||
@@ -75,7 +75,7 @@ enum ProcessEntry {
|
||||
|
||||
pub(crate) struct ExecServerHandler {
|
||||
notifications: RpcNotificationSender,
|
||||
file_system: ExecServerFileSystem,
|
||||
file_system: FileSystemHandler,
|
||||
processes: Arc<Mutex<HashMap<String, ProcessEntry>>>,
|
||||
initialize_requested: AtomicBool,
|
||||
initialized: AtomicBool,
|
||||
@@ -85,7 +85,7 @@ impl ExecServerHandler {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
Self {
|
||||
notifications,
|
||||
file_system: ExecServerFileSystem::default(),
|
||||
file_system: FileSystemHandler::default(),
|
||||
processes: Arc::new(Mutex::new(HashMap::new())),
|
||||
initialize_requested: AtomicBool::new(false),
|
||||
initialized: AtomicBool::new(false),
|
||||
|
||||
@@ -59,6 +59,7 @@ async fn run_websocket_listener(
|
||||
let listener = TcpListener::bind(bind_address).await?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
tracing::info!("codex-exec-server listening on ws://{local_addr}");
|
||||
println!("ws://{local_addr}");
|
||||
|
||||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await?;
|
||||
|
||||
@@ -11,6 +11,8 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_utils_cargo_bin::cargo_bin;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::process::Child;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::Instant;
|
||||
@@ -25,6 +27,7 @@ const EVENT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
pub(crate) struct ExecServerHarness {
|
||||
child: Child,
|
||||
websocket_url: String,
|
||||
websocket: tokio_tungstenite::WebSocketStream<
|
||||
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
||||
>,
|
||||
@@ -39,23 +42,28 @@ impl Drop for ExecServerHarness {
|
||||
|
||||
pub(crate) async fn exec_server() -> anyhow::Result<ExecServerHarness> {
|
||||
let binary = cargo_bin("codex-exec-server")?;
|
||||
let websocket_url = reserve_websocket_url()?;
|
||||
let mut child = Command::new(binary);
|
||||
child.args(["--listen", &websocket_url]);
|
||||
child.args(["--listen", "ws://127.0.0.1:0"]);
|
||||
child.stdin(Stdio::null());
|
||||
child.stdout(Stdio::null());
|
||||
child.stdout(Stdio::piped());
|
||||
child.stderr(Stdio::inherit());
|
||||
let child = child.spawn()?;
|
||||
let mut child = child.spawn()?;
|
||||
|
||||
let websocket_url = read_listen_url_from_stdout(&mut child).await?;
|
||||
let (websocket, _) = connect_websocket_when_ready(&websocket_url).await?;
|
||||
Ok(ExecServerHarness {
|
||||
child,
|
||||
websocket_url,
|
||||
websocket,
|
||||
next_request_id: 1,
|
||||
})
|
||||
}
|
||||
|
||||
impl ExecServerHarness {
|
||||
pub(crate) fn websocket_url(&self) -> &str {
|
||||
&self.websocket_url
|
||||
}
|
||||
|
||||
pub(crate) async fn send_request(
|
||||
&mut self,
|
||||
method: &str,
|
||||
@@ -155,13 +163,6 @@ impl ExecServerHarness {
|
||||
}
|
||||
}
|
||||
|
||||
fn reserve_websocket_url() -> anyhow::Result<String> {
|
||||
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
drop(listener);
|
||||
Ok(format!("ws://{addr}"))
|
||||
}
|
||||
|
||||
async fn connect_websocket_when_ready(
|
||||
websocket_url: &str,
|
||||
) -> anyhow::Result<(
|
||||
@@ -186,3 +187,30 @@ async fn connect_websocket_when_ready(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_listen_url_from_stdout(child: &mut Child) -> anyhow::Result<String> {
|
||||
let stdout = child
|
||||
.stdout
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("failed to capture exec-server stdout"))?;
|
||||
let mut lines = BufReader::new(stdout).lines();
|
||||
let deadline = Instant::now() + CONNECT_TIMEOUT;
|
||||
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
if now >= deadline {
|
||||
return Err(anyhow!(
|
||||
"timed out waiting for exec-server listen URL on stdout after {CONNECT_TIMEOUT:?}"
|
||||
));
|
||||
}
|
||||
let remaining = deadline.duration_since(now);
|
||||
let line = timeout(remaining, lines.next_line())
|
||||
.await
|
||||
.map_err(|_| anyhow!("timed out waiting for exec-server stdout"))??
|
||||
.ok_or_else(|| anyhow!("exec-server stdout closed before emitting listen URL"))?;
|
||||
let listen_url = line.trim();
|
||||
if listen_url.starts_with("ws://") {
|
||||
return Ok(listen_url.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
361
codex-rs/exec-server/tests/file_system.rs
Normal file
361
codex-rs/exec-server/tests/file_system.rs
Normal file
@@ -0,0 +1,361 @@
|
||||
#![cfg(unix)]
|
||||
|
||||
mod common;
|
||||
|
||||
use std::os::unix::fs::symlink;
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_exec_server::CopyOptions;
|
||||
use codex_exec_server::CreateDirectoryOptions;
|
||||
use codex_exec_server::Environment;
|
||||
use codex_exec_server::ExecutorFileSystem;
|
||||
use codex_exec_server::ReadDirectoryEntry;
|
||||
use codex_exec_server::RemoveOptions;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use test_case::test_case;
|
||||
|
||||
use common::exec_server::ExecServerHarness;
|
||||
use common::exec_server::exec_server;
|
||||
|
||||
struct FileSystemContext {
|
||||
file_system: Arc<dyn ExecutorFileSystem>,
|
||||
_server: Option<ExecServerHarness>,
|
||||
}
|
||||
|
||||
async fn create_file_system_context(use_remote: bool) -> Result<FileSystemContext> {
|
||||
if use_remote {
|
||||
let server = exec_server().await?;
|
||||
let environment = Environment::create(Some(server.websocket_url().to_string())).await?;
|
||||
Ok(FileSystemContext {
|
||||
file_system: environment.get_filesystem(),
|
||||
_server: Some(server),
|
||||
})
|
||||
} else {
|
||||
let environment = Environment::create(None).await?;
|
||||
Ok(FileSystemContext {
|
||||
file_system: environment.get_filesystem(),
|
||||
_server: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn absolute_path(path: std::path::PathBuf) -> AbsolutePathBuf {
|
||||
assert!(
|
||||
path.is_absolute(),
|
||||
"path must be absolute: {}",
|
||||
path.display()
|
||||
);
|
||||
match AbsolutePathBuf::try_from(path) {
|
||||
Ok(path) => path,
|
||||
Err(err) => panic!("path should be absolute: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_get_metadata_returns_expected_fields(use_remote: bool) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let file_path = tmp.path().join("note.txt");
|
||||
std::fs::write(&file_path, "hello")?;
|
||||
|
||||
let metadata = file_system
|
||||
.get_metadata(&absolute_path(file_path))
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
assert_eq!(metadata.is_directory, false);
|
||||
assert_eq!(metadata.is_file, true);
|
||||
assert!(metadata.modified_at_ms > 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_methods_cover_surface_area(use_remote: bool) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let source_dir = tmp.path().join("source");
|
||||
let nested_dir = source_dir.join("nested");
|
||||
let source_file = source_dir.join("root.txt");
|
||||
let nested_file = nested_dir.join("note.txt");
|
||||
let copied_dir = tmp.path().join("copied");
|
||||
let copied_file = tmp.path().join("copy.txt");
|
||||
|
||||
file_system
|
||||
.create_directory(
|
||||
&absolute_path(nested_dir.clone()),
|
||||
CreateDirectoryOptions { recursive: true },
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
|
||||
file_system
|
||||
.write_file(
|
||||
&absolute_path(nested_file.clone()),
|
||||
b"hello from trait".to_vec(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
file_system
|
||||
.write_file(
|
||||
&absolute_path(source_file.clone()),
|
||||
b"hello from source root".to_vec(),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
|
||||
let nested_file_contents = file_system
|
||||
.read_file(&absolute_path(nested_file.clone()))
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
assert_eq!(nested_file_contents, b"hello from trait");
|
||||
|
||||
file_system
|
||||
.copy(
|
||||
&absolute_path(nested_file),
|
||||
&absolute_path(copied_file.clone()),
|
||||
CopyOptions { recursive: false },
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
assert_eq!(std::fs::read_to_string(copied_file)?, "hello from trait");
|
||||
|
||||
file_system
|
||||
.copy(
|
||||
&absolute_path(source_dir.clone()),
|
||||
&absolute_path(copied_dir.clone()),
|
||||
CopyOptions { recursive: true },
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
assert_eq!(
|
||||
std::fs::read_to_string(copied_dir.join("nested").join("note.txt"))?,
|
||||
"hello from trait"
|
||||
);
|
||||
|
||||
let mut entries = file_system
|
||||
.read_directory(&absolute_path(source_dir))
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
entries.sort_by(|left, right| left.file_name.cmp(&right.file_name));
|
||||
assert_eq!(
|
||||
entries,
|
||||
vec![
|
||||
ReadDirectoryEntry {
|
||||
file_name: "nested".to_string(),
|
||||
is_directory: true,
|
||||
is_file: false,
|
||||
},
|
||||
ReadDirectoryEntry {
|
||||
file_name: "root.txt".to_string(),
|
||||
is_directory: false,
|
||||
is_file: true,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
file_system
|
||||
.remove(
|
||||
&absolute_path(copied_dir.clone()),
|
||||
RemoveOptions {
|
||||
recursive: true,
|
||||
force: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
assert!(!copied_dir.exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_copy_rejects_directory_without_recursive(use_remote: bool) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let source_dir = tmp.path().join("source");
|
||||
std::fs::create_dir_all(&source_dir)?;
|
||||
|
||||
let error = file_system
|
||||
.copy(
|
||||
&absolute_path(source_dir),
|
||||
&absolute_path(tmp.path().join("dest")),
|
||||
CopyOptions { recursive: false },
|
||||
)
|
||||
.await;
|
||||
let error = match error {
|
||||
Ok(()) => panic!("copy should fail"),
|
||||
Err(error) => error,
|
||||
};
|
||||
assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput);
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"fs/copy requires recursive: true when sourcePath is a directory"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_copy_rejects_copying_directory_into_descendant(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let source_dir = tmp.path().join("source");
|
||||
std::fs::create_dir_all(source_dir.join("nested"))?;
|
||||
|
||||
let error = file_system
|
||||
.copy(
|
||||
&absolute_path(source_dir.clone()),
|
||||
&absolute_path(source_dir.join("nested").join("copy")),
|
||||
CopyOptions { recursive: true },
|
||||
)
|
||||
.await;
|
||||
let error = match error {
|
||||
Ok(()) => panic!("copy should fail"),
|
||||
Err(error) => error,
|
||||
};
|
||||
assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput);
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"fs/copy cannot copy a directory to itself or one of its descendants"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_copy_preserves_symlinks_in_recursive_copy(use_remote: bool) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let source_dir = tmp.path().join("source");
|
||||
let nested_dir = source_dir.join("nested");
|
||||
let copied_dir = tmp.path().join("copied");
|
||||
std::fs::create_dir_all(&nested_dir)?;
|
||||
symlink("nested", source_dir.join("nested-link"))?;
|
||||
|
||||
file_system
|
||||
.copy(
|
||||
&absolute_path(source_dir),
|
||||
&absolute_path(copied_dir.clone()),
|
||||
CopyOptions { recursive: true },
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
|
||||
let copied_link = copied_dir.join("nested-link");
|
||||
let metadata = std::fs::symlink_metadata(&copied_link)?;
|
||||
assert!(metadata.file_type().is_symlink());
|
||||
assert_eq!(
|
||||
std::fs::read_link(copied_link)?,
|
||||
std::path::PathBuf::from("nested")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_copy_ignores_unknown_special_files_in_recursive_copy(
|
||||
use_remote: bool,
|
||||
) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let source_dir = tmp.path().join("source");
|
||||
let copied_dir = tmp.path().join("copied");
|
||||
std::fs::create_dir_all(&source_dir)?;
|
||||
std::fs::write(source_dir.join("note.txt"), "hello")?;
|
||||
|
||||
let fifo_path = source_dir.join("named-pipe");
|
||||
let output = Command::new("mkfifo").arg(&fifo_path).output()?;
|
||||
if !output.status.success() {
|
||||
anyhow::bail!(
|
||||
"mkfifo failed: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(&output.stdout).trim(),
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
);
|
||||
}
|
||||
|
||||
file_system
|
||||
.copy(
|
||||
&absolute_path(source_dir),
|
||||
&absolute_path(copied_dir.clone()),
|
||||
CopyOptions { recursive: true },
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("mode={use_remote}"))?;
|
||||
|
||||
assert_eq!(
|
||||
std::fs::read_to_string(copied_dir.join("note.txt"))?,
|
||||
"hello"
|
||||
);
|
||||
assert!(!copied_dir.join("named-pipe").exists());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case(false ; "local")]
|
||||
#[test_case(true ; "remote")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn file_system_copy_rejects_standalone_fifo_source(use_remote: bool) -> Result<()> {
|
||||
let context = create_file_system_context(use_remote).await?;
|
||||
let file_system = context.file_system;
|
||||
|
||||
let tmp = TempDir::new()?;
|
||||
let fifo_path = tmp.path().join("named-pipe");
|
||||
let output = Command::new("mkfifo").arg(&fifo_path).output()?;
|
||||
if !output.status.success() {
|
||||
anyhow::bail!(
|
||||
"mkfifo failed: stdout={} stderr={}",
|
||||
String::from_utf8_lossy(&output.stdout).trim(),
|
||||
String::from_utf8_lossy(&output.stderr).trim()
|
||||
);
|
||||
}
|
||||
|
||||
let error = file_system
|
||||
.copy(
|
||||
&absolute_path(fifo_path),
|
||||
&absolute_path(tmp.path().join("copied")),
|
||||
CopyOptions { recursive: false },
|
||||
)
|
||||
.await;
|
||||
let error = match error {
|
||||
Ok(()) => panic!("copy should fail"),
|
||||
Err(error) => error,
|
||||
};
|
||||
assert_eq!(error.kind(), std::io::ErrorKind::InvalidInput);
|
||||
assert_eq!(
|
||||
error.to_string(),
|
||||
"fs/copy only supports regular files, directories, and symlinks"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user