diff --git a/Cargo.lock b/Cargo.lock index 97cadbef..fa08c8b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,7 +938,7 @@ dependencies = [ [[package]] name = "dragonfly-client" -version = "0.2.3" +version = "0.2.4" dependencies = [ "anyhow", "blake3", @@ -1011,7 +1011,7 @@ dependencies = [ [[package]] name = "dragonfly-client-backend" -version = "0.2.3" +version = "0.2.4" dependencies = [ "dragonfly-api", "dragonfly-client-core", @@ -1042,7 +1042,7 @@ dependencies = [ [[package]] name = "dragonfly-client-config" -version = "0.2.3" +version = "0.2.4" dependencies = [ "bytesize", "bytesize-serde", @@ -1068,7 +1068,7 @@ dependencies = [ [[package]] name = "dragonfly-client-core" -version = "0.2.3" +version = "0.2.4" dependencies = [ "headers 0.4.0", "hyper 1.5.2", @@ -1087,7 +1087,7 @@ dependencies = [ [[package]] name = "dragonfly-client-init" -version = "0.2.3" +version = "0.2.4" dependencies = [ "anyhow", "clap", @@ -1105,7 +1105,7 @@ dependencies = [ [[package]] name = "dragonfly-client-storage" -version = "0.2.3" +version = "0.2.4" dependencies = [ "base16ct", "bincode", @@ -1130,7 +1130,7 @@ dependencies = [ [[package]] name = "dragonfly-client-util" -version = "0.2.3" +version = "0.2.4" dependencies = [ "base16ct", "base64 0.22.1", @@ -1520,7 +1520,7 @@ dependencies = [ [[package]] name = "hdfs" -version = "0.2.3" +version = "0.2.4" dependencies = [ "dragonfly-client-backend", "dragonfly-client-core", diff --git a/Cargo.toml b/Cargo.toml index b96c197c..bec44c98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [workspace.package] -version = "0.2.3" +version = "0.2.4" authors = ["The Dragonfly Developers"] homepage = "https://d7y.io/" repository = "https://github.com/dragonflyoss/client.git" @@ -22,13 +22,13 @@ readme = "README.md" edition = "2021" [workspace.dependencies] -dragonfly-client = { path = "dragonfly-client", version = "0.2.3" } -dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.3" } -dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.3" } -dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.3" } -dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.3" } -dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.3" } -dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.3" } +dragonfly-client = { path = "dragonfly-client", version = "0.2.4" } +dragonfly-client-core = { path = "dragonfly-client-core", version = "0.2.4" } +dragonfly-client-config = { path = "dragonfly-client-config", version = "0.2.4" } +dragonfly-client-storage = { path = "dragonfly-client-storage", version = "0.2.4" } +dragonfly-client-backend = { path = "dragonfly-client-backend", version = "0.2.4" } +dragonfly-client-util = { path = "dragonfly-client-util", version = "0.2.4" } +dragonfly-client-init = { path = "dragonfly-client-init", version = "0.2.4" } thiserror = "1.0" dragonfly-api = "=2.1.3" reqwest = { version = "0.12.4", features = [ diff --git a/dragonfly-client-storage/src/content.rs b/dragonfly-client-storage/src/content.rs index 6c948c05..6767fe8d 100644 --- a/dragonfly-client-storage/src/content.rs +++ b/dragonfly-client-storage/src/content.rs @@ -25,6 +25,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{ self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter, SeekFrom, }; +use tokio_util::io::InspectReader; use tracing::{error, info, instrument, warn}; /// DEFAULT_CONTENT_DIR is the default directory for store content. @@ -337,24 +338,22 @@ impl Content { })?; // Copy the piece to the file while updating the CRC32 value. + let reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); let crc = Crc::>::new(&CRC_32_ISCSI); let mut digest = crc.digest(); - let mut length = 0; - let mut buffer = vec![0; self.config.storage.write_buffer_size]; - let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); - let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, reader); - loop { - let n = reader.read(&mut buffer).await?; - if n == 0 { - break; - } + let mut tee = InspectReader::new(reader, |bytes| { + digest.update(bytes); + }); - digest.update(&buffer[..n]); - writer.write_all(&buffer[..n]).await?; - length += n as u64; - } - writer.flush().await?; + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, f); + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; // Calculate the hash of the piece. Ok(WritePieceResponse { @@ -458,24 +457,22 @@ impl Content { })?; // Copy the content to the file while updating the CRC32 value. + let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f); let crc = Crc::>::new(&CRC_32_ISCSI); let mut digest = crc.digest(); - let mut length = 0; - let mut buffer = vec![0; self.config.storage.write_buffer_size]; - let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f); - let mut reader = BufReader::with_capacity(self.config.storage.write_buffer_size, from_f); - loop { - let n = reader.read(&mut buffer).await?; - if n == 0 { - break; - } + let mut tee = InspectReader::new(&mut reader, |bytes| { + digest.update(bytes); + }); - digest.update(&buffer[..n]); - writer.write_all(&buffer[..n]).await?; - length += n as u64; - } - writer.flush().await?; + let mut writer = BufWriter::with_capacity(self.config.storage.write_buffer_size, to_f); + let length = io::copy(&mut tee, &mut writer).await.inspect_err(|err| { + error!("copy {:?} failed: {}", task_path, err); + })?; + + writer.flush().await.inspect_err(|err| { + error!("flush {:?} failed: {}", task_path, err); + })?; Ok(WritePersistentCacheTaskResponse { length,