Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fs adds copy & link #100

Merged
merged 14 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion fusio-object-store/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_stream::stream;
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
};
Expand All @@ -27,6 +27,10 @@ impl<O: ObjectStore> From<O> for S3Store<O> {
impl<O: ObjectStore> Fs for S3Store<O> {
type File = S3File<O>;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::S3
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
if !options.truncate {
return Err(Error::Unsupported {
Expand Down Expand Up @@ -64,4 +68,22 @@ impl<O: ObjectStore> Fs for S3Store<O> {

Ok(())
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = from.clone().into();
let to = to.clone().into();

self.inner
.copy(&from, &to)
.await
.map_err(BoxedError::from)?;

Ok(())
}

async fn link(&self, _: &Path, _: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "s3 does not support link file".to_string(),
})
}
}
19 changes: 18 additions & 1 deletion fusio-opendal/src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use fusio::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
Error,
};
Expand All @@ -25,6 +25,10 @@ impl From<Operator> for OpendalFs {
impl Fs for OpendalFs {
type File = OpendalFile;

fn file_system(&self) -> FileSystemTag {
todo!()
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
OpendalFile::open(self.op.clone(), path.to_string(), options).await
}
Expand Down Expand Up @@ -57,4 +61,17 @@ impl Fs for OpendalFs {
.await
.map_err(parse_opendal_error)
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
self.op
.copy(from.as_ref(), to.as_ref())
.await
.map_err(parse_opendal_error)
}

async fn link(&self, _from: &Path, _to: &Path) -> Result<(), Error> {
Err(Error::Unsupported {
message: "opendal does not support link file".to_string(),
})
}
}
2 changes: 1 addition & 1 deletion fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ hyper = { version = "1", optional = true, default-features = false, features = [
"http2",
] }
itertools = { version = "0.13" }
monoio = { version = "0.2", optional = true }
monoio = { version = "0.2", optional = true, features = ["sync"] }
object_store = { version = "0.11", optional = true, features = ["aws"] }
percent-encoding = { version = "2", default-features = false }
quick-xml = { version = "0.36", features = [
Expand Down
75 changes: 73 additions & 2 deletions fusio/src/dynamic/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::pin::Pin;
use std::{cmp, pin::Pin, sync::Arc};

use futures_core::Stream;

use super::MaybeSendFuture;
use crate::{
buf::IoBufMut,
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::Path,
DynRead, DynWrite, Error, IoBuf, MaybeSend, MaybeSync, Read, Write,
};
Expand Down Expand Up @@ -48,6 +48,8 @@ impl<'write> Write for Box<dyn DynFile + 'write> {
}

pub trait DynFs: MaybeSend + MaybeSync {
fn file_system(&self) -> FileSystemTag;

fn open<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -84,9 +86,25 @@ pub trait DynFs: MaybeSend + MaybeSync {
&'s self,
path: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

fn copy<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;

fn link<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>>;
}

impl<F: Fs> DynFs for F {
fn file_system(&self) -> FileSystemTag {
Fs::file_system(self)
}

fn open_options<'s, 'path: 's>(
&'s self,
path: &'path Path,
Expand Down Expand Up @@ -130,6 +148,59 @@ impl<F: Fs> DynFs for F {
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::remove(self, path))
}

fn copy<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::copy(self, from, to))
}

fn link<'s, 'path: 's>(
&'s self,
from: &'path Path,
to: &'path Path,
) -> Pin<Box<dyn MaybeSendFuture<Output = Result<(), Error>> + 's>> {
Box::pin(F::link(self, from, to))
}
}

pub async fn copy(
from_fs: &Arc<dyn DynFs>,
from: &Path,
to_fs: &Arc<dyn DynFs>,
to: &Path,
) -> Result<(), Error> {
if from_fs.file_system() == to_fs.file_system() {
from_fs.copy(from, to).await?;
return Ok(());
}
let mut from_file = from_fs
.open_options(from, OpenOptions::default().read(true))
.await?;
let from_file_size = DynRead::size(&from_file).await? as usize;

let mut to_file = to_fs
.open_options(to, OpenOptions::default().create(true).write(true))
.await?;
let buf_size = cmp::min(from_file_size, 4 * 1024);
let mut buf = Some(vec![0u8; buf_size]);
let mut read_pos = 0u64;

while (read_pos as usize) < from_file_size - 1 {
let tmp = buf.take().unwrap();
let (result, tmp) = Read::read_exact_at(&mut from_file, tmp, read_pos).await;
result?;
read_pos += tmp.bytes_init() as u64;

let (result, tmp) = Write::write_all(&mut to_file, tmp).await;
result?;
buf = Some(tmp);
}
DynWrite::close(&mut to_file).await?;

Ok(())
}

#[cfg(test)]
Expand Down
97 changes: 97 additions & 0 deletions fusio/src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@ pub struct FileMeta {
pub size: u64,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum FileSystemTag {
Local,
OPFS,
// TODO: Remote needs to check whether endpoint and other remote fs are consistent
S3,
}

pub trait Fs: MaybeSend + MaybeSync {
//! This trait is used to abstract file system operations across different file systems.

type File: Read + Write + MaybeSend + 'static;

fn file_system(&self) -> FileSystemTag;

fn open(&self, path: &Path) -> impl Future<Output = Result<Self::File, Error>> {
self.open_options(path, OpenOptions::default())
}
Expand All @@ -39,4 +49,91 @@ pub trait Fs: MaybeSend + MaybeSync {
) -> impl Future<Output = Result<impl Stream<Item = Result<FileMeta, Error>>, Error>> + MaybeSend;

fn remove(&self, path: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;

fn link(&self, from: &Path, to: &Path) -> impl Future<Output = Result<(), Error>> + MaybeSend;
}

#[cfg(test)]
mod tests {
#[ignore]
#[cfg(all(
feature = "tokio-http",
feature = "tokio",
feature = "aws",
not(feature = "completion-based")
))]
#[tokio::test]
async fn test_diff_fs_copy() -> Result<(), crate::Error> {
use std::sync::Arc;

use tempfile::TempDir;

use crate::{
fs::{Fs, OpenOptions},
impls::disk::tokio::fs::TokioFs,
path::Path,
remotes::{
aws::{credential::AwsCredential, fs::AmazonS3, options::S3Options, s3::S3File},
http::tokio::TokioClient,
},
DynFs, Read, Write,
};

let tmp_dir = TempDir::new()?;
let local_path = Path::from_absolute_path(&tmp_dir.as_ref().join("test.file"))?;
let s3_path: Path = "s3_copy_test.file".into();

let key_id = "user".to_string();
let secret_key = "password".to_string();

let client = TokioClient::new();
let region = "ap-southeast-1";
let options = S3Options {
endpoint: "http://localhost:9000/data".into(),
bucket: "data".to_string(),
credential: Some(AwsCredential {
key_id,
secret_key,
token: None,
}),
region: region.into(),
sign_payload: true,
checksum: false,
};

let s3_fs = Arc::new(AmazonS3::new(Box::new(client), options));
let local_fs = Arc::new(TokioFs);

{
let mut local_file = Fs::open_options(
local_fs.as_ref(),
&local_path,
OpenOptions::default().create(true).write(true),
)
.await?;
local_file
.write_all("🎵never gonna give you up🎵".as_bytes())
KKould marked this conversation as resolved.
Show resolved Hide resolved
.await
.0?;
local_file.close().await.unwrap();
}
{
let s3_fs = s3_fs.clone() as Arc<dyn DynFs>;
let local_fs = local_fs.clone() as Arc<dyn DynFs>;
crate::dynamic::fs::copy(&local_fs, &local_path, &s3_fs, &s3_path).await?;
}

let mut s3 = S3File::new(Arc::into_inner(s3_fs).unwrap(), s3_path.clone());

let size = s3.size().await.unwrap();
assert_eq!(size, 31);
let buf = Vec::new();
let (result, buf) = s3.read_to_end_at(buf, 0).await;
result.unwrap();
assert_eq!(buf, "🎵never gonna give you up🎵".as_bytes());

Ok(())
}
}
28 changes: 25 additions & 3 deletions fusio/src/impls/disk/monoio/fs.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fs::create_dir_all;
use std::{fs, fs::create_dir_all};

use async_stream::stream;
use futures_core::Stream;

use super::MonoioFile;
use crate::{
fs::{FileMeta, Fs, OpenOptions},
fs::{FileMeta, FileSystemTag, Fs, OpenOptions},
path::{path_to_local, Path},
Error,
};
Expand All @@ -15,6 +15,10 @@ pub struct MonoIoFs;
impl Fs for MonoIoFs {
type File = MonoioFile;

fn file_system(&self) -> FileSystemTag {
FileSystemTag::Local
}

async fn open_options(&self, path: &Path, options: OpenOptions) -> Result<Self::File, Error> {
let local_path = path_to_local(path)?;

Expand Down Expand Up @@ -54,6 +58,24 @@ impl Fs for MonoIoFs {
async fn remove(&self, path: &Path) -> Result<(), Error> {
let path = path_to_local(path)?;

Ok(std::fs::remove_file(path)?)
Ok(fs::remove_file(path)?)
}

async fn copy(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

monoio::spawn(async move { fs::copy(&from, &to) }).await?;

Ok(())
}

async fn link(&self, from: &Path, to: &Path) -> Result<(), Error> {
let from = path_to_local(from)?;
let to = path_to_local(to)?;

monoio::spawn(async move { fs::hard_link(&from, &to) }).await?;

Ok(())
}
}
Loading
Loading