From 019b30f99e22edc662aba33fe3e828426db5fa82 Mon Sep 17 00:00:00 2001 From: Kould Date: Mon, 11 Nov 2024 17:49:37 +0800 Subject: [PATCH 01/14] feat: `Fs` adds `copy` & `link`, `TokioFs` & `TokioUringFs` & `Monoio` have been implemented --- fusio-object-store/src/fs.rs | 28 ++++- fusio-opendal/src/fs.rs | 28 ++++- fusio/src/fs/mod.rs | 24 ++++ fusio/src/impls/disk/monoio/fs.rs | 37 +++++- fusio/src/impls/disk/opfs/fs.rs | 32 ++++- fusio/src/impls/disk/tokio/fs.rs | 33 ++++- fusio/src/impls/disk/tokio_uring/fs.rs | 37 +++++- fusio/src/impls/remotes/aws/fs.rs | 14 ++- fusio/src/lib.rs | 162 ++++++++++++++++++++++++- 9 files changed, 377 insertions(+), 18 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index c53870f..c54481d 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -1,10 +1,10 @@ -use std::sync::Arc; +use std::{future::Future, sync::Arc}; use async_stream::stream; use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; use futures_core::Stream; use futures_util::stream::StreamExt; @@ -27,6 +27,10 @@ impl From for S3Store { impl Fs for S3Store { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { if !options.truncate { return Err(Error::Unsupported { @@ -64,4 +68,22 @@ impl Fs for S3Store { Ok(()) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } } diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index c03af9e..b696630 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -1,7 +1,9 @@ +use std::future::Future; + use fusio::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; use futures_core::Stream; use futures_util::TryStreamExt; @@ -25,6 +27,10 @@ impl From for OpendalFs { impl Fs for OpendalFs { type File = OpendalFile; + fn file_system(&self) -> FileSystemTag { + todo!() + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { OpendalFile::open(self.op.clone(), path.to_string(), options).await } @@ -57,4 +63,22 @@ impl Fs for OpendalFs { .await .map_err(parse_opendal_error) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + todo!() + } } diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 8105bab..2336c46 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -16,11 +16,21 @@ pub struct FileMeta { pub size: u64, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum FileSystemTag { + Local, + OPFS, + // TODO: Remote needs to check whether endpoint and other remote fs are consistent + S3, +} + pub trait Fs: MaybeSend + MaybeSync { //! This trait is used to abstract file system operations across different file systems. type File: Read + Write + MaybeSend + 'static; + fn file_system(&self) -> FileSystemTag; + fn open(&self, path: &Path) -> impl Future> { self.open_options(path, OpenOptions::default()) } @@ -39,4 +49,18 @@ pub trait Fs: MaybeSend + MaybeSync { ) -> impl Future>, Error>> + MaybeSend; fn remove(&self, path: &Path) -> impl Future> + MaybeSend; + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend; + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend; } diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index c18c587..ccc3067 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -1,11 +1,11 @@ -use std::fs::create_dir_all; +use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; use super::MonoioFile; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -15,6 +15,10 @@ pub struct MonoIoFs; impl Fs for MonoIoFs { type File = MonoioFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -54,6 +58,33 @@ impl Fs for MonoIoFs { async fn remove(&self, path: &Path) -> Result<(), Error> { let path = path_to_local(path)?; - Ok(std::fs::remove_file(path)?) + Ok(fs::remove_file(path)?) + } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::hard_link(&from, &to)?; + + Ok(()) } } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index fa3901d..17d86fc 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use async_stream::stream; use futures_core::Stream; use futures_util::StreamExt; @@ -15,9 +17,9 @@ use super::OPFSFile; use crate::{ disk::opfs::{promise, storage}, error::wasm_err, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, + Error, MaybeSend, }; pub struct OPFS; @@ -25,6 +27,10 @@ pub struct OPFS; impl Fs for OPFS { type File = OPFSFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let segments: Vec<&str> = path.as_ref().trim_matches('/').split("/").collect(); @@ -97,6 +103,28 @@ impl Fs for OPFS { .map_err(wasm_err)?; Ok(()) } + + fn copy( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + Err(Error::Unsupported { + message: "opfs does not support copy file".to_string(), + }) + } + + fn link( + &self, + from: &Path, + to_fs: &F, + to: &Path, + ) -> impl Future> + MaybeSend { + Err(Error::Unsupported { + message: "opfs does not support link file".to_string(), + }) + } } impl OPFS { diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index 8fc17e9..b5a2e60 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -8,7 +8,7 @@ use tokio::{ }; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, }; @@ -18,6 +18,10 @@ pub struct TokioFs; impl Fs for TokioFs { type File = File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -67,4 +71,31 @@ impl Fs for TokioFs { remove_file(&path).await?; Ok(()) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::copy(&from, &to).await?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::hard_link(&from, &to).await?; + + Ok(()) + } } diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index b9ea928..9783188 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -1,12 +1,14 @@ +use std::{fs, future::Future}; + use async_stream::stream; use futures_core::Stream; use tokio_uring::fs::{create_dir_all, remove_file}; use crate::{ disk::tokio_uring::TokioUringFile, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, - Error, + Error, MaybeSend, }; pub struct TokioUringFs; @@ -14,6 +16,10 @@ pub struct TokioUringFs; impl Fs for TokioUringFs { type File = TokioUringFile; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::Local + } + async fn open_options(&self, path: &Path, options: OpenOptions) -> Result { let local_path = path_to_local(path)?; @@ -58,4 +64,31 @@ impl Fs for TokioUringFs { Ok(remove_file(path).await?) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() == to_fs.file_system() { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; + } else { + todo!() + } + + Ok(()) + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + if self.file_system() != to_fs.file_system() { + return Err(Error::Unsupported { + message: "file system is inconsistent".to_string(), + }); + } + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::hard_link(&from, &to)?; + + Ok(()) + } } diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 0546aea..34755b5 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -11,7 +11,7 @@ use url::Url; use super::{credential::AwsCredential, options::S3Options, S3Error, S3File}; use crate::{ - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, remotes::{ aws::sign::Sign, @@ -125,6 +125,10 @@ pub(super) struct AmazonS3Inner { impl Fs for AmazonS3 { type File = S3File; + fn file_system(&self) -> FileSystemTag { + FileSystemTag::S3 + } + async fn open_options(&self, path: &Path, _: OpenOptions) -> Result { Ok(S3File::new(self.clone(), path.clone())) } @@ -234,6 +238,14 @@ impl Fs for AmazonS3 { Ok(()) } + + async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + todo!() + } + + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + todo!() + } } #[derive(Debug, Deserialize)] diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 47863d2..866e792 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -327,7 +327,7 @@ mod tests { #[allow(unused)] #[cfg(not(target_arch = "wasm32"))] - async fn test_local_fs(fs: S) -> Result<(), Error> + async fn test_local_fs_read_write(fs: S) -> Result<(), Error> where S: crate::fs::Fs, { @@ -388,7 +388,140 @@ mod tests { Ok(()) } - #[cfg(feature = "tokio")] + #[allow(unused)] + async fn test_local_fs_copy_link(src_fs: S, dst_fs: D) -> Result<(), Error> + where + S: crate::fs::Fs, + D: crate::fs::Fs, + { + use std::collections::HashSet; + + use futures_util::StreamExt; + use tempfile::TempDir; + + use crate::{fs::OpenOptions, path::Path, DynFs}; + + let tmp_dir = TempDir::new()?; + + let work_dir_path = tmp_dir.path().join("work_dir"); + let src_file_path = work_dir_path.join("src_test.file"); + let dst_file_path = work_dir_path.join("dst_test.file"); + + src_fs + .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) + .await?; + dst_fs + .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) + .await?; + + // create files + let _ = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + let _ = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().create(true), + ) + .await?; + // copy + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .copy( + &Path::from_absolute_path(&src_file_path)?, + &dst_fs, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.close().await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! fusio"); + } + + dst_fs + .remove(&Path::from_absolute_path(&dst_file_path)?) + .await?; + // link + { + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true), + ) + .await?; + src_file.write_all("Hello! fusio".as_bytes()).await.0?; + src_file.close().await?; + + src_fs + .link( + &Path::from_absolute_path(&src_file_path)?, + &dst_fs, + &Path::from_absolute_path(&dst_file_path)?, + ) + .await?; + + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.close().await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! world"); + + let mut dst_file = dst_fs + .open_options( + &Path::from_absolute_path(&dst_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = dst_file.read_exact_at(vec![0u8; 24], 0).await; + result.unwrap(); + assert_eq!(buf.as_slice(), b"Hello! fusioHello! world"); + } + + Ok(()) + } + + #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] #[tokio::test] async fn test_tokio() { use tempfile::tempfile; @@ -405,10 +538,31 @@ mod tests { async fn test_tokio_fs() { use crate::disk::TokioFs; - test_local_fs(TokioFs).await.unwrap(); + test_local_fs_read_write(TokioFs).await.unwrap(); + test_local_fs_copy_link(TokioFs, TokioFs).await.unwrap(); } - #[cfg(feature = "tokio")] + #[cfg(all(feature = "tokio-uring", target_os = "linux"))] + #[tokio::test] + async fn test_tokio_uring_fs() { + use crate::disk::tokio_uring::fs::TokioUringFs; + + test_local_fs_read_write(TokioUringFs).await.unwrap(); + test_local_fs_copy_link(TokioUringFs, TokioUringFs) + .await + .unwrap(); + } + + #[cfg(all(feature = "monoio", not(target_arch = "wasm32")))] + #[tokio::test] + async fn test_monoio_fs() { + use crate::disk::monoio::fs::MonoIoFs; + + test_local_fs_read_write(MonoIoFs).await.unwrap(); + test_local_fs_copy_link(MonoIoFs, MonoIoFs).await.unwrap(); + } + + #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] #[tokio::test] async fn test_read_exact() { use tempfile::tempfile; From f5cf3a6a70a52c98d5dbe17c1ec714242729e03a Mon Sep 17 00:00:00 2001 From: Kould Date: Tue, 12 Nov 2024 15:50:18 +0800 Subject: [PATCH 02/14] feat: impl `copy` for S3 & add `fs::copy` for different filesystems --- fusio-object-store/src/fs.rs | 28 ++++++------ fusio-opendal/src/fs.rs | 9 +--- fusio/src/fs/mod.rs | 45 +++++++++++++++---- fusio/src/impls/disk/monoio/fs.rs | 14 +++--- fusio/src/impls/disk/opfs/fs.rs | 14 +----- fusio/src/impls/disk/tokio/fs.rs | 14 +++--- fusio/src/impls/disk/tokio_uring/fs.rs | 14 +++--- fusio/src/impls/remotes/aws/fs.rs | 32 ++++++++++--- .../src/impls/remotes/aws/multipart_upload.rs | 40 ++++++++++++++--- fusio/src/impls/remotes/aws/writer.rs | 9 ++-- fusio/src/lib.rs | 1 - 11 files changed, 137 insertions(+), 83 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index c54481d..e5340ac 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -69,21 +69,21 @@ impl Fs for S3Store { Ok(()) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = from.clone().into(); + let to = to.clone().into(); + + self.inner + .copy(&from, &to) + .await + .map_err(BoxedError::from)?; + + Ok(()) } - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) } } diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index b696630..8f28b41 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -64,13 +64,8 @@ impl Fs for OpendalFs { .map_err(parse_opendal_error) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { - todo!() + fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend { + self.op.copy(from.as_ref(), to.as_ref()) } fn link( diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 2336c46..34d54ae 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -3,12 +3,12 @@ mod options; -use std::future::Future; +use std::{cmp, future::Future}; use futures_core::Stream; pub use options::*; -use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Write}; +use crate::{path::Path, Error, IoBufMut, MaybeSend, MaybeSync, Read, Write}; #[derive(Debug)] pub struct FileMeta { @@ -50,12 +50,7 @@ pub trait Fs: MaybeSend + MaybeSync { fn remove(&self, path: &Path) -> impl Future> + MaybeSend; - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend; + fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; fn link( &self, @@ -64,3 +59,37 @@ pub trait Fs: MaybeSend + MaybeSync { to: &Path, ) -> impl Future> + MaybeSend; } + +pub async fn copy(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error> +where + F: Fs, + T: Fs, +{ + if from_fs.file_system() == to_fs.file_system() { + from_fs.copy(from, to).await?; + return Ok(()); + } + let mut from_file = from_fs + .open_options(from, OpenOptions::default().read(true)) + .await?; + let from_file_size = from_file.size().await? as usize; + + let mut to_file = to_fs + .open_options(to, OpenOptions::default().create(true).write(true)) + .await?; + let buf_size = cmp::min(from_file_size, 4 * 1024); + let mut buf = vec![0u8; buf_size]; + let mut read_pos = 0u64; + + while (read_pos as usize) < from_file_size - 1 { + let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await; + result?; + read_pos += buf.len() as u64; + + let (result, _) = to_file.write_all(buf.as_slice()).await; + result?; + buf.resize(buf_size, 0); + } + + Ok(()) +} diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index ccc3067..919c1a7 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -61,15 +61,11 @@ impl Fs for MonoIoFs { Ok(fs::remove_file(path)?) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - fs::copy(&from, &to)?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; Ok(()) } diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index 17d86fc..d7be0ac 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -104,23 +104,13 @@ impl Fs for OPFS { Ok(()) } - fn copy( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support copy file".to_string(), }) } - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support link file".to_string(), }) diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index b5a2e60..c8848a9 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -72,15 +72,11 @@ impl Fs for TokioFs { Ok(()) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - tokio::fs::copy(&from, &to).await?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + tokio::fs::copy(&from, &to).await?; Ok(()) } diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index 9783188..02c3ddb 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -65,15 +65,11 @@ impl Fs for TokioUringFs { Ok(remove_file(path).await?) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() == to_fs.file_system() { - let from = path_to_local(from)?; - let to = path_to_local(to)?; - - fs::copy(&from, &to)?; - } else { - todo!() - } + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from = path_to_local(from)?; + let to = path_to_local(to)?; + + fs::copy(&from, &to)?; Ok(()) } diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 34755b5..ac24110 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -14,10 +14,13 @@ use crate::{ fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, remotes::{ - aws::sign::Sign, + aws::{ + multipart_upload::{MultipartUpload, UploadType}, + sign::Sign, + }, http::{DynHttpClient, HttpClient, HttpError}, }, - Error, + Error, Read, }; pub struct AmazonS3Builder { @@ -239,12 +242,29 @@ impl Fs for AmazonS3 { Ok(()) } - async fn copy(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - todo!() + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + let from_file = S3File::new(self.clone(), from.clone()); + let from_file_size = from_file.size().await?; + + let upload = MultipartUpload::new(self.clone(), to.clone()); + upload + .upload_once( + from_file_size as usize, + UploadType::Copy { + endpoint: self.inner.options.endpoint.clone(), + from: from.clone(), + body: Empty::::new(), + }, + ) + .await?; + + Ok(()) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - todo!() + async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + Err(Error::Unsupported { + message: "s3 does not support link file".to_string(), + }) } } diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index 112d0f7..6aabf01 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -27,6 +27,16 @@ pub(crate) struct MultipartUpload { path: Path, } +pub enum UploadType { + Write(B), + Copy { + endpoint: String, + from: Path, + // FIXME: for Empty + body: B, + }, +} + impl MultipartUpload { pub fn new(fs: AmazonS3, path: Path) -> Self { Self { fs, path } @@ -67,22 +77,42 @@ impl MultipartUpload { Self::check_response(response).await } - pub(crate) async fn upload_once(&self, size: usize, body: B) -> Result<(), Error> + pub(crate) async fn upload_once( + &self, + size: usize, + upload_type: UploadType, + ) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { + let (body, copy_from) = match upload_type { + UploadType::Write(body) => (body, None), + UploadType::Copy { + endpoint, + from: file_path, + body, + } => { + let from_url = format!( + "{endpoint}/{}", + utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET) + ); + (body, Some(from_url)) + } + }; let url = format!( "{}/{}", self.fs.as_ref().options.endpoint, utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) ); - let request = Request::builder() + let mut builder = Request::builder() .uri(url) .method(Method::PUT) - .header(CONTENT_LENGTH, size) - .body(body) - .map_err(|e| Error::Other(e.into()))?; + .header(CONTENT_LENGTH, size); + if let Some(from_url) = copy_from { + builder = builder.header("x-amz-copy-source", from_url); + } + let request = builder.body(body).map_err(|e| Error::Other(e.into()))?; let _ = self.send_request(request).await?; Ok(()) diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index 520a18d..f4b24c1 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -2,11 +2,14 @@ use std::{mem, pin::Pin, sync::Arc}; use bytes::{BufMut, BytesMut}; use futures_util::{stream::FuturesOrdered, StreamExt}; -use http_body_util::Full; +use http_body_util::{Empty, Full}; use crate::{ dynamic::MaybeSendFuture, - remotes::{aws::multipart_upload::MultipartUpload, serde::MultipartPart}, + remotes::{ + aws::multipart_upload::{MultipartUpload, UploadType}, + serde::MultipartPart, + }, Error, IoBuf, Write, }; @@ -90,7 +93,7 @@ impl Write for S3Writer { let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze(); self.inner - .upload_once(bytes.len(), Full::new(bytes)) + .upload_once(bytes.len(), UploadType::Write(Full::new(bytes))) .await?; } return Ok(()); diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 866e792..1ded622 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -441,7 +441,6 @@ mod tests { src_fs .copy( &Path::from_absolute_path(&src_file_path)?, - &dst_fs, &Path::from_absolute_path(&dst_file_path)?, ) .await?; From 515b834082df9bc334dc0e85f120426edb370adc Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 08:58:45 +0000 Subject: [PATCH 03/14] fixe: `Fs::copy` on S3 --- fusio/src/impls/disk/opfs/fs.rs | 2 +- fusio/src/impls/remotes/aws/fs.rs | 76 +++++++++++++++++-- .../src/impls/remotes/aws/multipart_upload.rs | 32 +++++--- fusio/src/impls/remotes/aws/options.rs | 1 + fusio/src/impls/remotes/aws/s3.rs | 18 +++-- fusio/src/impls/remotes/aws/writer.rs | 6 +- 6 files changed, 110 insertions(+), 25 deletions(-) diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index d7be0ac..a1bc0d3 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -104,7 +104,7 @@ impl Fs for OPFS { Ok(()) } - async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support copy file".to_string(), }) diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index ac24110..630448a 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -98,6 +98,7 @@ impl AmazonS3Builder { inner: Arc::new(AmazonS3Inner { options: S3Options { endpoint, + bucket: self.bucket, region: self.region, credential: self.credential, sign_payload: self.sign_payload, @@ -243,15 +244,11 @@ impl Fs for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { - let from_file = S3File::new(self.clone(), from.clone()); - let from_file_size = from_file.size().await?; - let upload = MultipartUpload::new(self.clone(), to.clone()); upload .upload_once( - from_file_size as usize, UploadType::Copy { - endpoint: self.inner.options.endpoint.clone(), + bucket: self.inner.options.bucket.clone(), from: from.clone(), body: Empty::::new(), }, @@ -297,6 +294,9 @@ pub struct ListResponse { #[cfg(test)] mod tests { + use crate::fs::Fs; + use crate::path::Path; + #[cfg(feature = "tokio-http")] #[tokio::test] async fn list_and_remove() { @@ -330,4 +330,70 @@ mod tests { s3.remove(&meta.path).await.unwrap(); } } + + #[ignore] + #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] + #[tokio::test] + async fn copy() { + use std::sync::Arc; + + use crate::{ + remotes::{ + aws::{ + credential::AwsCredential, + fs::{AmazonS3, AmazonS3Inner}, + options::S3Options, + s3::S3File, + }, + http::{tokio::TokioClient, DynHttpClient, HttpClient}, + }, + Read, Write, + }; + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3 = AmazonS3 { + inner: Arc::new(AmazonS3Inner { + options, + client: Box::new(client) as Box, + }), + }; + + let from_path: Path = "read-write.txt".into(); + let to_path: Path = "read-write-copy.txt".into(); + { + let mut s3 = S3File::new(s3.clone(), from_path.clone()); + + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + s3.copy(&from_path, &to_path).await.unwrap(); + let mut s3 = S3File::new(s3, to_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 42); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, b"The answer of life, universe and everthing"); + } } diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index 6aabf01..c3a5bab 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -28,9 +28,12 @@ pub(crate) struct MultipartUpload { } pub enum UploadType { - Write(B), + Write { + size: usize, + body: B, + }, Copy { - endpoint: String, + bucket: String, from: Path, // FIXME: for Empty body: B, @@ -79,25 +82,27 @@ impl MultipartUpload { pub(crate) async fn upload_once( &self, - size: usize, upload_type: UploadType, ) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { - let (body, copy_from) = match upload_type { - UploadType::Write(body) => (body, None), + let (size, body, copy_from) = match upload_type { + UploadType::Write { + size, + body + } => (Some(size), body, None), UploadType::Copy { - endpoint, - from: file_path, + bucket, + from, body, } => { let from_url = format!( - "{endpoint}/{}", - utf8_percent_encode(file_path.as_ref(), &STRICT_PATH_ENCODE_SET) + "/{bucket}/{}", + utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET) ); - (body, Some(from_url)) + (None, body, Some(from_url)) } }; let url = format!( @@ -107,11 +112,14 @@ impl MultipartUpload { ); let mut builder = Request::builder() .uri(url) - .method(Method::PUT) - .header(CONTENT_LENGTH, size); + .method(Method::PUT); if let Some(from_url) = copy_from { builder = builder.header("x-amz-copy-source", from_url); } + // Tips: When the body is empty or the length is less than CONTENT_LENGTH, it may block + if let Some(size) = size { + builder = builder.header(CONTENT_LENGTH, size) + } let request = builder.body(body).map_err(|e| Error::Other(e.into()))?; let _ = self.send_request(request).await?; diff --git a/fusio/src/impls/remotes/aws/options.rs b/fusio/src/impls/remotes/aws/options.rs index 3e64355..89dd339 100644 --- a/fusio/src/impls/remotes/aws/options.rs +++ b/fusio/src/impls/remotes/aws/options.rs @@ -2,6 +2,7 @@ use super::credential::AwsCredential; pub(crate) struct S3Options { pub(crate) endpoint: String, + pub(crate) bucket: String, pub(crate) region: String, pub(crate) credential: Option, pub(crate) sign_payload: bool, diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index 79636b6..51c66da 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -254,7 +254,8 @@ impl Write for S3File { #[cfg(test)] mod tests { - #[ignore] + use crate::Write; + #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] #[tokio::test] async fn write_and_read_s3_file() { @@ -280,6 +281,7 @@ mod tests { let region = "ap-southeast-1"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id, secret_key, @@ -297,12 +299,16 @@ mod tests { }), }; - let mut s3 = S3File::new(s3, "read-write.txt".into()); + { + let mut s3 = S3File::new(s3.clone(), "read-write.txt".into()); - let (result, _) = s3 - .write_all(&b"The answer of life, universe and everthing"[..]) - .await; - result.unwrap(); + let (result, _) = s3 + .write_all(&b"The answer of life, universe and everthing"[..]) + .await; + result.unwrap(); + s3.close().await.unwrap(); + } + let mut s3 = S3File::new(s3, "read-write.txt".into()); let size = s3.size().await.unwrap(); assert_eq!(size, 42); diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index f4b24c1..9d12dc0 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -93,7 +93,10 @@ impl Write for S3Writer { let bytes = mem::replace(&mut self.buf, BytesMut::new()).freeze(); self.inner - .upload_once(bytes.len(), UploadType::Write(Full::new(bytes))) + .upload_once(UploadType::Write { + size: bytes.len(), + body: Full::new(bytes), + }) .await?; } return Ok(()); @@ -143,6 +146,7 @@ mod tests { let region = "ap-southeast-2"; let options = S3Options { endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), credential: Some(AwsCredential { key_id: "user".to_string(), secret_key: "password".to_string(), From 8782289550adcf4e9ede2394ae14946c73b18dcf Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 09:31:53 +0000 Subject: [PATCH 04/14] test: diff filesystem copy --- fusio/src/fs/mod.rs | 90 +++++++++++++++++-- fusio/src/impls/remotes/aws/fs.rs | 23 +++-- fusio/src/impls/remotes/aws/mod.rs | 2 +- .../src/impls/remotes/aws/multipart_upload.rs | 20 +---- 4 files changed, 104 insertions(+), 31 deletions(-) diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 34d54ae..74bfcbe 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -78,18 +78,98 @@ where .open_options(to, OpenOptions::default().create(true).write(true)) .await?; let buf_size = cmp::min(from_file_size, 4 * 1024); - let mut buf = vec![0u8; buf_size]; + let mut buf = Some(vec![0u8; buf_size]); let mut read_pos = 0u64; while (read_pos as usize) < from_file_size - 1 { - let (result, _) = from_file.read_exact_at(buf.as_slice_mut(), read_pos).await; + let tmp = buf.take().unwrap(); + let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await; result?; - read_pos += buf.len() as u64; + read_pos += tmp.len() as u64; - let (result, _) = to_file.write_all(buf.as_slice()).await; + let (result, tmp) = to_file.write_all(tmp).await; result?; - buf.resize(buf_size, 0); + buf = Some(tmp); } + to_file.close().await?; Ok(()) } + +#[cfg(test)] +mod tests { + + #[ignore] + #[cfg(all( + feature = "tokio-http", + feature = "tokio", + feature = "aws", + not(feature = "completion-based") + ))] + #[tokio::test] + async fn test_diff_fs_copy() -> Result<(), crate::Error> { + use std::sync::Arc; + + use tempfile::TempDir; + + use crate::{ + fs, + fs::{Fs, OpenOptions}, + impls::disk::tokio::fs::TokioFs, + path::Path, + remotes::{ + aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File}, + http::{tokio::TokioClient, DynHttpClient, HttpClient}, + }, + Read, Write, + }; + + let tmp_dir = TempDir::new()?; + let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?; + let s3_path: Path = "s3_copy_test.file".into(); + + let key_id = "user".to_string(); + let secret_key = "password".to_string(); + + let client = TokioClient::new(); + let region = "ap-southeast-1"; + let options = S3Options { + endpoint: "http://localhost:9000/data".into(), + bucket: "data".to_string(), + credential: Some(AwsCredential { + key_id, + secret_key, + token: None, + }), + region: region.into(), + sign_payload: true, + checksum: false, + }; + + let s3_fs = AmazonS3::new(Box::new(client), options); + let local_fs = TokioFs; + + { + let mut local_file = local_fs + .open_options(&local_path, OpenOptions::default().create(true).write(true)) + .await?; + local_file + .write_all("🎵never gonna give you up🎵".as_bytes()) + .await + .0?; + local_file.close().await.unwrap(); + } + fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?; + + let mut s3 = S3File::new(s3_fs, s3_path.clone()); + + let size = s3.size().await.unwrap(); + assert_eq!(size, 31); + let buf = Vec::new(); + let (result, buf) = s3.read_to_end_at(buf, 0).await; + result.unwrap(); + assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes()); + + Ok(()) + } +} diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 630448a..7c7c3a0 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -126,6 +126,14 @@ pub(super) struct AmazonS3Inner { pub(super) client: Box, } +impl AmazonS3 { + pub fn new(client: Box, options: S3Options) -> Self { + AmazonS3 { + inner: Arc::new(AmazonS3Inner { options, client }), + } + } +} + impl Fs for AmazonS3 { type File = S3File; @@ -246,13 +254,11 @@ impl Fs for AmazonS3 { async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { let upload = MultipartUpload::new(self.clone(), to.clone()); upload - .upload_once( - UploadType::Copy { - bucket: self.inner.options.bucket.clone(), - from: from.clone(), - body: Empty::::new(), - }, - ) + .upload_once(UploadType::Copy { + bucket: self.inner.options.bucket.clone(), + from: from.clone(), + body: Empty::::new(), + }) .await?; Ok(()) @@ -294,8 +300,7 @@ pub struct ListResponse { #[cfg(test)] mod tests { - use crate::fs::Fs; - use crate::path::Path; + use crate::{fs::Fs, path::Path}; #[cfg(feature = "tokio-http")] #[tokio::test] diff --git a/fusio/src/impls/remotes/aws/mod.rs b/fusio/src/impls/remotes/aws/mod.rs index a3b842c..3215d38 100644 --- a/fusio/src/impls/remotes/aws/mod.rs +++ b/fusio/src/impls/remotes/aws/mod.rs @@ -4,7 +4,7 @@ mod error; pub mod fs; pub(crate) mod multipart_upload; pub(crate) mod options; -mod s3; +pub(crate) mod s3; pub(crate) mod sign; pub(crate) mod writer; diff --git a/fusio/src/impls/remotes/aws/multipart_upload.rs b/fusio/src/impls/remotes/aws/multipart_upload.rs index c3a5bab..0b1b7eb 100644 --- a/fusio/src/impls/remotes/aws/multipart_upload.rs +++ b/fusio/src/impls/remotes/aws/multipart_upload.rs @@ -80,24 +80,14 @@ impl MultipartUpload { Self::check_response(response).await } - pub(crate) async fn upload_once( - &self, - upload_type: UploadType, - ) -> Result<(), Error> + pub(crate) async fn upload_once(&self, upload_type: UploadType) -> Result<(), Error> where B: Body + Clone + Unpin + Send + Sync + 'static, B::Error: std::error::Error + Send + Sync + 'static, { let (size, body, copy_from) = match upload_type { - UploadType::Write { - size, - body - } => (Some(size), body, None), - UploadType::Copy { - bucket, - from, - body, - } => { + UploadType::Write { size, body } => (Some(size), body, None), + UploadType::Copy { bucket, from, body } => { let from_url = format!( "/{bucket}/{}", utf8_percent_encode(from.as_ref(), &STRICT_PATH_ENCODE_SET) @@ -110,9 +100,7 @@ impl MultipartUpload { self.fs.as_ref().options.endpoint, utf8_percent_encode(self.path.as_ref(), &STRICT_PATH_ENCODE_SET) ); - let mut builder = Request::builder() - .uri(url) - .method(Method::PUT); + let mut builder = Request::builder().uri(url).method(Method::PUT); if let Some(from_url) = copy_from { builder = builder.header("x-amz-copy-source", from_url); } From be630d074cbf0c2c44058760ee88f4bba816ab3d Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 09:51:33 +0000 Subject: [PATCH 05/14] fix: ci fail on opendal --- fusio-opendal/src/fs.rs | 14 ++++++-------- fusio/src/impls/remotes/aws/s3.rs | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index 8f28b41..4b39aba 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -64,16 +64,14 @@ impl Fs for OpendalFs { .map_err(parse_opendal_error) } - fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend { - self.op.copy(from.as_ref(), to.as_ref()) + async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + self.op + .copy(from.as_ref(), to.as_ref()) + .await + .map_err(parse_opendal_error) } - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend { + async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { todo!() } } diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index 51c66da..ded2141 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -254,8 +254,8 @@ impl Write for S3File { #[cfg(test)] mod tests { - use crate::Write; + #[ignore] #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] #[tokio::test] async fn write_and_read_s3_file() { From 54f8cc88b15c14abc2c2f20f29d3eb49f3061848 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 10:22:54 +0000 Subject: [PATCH 06/14] test: add fs test for monoio & tokio uring --- fusio/src/lib.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 1ded622..5ed0786 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -542,18 +542,20 @@ mod tests { } #[cfg(all(feature = "tokio-uring", target_os = "linux"))] - #[tokio::test] - async fn test_tokio_uring_fs() { + #[test] + fn test_tokio_uring_fs() { use crate::disk::tokio_uring::fs::TokioUringFs; - test_local_fs_read_write(TokioUringFs).await.unwrap(); - test_local_fs_copy_link(TokioUringFs, TokioUringFs) - .await - .unwrap(); + tokio_uring::start(async { + test_local_fs_read_write(TokioUringFs).await.unwrap(); + test_local_fs_copy_link(TokioUringFs, TokioUringFs) + .await + .unwrap(); + }) } #[cfg(all(feature = "monoio", not(target_arch = "wasm32")))] - #[tokio::test] + #[monoio::test] async fn test_monoio_fs() { use crate::disk::monoio::fs::MonoIoFs; From d0463e39893ccc51f3d1185399af58f13e208d62 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Tue, 12 Nov 2024 10:53:48 +0000 Subject: [PATCH 07/14] test: add fs test for monoio & tokio uring --- fusio/src/lib.rs | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 5ed0786..083dfeb 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -377,10 +377,17 @@ mod tests { ) .await?; file.write_all("Hello! world".as_bytes()).await.0?; - + file.flush().await.unwrap(); file.close().await.unwrap(); - let (result, buf) = file.read_exact_at(vec![0u8; 12], 12).await; + let mut file = fs + .open_options( + &Path::from_absolute_path(&work_file_path)?, + OpenOptions::default().read(true), + ) + .await?; + + let (result, buf) = file.read_exact_at(vec![0u8; 12], 0).await; result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); } @@ -452,9 +459,17 @@ mod tests { ) .await?; src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.flush().await?; src_file.close().await?; - let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await; result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); @@ -499,9 +514,16 @@ mod tests { ) .await?; src_file.write_all("Hello! world".as_bytes()).await.0?; + src_file.flush().await?; src_file.close().await?; - let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 12).await; + let mut src_file = src_fs + .open_options( + &Path::from_absolute_path(&src_file_path)?, + OpenOptions::default().write(true).read(true), + ) + .await?; + let (result, buf) = src_file.read_exact_at(vec![0u8; 12], 0).await; result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); @@ -512,9 +534,9 @@ mod tests { ) .await?; - let (result, buf) = dst_file.read_exact_at(vec![0u8; 24], 0).await; + let (result, buf) = dst_file.read_exact_at(vec![0u8; 12], 0).await; result.unwrap(); - assert_eq!(buf.as_slice(), b"Hello! fusioHello! world"); + assert_eq!(buf.as_slice(), b"Hello! world"); } Ok(()) From 0fa5192f03c1008af6e76c2d40c22302098cac1e Mon Sep 17 00:00:00 2001 From: Kould Date: Wed, 13 Nov 2024 12:12:37 +0800 Subject: [PATCH 08/14] chore: add `copy` & `link` for `DynFs` --- fusio/src/dynamic/fs.rs | 34 +++++++++++++++++++++++++++ fusio/src/fs/mod.rs | 2 +- fusio/src/impls/remotes/aws/fs.rs | 4 ++-- fusio/src/impls/remotes/aws/s3.rs | 2 +- fusio/src/impls/remotes/aws/writer.rs | 2 +- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index 1df009f..c5100c8 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -84,6 +84,19 @@ pub trait DynFs: MaybeSend + MaybeSync { &'s self, path: &'path Path, ) -> Pin> + 's>>; + + fn copy<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>>; + + fn link<'s, 'path: 's>( + &'s self, + from: &'path Path, + to_fs: &'s Self, + to: &'path Path, + ) -> Pin> + 's>>; } impl DynFs for F { @@ -130,6 +143,27 @@ impl DynFs for F { ) -> Pin> + 's>> { Box::pin(F::remove(self, path)) } + + fn copy<'s, 'path: 's>( + &'s self, + from: &'path Path, + to: &'path Path, + ) -> Pin> + 's>> { + Box::pin(F::copy(self, from, to)) + } + + fn link<'s, 'path: 's>( + &'s self, + from: &'path Path, + to_fs: &'s Self, + to: &'path Path, + ) -> Pin> + 's>> { + Box::pin(async move { + self.link(from, to_fs, to).await?; + + Ok(()) + }) + } } #[cfg(test)] diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 74bfcbe..db42ff9 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -8,7 +8,7 @@ use std::{cmp, future::Future}; use futures_core::Stream; pub use options::*; -use crate::{path::Path, Error, IoBufMut, MaybeSend, MaybeSync, Read, Write}; +use crate::{path::Path, Error, MaybeSend, MaybeSync, Read, Write}; #[derive(Debug)] pub struct FileMeta { diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 7c7c3a0..9ae7e08 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -20,7 +20,7 @@ use crate::{ }, http::{DynHttpClient, HttpClient, HttpError}, }, - Error, Read, + Error, }; pub struct AmazonS3Builder { @@ -350,7 +350,7 @@ mod tests { options::S3Options, s3::S3File, }, - http::{tokio::TokioClient, DynHttpClient, HttpClient}, + http::{tokio::TokioClient, DynHttpClient}, }, Read, Write, }; diff --git a/fusio/src/impls/remotes/aws/s3.rs b/fusio/src/impls/remotes/aws/s3.rs index ded2141..54f1cc9 100644 --- a/fusio/src/impls/remotes/aws/s3.rs +++ b/fusio/src/impls/remotes/aws/s3.rs @@ -269,7 +269,7 @@ mod tests { options::S3Options, s3::S3File, }, - http::{tokio::TokioClient, DynHttpClient, HttpClient}, + http::{tokio::TokioClient, DynHttpClient}, }, Read, Write, }; diff --git a/fusio/src/impls/remotes/aws/writer.rs b/fusio/src/impls/remotes/aws/writer.rs index 9d12dc0..6b96d5b 100644 --- a/fusio/src/impls/remotes/aws/writer.rs +++ b/fusio/src/impls/remotes/aws/writer.rs @@ -2,7 +2,7 @@ use std::{mem, pin::Pin, sync::Arc}; use bytes::{BufMut, BytesMut}; use futures_util::{stream::FuturesOrdered, StreamExt}; -use http_body_util::{Empty, Full}; +use http_body_util::Full; use crate::{ dynamic::MaybeSendFuture, From fdd73488492366a35f6d5bc2edd50b25d6ba2a3b Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Wed, 13 Nov 2024 07:08:29 +0000 Subject: [PATCH 09/14] chore: remove `Fs::link` arg `to_fs` --- fusio-object-store/src/fs.rs | 2 +- fusio-opendal/src/fs.rs | 2 +- fusio/src/dynamic/fs.rs | 8 +------- fusio/src/fs/mod.rs | 7 +------ fusio/src/impls/disk/monoio/fs.rs | 7 +------ fusio/src/impls/disk/opfs/fs.rs | 4 ++-- fusio/src/impls/disk/tokio/fs.rs | 7 +------ fusio/src/impls/disk/tokio_uring/fs.rs | 7 +------ fusio/src/impls/remotes/aws/fs.rs | 2 +- fusio/src/lib.rs | 26 ++++++++------------------ 10 files changed, 18 insertions(+), 54 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index e5340ac..f550307 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -81,7 +81,7 @@ impl Fs for S3Store { Ok(()) } - async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "s3 does not support link file".to_string(), }) diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index 4b39aba..0e04361 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -71,7 +71,7 @@ impl Fs for OpendalFs { .map_err(parse_opendal_error) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { todo!() } } diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index c5100c8..939ff79 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -94,7 +94,6 @@ pub trait DynFs: MaybeSend + MaybeSync { fn link<'s, 'path: 's>( &'s self, from: &'path Path, - to_fs: &'s Self, to: &'path Path, ) -> Pin> + 's>>; } @@ -155,14 +154,9 @@ impl DynFs for F { fn link<'s, 'path: 's>( &'s self, from: &'path Path, - to_fs: &'s Self, to: &'path Path, ) -> Pin> + 's>> { - Box::pin(async move { - self.link(from, to_fs, to).await?; - - Ok(()) - }) + Box::pin(F::link(self, from, to)) } } diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index db42ff9..9792629 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -52,12 +52,7 @@ pub trait Fs: MaybeSend + MaybeSync { fn copy(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; - fn link( - &self, - from: &Path, - to_fs: &F, - to: &Path, - ) -> impl Future> + MaybeSend; + fn link(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; } pub async fn copy(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error> diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index 919c1a7..229010a 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -70,12 +70,7 @@ impl Fs for MonoIoFs { Ok(()) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() != to_fs.file_system() { - return Err(Error::Unsupported { - message: "file system is inconsistent".to_string(), - }); - } + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { let from = path_to_local(from)?; let to = path_to_local(to)?; diff --git a/fusio/src/impls/disk/opfs/fs.rs b/fusio/src/impls/disk/opfs/fs.rs index a1bc0d3..3044153 100644 --- a/fusio/src/impls/disk/opfs/fs.rs +++ b/fusio/src/impls/disk/opfs/fs.rs @@ -104,13 +104,13 @@ impl Fs for OPFS { Ok(()) } - async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> { + async fn copy(&self, _: &Path, _: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support copy file".to_string(), }) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opfs does not support link file".to_string(), }) diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index c8848a9..e5f0b73 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -81,12 +81,7 @@ impl Fs for TokioFs { Ok(()) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() != to_fs.file_system() { - return Err(Error::Unsupported { - message: "file system is inconsistent".to_string(), - }); - } + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { let from = path_to_local(from)?; let to = path_to_local(to)?; diff --git a/fusio/src/impls/disk/tokio_uring/fs.rs b/fusio/src/impls/disk/tokio_uring/fs.rs index 02c3ddb..e28277f 100644 --- a/fusio/src/impls/disk/tokio_uring/fs.rs +++ b/fusio/src/impls/disk/tokio_uring/fs.rs @@ -74,12 +74,7 @@ impl Fs for TokioUringFs { Ok(()) } - async fn link(&self, from: &Path, to_fs: &F, to: &Path) -> Result<(), Error> { - if self.file_system() != to_fs.file_system() { - return Err(Error::Unsupported { - message: "file system is inconsistent".to_string(), - }); - } + async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { let from = path_to_local(from)?; let to = path_to_local(to)?; diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index 9ae7e08..ff16080 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -264,7 +264,7 @@ impl Fs for AmazonS3 { Ok(()) } - async fn link(&self, _: &Path, _: &F, _: &Path) -> Result<(), Error> { + async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "s3 does not support link file".to_string(), }) diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 083dfeb..902785f 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -396,11 +396,7 @@ mod tests { } #[allow(unused)] - async fn test_local_fs_copy_link(src_fs: S, dst_fs: D) -> Result<(), Error> - where - S: crate::fs::Fs, - D: crate::fs::Fs, - { + async fn test_local_fs_copy_link(src_fs: F) -> Result<(), Error> { use std::collections::HashSet; use futures_util::StreamExt; @@ -417,9 +413,6 @@ mod tests { src_fs .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) .await?; - dst_fs - .create_dir_all(&Path::from_absolute_path(&work_dir_path)?) - .await?; // create files let _ = src_fs @@ -428,7 +421,7 @@ mod tests { OpenOptions::default().create(true), ) .await?; - let _ = dst_fs + let _ = src_fs .open_options( &Path::from_absolute_path(&dst_file_path)?, OpenOptions::default().create(true), @@ -473,7 +466,7 @@ mod tests { result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); - let mut dst_file = dst_fs + let mut dst_file = src_fs .open_options( &Path::from_absolute_path(&dst_file_path)?, OpenOptions::default().read(true), @@ -485,7 +478,7 @@ mod tests { assert_eq!(buf.as_slice(), b"Hello! fusio"); } - dst_fs + src_fs .remove(&Path::from_absolute_path(&dst_file_path)?) .await?; // link @@ -502,7 +495,6 @@ mod tests { src_fs .link( &Path::from_absolute_path(&src_file_path)?, - &dst_fs, &Path::from_absolute_path(&dst_file_path)?, ) .await?; @@ -527,7 +519,7 @@ mod tests { result.unwrap(); assert_eq!(buf.as_slice(), b"Hello! world"); - let mut dst_file = dst_fs + let mut dst_file = src_fs .open_options( &Path::from_absolute_path(&dst_file_path)?, OpenOptions::default().read(true), @@ -560,7 +552,7 @@ mod tests { use crate::disk::TokioFs; test_local_fs_read_write(TokioFs).await.unwrap(); - test_local_fs_copy_link(TokioFs, TokioFs).await.unwrap(); + test_local_fs_copy_link(TokioFs).await.unwrap(); } #[cfg(all(feature = "tokio-uring", target_os = "linux"))] @@ -570,9 +562,7 @@ mod tests { tokio_uring::start(async { test_local_fs_read_write(TokioUringFs).await.unwrap(); - test_local_fs_copy_link(TokioUringFs, TokioUringFs) - .await - .unwrap(); + test_local_fs_copy_link(TokioUringFs).await.unwrap(); }) } @@ -582,7 +572,7 @@ mod tests { use crate::disk::monoio::fs::MonoIoFs; test_local_fs_read_write(MonoIoFs).await.unwrap(); - test_local_fs_copy_link(MonoIoFs, MonoIoFs).await.unwrap(); + test_local_fs_copy_link(MonoIoFs).await.unwrap(); } #[cfg(all(feature = "tokio", not(target_arch = "wasm32")))] From ea56dbad06217f4cfb77b2c3f7e18493b78a7f10 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Fri, 15 Nov 2024 03:23:37 +0000 Subject: [PATCH 10/14] fix: fix the semantics of append writing when TokioFs::open_options (write from scratch after reopening) --- fusio/Cargo.toml | 2 +- fusio/src/error.rs | 8 ++++++++ fusio/src/impls/disk/monoio/fs.rs | 10 ++++++++-- fusio/src/impls/disk/tokio/fs.rs | 2 +- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/fusio/Cargo.toml b/fusio/Cargo.toml index bdc47de..700b312 100644 --- a/fusio/Cargo.toml +++ b/fusio/Cargo.toml @@ -80,7 +80,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [ "http2", ] } itertools = { version = "0.13" } -monoio = { version = "0.2", optional = true } +monoio = { version = "0.2", optional = true, features = ["sync"] } object_store = { version = "0.11", optional = true, features = ["aws"] } percent-encoding = { version = "2", default-features = false } quick-xml = { version = "0.36", features = [ diff --git a/fusio/src/error.rs b/fusio/src/error.rs index 550709a..973f4b2 100644 --- a/fusio/src/error.rs +++ b/fusio/src/error.rs @@ -22,6 +22,9 @@ pub enum Error { Wasm { message: String, }, + #[cfg(feature = "monoio")] + #[error("monoio JoinHandle was canceled")] + MonoIOJoinCancel, #[error(transparent)] Other(#[from] BoxedError), } @@ -34,3 +37,8 @@ pub(crate) fn wasm_err(js_val: js_sys::wasm_bindgen::JsValue) -> Error { message: format!("{js_val:?}"), } } + +#[cfg(feature = "monoio")] +pub(crate) fn monoio_join_err(_: monoio::blocking::JoinError) -> Error { + Error::MonoIOJoinCancel +} diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index 229010a..3bfacdd 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -2,9 +2,11 @@ use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; +use monoio::blocking::spawn_blocking; use super::MonoioFile; use crate::{ + error::monoio_join_err, fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, @@ -65,7 +67,9 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - fs::copy(&from, &to)?; + spawn_blocking(move || fs::copy(&from, &to)) + .await + .map_err(monoio_join_err)??; Ok(()) } @@ -74,7 +78,9 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - fs::hard_link(&from, &to)?; + spawn_blocking(move || fs::hard_link(&from, &to)) + .await + .map_err(monoio_join_err)??; Ok(()) } diff --git a/fusio/src/impls/disk/tokio/fs.rs b/fusio/src/impls/disk/tokio/fs.rs index e5f0b73..2f470d4 100644 --- a/fusio/src/impls/disk/tokio/fs.rs +++ b/fusio/src/impls/disk/tokio/fs.rs @@ -27,7 +27,7 @@ impl Fs for TokioFs { let file = tokio::fs::OpenOptions::new() .read(options.read) - .append(options.write) + .write(options.write) .create(options.create) .open(&local_path) .await?; From b9ee7921b14c9f05c66699209142e4270255a748 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Fri, 15 Nov 2024 03:35:24 +0000 Subject: [PATCH 11/14] fix: MonoioFS execute blocking task without thread pool attached --- fusio-opendal/src/fs.rs | 4 +++- fusio/src/error.rs | 8 -------- fusio/src/impls/disk/monoio/fs.rs | 10 ++-------- 3 files changed, 5 insertions(+), 17 deletions(-) diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index 0e04361..f579036 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -72,6 +72,8 @@ impl Fs for OpendalFs { } async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { - todo!() + Err(Error::Unsupported { + message: "opendal does not support link file".to_string(), + }) } } diff --git a/fusio/src/error.rs b/fusio/src/error.rs index 973f4b2..550709a 100644 --- a/fusio/src/error.rs +++ b/fusio/src/error.rs @@ -22,9 +22,6 @@ pub enum Error { Wasm { message: String, }, - #[cfg(feature = "monoio")] - #[error("monoio JoinHandle was canceled")] - MonoIOJoinCancel, #[error(transparent)] Other(#[from] BoxedError), } @@ -37,8 +34,3 @@ pub(crate) fn wasm_err(js_val: js_sys::wasm_bindgen::JsValue) -> Error { message: format!("{js_val:?}"), } } - -#[cfg(feature = "monoio")] -pub(crate) fn monoio_join_err(_: monoio::blocking::JoinError) -> Error { - Error::MonoIOJoinCancel -} diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index 3bfacdd..58de060 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -2,11 +2,9 @@ use std::{fs, fs::create_dir_all}; use async_stream::stream; use futures_core::Stream; -use monoio::blocking::spawn_blocking; use super::MonoioFile; use crate::{ - error::monoio_join_err, fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::{path_to_local, Path}, Error, @@ -67,9 +65,7 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - spawn_blocking(move || fs::copy(&from, &to)) - .await - .map_err(monoio_join_err)??; + let _ = monoio::spawn(async move { fs::copy(&from, &to) }).await?; Ok(()) } @@ -78,9 +74,7 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - spawn_blocking(move || fs::hard_link(&from, &to)) - .await - .map_err(monoio_join_err)??; + let _ = monoio::spawn(async move { fs::hard_link(&from, &to) }).await?; Ok(()) } From 91818fe20c36270316035943f398c83de77f99b6 Mon Sep 17 00:00:00 2001 From: kkould <2435992353@qq.com> Date: Fri, 15 Nov 2024 03:59:27 +0000 Subject: [PATCH 12/14] fix: test_local_fs_copy_link ci fail --- fusio/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/fusio/src/lib.rs b/fusio/src/lib.rs index 902785f..a4fd45d 100644 --- a/fusio/src/lib.rs +++ b/fusio/src/lib.rs @@ -395,6 +395,7 @@ mod tests { Ok(()) } + #[cfg(not(target_arch = "wasm32"))] #[allow(unused)] async fn test_local_fs_copy_link(src_fs: F) -> Result<(), Error> { use std::collections::HashSet; From a86969d0564f010f4f1ab136deae4cbe638d8f37 Mon Sep 17 00:00:00 2001 From: Kould Date: Fri, 15 Nov 2024 19:43:19 +0800 Subject: [PATCH 13/14] chore: move copy to dynamic --- fusio/src/dynamic/fs.rs | 47 ++++++++++++++++++++++- fusio/src/fs/mod.rs | 63 +++++++++---------------------- fusio/src/impls/disk/monoio/fs.rs | 4 +- 3 files changed, 64 insertions(+), 50 deletions(-) diff --git a/fusio/src/dynamic/fs.rs b/fusio/src/dynamic/fs.rs index 939ff79..5d96bf2 100644 --- a/fusio/src/dynamic/fs.rs +++ b/fusio/src/dynamic/fs.rs @@ -1,11 +1,11 @@ -use std::pin::Pin; +use std::{cmp, pin::Pin, sync::Arc}; use futures_core::Stream; use super::MaybeSendFuture; use crate::{ buf::IoBufMut, - fs::{FileMeta, Fs, OpenOptions}, + fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write, }; @@ -48,6 +48,8 @@ impl<'write> Write for Box { } pub trait DynFs: MaybeSend + MaybeSync { + fn file_system(&self) -> FileSystemTag; + fn open<'s, 'path: 's>( &'s self, path: &'path Path, @@ -99,6 +101,10 @@ pub trait DynFs: MaybeSend + MaybeSync { } impl DynFs for F { + fn file_system(&self) -> FileSystemTag { + Fs::file_system(self) + } + fn open_options<'s, 'path: 's>( &'s self, path: &'path Path, @@ -160,6 +166,43 @@ impl DynFs for F { } } +pub async fn copy( + from_fs: &Arc, + from: &Path, + to_fs: &Arc, + to: &Path, +) -> Result<(), Error> { + if from_fs.file_system() == to_fs.file_system() { + from_fs.copy(from, to).await?; + return Ok(()); + } + let mut from_file = from_fs + .open_options(from, OpenOptions::default().read(true)) + .await?; + let from_file_size = DynRead::size(&from_file).await? as usize; + + let mut to_file = to_fs + .open_options(to, OpenOptions::default().create(true).write(true)) + .await?; + let buf_size = cmp::min(from_file_size, 4 * 1024); + let mut buf = Some(vec![0u8; buf_size]); + let mut read_pos = 0u64; + + while (read_pos as usize) < from_file_size - 1 { + let tmp = buf.take().unwrap(); + let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await; + result?; + read_pos += tmp.bytes_init() as u64; + + let (result, tmp) = Write::write_all(&mut to_file, tmp).await; + result?; + buf = Some(tmp); + } + DynWrite::close(&mut to_file).await?; + + Ok(()) +} + #[cfg(test)] mod tests { diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index 9792629..be9293c 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -3,7 +3,7 @@ mod options; -use std::{cmp, future::Future}; +use std::future::Future; use futures_core::Stream; pub use options::*; @@ -55,44 +55,9 @@ pub trait Fs: MaybeSend + MaybeSync { fn link(&self, from: &Path, to: &Path) -> impl Future> + MaybeSend; } -pub async fn copy(from_fs: &F, from: &Path, to_fs: &T, to: &Path) -> Result<(), Error> -where - F: Fs, - T: Fs, -{ - if from_fs.file_system() == to_fs.file_system() { - from_fs.copy(from, to).await?; - return Ok(()); - } - let mut from_file = from_fs - .open_options(from, OpenOptions::default().read(true)) - .await?; - let from_file_size = from_file.size().await? as usize; - - let mut to_file = to_fs - .open_options(to, OpenOptions::default().create(true).write(true)) - .await?; - let buf_size = cmp::min(from_file_size, 4 * 1024); - let mut buf = Some(vec![0u8; buf_size]); - let mut read_pos = 0u64; - - while (read_pos as usize) < from_file_size - 1 { - let tmp = buf.take().unwrap(); - let (result, tmp) = from_file.read_exact_at(tmp, read_pos).await; - result?; - read_pos += tmp.len() as u64; - - let (result, tmp) = to_file.write_all(tmp).await; - result?; - buf = Some(tmp); - } - to_file.close().await?; - - Ok(()) -} - #[cfg(test)] mod tests { + use crate::DynFs; #[ignore] #[cfg(all( @@ -108,13 +73,12 @@ mod tests { use tempfile::TempDir; use crate::{ - fs, fs::{Fs, OpenOptions}, impls::disk::tokio::fs::TokioFs, path::Path, remotes::{ aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File}, - http::{tokio::TokioClient, DynHttpClient, HttpClient}, + http::tokio::TokioClient, }, Read, Write, }; @@ -141,22 +105,29 @@ mod tests { checksum: false, }; - let s3_fs = AmazonS3::new(Box::new(client), options); - let local_fs = TokioFs; + let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options)); + let local_fs = Arc::new(TokioFs); { - let mut local_file = local_fs - .open_options(&local_path, OpenOptions::default().create(true).write(true)) - .await?; + let mut local_file = Fs::open_options( + local_fs.as_ref(), + &local_path, + OpenOptions::default().create(true).write(true), + ) + .await?; local_file .write_all("🎵never gonna give you up🎵".as_bytes()) .await .0?; local_file.close().await.unwrap(); } - fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?; + { + let s3_fs = s3_fs.clone() as Arc; + let local_fs = local_fs.clone() as Arc; + crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?; + } - let mut s3 = S3File::new(s3_fs, s3_path.clone()); + let mut s3 = S3File::new(Arc::into_inner(s3_fs).unwrap(), s3_path.clone()); let size = s3.size().await.unwrap(); assert_eq!(size, 31); diff --git a/fusio/src/impls/disk/monoio/fs.rs b/fusio/src/impls/disk/monoio/fs.rs index 58de060..9a7e1b1 100644 --- a/fusio/src/impls/disk/monoio/fs.rs +++ b/fusio/src/impls/disk/monoio/fs.rs @@ -65,7 +65,7 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - let _ = monoio::spawn(async move { fs::copy(&from, &to) }).await?; + monoio::spawn(async move { fs::copy(&from, &to) }).await?; Ok(()) } @@ -74,7 +74,7 @@ impl Fs for MonoIoFs { let from = path_to_local(from)?; let to = path_to_local(to)?; - let _ = monoio::spawn(async move { fs::hard_link(&from, &to) }).await?; + monoio::spawn(async move { fs::hard_link(&from, &to) }).await?; Ok(()) } From db7c1dcb9b606a1bed2cd031b416623b03048cee Mon Sep 17 00:00:00 2001 From: Gwo Tzu-Hsing Date: Tue, 19 Nov 2024 15:53:04 +0800 Subject: [PATCH 14/14] clippy --- fusio-object-store/src/fs.rs | 4 ++-- fusio-opendal/src/fs.rs | 6 ++---- fusio/src/fs/mod.rs | 4 +--- fusio/src/impls/remotes/aws/fs.rs | 5 ++++- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/fusio-object-store/src/fs.rs b/fusio-object-store/src/fs.rs index f550307..3834d24 100644 --- a/fusio-object-store/src/fs.rs +++ b/fusio-object-store/src/fs.rs @@ -1,10 +1,10 @@ -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use async_stream::stream; use fusio::{ fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, MaybeSend, + Error, }; use futures_core::Stream; use futures_util::stream::StreamExt; diff --git a/fusio-opendal/src/fs.rs b/fusio-opendal/src/fs.rs index f579036..7e7ef12 100644 --- a/fusio-opendal/src/fs.rs +++ b/fusio-opendal/src/fs.rs @@ -1,9 +1,7 @@ -use std::future::Future; - use fusio::{ fs::{FileMeta, FileSystemTag, Fs, OpenOptions}, path::Path, - Error, MaybeSend, + Error, }; use futures_core::Stream; use futures_util::TryStreamExt; @@ -71,7 +69,7 @@ impl Fs for OpendalFs { .map_err(parse_opendal_error) } - async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> { + async fn link(&self, _from: &Path, _to: &Path) -> Result<(), Error> { Err(Error::Unsupported { message: "opendal does not support link file".to_string(), }) diff --git a/fusio/src/fs/mod.rs b/fusio/src/fs/mod.rs index be9293c..d0589d8 100644 --- a/fusio/src/fs/mod.rs +++ b/fusio/src/fs/mod.rs @@ -57,8 +57,6 @@ pub trait Fs: MaybeSend + MaybeSync { #[cfg(test)] mod tests { - use crate::DynFs; - #[ignore] #[cfg(all( feature = "tokio-http", @@ -80,7 +78,7 @@ mod tests { aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File}, http::tokio::TokioClient, }, - Read, Write, + DynFs, Read, Write, }; let tmp_dir = TempDir::new()?; diff --git a/fusio/src/impls/remotes/aws/fs.rs b/fusio/src/impls/remotes/aws/fs.rs index ff16080..fad6aaa 100644 --- a/fusio/src/impls/remotes/aws/fs.rs +++ b/fusio/src/impls/remotes/aws/fs.rs @@ -127,8 +127,10 @@ pub(super) struct AmazonS3Inner { } impl AmazonS3 { - pub fn new(client: Box, options: S3Options) -> Self { + #[allow(dead_code)] + pub(crate) fn new(client: Box, options: S3Options) -> Self { AmazonS3 { + #[allow(clippy::arc_with_non_send_sync)] inner: Arc::new(AmazonS3Inner { options, client }), } } @@ -300,6 +302,7 @@ pub struct ListResponse { #[cfg(test)] mod tests { + #[cfg(feature = "tokio-http")] use crate::{fs::Fs, path::Path}; #[cfg(feature = "tokio-http")]