Skip to content

Commit

Permalink
refactor: add support for extending file storage to other schemes and…
Browse files Browse the repository at this point in the history
… provide a runtime flag for the same (#3348)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
  • Loading branch information
Chethan-rao and hyperswitch-bot[bot] authored Jan 30, 2024
1 parent 937aea9 commit a9638d1
Show file tree
Hide file tree
Showing 18 changed files with 461 additions and 258 deletions.
31 changes: 15 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,11 @@ refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events

# File storage configuration
[file_storage]
file_storage_backend = "aws_s3" # File storage backend to be used

[file_storage.aws_s3]
region = "us-east-1" # The AWS region used by the AWS S3 for file storage
bucket_name = "bucket1" # The AWS S3 bucket name for file storage
3 changes: 3 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -544,3 +544,6 @@ client_id = ""
client_secret = ""
partner_id = ""
enabled = true

[file_storage]
file_storage_backend = "file_system"
3 changes: 3 additions & 0 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,6 @@ enabled = true

[events]
source = "logs"

[file_storage]
file_storage_backend = "file_system"
2 changes: 2 additions & 0 deletions crates/external_services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ license.workspace = true
[features]
kms = ["dep:aws-config", "dep:aws-sdk-kms"]
email = ["dep:aws-config"]
aws_s3 = ["dep:aws-config", "dep:aws-sdk-s3"]
hashicorp-vault = [ "dep:vaultrs" ]

[dependencies]
Expand All @@ -18,6 +19,7 @@ aws-config = { version = "0.55.3", optional = true }
aws-sdk-kms = { version = "0.28.0", optional = true }
aws-sdk-sesv2 = "0.28.0"
aws-sdk-sts = "0.28.0"
aws-sdk-s3 = { version = "0.28.0", optional = true }
aws-smithy-client = "0.55.3"
base64 = "0.21.2"
dyn-clone = "1.0.11"
Expand Down
96 changes: 96 additions & 0 deletions crates/external_services/src/file_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
//!
//! Module for managing file storage operations with support for multiple storage schemes.
//!

use std::fmt::{Display, Formatter};

use common_utils::errors::CustomResult;

/// Includes functionality for AWS S3 storage operations.
#[cfg(feature = "aws_s3")]
mod aws_s3;

mod file_system;

/// Enum representing different file storage configurations, allowing for multiple storage schemes.
#[derive(Debug, Clone, Default, serde::Deserialize)]
#[serde(tag = "file_storage_backend")]
#[serde(rename_all = "snake_case")]
pub enum FileStorageConfig {
/// AWS S3 storage configuration.
#[cfg(feature = "aws_s3")]
AwsS3 {
/// Configuration for AWS S3 file storage.
aws_s3: aws_s3::AwsFileStorageConfig,
},
/// Local file system storage configuration.
#[default]
FileSystem,
}

impl FileStorageConfig {
/// Validates the file storage configuration.
pub fn validate(&self) -> Result<(), InvalidFileStorageConfig> {
match self {
#[cfg(feature = "aws_s3")]
Self::AwsS3 { aws_s3 } => aws_s3.validate(),
Self::FileSystem => Ok(()),
}
}

/// Retrieves the appropriate file storage client based on the file storage configuration.
pub async fn get_file_storage_client(&self) -> Box<dyn FileStorageInterface> {
match self {
#[cfg(feature = "aws_s3")]
Self::AwsS3 { aws_s3 } => Box::new(aws_s3::AwsFileStorageClient::new(aws_s3).await),
Self::FileSystem => Box::new(file_system::FileSystem),
}
}
}

/// Trait for file storage operations
#[async_trait::async_trait]
pub trait FileStorageInterface: dyn_clone::DynClone + Sync + Send {
/// Uploads a file to the selected storage scheme.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileStorageError>;

/// Deletes a file from the selected storage scheme.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError>;

/// Retrieves a file from the selected storage scheme.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileStorageError>;
}

dyn_clone::clone_trait_object!(FileStorageInterface);

/// Error thrown when the file storage config is invalid
#[derive(Debug, Clone)]
pub struct InvalidFileStorageConfig(&'static str);

impl std::error::Error for InvalidFileStorageConfig {}

impl Display for InvalidFileStorageConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "file_storage: {}", self.0)
}
}

/// Represents errors that can occur during file storage operations.
#[derive(Debug, thiserror::Error, PartialEq)]
pub enum FileStorageError {
/// Indicates that the file upload operation failed.
#[error("Failed to upload file")]
UploadFailed,

/// Indicates that the file retrieval operation failed.
#[error("Failed to retrieve file")]
RetrieveFailed,

/// Indicates that the file deletion operation failed.
#[error("Failed to delete file")]
DeleteFailed,
}
158 changes: 158 additions & 0 deletions crates/external_services/src/file_storage/aws_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{
operation::{
delete_object::DeleteObjectError, get_object::GetObjectError, put_object::PutObjectError,
},
Client,
};
use aws_sdk_sts::config::Region;
use common_utils::{errors::CustomResult, ext_traits::ConfigExt};
use error_stack::ResultExt;

use super::InvalidFileStorageConfig;
use crate::file_storage::{FileStorageError, FileStorageInterface};

/// Configuration for AWS S3 file storage.
#[derive(Debug, serde::Deserialize, Clone, Default)]
#[serde(default)]
pub struct AwsFileStorageConfig {
/// The AWS region to send file uploads
region: String,
/// The AWS s3 bucket to send file uploads
bucket_name: String,
}

impl AwsFileStorageConfig {
/// Validates the AWS S3 file storage configuration.
pub(super) fn validate(&self) -> Result<(), InvalidFileStorageConfig> {
use common_utils::fp_utils::when;

when(self.region.is_default_or_empty(), || {
Err(InvalidFileStorageConfig("aws s3 region must not be empty"))
})?;

when(self.bucket_name.is_default_or_empty(), || {
Err(InvalidFileStorageConfig(
"aws s3 bucket name must not be empty",
))
})
}
}

/// AWS S3 file storage client.
#[derive(Debug, Clone)]
pub(super) struct AwsFileStorageClient {
/// AWS S3 client
inner_client: Client,
/// The name of the AWS S3 bucket.
bucket_name: String,
}

impl AwsFileStorageClient {
/// Creates a new AWS S3 file storage client.
pub(super) async fn new(config: &AwsFileStorageConfig) -> Self {
let region_provider = RegionProviderChain::first_try(Region::new(config.region.clone()));
let sdk_config = aws_config::from_env().region(region_provider).load().await;
Self {
inner_client: Client::new(&sdk_config),
bucket_name: config.bucket_name.clone(),
}
}

/// Uploads a file to AWS S3.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), AwsS3StorageError> {
self.inner_client
.put_object()
.bucket(&self.bucket_name)
.key(file_key)
.body(file.into())
.send()
.await
.map_err(AwsS3StorageError::UploadFailure)?;
Ok(())
}

/// Deletes a file from AWS S3.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), AwsS3StorageError> {
self.inner_client
.delete_object()
.bucket(&self.bucket_name)
.key(file_key)
.send()
.await
.map_err(AwsS3StorageError::DeleteFailure)?;
Ok(())
}

/// Retrieves a file from AWS S3.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, AwsS3StorageError> {
Ok(self
.inner_client
.get_object()
.bucket(&self.bucket_name)
.key(file_key)
.send()
.await
.map_err(AwsS3StorageError::RetrieveFailure)?
.body
.collect()
.await
.map_err(AwsS3StorageError::UnknownError)?
.to_vec())
}
}

#[async_trait::async_trait]
impl FileStorageInterface for AwsFileStorageClient {
/// Uploads a file to AWS S3.
async fn upload_file(
&self,
file_key: &str,
file: Vec<u8>,
) -> CustomResult<(), FileStorageError> {
self.upload_file(file_key, file)
.await
.change_context(FileStorageError::UploadFailed)?;
Ok(())
}

/// Deletes a file from AWS S3.
async fn delete_file(&self, file_key: &str) -> CustomResult<(), FileStorageError> {
self.delete_file(file_key)
.await
.change_context(FileStorageError::DeleteFailed)?;
Ok(())
}

/// Retrieves a file from AWS S3.
async fn retrieve_file(&self, file_key: &str) -> CustomResult<Vec<u8>, FileStorageError> {
Ok(self
.retrieve_file(file_key)
.await
.change_context(FileStorageError::RetrieveFailed)?)
}
}

/// Enum representing errors that can occur during AWS S3 file storage operations.
#[derive(Debug, thiserror::Error)]
enum AwsS3StorageError {
/// Error indicating that file upload to S3 failed.
#[error("File upload to S3 failed: {0:?}")]
UploadFailure(aws_smithy_client::SdkError<PutObjectError>),

/// Error indicating that file retrieval from S3 failed.
#[error("File retrieve from S3 failed: {0:?}")]
RetrieveFailure(aws_smithy_client::SdkError<GetObjectError>),

/// Error indicating that file deletion from S3 failed.
#[error("File delete from S3 failed: {0:?}")]
DeleteFailure(aws_smithy_client::SdkError<DeleteObjectError>),

/// Unknown error occurred.
#[error("Unknown error occurred: {0:?}")]
UnknownError(aws_sdk_s3::primitives::ByteStreamError),
}
Loading

0 comments on commit a9638d1

Please sign in to comment.