Skip to content

Commit

Permalink
Uploader supports multipart/form-data and attaches filename
Browse files Browse the repository at this point in the history
- Use async client to support multipart/form-data
- Expand Uploader to support both PUT and POST method
- Add MIME auto-detection based on file extension

Signed-off-by: Rina Fujino <[email protected]>
  • Loading branch information
rina23q committed Apr 3, 2024
1 parent feaf800 commit 31c25ca
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 20 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ log_manager = { path = "crates/common/log_manager" }
logged_command = { path = "crates/common/logged_command" }
maplit = "1.0"
miette = { version = "5.5.0", features = ["fancy"] }
mime_guess = "2.0.4"
mockall = "0.11"
mockito = "1.1.0"
mqtt_channel = { path = "crates/common/mqtt_channel" }
Expand Down
3 changes: 2 additions & 1 deletion crates/common/upload/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ axum_tls = { workspace = true, features = ["error-matching"] }
backoff = { workspace = true }
camino = { workspace = true }
log = { workspace = true }
reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots"] }
mime_guess = { workspace = true }
reqwest = { workspace = true, features = ["stream", "rustls-tls-native-roots", "multipart"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["codec"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/common/upload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ mod upload;
pub use crate::error::UploadError;
pub use crate::upload::Auth;
pub use crate::upload::ContentType;
pub use crate::upload::FormData;
pub use crate::upload::Mime;
pub use crate::upload::UploadInfo;
pub use crate::upload::UploadMethod;
pub use crate::upload::Uploader;
128 changes: 117 additions & 11 deletions crates/common/upload/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use camino::Utf8Path;
use camino::Utf8PathBuf;
use log::info;
use log::warn;
use mime_guess::MimeGuess;
use reqwest::header::CONTENT_LENGTH;
use reqwest::header::CONTENT_TYPE;
use reqwest::multipart;
use reqwest::Body;
use reqwest::Identity;
use std::fmt::Display;
Expand All @@ -27,26 +29,69 @@ fn default_backoff() -> ExponentialBackoff {
}
}

/// Auto tries to detect the mime from the given file extension without direct file access.
/// Custom sets a custom Content-Type.
/// If multipart request is needed, choose FormData.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ContentType {
Auto,
Custom(String),
FormData(FormData),
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Mime {
TextPlain,
ApplicationOctetStream,
}

impl Display for ContentType {
impl Display for Mime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ContentType::TextPlain => write!(f, "text/plain"),
ContentType::ApplicationOctetStream => write!(f, "application/octet-stream"),
Mime::TextPlain => write!(f, "text/plain"),
Mime::ApplicationOctetStream => write!(f, "application/octet-stream"),
}
}
}

/// Dataset to construct reqwest::multipart::Form.
/// To avoid using reqwest::multipart::Form inside the ContentType enum
/// since reqwest::multipart::Form doesn't support Copy or Clone.
#[derive(Debug, Eq, Clone, PartialEq)]
pub struct FormData {
filename: String,
mime: Option<String>,
}

impl FormData {
pub fn new(filename: String) -> Self {
Self {
filename,
mime: None,
}
}

pub fn set_mime(self, mime: Mime) -> Self {
Self {
mime: Some(mime.to_string()),
..self
}
}
}

/// Switch upload method
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum UploadMethod {
PUT,
POST,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct UploadInfo {
pub url: String,
pub auth: Option<Auth>,
pub content_type: ContentType,
pub method: UploadMethod,
}

impl From<&str> for UploadInfo {
Expand All @@ -60,7 +105,8 @@ impl UploadInfo {
Self {
url: url.into(),
auth: None,
content_type: ContentType::ApplicationOctetStream,
content_type: ContentType::Auto,
method: UploadMethod::PUT,
}
}

Expand All @@ -71,13 +117,17 @@ impl UploadInfo {
}
}

pub fn with_content_type(self, content_type: ContentType) -> Self {
pub fn set_content_type(self, content_type: ContentType) -> Self {
Self {
content_type,
..self
}
}

pub fn set_method(self, method: UploadMethod) -> Self {
Self { method, ..self }
}

pub fn url(&self) -> &str {
self.url.as_str()
}
Expand Down Expand Up @@ -166,18 +216,46 @@ impl Uploader {
info!("Redirecting request from {} to {target_url}", url.url())
}

// Todo: Ideally it detects the appropriate content-type automatically, e.g. UTF-8 => text/plain
let mut client = client
.put(target_url)
.header(CONTENT_TYPE, url.content_type.to_string())
.header(CONTENT_LENGTH, file_length);
let mut client = match url.method {
UploadMethod::PUT => client.put(target_url),
UploadMethod::POST => client.post(target_url),
};

if let Some(Auth::Bearer(token)) = &url.auth {
client = client.bearer_auth(token)
}

client = match url.content_type.clone() {
ContentType::Auto => {
let mime = MimeGuess::from_path(&self.source_filename)
.first_or_octet_stream()
.to_string();
client
.header(CONTENT_TYPE, mime)
.header(CONTENT_LENGTH, file_length)
.body(file_body)
}
ContentType::Custom(mime) => client
.header(CONTENT_TYPE, mime)
.header(CONTENT_LENGTH, file_length)
.body(file_body),
ContentType::FormData(data) => {
let mime = match data.mime {
None => MimeGuess::from_path(&self.source_filename)
.first_or_octet_stream()
.to_string(),
Some(mime) => mime,
};
let file_part = multipart::Part::stream_with_length(file_body, file_length)
.file_name(data.filename)
.mime_str(&mime)
.unwrap(); // safe, ensured that mime is valid
let form = multipart::Form::new().part("file", file_part);
client.multipart(form)
}
};

client
.body(file_body)
.send()
.await
.map_err(|err| {
Expand Down Expand Up @@ -255,6 +333,34 @@ mod tests {
assert!(uploader.upload(&url).await.is_ok())
}

#[tokio::test]
async fn upload_content_no_auth_post() {
let mut server = mockito::Server::new();
let _mock1 = server
.mock("POST", "/some_file.txt")
.with_status(201)
.create();

let mut target_url = server.url();
target_url.push_str("/some_file.txt");

let url = UploadInfo::new(&target_url)
.set_content_type(ContentType::FormData(FormData::new("filename".into())))
.set_method(UploadMethod::POST);

let ttd = TempTedgeDir::new();
ttd.file("file_upload.txt")
.with_raw_content("Hello, world!");

let mut uploader = Uploader::new(ttd.utf8_path().join("file_upload.txt"), None);
uploader.set_backoff(ExponentialBackoff {
current_interval: Duration::ZERO,
..Default::default()
});

assert!(uploader.upload(&url).await.is_ok())
}

#[tokio::test]
async fn upload_content_with_auth() {
let mut server = mockito::Server::new();
Expand Down
17 changes: 14 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/operations/config_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use tedge_downloader_ext::DownloadResult;
use tedge_mqtt_ext::Message;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tedge_uploader_ext::ContentType;
use tedge_uploader_ext::FormData;
use tedge_uploader_ext::UploadRequest;
use time::OffsetDateTime;
use tracing::log::warn;
Expand Down Expand Up @@ -177,6 +179,7 @@ impl CumulocityConverter {
fts_download: FtsDownloadOperationData,
) -> Result<Vec<Message>, ConversionError> {
let target = self.entity_store.try_get(&fts_download.entity_topic_id)?;
let xid = target.external_id.as_ref();
let smartrest_topic =
self.smartrest_publish_topic_for_entity(&fts_download.entity_topic_id)?;
let payload = fts_download.message.payload_str()?;
Expand Down Expand Up @@ -206,20 +209,28 @@ impl CumulocityConverter {
time: OffsetDateTime::now_utc(),
text: response.config_type.clone(),
extras: HashMap::new(),
device_id: target.external_id.as_ref().to_string(),
device_id: xid.to_string(),
};
let event_response_id = self.http_proxy.send_event(create_event).await?;

let binary_upload_event_url = self
.c8y_endpoint
.get_url_for_event_binary_upload_unchecked(&event_response_id);

let file_path = Utf8PathBuf::try_from(download.file_path).map_err(|e| e.into_io_error())?;

// The method must be POST, otherwise file name won't be supported.
let upload_request = UploadRequest::new(
self.auth_proxy
.proxy_url(binary_upload_event_url.clone())
.as_str(),
&Utf8PathBuf::try_from(download.file_path).map_err(|e| e.into_io_error())?,
);
&file_path,
)
.post()
.with_content_type(ContentType::FormData(FormData::new(format!(
"{xid}_{filename}",
filename = file_path.file_name().unwrap_or("filename")
))));

self.pending_upload_operations.insert(
cmd_id.clone(),
Expand Down
21 changes: 18 additions & 3 deletions crates/extensions/c8y_mapper_ext/src/operations/log_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::QoS;
use tedge_mqtt_ext::TopicFilter;
use tedge_uploader_ext::ContentType;
use tedge_uploader_ext::FormData;
use tedge_uploader_ext::Mime;
use tedge_uploader_ext::UploadRequest;
use time::OffsetDateTime;
use tracing::debug;
Expand Down Expand Up @@ -182,6 +184,7 @@ impl CumulocityConverter {
let smartrest_topic = self.smartrest_publish_topic_for_entity(&topic_id)?;
let payload = fts_download.message.payload_str()?;
let response = &LogUploadCmdPayload::from_json(payload)?;
let xid = target.external_id.as_ref();

let download_response = match download_result {
Err(err) => {
Expand All @@ -207,21 +210,33 @@ impl CumulocityConverter {
time: OffsetDateTime::now_utc(),
text: response.log_type.clone(),
extras: HashMap::new(),
device_id: target.external_id.as_ref().to_string(),
device_id: xid.to_string(),
};
let event_response_id = self.http_proxy.send_event(create_event).await?;

let binary_upload_event_url = self
.c8y_endpoint
.get_url_for_event_binary_upload_unchecked(&event_response_id);

let file_path =
Utf8PathBuf::try_from(download_response.file_path).map_err(|e| e.into_io_error())?;

// The method must be POST, otherwise file name won't be supported.
// Mime must be text/*, otherwise c8y UI doesn't give a preview of the content.
let upload_request = UploadRequest::new(
self.auth_proxy
.proxy_url(binary_upload_event_url.clone())
.as_str(),
&Utf8PathBuf::try_from(download_response.file_path).map_err(|e| e.into_io_error())?,
&file_path,
)
.with_content_type(ContentType::TextPlain);
.post()
.with_content_type(ContentType::FormData(
FormData::new(format!(
"{xid}_{filename}",
filename = file_path.file_name().unwrap_or("filename")
))
.set_mime(Mime::TextPlain),
));

self.uploader_sender
.send((cmd_id.clone(), upload_request))
Expand Down
Loading

0 comments on commit 31c25ca

Please sign in to comment.