Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
Merge pull request #17 from MalteJanz/performance-tuning
Browse files Browse the repository at this point in the history
Performance tuning
  • Loading branch information
MalteJanz authored Jul 8, 2024
2 parents 05faf52 + a6769a1 commit 375dfc4
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 119 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license = "MIT"
[dependencies]
clap = { version = "4.5.8", features = ["derive"] }
tokio = { version = "1.38.0", features = ["full"] }
rayon = "1.10.0"
reqwest = { version = "0.12.5", features = ["json"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.120"
Expand Down
55 changes: 37 additions & 18 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{header, Client, Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use thiserror::Error;
Expand All @@ -30,6 +31,9 @@ impl SwClient {
// and that doesn't have the association data as part of the entity object
default_headers.insert(header::ACCEPT, HeaderValue::from_static("application/json"));
let client = Client::builder()
// workaround for long-running requests,
// see https://github.com/hyperium/hyper/issues/2312#issuecomment-1411360500
.pool_max_idle_per_host(0)
.timeout(Duration::from_secs(15))
.default_headers(default_headers)
.build()?;
Expand All @@ -55,17 +59,11 @@ impl SwClient {
payload: &[T],
) -> Result<(), SwApiError> {
let entity: String = entity.into();
println!(
"sync {:?} '{}' with payload size {}",
action,
&entity,
payload.len()
);
// ToDo: implement retry on auth fail
let access_token = self.access_token.lock().unwrap().clone();
let body = SyncBody {
write_data: SyncOperation {
entity,
entity: entity.clone(),
action,
payload,
},
Expand All @@ -74,6 +72,12 @@ impl SwClient {
let response = {
let _lock = self.in_flight_semaphore.acquire().await.unwrap();
let start_instant = Instant::now();
println!(
"sync {:?} '{}' with payload size {}",
action,
&entity,
payload.len()
);
let res = self
.client
.post(format!("{}/api/_action/sync", self.credentials.base_url))
Expand Down Expand Up @@ -190,6 +194,10 @@ impl SwClient {
let response = {
let _lock = self.in_flight_semaphore.acquire().await.unwrap();
let start_instant = Instant::now();
println!(
"fetching page {} of '{}' with limit {}",
criteria.page, entity, criteria.limit
);
let res = self
.client
.post(format!(
Expand Down Expand Up @@ -234,7 +242,7 @@ impl SwClient {

if !response.status().is_success() {
let status = response.status();
let body: serde_json::Value = response.json().await?;
let body: serde_json::Value = Self::deserialize(response).await?;
return Err(anyhow!(
"Failed to authenticate, got {} with body:\n{}",
status,
Expand All @@ -247,18 +255,29 @@ impl SwClient {
Ok(res)
}

async fn deserialize<T: for<'a> Deserialize<'a>>(response: Response) -> Result<T, SwApiError> {
async fn deserialize<T>(response: Response) -> Result<T, SwApiError>
where
T: for<'a> Deserialize<'a> + Debug + Send + 'static,
{
let bytes = response.bytes().await?;

match serde_json::from_slice(&bytes) {
Ok(t) => Ok(t),
Err(_e) => {
let body: serde_json::Value = serde_json::from_slice(&bytes)?;
Err(SwApiError::DeserializeIntoSchema(
serde_json::to_string_pretty(&body)?,
))
}
}
// offload heavy deserialization (shopware json responses can get big) to worker thread
// to not block this thread for too long doing async work
let (worker_tx, worker_rx) = tokio::sync::oneshot::channel::<Result<T, SwApiError>>();
rayon::spawn(move || {
// expensive for lage json objects
let result = match serde_json::from_slice(&bytes) {
Ok(t) => Ok(t),
Err(_e) => {
let body: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
Err(SwApiError::DeserializeIntoSchema(
serde_json::to_string_pretty(&body).unwrap(),
))
}
};
worker_tx.send(result).unwrap();
});
worker_rx.await.unwrap()
}
}

Expand Down
114 changes: 64 additions & 50 deletions src/data/export.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Everything related to exporting data out of shopware

use crate::api::filter::Criteria;
use crate::api::SwListResponse;
use crate::data::transform::serialize_entity;
use crate::SyncContext;
use std::cmp;
use std::sync::Arc;
use tokio::task::JoinHandle;

/// Might block, so should be used with `task::spawn_blocking`
pub async fn export(context: Arc<SyncContext>) -> anyhow::Result<()> {
if !context.associations.is_empty() {
println!("Using associations: {:#?}", context.associations);
Expand All @@ -19,44 +19,91 @@ pub async fn export(context: Arc<SyncContext>) -> anyhow::Result<()> {
println!("Using sort: {:#?}", context.schema.sort);
}

// retrieve total entity count from shopware and calculate chunk count
let mut total = context
.sw_client
.get_total(&context.schema.entity, &context.schema.filter)
.await?;
if let Some(limit) = context.limit {
total = cmp::min(limit, total);
}

let chunk_limit = cmp::min(Criteria::MAX_LIMIT, total);
let mut page = 1;
let mut counter = 0;
let chunk_count = total.div_ceil(chunk_limit);
println!(
"Reading {} of entity '{}' with chunk limit {}",
total, context.schema.entity, chunk_limit
"Reading {} of entity '{}' with chunk limit {}, resulting in {} chunks to be processed",
total, context.schema.entity, chunk_limit, chunk_count
);

// submit request tasks
let mut request_tasks = vec![];
loop {
if counter >= total {
break;
}
#[allow(clippy::type_complexity)]
let mut request_tasks: Vec<JoinHandle<anyhow::Result<(u64, Vec<Vec<String>>)>>> = vec![];
for i in 0..chunk_count {
let page = i + 1;

let context = Arc::clone(&context);
request_tasks.push(tokio::spawn(async move {
process_request(page, chunk_limit, &context).await
let response = send_request(page, chunk_limit, &context).await?;

// move actual response processing / deserialization to worker thread pool
// and wait for it to finish
let (worker_tx, worker_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<(u64, Vec<Vec<String>>)>>();
rayon::spawn(move || {
let result = process_response(page, chunk_limit, response, &context);
worker_tx.send(result).unwrap();
});
worker_rx.await?
}));

page += 1;
counter += chunk_limit;
}

// wait for all request tasks to finish
write_to_file(request_tasks, &context).await?;
// wait for all tasks to finish, one after the other, in order,
// and write them to the target file (blocking IO)
tokio::task::spawn_blocking(|| async move { write_to_file(request_tasks, &context).await })
.await?
.await?;

Ok(())
}

async fn send_request(
page: u64,
chunk_limit: u64,
context: &SyncContext,
) -> anyhow::Result<SwListResponse> {
let mut criteria = Criteria {
page,
limit: chunk_limit,
sort: context.schema.sort.clone(),
filter: context.schema.filter.clone(),
..Default::default()
};
for association in &context.associations {
criteria.add_association(association);
}

let response = context
.sw_client
.list(&context.schema.entity, &criteria)
.await?;

Ok(response)
}

fn process_response(
page: u64,
chunk_limit: u64,
response: SwListResponse,
context: &SyncContext,
) -> anyhow::Result<(u64, Vec<Vec<String>>)> {
let mut rows: Vec<Vec<String>> = Vec::with_capacity(chunk_limit as usize);
for entity in response.data {
let row = serialize_entity(entity, context)?;
rows.push(row);
}

Ok((page, rows))
}

#[allow(clippy::type_complexity)]
async fn write_to_file(
worker_handles: Vec<JoinHandle<anyhow::Result<(u64, Vec<Vec<String>>)>>>,
Expand Down Expand Up @@ -84,39 +131,6 @@ async fn write_to_file(
Ok(())
}

async fn process_request(
page: u64,
chunk_limit: u64,
context: &SyncContext,
) -> anyhow::Result<(u64, Vec<Vec<String>>)> {
println!(
"fetching page {} of {} with limit {}",
page, context.schema.entity, chunk_limit
);
let mut rows: Vec<Vec<String>> = Vec::with_capacity(chunk_limit as usize);
let mut criteria = Criteria {
page,
limit: chunk_limit,
sort: context.schema.sort.clone(),
filter: context.schema.filter.clone(),
..Default::default()
};
for association in &context.associations {
criteria.add_association(association);
}

let response = context
.sw_client
.list(&context.schema.entity, &criteria)
.await?;
for entity in response.data {
let row = serialize_entity(entity, context)?;
rows.push(row);
}

Ok((page, rows))
}

fn get_header_line(context: &SyncContext) -> Vec<String> {
let mut columns = vec![];

Expand Down
Loading

0 comments on commit 375dfc4

Please sign in to comment.