Skip to content

Commit

Permalink
bump object_store dependency to 0.7.1
Browse files Browse the repository at this point in the history
Also points to a new branch for direct multi-part upload that uses more existing code.
  • Loading branch information
jacksonrnewhouse committed Oct 23, 2023
1 parent d0e6e63 commit 1115613
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 63 deletions.
47 changes: 34 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ arrow = { version = "43.0.0" }
arrow-buffer = { version = "43.0.0" }
arrow-array = { version = "43.0.0" }
arrow-schema = { version = "43.0.0" }
object_store = { version = "0.6.1" }
object_store = { version = "0.7.1" }
parquet = { version = "43.0.0" }

[profile.release]
Expand All @@ -56,4 +56,4 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api'}
2 changes: 1 addition & 1 deletion arroyo-controller/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arr
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '43.0.0/arroyo_patches'}
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'multipart_gcp' }
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = 'object_store/put_part_api' }
"#;

// NOTE: These must be kept in sync with the Cargo configs in docker/build_base and build_dir/Cargo.toml
Expand Down
2 changes: 1 addition & 1 deletion arroyo-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bytes = "1.4.0"
# better way to do this
rusoto_core = "0.48.0"

object_store = {version = "0.6.1", features = ["aws", "gcp"]}
object_store = {workspace = true, features = ["aws", "gcp"]}
regex = "1.9.5"
thiserror = "1"
tokio = { version = "1", features = ["fs"] }
Expand Down
20 changes: 13 additions & 7 deletions arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use aws::ArroyoCredentialProvider;
use bytes::Bytes;
use object_store::aws::AmazonS3ConfigKey;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::multipart::PartId;
use object_store::path::Path;
use object_store::MultipartId;
use object_store::{aws::AmazonS3Builder, local::LocalFileSystem, ObjectStore};
use object_store::{MultipartId, UploadPart};
use regex::{Captures, Regex};
use thiserror::Error;

Expand Down Expand Up @@ -440,9 +441,10 @@ impl StorageProvider {
pub async fn start_multipart(&self, path: &Path) -> Result<MultipartId, StorageError> {
Ok(self
.object_store
.start_multipart(path)
.initiate_multipart_upload(path)
.await
.map_err(|e| Into::<StorageError>::into(e))?)
.map_err(|e| Into::<StorageError>::into(e))?
.0)
}

pub async fn add_multipart(
Expand All @@ -451,10 +453,12 @@ impl StorageProvider {
multipart_id: &MultipartId,
part_number: usize,
bytes: Bytes,
) -> Result<UploadPart, StorageError> {
) -> Result<PartId, StorageError> {
Ok(self
.object_store
.add_multipart(path, multipart_id, part_number, bytes)
.get_put_part(path, multipart_id)
.await?
.put_part(bytes, part_number)
.await
.map_err(|e| Into::<StorageError>::into(e))?)
}
Expand All @@ -463,11 +467,13 @@ impl StorageProvider {
&self,
path: &Path,
multipart_id: &MultipartId,
parts: Vec<UploadPart>,
parts: Vec<PartId>,
) -> Result<(), StorageError> {
Ok(self
.object_store
.close_multipart(path, multipart_id, parts)
.get_put_part(path, multipart_id)
.await?
.complete(parts)
.await
.map_err(|e| Into::<StorageError>::into(e))?)
}
Expand Down
Loading

0 comments on commit 1115613

Please sign in to comment.