From 09537342f1b8c131651ba24089791d62e8232dda Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 12 Apr 2024 14:37:44 -0700 Subject: [PATCH] Add builder style remove_object(s) APIs - remove older APIs --- src/s3/args.rs | 90 +------ src/s3/builders.rs | 2 + src/s3/builders/remove_objects.rs | 419 ++++++++++++++++++++++++++++++ src/s3/client.rs | 174 +------------ src/s3/client/remove_objects.rs | 31 +++ src/s3/client_core.rs | 41 +++ src/s3/mod.rs | 1 + src/s3/response.rs | 35 +-- src/s3/response/remove_objects.rs | 146 +++++++++++ src/s3/types.rs | 7 - tests/tests.rs | 151 +++++++---- 11 files changed, 745 insertions(+), 352 deletions(-) create mode 100644 src/s3/builders/remove_objects.rs create mode 100644 src/s3/client/remove_objects.rs create mode 100644 src/s3/client_core.rs create mode 100644 src/s3/response/remove_objects.rs diff --git a/src/s3/args.rs b/src/s3/args.rs index b4733846..f01c54b1 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -19,14 +19,14 @@ use crate::s3::error::Error; use crate::s3::signer::post_presign_v4; use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::types::{ - DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, - ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, + Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, ReplicationConfig, + Retention, RetentionMode, SelectRequest, SseConfig, }; use crate::s3::utils::{ b64encode, check_bucket_name, merge, to_amz_date, to_http_header_value, to_iso8601utc, to_signer_date, urlencode, utc_now, Multimap, UtcTime, }; -use derivative::Derivative; + use hyper::http::Method; use serde_json::json; use serde_json::Value; @@ -881,90 +881,6 @@ pub type StatObjectArgs<'a> = ObjectConditionalReadArgs<'a>; /// Source object information for [copy object argument](CopyObjectArgs) pub type CopySource<'a> = ObjectConditionalReadArgs<'a>; -#[derive(Derivative, Clone, Debug, Default)] -/// Argument for [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API -pub struct RemoveObjectsApiArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub bypass_governance_mode: bool, - #[derivative(Default(value = "true"))] - pub quiet: bool, - pub objects: &'a [DeleteObject<'a>], -} - -impl<'a> RemoveObjectsApiArgs<'a> { - /// Returns argument for [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API with given bucket name and list of delete object information - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// use minio::s3::types::DeleteObject; - /// let mut objects: Vec = Vec::new(); - /// objects.push(DeleteObject{name: "my-object-1", version_id: None}); - /// objects.push(DeleteObject{name: "my-object-2", version_id: Some("0e295d23-10e1-4c39-b134-5b08ad146df6")}); - /// let args = RemoveObjectsApiArgs::new("my-bucket", &objects).unwrap(); - /// ``` - pub fn new( - bucket_name: &'a str, - objects: &'a [DeleteObject], - ) -> Result, Error> { - check_bucket_name(bucket_name, true)?; - - Ok(RemoveObjectsApiArgs { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name, - bypass_governance_mode: false, - quiet: true, - objects, - }) - } -} - -/// Argument for [remove_objects()](crate::s3::client::Client::remove_objects) API -pub struct RemoveObjectsArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub bypass_governance_mode: bool, - pub objects: &'a mut core::slice::Iter<'a, DeleteObject<'a>>, -} - -impl<'a> RemoveObjectsArgs<'a> { - /// Returns argument for [remove_objects()](crate::s3::client::Client::remove_objects) API with given bucket name and iterable delete object information - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// use minio::s3::types::DeleteObject; - /// let mut objects: Vec = Vec::new(); - /// objects.push(DeleteObject{name: "my-object-1", version_id: None}); - /// objects.push(DeleteObject{name: "my-object-2", version_id: Some("0e295d23-10e1-4c39-b134-5b08ad146df6")}); - /// let args = RemoveObjectsArgs::new("my-bucket", &mut objects.iter()).unwrap(); - /// ``` - pub fn new( - bucket_name: &'a str, - objects: &'a mut core::slice::Iter<'a, DeleteObject<'a>>, - ) -> Result, Error> { - check_bucket_name(bucket_name, true)?; - - Ok(RemoveObjectsArgs { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name, - bypass_governance_mode: false, - objects, - }) - } -} - /// Argument for [select_object_content()](crate::s3::client::Client::select_object_content) API pub struct SelectObjectContentArgs<'a> { pub extra_headers: Option<&'a Multimap>, diff --git a/src/s3/builders.rs b/src/s3/builders.rs index cda709be..d114a598 100644 --- a/src/s3/builders.rs +++ b/src/s3/builders.rs @@ -18,6 +18,7 @@ mod list_objects; mod listen_bucket_notification; mod object_content; mod put_object; +mod remove_objects; pub use buckets::*; pub use get_object::*; @@ -25,3 +26,4 @@ pub use list_objects::*; pub use listen_bucket_notification::*; pub use object_content::*; pub use put_object::*; +pub use remove_objects::*; diff --git a/src/s3/builders/remove_objects.rs b/src/s3/builders/remove_objects.rs new file mode 100644 index 00000000..5bde895a --- /dev/null +++ b/src/s3/builders/remove_objects.rs @@ -0,0 +1,419 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2022-2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Builders for RemoveObject APIs. + +use std::pin::Pin; + +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::{stream as futures_stream, Stream, StreamExt}; +use http::Method; +use tokio_stream::iter as stream_iter; + +use crate::s3::{ + client_core::ClientCore, + error::Error, + response::{RemoveObjectResponse2, RemoveObjectsResponse}, + types::{S3Api, S3Request, ToS3Request, ToStream}, + utils::{check_bucket_name, md5sum_hash, merge, Multimap}, + Client, +}; + +/// Specify an object to be deleted. The object can be specified by key or by +/// key and version_id via the From trait. +#[derive(Debug, Clone)] +pub struct ObjectToDelete { + key: String, + version_id: Option, +} + +/// A key can be converted into a DeleteObject. The version_id is set to None. +impl From<&str> for ObjectToDelete { + fn from(key: &str) -> Self { + ObjectToDelete { + key: key.to_string(), + version_id: None, + } + } +} + +/// A tuple of key and version_id can be converted into a DeleteObject. +impl From<(&str, &str)> for ObjectToDelete { + fn from((key, version_id): (&str, &str)) -> Self { + ObjectToDelete { + key: key.to_string(), + version_id: Some(version_id.to_string()), + } + } +} + +/// A tuple of key and option version_id can be converted into a DeleteObject. +impl From<(&str, Option<&str>)> for ObjectToDelete { + fn from((key, version_id): (&str, Option<&str>)) -> Self { + ObjectToDelete { + key: key.to_string(), + version_id: version_id.map(|v| v.to_string()), + } + } +} + +#[derive(Debug, Clone)] +pub struct RemoveObject { + client: Option, + + bucket: String, + object: ObjectToDelete, + + bypass_governance_mode: bool, + + extra_headers: Option, + extra_query_params: Option, + region: Option, +} + +impl RemoveObject { + pub fn new(bucket: &str, object: impl Into) -> Self { + Self { + client: None, + + bucket: bucket.to_string(), + object: object.into(), + + bypass_governance_mode: false, + + extra_headers: None, + extra_query_params: None, + region: None, + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self { + self.bypass_governance_mode = bypass_governance_mode; + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } +} + +impl S3Api for RemoveObject { + type S3Response = RemoveObjectResponse2; +} + +impl ToS3Request for RemoveObject { + fn to_s3request(&self) -> Result { + check_bucket_name(&self.bucket, true)?; + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + if let Some(v) = &self.object.version_id { + query_params.insert(String::from("versionId"), v.to_string()); + } + + let req = S3Request::new( + self.client.as_ref().ok_or(Error::NoClientProvided)?, + Method::DELETE, + ) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .object(Some(&self.object.key)) + .query_params(query_params) + .headers(headers); + Ok(req) + } +} + +#[derive(Debug, Clone)] +pub struct RemoveObjectsApi { + client: Option, + + bucket: String, + objects: Vec, + + bypass_governance_mode: bool, + verbose_mode: bool, + + extra_headers: Option, + extra_query_params: Option, + region: Option, +} + +impl RemoveObjectsApi { + pub fn new(bucket: &str, objects: Vec) -> Self { + RemoveObjectsApi { + client: None, + + bucket: bucket.to_string(), + objects, + + bypass_governance_mode: false, + verbose_mode: false, + + extra_headers: None, + extra_query_params: None, + region: None, + } + } + + pub fn client(mut self, client: &ClientCore) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self { + self.bypass_governance_mode = bypass_governance_mode; + self + } + + /// Enable verbose mode (defaults to false). If enabled, the response will + /// include the keys of objects that were successfully deleted. Otherwise + /// only objects that encountered an error are returned. + pub fn verbose_mode(mut self, verbose_mode: bool) -> Self { + self.verbose_mode = verbose_mode; + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } +} + +impl ToS3Request for RemoveObjectsApi { + fn to_s3request(&self) -> Result { + check_bucket_name(&self.bucket, true)?; + + let mut data = String::from(""); + if !self.verbose_mode { + data.push_str("true"); + } + for object in self.objects.iter() { + data.push_str(""); + data.push_str(""); + data.push_str(&object.key); + data.push_str(""); + if let Some(v) = object.version_id.as_ref() { + data.push_str(""); + data.push_str(&v); + data.push_str(""); + } + data.push_str(""); + } + data.push_str(""); + let data: Bytes = data.into(); + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + if self.bypass_governance_mode { + headers.insert( + String::from("x-amz-bypass-governance-retention"), + String::from("true"), + ); + } + headers.insert( + String::from("Content-Type"), + String::from("application/xml"), + ); + headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref())); + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("delete"), String::new()); + + let client = self.client.as_ref().ok_or(Error::NoClientProvided)?.inner(); + let req = S3Request::new(client, Method::POST) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .query_params(query_params) + .headers(headers) + .body(Some(data.into())); + Ok(req) + } +} + +impl S3Api for RemoveObjectsApi { + type S3Response = RemoveObjectsResponse; +} + +pub struct DeleteObjects { + items: Pin + Send + Sync>>, +} + +impl DeleteObjects { + pub fn from_stream(s: impl Stream + Send + Sync + 'static) -> Self { + DeleteObjects { items: Box::pin(s) } + } +} + +impl From for DeleteObjects { + fn from(delete_object: ObjectToDelete) -> Self { + DeleteObjects::from_stream(stream_iter(std::iter::once(delete_object))) + } +} + +impl + Send + Sync + 'static> From for DeleteObjects { + fn from(keys: I) -> Self { + DeleteObjects::from_stream(stream_iter(keys)) + } +} + +pub struct RemoveObjects { + client: Option, + + bucket: String, + objects: DeleteObjects, + + bypass_governance_mode: bool, + verbose_mode: bool, + + extra_headers: Option, + extra_query_params: Option, + region: Option, +} + +impl RemoveObjects { + pub fn new(bucket: &str, objects: impl Into) -> Self { + RemoveObjects { + client: None, + + bucket: bucket.to_string(), + objects: objects.into(), + + bypass_governance_mode: false, + verbose_mode: false, + + extra_headers: None, + extra_query_params: None, + region: None, + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self { + self.bypass_governance_mode = bypass_governance_mode; + self + } + + /// Enable verbose mode (defaults to false). If enabled, the response will + /// include the keys of objects that were successfully deleted. Otherwise + /// only objects that encountered an error are returned. + pub fn verbose_mode(mut self, verbose_mode: bool) -> Self { + self.verbose_mode = verbose_mode; + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + async fn next_request(&mut self) -> Result, Error> { + let mut objects = Vec::new(); + while let Some(object) = self.objects.items.next().await { + objects.push(object); + if objects.len() >= 1000 { + break; + } + } + if objects.is_empty() { + return Ok(None); + } + let client_core = ClientCore::new(self.client.as_ref().ok_or(Error::NoClientProvided)?); + let request = RemoveObjectsApi::new(&self.bucket, objects) + .client(&client_core) + .bypass_governance_mode(self.bypass_governance_mode) + .verbose_mode(self.verbose_mode) + .extra_headers(self.extra_headers.clone()) + .extra_query_params(self.extra_query_params.clone()) + .region(self.region.clone()); + Ok(Some(request)) + } +} + +#[async_trait] +impl ToStream for RemoveObjects { + type Item = RemoveObjectsResponse; + + async fn to_stream( + mut self, + ) -> Box> + Unpin + Send> { + Box::new(Box::pin(futures_stream::unfold( + self, + move |mut this| async move { + match this.next_request().await { + Ok(Some(request)) => { + let response = request.send().await; + Some((response, this)) + } + Ok(None) => None, + Err(e) => Some((Err(e), this)), + } + }, + ))) + } +} diff --git a/src/s3/client.rs b/src/s3/client.rs index 97902c6f..e707fb7e 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -29,8 +29,8 @@ use crate::s3::response::*; use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{ - DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, - ReplicationConfig, RetentionMode, SseConfig, + Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, ReplicationConfig, + RetentionMode, SseConfig, }; use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, md5sum_hash_sb, @@ -51,6 +51,7 @@ mod get_object; mod list_objects; mod listen_bucket_notification; mod put_object; +mod remove_objects; use super::builders::{GetBucketVersioning, ListBuckets, SegmentedBytes}; @@ -2566,175 +2567,6 @@ impl Client { }) } - pub async fn remove_object( - &self, - args: &RemoveObjectArgs<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - if let Some(v) = args.version_id { - query_params.insert(String::from("versionId"), v.to_string()); - } - - let resp = self - .execute( - Method::DELETE, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - Some(args.object), - None, - ) - .await?; - - Ok(RemoveObjectResponse { - headers: resp.headers().clone(), - region: region.to_string(), - bucket_name: args.bucket.to_string(), - object_name: args.object.to_string(), - version_id: args.version_id.map(|v| v.to_string()), - }) - } - - /// Executes [DeleteObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) S3 API - pub async fn remove_objects_api( - &self, - args: &RemoveObjectsApiArgs<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - let mut data = String::from(""); - if args.quiet { - data.push_str("true"); - } - for object in args.objects.iter() { - data.push_str(""); - data.push_str(""); - data.push_str(object.name); - data.push_str(""); - if let Some(v) = object.version_id { - data.push_str(""); - data.push_str(v); - data.push_str(""); - } - data.push_str(""); - } - data.push_str(""); - let data: Bytes = data.into(); - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - if args.bypass_governance_mode { - headers.insert( - String::from("x-amz-bypass-governance-retention"), - String::from("true"), - ); - } - headers.insert( - String::from("Content-Type"), - String::from("application/xml"), - ); - headers.insert(String::from("Content-MD5"), md5sum_hash(data.as_ref())); - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - query_params.insert(String::from("delete"), String::new()); - - let resp = self - .execute( - Method::POST, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - Some(data), - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let mut objects: Vec = Vec::new(); - while let Some(v) = root.take_child("Deleted") { - let deleted = v; - objects.push(DeletedObject { - name: get_text(&deleted, "Key")?, - version_id: get_option_text(&deleted, "VersionId"), - delete_marker: get_text(&deleted, "DeleteMarker")?.to_lowercase() == "true", - delete_marker_version_id: get_option_text(&deleted, "DeleteMarkerVersionId"), - }) - } - - let mut errors: Vec = Vec::new(); - while let Some(v) = root.take_child("Error") { - let error = v; - errors.push(DeleteError { - code: get_text(&error, "Code")?, - message: get_text(&error, "Message")?, - object_name: get_text(&error, "Key")?, - version_id: get_option_text(&error, "VersionId"), - }) - } - - Ok(RemoveObjectsApiResponse { - headers: header_map.clone(), - region: region.clone(), - bucket_name: args.bucket.to_string(), - objects, - errors, - }) - } - - pub async fn remove_objects( - &self, - args: &mut RemoveObjectsArgs<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - loop { - let mut objects: Vec = Vec::new(); - for object in args.objects.take(1000) { - objects.push(*object); - } - if objects.is_empty() { - break; - } - - let mut roa_args = RemoveObjectsApiArgs::new(args.bucket, &objects)?; - roa_args.extra_headers = args.extra_headers; - roa_args.extra_query_params = args.extra_query_params; - roa_args.region = args.region; - roa_args.bypass_governance_mode = args.bypass_governance_mode; - roa_args.quiet = true; - let resp = self.remove_objects_api(&roa_args).await?; - if !resp.errors.is_empty() { - return Ok(resp); - } - } - - Ok(RemoveObjectsResponse { - headers: HeaderMap::new(), - region: region.to_string(), - bucket_name: args.bucket.to_string(), - objects: vec![], - errors: vec![], - }) - } - pub async fn set_bucket_encryption( &self, args: &SetBucketEncryptionArgs<'_>, diff --git a/src/s3/client/remove_objects.rs b/src/s3/client/remove_objects.rs new file mode 100644 index 00000000..6893c7aa --- /dev/null +++ b/src/s3/client/remove_objects.rs @@ -0,0 +1,31 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2022-2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! APIs to remove objects. + +use crate::s3::{ + builders::{DeleteObjects, ObjectToDelete, RemoveObject, RemoveObjects}, + client::Client, +}; + +impl Client { + pub fn remove_object(&self, bucket: &str, object: impl Into) -> RemoveObject { + RemoveObject::new(bucket, object).client(self) + } + + pub fn remove_objects(&self, bucket: &str, object: impl Into) -> RemoveObjects { + RemoveObjects::new(bucket, object).client(self) + } +} diff --git a/src/s3/client_core.rs b/src/s3/client_core.rs new file mode 100644 index 00000000..08cd5fa5 --- /dev/null +++ b/src/s3/client_core.rs @@ -0,0 +1,41 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2022-2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Module containing lower level APIs. + +use super::{ + builders::{ObjectToDelete, RemoveObjectsApi}, + Client, +}; + +#[derive(Debug, Clone)] +pub struct ClientCore(Client); + +impl ClientCore { + pub fn new(client: &Client) -> Self { + Self(client.clone()) + } + + pub(crate) fn inner(&self) -> &Client { + &self.0 + } + + /// Creates a builder to execute + /// [DeleteObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) + /// S3 API + pub fn delete_objects(&self, bucket: &str, object: Vec) -> RemoveObjectsApi { + RemoveObjectsApi::new(bucket, object).client(self) + } +} diff --git a/src/s3/mod.rs b/src/s3/mod.rs index e899f277..e9b24802 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -18,6 +18,7 @@ pub mod args; pub mod builders; pub mod client; +pub mod client_core; pub mod creds; pub mod error; pub mod http; diff --git a/src/s3/response.rs b/src/s3/response.rs index 28f6f1d5..2b9cb18e 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -36,6 +36,7 @@ mod get_object; mod list_objects; mod listen_bucket_notification; mod put_object; +mod remove_objects; pub use buckets::{GetBucketVersioningResponse, ListBucketsResponse}; pub use get_object::GetObjectResponse2; @@ -48,6 +49,9 @@ pub use put_object::{ CreateMultipartUploadResponse2, PutObjectContentResponse, PutObjectResponse, UploadPartResponse2, }; +pub use remove_objects::{ + DeleteError, DeletedObject, RemoveObjectResponse2, RemoveObjectsResponse, +}; #[derive(Debug)] /// Base response for bucket operation @@ -217,37 +221,6 @@ impl StatObjectResponse { } } -#[derive(Clone, Debug)] -/// Error defintion of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API -pub struct DeleteError { - pub code: String, - pub message: String, - pub object_name: String, - pub version_id: Option, -} - -#[derive(Clone, Debug)] -/// Deleted object defintion of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API -pub struct DeletedObject { - pub name: String, - pub version_id: Option, - pub delete_marker: bool, - pub delete_marker_version_id: Option, -} - -#[derive(Clone, Debug)] -/// Response of [remove_objects_api()](crate::s3::client::Client::remove_objects_api) S3 API -pub struct RemoveObjectsApiResponse { - pub headers: HeaderMap, - pub region: String, - pub bucket_name: String, - pub objects: Vec, - pub errors: Vec, -} - -/// Response of [remove_objects()](crate::s3::client::Client::remove_objects) API -pub type RemoveObjectsResponse = RemoveObjectsApiResponse; - /// Response of [select_object_content()](crate::s3::client::Client::select_object_content) API pub struct SelectObjectContentResponse { pub headers: HeaderMap, diff --git a/src/s3/response/remove_objects.rs b/src/s3/response/remove_objects.rs new file mode 100644 index 00000000..d9819a36 --- /dev/null +++ b/src/s3/response/remove_objects.rs @@ -0,0 +1,146 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2022-2024 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Responses for RemoveObject APIs. + +use async_trait::async_trait; +use bytes::Buf; +use http::HeaderMap; +use xmltree::Element; + +use crate::s3::{ + error::Error, + types::{FromS3Response, S3Request}, + utils::{get_default_text, get_option_text, get_text}, +}; + +#[derive(Debug, Clone)] +pub struct RemoveObjectResponse2 { + pub headers: HeaderMap, + /// Value of the `x-amz-delete-marker` header. + pub is_delete_marker: bool, + /// If a delete marker was created, this field will contain the version_id + /// of the delete marker. Value of the `x-amz-version-id` header. + pub version_id: Option, +} + +#[async_trait] +impl FromS3Response for RemoveObjectResponse2 { + async fn from_s3response<'a>( + _req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + let is_delete_marker = headers + .get("x-amz-delete-marker") + .map(|v| v == "true") + .unwrap_or(false); + + let version_id = headers + .get("x-amz-version-id") + .map(|v| v.to_str().unwrap().to_string()); + + Ok(RemoveObjectResponse2 { + headers, + is_delete_marker, + version_id, + }) + } +} + +/// Error info returned by the S3 API when an object could not be deleted. +#[derive(Clone, Debug)] +pub struct DeleteError { + pub code: String, + pub message: String, + pub object_name: String, + pub version_id: Option, +} + +/// Information about an object that was deleted. +#[derive(Clone, Debug)] +pub struct DeletedObject { + pub name: String, + pub version_id: Option, + pub delete_marker: bool, + pub delete_marker_version_id: Option, +} + +/// Response of +/// [remove_objects_api()](crate::s3::client_core::ClientCore::delete_objects) +/// S3 API. It is also returned by the +/// [remove_objects()](crate::s3::client::Client::remove_objects) API in the +/// form of a stream. +#[derive(Clone, Debug)] +pub struct RemoveObjectsResponse { + pub headers: HeaderMap, + pub result: Vec, +} + +#[derive(Clone, Debug)] +pub enum DeleteResult { + Deleted(DeletedObject), + Error(DeleteError), +} + +impl DeleteResult { + pub fn is_deleted(&self) -> bool { + matches!(self, DeleteResult::Deleted(_)) + } + + pub fn is_error(&self) -> bool { + matches!(self, DeleteResult::Error(_)) + } +} + +#[async_trait] +impl FromS3Response for RemoveObjectsResponse { + async fn from_s3response<'a>( + _req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + + let body = resp.bytes().await?; + + let root = Element::parse(body.reader())?; + let result = root + .children + .iter() + .map(|elem| elem.as_element().unwrap()) + .map(|elem| { + if elem.name == "Deleted" { + Ok(DeleteResult::Deleted(DeletedObject { + name: get_text(&elem, "Key")?, + version_id: get_option_text(&elem, "VersionId"), + delete_marker: get_default_text(&elem, "DeleteMarker").to_lowercase() + == "true", + delete_marker_version_id: get_option_text(&elem, "DeleteMarkerVersionId"), + })) + } else { + assert_eq!(elem.name, "Error"); + Ok(DeleteResult::Error(DeleteError { + code: get_text(&elem, "Code")?, + message: get_text(&elem, "Message")?, + object_name: get_text(&elem, "Key")?, + version_id: get_option_text(&elem, "VersionId"), + })) + } + }) + .collect::, Error>>()?; + + Ok(RemoveObjectsResponse { headers, result }) + } +} diff --git a/src/s3/types.rs b/src/s3/types.rs index c455830c..44b0e7f3 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -230,13 +230,6 @@ pub fn parse_legal_hold(s: &str) -> Result { } } -#[derive(Clone, Debug, Copy)] -/// Contains delete object name and optional version ID -pub struct DeleteObject<'a> { - pub name: &'a str, - pub version_id: Option<&'a str>, -} - #[derive(Clone, Debug)] /// Compression types pub enum CompressionType { diff --git a/tests/tests.rs b/tests/tests.rs index 44bd62e8..8f7910fe 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -19,7 +19,7 @@ use chrono::Duration; use futures_util::Stream; use hyper::http::Method; -use minio::s3::builders::ObjectContent; +use minio::s3::builders::{ObjectContent, ObjectToDelete}; use rand::{ distributions::{Alphanumeric, DistString}, rngs::SmallRng, @@ -36,12 +36,13 @@ use tokio_stream::StreamExt; use minio::s3::args::*; use minio::s3::client::Client; use minio::s3::creds::StaticProvider; +use minio::s3::error::Error; use minio::s3::http::BaseUrl; use minio::s3::types::ToStream; use minio::s3::types::{ - CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, - NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, - RetentionMode, SelectRequest, SuffixFilterRule, + CsvInputSerialization, CsvOutputSerialization, FileHeaderInfo, NotificationConfig, + ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, RetentionMode, SelectRequest, + SuffixFilterRule, }; use minio::s3::types::{NotificationRecords, S3Api}; use minio::s3::utils::{to_iso8601utc, utc_now}; @@ -277,9 +278,21 @@ impl ClientTest { assert_eq!(resp.object_name, object_name); assert_eq!(resp.size, size); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); + // Validate delete succeeded. + let resp = self + .client + .stat_object(&StatObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .await; + match resp.err().unwrap() { + Error::S3Error(er) => { + assert_eq!(er.code, "NoSuchKey") + } + _ => assert!(false), + } } async fn put_object_multipart(&self) { @@ -307,7 +320,8 @@ impl ClientTest { assert_eq!(resp.object_name, object_name); assert_eq!(resp.size, size); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -338,7 +352,8 @@ impl ClientTest { assert_eq!(resp.size, *size as usize); assert_eq!(resp.etag, etag); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -367,7 +382,8 @@ impl ClientTest { assert_eq!(resp.size, *size as usize); assert_eq!(resp.etag, etag); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -416,7 +432,8 @@ impl ClientTest { assert_eq!(resp.size, sizes[idx] as usize); assert_eq!(resp.etag, etag); client - .remove_object(&RemoveObjectArgs::new(&test_bucket, &object_name).unwrap()) + .remove_object(&test_bucket, object_name.as_str()) + .send() .await .unwrap(); @@ -453,7 +470,8 @@ impl ClientTest { let got = resp.text().await.unwrap(); assert_eq!(got, data); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -475,7 +493,8 @@ impl ClientTest { let got = resp.content.to_segmented_bytes().await.unwrap().to_bytes(); assert_eq!(got, data); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -513,12 +532,13 @@ impl ClientTest { fs::remove_file(&filename).unwrap(); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); - self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); @@ -547,12 +567,13 @@ impl ClientTest { fs::remove_file(&filename).unwrap(); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); - self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -583,20 +604,27 @@ impl ClientTest { .unwrap(); names.push(object_name); } - let mut objects: Vec = Vec::new(); - for name in names.iter() { - objects.push(DeleteObject { - name, - version_id: None, - }); - } + let del_items: Vec = names + .iter() + .map(|v| ObjectToDelete::from(v.as_str())) + .collect(); - self.client - .remove_objects( - &mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(), - ) - .await - .unwrap(); + let mut resp = self + .client + .remove_objects(&self.test_bucket, del_items.into_iter()) + .verbose_mode(true) + .to_stream() + .await; + + let mut del_count = 0; + while let Some(item) = resp.next().await { + let res = item.unwrap(); + for obj in res.result.iter() { + assert!(obj.is_deleted()); + } + del_count += res.result.len(); + } + assert_eq!(del_count, 3); self.client .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) @@ -647,21 +675,23 @@ impl ClientTest { } assert!(count == 3); - let mut objects: Vec = Vec::new(); - for name in names.iter() { - objects.push(DeleteObject { - name, - version_id: None, - }); + let del_items: Vec = names + .iter() + .map(|v| ObjectToDelete::from(v.as_str())) + .collect(); + let mut resp = self + .client + .remove_objects(&self.test_bucket, del_items.into_iter()) + .verbose_mode(true) + .to_stream() + .await; + while let Some(item) = resp.next().await { + let res = item.unwrap(); + for obj in res.result.iter() { + assert!(obj.is_deleted()); + } } - self.client - .remove_objects( - &mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(), - ) - .await - .unwrap(); - self.client .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) .await @@ -731,7 +761,8 @@ impl ClientTest { } assert_eq!(got, data); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -806,7 +837,8 @@ impl ClientTest { .unwrap(); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); @@ -853,12 +885,13 @@ impl ClientTest { assert_eq!(resp.size, size); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); - self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &src_object_name).unwrap()) + .remove_object(&self.test_bucket, src_object_name.as_str()) + .send() .await .unwrap(); } @@ -904,12 +937,13 @@ impl ClientTest { assert_eq!(resp.size, 5); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); - self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &src_object_name).unwrap()) + .remove_object(&self.test_bucket, src_object_name.as_str()) + .send() .await .unwrap(); } @@ -1204,7 +1238,8 @@ impl ClientTest { assert!(resp.tags.is_empty()); self.client - .remove_object(&RemoveObjectArgs::new(&self.test_bucket, &object_name).unwrap()) + .remove_object(&self.test_bucket, object_name.as_str()) + .send() .await .unwrap(); } @@ -1319,10 +1354,14 @@ impl ClientTest { assert!(resp.retention_mode.is_none()); assert!(resp.retain_until_date.is_none()); - let mut args = RemoveObjectArgs::new(&bucket_name, &object_name).unwrap(); - let version_id = obj_resp.version_id.unwrap().clone(); - args.version_id = Some(version_id.as_str()); - self.client.remove_object(&args).await.unwrap(); + self.client + .remove_object( + &bucket_name, + (object_name.as_str(), obj_resp.version_id.as_deref()), + ) + .send() + .await + .unwrap(); self.client .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap())