Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rename and fetch_data progress report #12

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 38 additions & 96 deletions examples/sftp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use wincs::{
filter::{info, ticket, SyncFilter},
placeholder_file::{Metadata, PlaceholderFile},
request::Request,
CloudErrorKind, PopulationType, Registration, SecurityId, SyncRootIdBuilder,
CloudErrorKind, HydrationType, PopulationType, Registration, SecurityId, SyncRootIdBuilder,
};

// max should be 65536, this is done both in term-scp and sshfs because it's the
// max packet size for a tcp connection
const DOWNLOAD_CHUNK_SIZE_BYTES: usize = 4096;
// doesn't have to be 4KiB aligned
const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096;
// const UPLOAD_CHUNK_SIZE_BYTES: usize = 4096;

const PROVIDER_NAME: &str = "wincs";
const DISPLAY_NAME: &str = "Sftp";
Expand Down Expand Up @@ -59,7 +59,7 @@ fn main() {
let u16_display_name = U16String::from_str(DISPLAY_NAME);
Registration::from_sync_root_id(&sync_root_id)
.display_name(&u16_display_name)
.hydration_type(wincs::HydrationType::Full)
.hydration_type(HydrationType::Full)
.population_type(PopulationType::Full)
.icon(
U16String::from_str("%SystemRoot%\\system32\\charmap.exe"),
Expand Down Expand Up @@ -119,48 +119,6 @@ pub struct Filter {
}

impl Filter {
pub fn create_file(&self, src: &Path, dest: &Path) -> Result<(), SftpError> {
let mut client_file = File::open(src)?;
// TODO: This will overwrite the file if it exists on the server
let mut server_file = self.sftp.create(dest)?;

let mut buffer = [0; UPLOAD_CHUNK_SIZE_BYTES];
let mut bytes_written = 0;

// TODO: I could do the little offset trick and moving the old bytes to the
// beginning of the buffer, I just don't know if it's worth it
loop {
client_file.seek(SeekFrom::Start(bytes_written))?;
match client_file.read(&mut buffer) {
Ok(0) => break,
Ok(bytes_read) => {
bytes_written += server_file.write(&buffer[0..bytes_read])? as u64;
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
Err(err) => return Err(SftpError::Io(err)),
}
}

Ok(())
}

// TODO: src is full, dest is relative
pub fn create_dir_all(&self, src: &Path, dest: &Path) -> Result<(), SftpError> {
// TODO: what does the "o" mean in 0o775
self.sftp.mkdir(dest, 0o775)?;

for entry in fs::read_dir(src)? {
let src = entry?.path();
let dest = dest.join(src.file_name().unwrap());
match src.is_dir() {
true => self.create_dir_all(&src, &dest)?,
false => self.create_file(&src, &dest)?,
}
}

Ok(())
}

pub fn remove_dir_all(&self, dest: &Path) -> Result<(), ssh2::Error> {
for entry in self.sftp.readdir(dest)? {
match entry.0.is_dir() {
Expand All @@ -173,28 +131,27 @@ impl Filter {
}
}

// TODO: handle unwraps
// TODO: everything is just forwarded to external functions... This should be
// changed in the wrapper api
impl SyncFilter for Filter {
// TODO: handle unwraps
fn fetch_data(&self, request: Request, ticket: ticket::FetchData, info: info::FetchData) {
println!("fetch_data {:?}", request.file_blob());
// TODO: handle unwrap
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });

let range = info.required_file_range();
let end = range.end;
let mut position = range.start;

// TODO: allow callback to return Result in SyncFilter
println!(
"fetch_data {:?} {:?} {}",
path,
range,
info.interrupted_hydration()
);

let res = || -> Result<(), _> {
let mut server_file = self
.sftp
.open(path)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
let mut client_file = BufWriter::with_capacity(4096, request.placeholder());

server_file
.seek(SeekFrom::Start(position))
.map_err(|_| CloudErrorKind::InvalidRequest)?;
Expand All @@ -208,12 +165,10 @@ impl SyncFilter for Filter {
// into segments done on separate threads
// transfer the data in chunks
loop {
client_file.get_ref().set_progress(end, position).unwrap();

// TODO: read directly to the BufWriters buffer
// TODO: ignore if the error was just interrupted
let bytes_read = server_file
.read(&mut buffer[0..DOWNLOAD_CHUNK_SIZE_BYTES])
.read(&mut buffer)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
let bytes_written = client_file
.write(&buffer[0..bytes_read])
Expand All @@ -223,6 +178,8 @@ impl SyncFilter for Filter {
if position >= end {
break;
}

client_file.get_ref().set_progress(end, position).unwrap();
}

client_file
Expand All @@ -241,7 +198,6 @@ impl SyncFilter for Filter {
println!("deleted");
}

// TODO: I probably also have to delete the file from the disk
fn delete(&self, request: Request, ticket: ticket::Delete, info: info::Delete) {
println!("delete {:?}", request.path());
let path = Path::new(unsafe { OsStr::from_encoded_bytes_unchecked(request.file_blob()) });
Expand All @@ -264,44 +220,32 @@ impl SyncFilter for Filter {
}
}

// TODO: Do I have to move the file and set the file progress? or does the OS
// handle that? (I think I do)
fn rename(&self, request: Request, ticket: ticket::Rename, info: info::Rename) {
let res = || -> Result<(), _> {
match info.target_in_scope() {
true => {
// TODO: path should auto include the drive letter
let src = request.path();
// TODO: should be relative
let dest = info.target_path();

match info.source_in_scope() {
// TODO: use fs::copy or fs::rename, whatever it is to move the local files,
// then use ConvertToPlaceholder. I'm not sure if I have to do this recursively
// for each file or only the top-level folder TODO: which
// rename flags do I use? how do I know if I should be overwriting?
true => self
.sftp
.rename(&src, &dest, None)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
false => match info.is_directory() {
true => self
.create_dir_all(&src, &dest)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
false => self
.create_file(&src, &dest)
.map_err(|_| CloudErrorKind::InvalidRequest)?,
},
}
let src = request.path();
let dest = info.target_path();
let base = get_client_path();

match (info.source_in_scope(), info.target_in_scope()) {
ok-nick marked this conversation as resolved.
Show resolved Hide resolved
(true, true) => {
self.sftp
.rename(
&src.strip_prefix(&base).unwrap(),
&dest.strip_prefix(&base).unwrap(),
None,
)
.map_err(|_| CloudErrorKind::InvalidRequest)?;
}
// TODO: do I need to delete it locally?
false => self
.sftp
.unlink(Path::new(unsafe {
OsStr::from_encoded_bytes_unchecked(request.file_blob())
}))
.map_err(|_| CloudErrorKind::InvalidRequest)?,
(true, false) => {
fs::copy(&src, dest).map_err(|_| CloudErrorKind::InvalidRequest)?;
ok-nick marked this conversation as resolved.
Show resolved Hide resolved
self.sftp
.unlink(&src.strip_prefix(&base).unwrap())
.map_err(|_| CloudErrorKind::InvalidRequest)?;
}
(false, true) => Err(CloudErrorKind::NotSupported)?, // TODO
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with the old impl w/ self.create_dir_all and self.create_file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's simpler way.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, a lot of the impl can be greatly simplified and probably replaced with a single call to fs::copy, but this functionality shouldn't be removed from the example.

(false, false) => Err(CloudErrorKind::InvalidRequest)?,
}

ticket.pass().unwrap();
Ok(())
}();
Expand All @@ -327,7 +271,7 @@ impl SyncFilter for Filter {
let parent = absolute.strip_prefix(&client_path).unwrap();

let dirs = self.sftp.readdir(parent).unwrap();
let placeholders = dirs
let mut placeholders = dirs
.into_iter()
.filter(|(path, _)| !Path::new(&client_path).join(path).exists())
.map(|(path, stat)| {
Expand All @@ -348,13 +292,13 @@ impl SyncFilter for Filter {
.last_write_time(stat.mtime.unwrap_or_default())
.change_time(stat.mtime.unwrap_or_default()),
)
.mark_sync()
.overwrite()
// .mark_sync() // need this?
.blob(path.into_os_string().into_encoded_bytes())
})
.collect::<Vec<_>>();

ticket.pass_with_placeholder(placeholders).unwrap();
ticket.pass_with_placeholder(&mut placeholders).unwrap();
}

fn closed(&self, request: Request, info: info::Closed) {
Expand Down Expand Up @@ -401,8 +345,6 @@ impl SyncFilter for Filter {
fn renamed(&self, _request: Request, _info: info::Renamed) {
println!("renamed");
}

// TODO: acknowledgement callbacks
}

#[derive(Error, Debug)]
Expand Down
14 changes: 6 additions & 8 deletions src/filter/info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, ops::Range, path::PathBuf};
use std::{ffi::OsString, fmt::Debug, ops::Range, path::PathBuf};

use widestring::U16CStr;
use windows::Win32::Storage::CloudFilters::{
Expand Down Expand Up @@ -266,10 +266,10 @@ pub struct Deleted(pub(crate) CF_CALLBACK_PARAMETERS_0_4);

/// Information for the [SyncFilter::rename][crate::SyncFilter::rename] callback.
#[derive(Debug)]
pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10);
pub struct Rename(pub(crate) CF_CALLBACK_PARAMETERS_0_10, pub(crate) OsString);

impl Rename {
/// Whether or not the placeholder being deleted is a directory.
/// Whether or not the placeholder being renamed is a directory.
pub fn is_directory(&self) -> bool {
(self.0.Flags & CloudFilters::CF_CALLBACK_RENAME_FLAG_IS_DIRECTORY).0 != 0
}
Expand All @@ -286,11 +286,9 @@ impl Rename {

/// The full path the placeholder is being moved to.
pub fn target_path(&self) -> PathBuf {
unsafe {
U16CStr::from_ptr_str(self.0.TargetPath.0)
.to_os_string()
.into()
}
let mut path = PathBuf::from(&self.1);
path.push(unsafe { U16CStr::from_ptr_str(self.0.TargetPath.0) }.to_os_string());
path
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/filter/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,12 @@ pub unsafe extern "system" fn notify_rename<T: SyncFilter + 'static>(
if let Some(filter) = filter_from_info::<T>(info) {
let request = Request::new(*info);
let ticket = ticket::Rename::new(request.connection_key(), request.transfer_key());
let info = info::Rename(
(*params).Anonymous.Rename,
request.volume_letter().to_os_string(),
);
ok-nick marked this conversation as resolved.
Show resolved Hide resolved

filter.rename(request, ticket, info::Rename((*params).Anonymous.Rename));
filter.rename(request, ticket, info);
}
}

Expand Down