Skip to content

Commit

Permalink
gridfs: do not error when contentType field is not found, allow m…
Browse files Browse the repository at this point in the history
…etadata to be included
  • Loading branch information
auguwu committed Feb 19, 2024
1 parent 0ef9a93 commit e486bff
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ resolver = "2"
members = ["crates/*", "remi"]

[workspace.package]
version = "0.6.2"
version = "0.6.3"
repository = "https://github.com/Noelware/remi-rs"
license = "MIT"
edition = "2021"
Expand Down
2 changes: 1 addition & 1 deletion crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ azure_storage_blobs = "0.19.0"
bytes = "1.5.0"
futures-util = "0.3.30"
log = { version = "0.4.20", optional = true }
remi = { path = "../../remi", version = "0.6.2" }
remi = { path = "../../remi", version = "0.6.3" }
serde = { version = "1.0.196", features = ["derive"], optional = true }
tracing = { version = "0.1.40", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ file-format = { version = "0.24.0", optional = true }
futures = "0.3.30"
infer = { version = "0.15.0", default-features = false, optional = true }
log = { version = "0.4.20", optional = true }
remi = { path = "../../remi", version = "0.6.2" }
remi = { path = "../../remi", version = "0.6.3" }
serde = { version = "1.0.196", features = ["derive"], optional = true }
serde_json = { version = "1.0.113", optional = true }
serde_yaml = { version = "0.9.31", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/gridfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bytes = "1.5.0"
futures-util = "0.3.30"
log = { version = "0.4.20", optional = true }
mongodb = "2.8.1"
remi = { path = "../../remi", version = "0.6.2" }
remi = { path = "../../remi", version = "0.6.3" }
serde = { version = "1.0.196", features = ["derive"], optional = true }
tokio-util = "0.7.10"
tracing = { version = "0.1.40", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/gridfs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use mongodb::options::{ClientOptions, GridFsBucketOptions, ReadConcern, Selectio
)]
pub type GridfsStorageConfig = StorageConfig;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct StorageConfig {
/// Specifies the [`SelectionCriteria`].
Expand All @@ -48,7 +48,7 @@ pub struct StorageConfig {
pub write_concern: Option<WriteConcern>,

/// Configure the [`ClientOptions`] that allows to connect to a MongoDB server.
#[cfg_attr(feature = "serde", serde(skip_serializing))]
#[cfg_attr(feature = "serde", serde(default, skip_serializing))]
pub client_options: ClientOptions,

/// Specifies the [`ReadConcern`] for isolation for when reading documents from the GridFS store. Read the
Expand Down
58 changes: 43 additions & 15 deletions crates/gridfs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bytes::{Bytes, BytesMut};
use futures_util::{AsyncWriteExt, StreamExt};
use mongodb::{
bson::{doc, raw::ValueAccessErrorKind, Bson, RawDocument},
options::GridFsFindOptions,
options::{GridFsFindOptions, GridFsUploadOptions},
Client, Database, GridFsBucket,
};
use remi::{Blob, File, ListBlobsRequest, UploadRequest};
Expand Down Expand Up @@ -53,12 +53,18 @@ fn value_access_err_to_error(error: mongodb::bson::raw::ValueAccessError) -> mon
fn document_to_blob(bytes: Bytes, doc: &RawDocument) -> Result<File, mongodb::error::Error> {
let filename = doc.get_str("filename").map_err(value_access_err_to_error)?;
let length = doc.get_i64("length").map_err(value_access_err_to_error)?;
let content_type = doc.get_str("contentType").map_err(value_access_err_to_error)?;
let created_at = doc.get_datetime("uploadDate").map_err(value_access_err_to_error)?;
let content_type = match doc.get_str("contentType") {
Ok(res) => Some(res),
Err(e) => match e.kind {
ValueAccessErrorKind::NotPresent => None,
_ => return Err(value_access_err_to_error(e)),
},
};

Ok(File {
last_modified_at: None,
content_type: Some(content_type.to_owned()),
content_type: content_type.map(String::from),
created_at: if created_at.timestamp_millis() < 0 {
#[cfg(feature = "tracing")]
::tracing::warn!(remi.service = "gridfs", %filename, "`created_at` timestamp was negative");
Expand Down Expand Up @@ -95,14 +101,20 @@ fn document_to_blob(bytes: Bytes, doc: &RawDocument) -> Result<File, mongodb::er
pub type GridfsStorageService = StorageService;

#[derive(Debug, Clone)]
pub struct StorageService(GridFsBucket);
pub struct StorageService {
config: Option<StorageConfig>,
bucket: GridFsBucket,
}

impl StorageService {
/// Creates a new [`StorageService`] which uses the [`StorageConfig`] as a way to create
/// the inner [`GridFsBucket`].
pub fn new(db: Database, config: StorageConfig) -> StorageService {
let bucket = db.gridfs_bucket(Some(config.into()));
StorageService::with_bucket(bucket)
let bucket = db.gridfs_bucket(Some(config.clone().into()));
StorageService {
config: Some(config),
bucket,
}
}

/// Return a new [`StorageService`] from a constructed [`Client`].
Expand All @@ -124,7 +136,7 @@ impl StorageService {

/// Uses a preconfigured [`GridFsBucket`] as the underlying bucket.
pub fn with_bucket(bucket: GridFsBucket) -> StorageService {
StorageService(bucket)
StorageService { config: None, bucket }
}

fn resolve_path<P: AsRef<Path>>(&self, path: P) -> Result<String, mongodb::error::Error> {
Expand Down Expand Up @@ -168,7 +180,7 @@ impl remi::StorageService for StorageService {
::log::info!("opening file [{}]", path);

let mut cursor = self
.0
.bucket
.find(doc! { "filename": &path }, GridFsFindOptions::default())
.await?;

Expand All @@ -189,7 +201,7 @@ impl remi::StorageService for StorageService {

let doc = cursor.current();
let stream = self
.0
.bucket
.open_download_stream(Bson::ObjectId(
doc.get_object_id("_id").map_err(value_access_err_to_error)?,
))
Expand Down Expand Up @@ -235,7 +247,7 @@ impl remi::StorageService for StorageService {
::log::info!("getting file metadata for file [{}]", path);

let mut cursor = self
.0
.bucket
.find(
doc! {
"filename": &path,
Expand Down Expand Up @@ -295,12 +307,12 @@ impl remi::StorageService for StorageService {
return Ok(vec![]);
}

let mut cursor = self.0.find(doc!(), GridFsFindOptions::default()).await?;
let mut cursor = self.bucket.find(doc!(), GridFsFindOptions::default()).await?;
let mut blobs = vec![];
while cursor.advance().await? {
let doc = cursor.current();
let stream = self
.0
.bucket
.open_download_stream(Bson::ObjectId(
doc.get_object_id("_id").map_err(value_access_err_to_error)?,
))
Expand Down Expand Up @@ -356,7 +368,7 @@ impl remi::StorageService for StorageService {
::log::info!("deleting file [{}]", path);

let mut cursor = self
.0
.bucket
.find(
doc! {
"filename": &path,
Expand All @@ -380,7 +392,7 @@ impl remi::StorageService for StorageService {
let doc = cursor.current();
let oid = doc.get_object_id("_id").map_err(value_access_err_to_error)?;

self.0.delete(Bson::ObjectId(oid)).await
self.bucket.delete(Bson::ObjectId(oid)).await
}

#[cfg_attr(
Expand Down Expand Up @@ -426,7 +438,23 @@ impl remi::StorageService for StorageService {
#[cfg(feature = "log")]
::log::info!("uploading file [{}] to GridFS", path);

let mut stream = self.0.open_upload_stream(path, None);
let opts = GridFsUploadOptions::builder()
.chunk_size_bytes(Some(
self.config.clone().unwrap_or_default().chunk_size.unwrap_or(255 * 1024),
))
.metadata(match options.metadata.is_empty() {
true => None,
false => Some(
options
.metadata
.into_iter()
.map(|(k, v)| (k, Bson::String(v)))
.collect(),
),
})
.build();

let mut stream = self.bucket.open_upload_stream(path, Some(opts));
stream.write_all(&options.data[..]).await?;
stream.close().await.map_err(From::from)

Expand Down
2 changes: 1 addition & 1 deletion crates/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ aws-sdk-s3 = { version = "1.15.0", features = ["behavior-version-latest"] }
#aws-smithy-runtime-api = "1.1.6"
bytes = "1.5.0"
log = { version = "0.4.20", optional = true }
remi = { path = "../../remi", version = "0.6.2" }
remi = { path = "../../remi", version = "0.6.3" }
serde = { version = "1.0.196", features = ["derive"], optional = true }
tracing = { version = "0.1.40", optional = true }

Expand Down

0 comments on commit e486bff

Please sign in to comment.