diff --git a/Cargo.lock b/Cargo.lock index 1f149ba57..cbf860f3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -782,7 +782,7 @@ dependencies = [ "arroyo-types", "async-trait", "bytes", - "object_store", + "object_store 0.7.1", "regex", "rusoto_core", "thiserror", @@ -831,7 +831,7 @@ dependencies = [ "local-ip-address", "md-5 0.10.5", "memchr", - "object_store", + "object_store 0.7.1", "once_cell", "ordered-float 3.9.1", "parquet", @@ -1764,9 +1764,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", @@ -2456,7 +2456,7 @@ dependencies = [ "lazy_static", "log", "num_cpus", - "object_store", + "object_store 0.6.1", "parking_lot 0.12.1", "parquet", "percent-encoding", @@ -2483,7 +2483,7 @@ dependencies = [ "arrow-array", "chrono", "num_cpus", - "object_store", + "object_store 0.6.1", "parquet", "sqlparser", ] @@ -2499,7 +2499,7 @@ dependencies = [ "datafusion-expr", "hashbrown 0.14.0", "log", - "object_store", + "object_store 0.6.1", "parking_lot 0.12.1", "rand", "tempfile", @@ -5086,7 +5086,28 @@ dependencies = [ [[package]] name = "object_store" version = "0.6.1" -source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=43.0.0/arroyo_patches#1ee79ea0973a28bb7e8112024576e24a90eabe92" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c776db4f332b571958444982ff641d2531417a326ca368995073b639205d58" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.10.5", + "parking_lot 0.12.1", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + +[[package]] +name = "object_store" +version = "0.7.1" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=object_store/put_part_api#6062e6b1286297922bbf658eecef293dda19a643" dependencies = [ "async-trait", "base64 0.21.4", @@ -5095,13 +5116,13 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.10.5", + "itertools 0.11.0", "parking_lot 0.12.1", "percent-encoding", "quick-xml", "rand", "reqwest", - "ring 0.16.20", + "ring 0.17.3", "rustls-pemfile", "serde", "serde_json", @@ -5342,7 +5363,7 @@ dependencies = [ "lz4", "num", "num-bigint", - "object_store", + "object_store 0.6.1", "paste", "seq-macro", "snap", @@ -5899,9 +5920,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.28.2" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" dependencies = [ "memchr", "serde", diff --git a/Cargo.toml b/Cargo.toml index 344a2963f..86c5c2bee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ arrow = { version = "43.0.0" } arrow-buffer = { version = "43.0.0" } arrow-array = { version = "43.0.0" } arrow-schema = { version = "43.0.0" } -object_store = { version = "0.6.1" } +object_store = { version = "0.7.1" } parquet = { version = "43.0.0" } [profile.release] @@ -56,4 +56,4 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} -object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} \ No newline at end of file +object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'} \ No newline at end of file diff --git a/arroyo-controller/src/compiler.rs b/arroyo-controller/src/compiler.rs index 89242167f..143a18da6 100644 --- a/arroyo-controller/src/compiler.rs +++ b/arroyo-controller/src/compiler.rs @@ -134,7 +134,7 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} -object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'multipart_gcp' } +object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api' } "#; // NOTE: These must be kept in sync with the Cargo configs in docker/build_base and build_dir/Cargo.toml diff --git a/arroyo-storage/Cargo.toml b/arroyo-storage/Cargo.toml index 03bdb2b48..4752eedcf 100644 --- a/arroyo-storage/Cargo.toml +++ b/arroyo-storage/Cargo.toml @@ -14,7 +14,7 @@ bytes = "1.4.0" # better way to do this rusoto_core = "0.48.0" -object_store = {version = "0.6.1", features = ["aws", "gcp"]} +object_store = {workspace = true, features = ["aws", "gcp"]} regex = "1.9.5" thiserror = "1" tokio = { version = "1", features = ["fs"] } diff --git a/arroyo-storage/src/lib.rs b/arroyo-storage/src/lib.rs index c9d047eda..b8a8ddeb6 100644 --- a/arroyo-storage/src/lib.rs +++ b/arroyo-storage/src/lib.rs @@ -10,9 +10,10 @@ use aws::ArroyoCredentialProvider; use bytes::Bytes; use object_store::aws::AmazonS3ConfigKey; use object_store::gcp::GoogleCloudStorageBuilder; +use object_store::multipart::PartId; use object_store::path::Path; +use object_store::MultipartId; use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore}; -use object_store::{MultipartId, UploadPart}; use regex::{Captures, Regex}; use thiserror::Error; @@ -440,9 +441,10 @@ impl StorageProvider { pub async fn start_multipart(&self, path: &Path) -> Result { Ok(self .object_store - .start_multipart(path) + .initiate_multipart_upload(path) .await - .map_err(|e| Into::::into(e))?) + .map_err(|e| Into::::into(e))? + .0) } pub async fn add_multipart( @@ -451,10 +453,12 @@ impl StorageProvider { multipart_id: &MultipartId, part_number: usize, bytes: Bytes, - ) -> Result { + ) -> Result { Ok(self .object_store - .add_multipart(path, multipart_id, part_number, bytes) + .get_put_part(path, multipart_id) + .await? + .put_part(bytes, part_number) .await .map_err(|e| Into::::into(e))?) } @@ -463,11 +467,13 @@ impl StorageProvider { &self, path: &Path, multipart_id: &MultipartId, - parts: Vec, + parts: Vec, ) -> Result<(), StorageError> { Ok(self .object_store - .close_multipart(path, multipart_id, parts) + .get_put_part(path, multipart_id) + .await? + .complete(parts) .await .map_err(|e| Into::::into(e))?) } diff --git a/arroyo-worker/src/connectors/filesystem/mod.rs b/arroyo-worker/src/connectors/filesystem/mod.rs index 611602a62..b1f315a15 100644 --- a/arroyo-worker/src/connectors/filesystem/mod.rs +++ b/arroyo-worker/src/connectors/filesystem/mod.rs @@ -15,7 +15,7 @@ use bincode::{Decode, Encode}; use chrono::{DateTime, Utc}; use futures::{stream::FuturesUnordered, Future}; use futures::{stream::StreamExt, TryStreamExt}; -use object_store::{path::Path, MultipartId, UploadPart}; +use object_store::{multipart::PartId, path::Path, MultipartId}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::warn; @@ -362,7 +362,7 @@ pub trait MultiPartWriter { fn handle_completed_part( &mut self, part_idx: usize, - upload_part: UploadPart, + upload_part: PartId, ) -> Result>; fn get_in_progress_checkpoint(&mut self) -> FileCheckpointData; @@ -421,7 +421,7 @@ async fn from_checkpoint( InFlightPartCheckpoint::FinishedPart { part: _, content_id, - } => parts.push(UploadPart { content_id }), + } => parts.push(PartId { content_id }), InFlightPartCheckpoint::InProgressPart { part, data } => { let upload_part = object_store .add_multipart(path, &multi_part_upload_id, part, data.into()) @@ -453,7 +453,7 @@ async fn from_checkpoint( InFlightPartCheckpoint::FinishedPart { part: _, content_id, - } => parts.push(UploadPart { content_id }), + } => parts.push(PartId { content_id }), InFlightPartCheckpoint::InProgressPart { part: _, data } => { let upload_part = object_store .add_multipart(path, &multi_part_upload_id, part_index, data.into()) @@ -470,7 +470,7 @@ async fn from_checkpoint( completed_parts, } => { for content_id in completed_parts { - parts.push(UploadPart { content_id }) + parts.push(PartId { content_id }) } multi_part_upload_id } @@ -770,7 +770,7 @@ where } let parts: Vec<_> = completed_parts .into_iter() - .map(|content_id| UploadPart { + .map(|content_id| PartId { content_id: content_id.clone(), }) .collect(); @@ -846,7 +846,7 @@ struct MultipartManager { location: Path, partition: Option, multipart_id: Option, - pushed_parts: Vec, + pushed_parts: Vec, uploaded_parts: usize, pushed_size: usize, parts_to_add: Vec, @@ -902,11 +902,10 @@ impl MultipartManager { &mut self, part_to_upload: PartToUpload, ) -> Result> { - self.pushed_parts - .push(UploadPartOrBufferedData::BufferedData { - // TODO: use Bytes to avoid clone - data: part_to_upload.byte_data.clone(), - }); + self.pushed_parts.push(PartIdOrBufferedData::BufferedData { + // TODO: use Bytes to avoid clone + data: part_to_upload.byte_data.clone(), + }); let location = self.location.clone(); let multipart_id = self .multipart_id @@ -961,9 +960,9 @@ impl MultipartManager { fn handle_completed_part( &mut self, part_idx: usize, - upload_part: UploadPart, + upload_part: PartId, ) -> Result> { - self.pushed_parts[part_idx] = UploadPartOrBufferedData::UploadPart(upload_part); + self.pushed_parts[part_idx] = PartIdOrBufferedData::PartId(upload_part); self.uploaded_parts += 1; if !self.all_uploads_finished() { @@ -981,10 +980,10 @@ impl MultipartManager { .pushed_parts .iter() .map(|part| match part { - UploadPartOrBufferedData::UploadPart(upload_part) => { + PartIdOrBufferedData::PartId(upload_part) => { Ok(upload_part.content_id.clone()) } - UploadPartOrBufferedData::BufferedData { .. } => { + PartIdOrBufferedData::BufferedData { .. } => { bail!("unfinished part in get_complete_multipart_future") } }) @@ -1021,10 +1020,8 @@ impl MultipartManager { .pushed_parts .iter() .map(|val| match val { - UploadPartOrBufferedData::UploadPart(upload_part) => { - upload_part.content_id.clone() - } - UploadPartOrBufferedData::BufferedData { .. } => { + PartIdOrBufferedData::PartId(upload_part) => upload_part.content_id.clone(), + PartIdOrBufferedData::BufferedData { .. } => { unreachable!("unfinished part in get_closed_file_checkpoint_data") } }) @@ -1036,13 +1033,13 @@ impl MultipartManager { .iter() .enumerate() .map(|(part_index, part)| match part { - UploadPartOrBufferedData::UploadPart(upload_part) => { + PartIdOrBufferedData::PartId(upload_part) => { InFlightPartCheckpoint::FinishedPart { part: part_index, content_id: upload_part.content_id.clone(), } } - UploadPartOrBufferedData::BufferedData { data } => { + PartIdOrBufferedData::BufferedData { data } => { InFlightPartCheckpoint::InProgressPart { part: part_index, data: data.clone(), @@ -1080,13 +1077,11 @@ impl MultipartManager { .iter() .enumerate() .map(|(part_index, part)| match part { - UploadPartOrBufferedData::UploadPart(upload_part) => { - InFlightPartCheckpoint::FinishedPart { - part: part_index, - content_id: upload_part.content_id.clone(), - } - } - UploadPartOrBufferedData::BufferedData { data } => { + PartIdOrBufferedData::PartId(upload_part) => InFlightPartCheckpoint::FinishedPart { + part: part_index, + content_id: upload_part.content_id.clone(), + }, + PartIdOrBufferedData::BufferedData { data } => { InFlightPartCheckpoint::InProgressPart { part: part_index, data: data.clone(), @@ -1117,10 +1112,8 @@ impl MultipartManager { .pushed_parts .iter() .map(|part| match part { - UploadPartOrBufferedData::UploadPart(upload_part) => { - upload_part.content_id.clone() - } - UploadPartOrBufferedData::BufferedData { .. } => { + PartIdOrBufferedData::PartId(upload_part) => upload_part.content_id.clone(), + PartIdOrBufferedData::BufferedData { .. } => { unreachable!("unfinished part in get_finished_file") } }) @@ -1231,7 +1224,7 @@ impl> Mul fn handle_completed_part( &mut self, part_idx: usize, - upload_part: UploadPart, + upload_part: PartId, ) -> Result> { self.multipart_manager .handle_completed_part(part_idx, upload_part) @@ -1301,8 +1294,8 @@ struct PartToUpload { } #[derive(Debug)] -enum UploadPartOrBufferedData { - UploadPart(UploadPart), +enum PartIdOrBufferedData { + PartId(PartId), BufferedData { data: Vec }, } @@ -1317,7 +1310,7 @@ pub enum MultipartCallback { }, CompletedPart { part_idx: usize, - upload_part: UploadPart, + upload_part: PartId, }, UploadsFinished, } diff --git a/build_dir/Cargo.toml b/build_dir/Cargo.toml index e87071bda..a29e8dd95 100644 --- a/build_dir/Cargo.toml +++ b/build_dir/Cargo.toml @@ -25,7 +25,7 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} -object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} +object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'} [profile.dev] debug = false diff --git a/docker/build_base/Cargo.toml b/docker/build_base/Cargo.toml index a23e63b5c..41e849060 100644 --- a/docker/build_base/Cargo.toml +++ b/docker/build_base/Cargo.toml @@ -22,4 +22,4 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} -object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'} +object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}