Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Some refactoring for readability #15

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/api/filter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Data structures to build criteria objects for the shopware API

use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

Expand Down
8 changes: 4 additions & 4 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! Everything needed for communicating with the Shopware API

pub mod filter;

use crate::api::filter::{Criteria, CriteriaFilter};
use crate::config::Credentials;
use crate::config_file::Credentials;
use anyhow::anyhow;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{header, Client, Response, StatusCode};
Expand Down Expand Up @@ -98,9 +100,7 @@ impl SwClient {
Ok(())
}

pub async fn entity_schema(
&self,
) -> Result<serde_json::Map<String, serde_json::Value>, SwApiError> {
pub async fn entity_schema(&self) -> Result<Entity, SwApiError> {
// ToDo: implement retry on auth fail
let access_token = self.access_token.lock().unwrap().clone();
let response = {
Expand Down
64 changes: 64 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Definitions for the CLI commands, arguments and help texts
//!
//! Makes heavy use of https://docs.rs/clap/latest/clap/

use clap::{Parser, Subcommand};
use std::path::PathBuf;

#[derive(Parser)]
#[command(version, about, long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
}

#[derive(Subcommand)]
pub enum Commands {
/// Authenticate with a given shopware shop via integration admin API.
/// Credentials are stored in .credentials.toml in the current working directory.
Auth {
/// base URL of the shop
#[arg(short, long)]
domain: String,

/// access_key_id
#[arg(short, long)]
id: String,

/// access_key_secret
#[arg(short, long)]
secret: String,
},

/// Import data into shopware or export data to a file
Sync {
/// Mode (import or export)
#[arg(value_enum, short, long)]
mode: SyncMode,

/// Path to profile schema.yaml
#[arg(short, long)]
schema: PathBuf,

/// Path to data file
#[arg(short, long)]
file: PathBuf,

/// Maximum amount of entities, can be used for debugging
#[arg(short, long)]
limit: Option<u64>,

// Verbose output, used for debugging
// #[arg(short, long, action = ArgAction::SetTrue)]
// verbose: bool,
/// How many requests can be "in-flight" at the same time
#[arg(short, long, default_value = "8")]
in_flight_limit: usize,
},
}

#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum SyncMode {
Import,
Export,
}
14 changes: 14 additions & 0 deletions src/config.rs → src/config_file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! Definitions for the `schema.yaml` and `.credentials.toml` files
//!
//! Allows deserialization into a proper typed structure from these files
//! or also write these typed structures to a file (in case of `.credentials.toml`)
//!
//! Utilizes https://serde.rs/

use crate::api::filter::{CriteriaFilter, CriteriaSorting};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
Expand All @@ -12,15 +19,22 @@ pub struct Credentials {
#[derive(Debug, Deserialize)]
pub struct Schema {
pub entity: String,

#[serde(default = "Vec::new")]
pub filter: Vec<CriteriaFilter>,

#[serde(default = "Vec::new")]
pub sort: Vec<CriteriaSorting>,

/// Are unique thanks to `HashSet`
#[serde(default = "HashSet::new")]
pub associations: HashSet<String>,

pub mappings: Vec<Mapping>,

#[serde(default = "String::new")]
pub serialize_script: String,

#[serde(default = "String::new")]
pub deserialize_script: String,
}
Expand Down
3 changes: 3 additions & 0 deletions src/data/export.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Everything related to exporting data out of shopware

use crate::api::filter::Criteria;
use crate::data::transform::serialize_entity;
use crate::SyncContext;
Expand Down Expand Up @@ -68,6 +70,7 @@ async fn write_to_file(
csv_writer.write_record(get_header_line(context))?;

for handle in worker_handles {
// ToDo: we might want to handle the errors more gracefully here and don't stop on first error
let (page, rows) = handle.await??;
println!("writing page {}", page);

Expand Down
35 changes: 22 additions & 13 deletions src/data/import.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::api::{SwApiError, SyncAction};
//! Everything related to import data into shopware

use crate::api::{Entity, SwApiError, SyncAction};
use crate::data::transform::deserialize_row;
use crate::SyncContext;
use anyhow::anyhow;
use itertools::Itertools;
use std::sync::Arc;

Expand All @@ -11,27 +14,32 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
.from_path(&context.file)?;
let headers = csv_reader.headers()?.clone();

// create an iterator, that processes (CSV) rows (StringRecord) into (usize, anyhow::Result<Entity>)
// where the former is the row index
let iter = csv_reader
.into_records()
.map(|r| {
let result = r.expect("failed reading CSV row");

deserialize_row(&headers, result, &context).expect("deserialize failed")
// ToDo improve error handling
.map(|r| match r {
Ok(row) => deserialize_row(&headers, row, &context),
Err(e) => Err(anyhow!(e)),
})
.enumerate()
.take(context.limit.unwrap_or(u64::MAX) as usize);

// iterate in chunks of 500 or less
let mut join_handles = vec![];
for sync_values in &iter.chunks(500) {
let (mut row_indices, mut chunk): (
Vec<usize>,
Vec<serde_json::Map<String, serde_json::Value>>,
) = sync_values.unzip();
let (mut row_indices, chunk): (Vec<usize>, Vec<anyhow::Result<Entity>>) =
sync_values.unzip();

// for now fail on first invalid row
// currently the most likely deserialization failure is not finding the column / CSV header
// ToDo: we might want to handle the errors more gracefully here and don't stop on first error
let mut valid_chunk = chunk.into_iter().collect::<anyhow::Result<Vec<Entity>>>()?;

// submit sync task
let context = Arc::clone(&context);
join_handles.push(tokio::spawn(async move {
match context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &chunk).await {
match context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &valid_chunk).await {
Ok(()) => Ok(()),
Err(SwApiError::Server(_, body)) => {
for err in body.errors.iter().rev() {
Expand All @@ -40,7 +48,7 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
let entry: usize = entry_str.parse().expect("error pointer should contain usize");

let row_index = row_indices.remove(entry);
let row = chunk.remove(entry);
let row = valid_chunk.remove(entry);
println!(
"server validation error on row {}: {} Remaining pointer '{}' ignored payload:\n{}",
row_index + 2,
Expand All @@ -50,13 +58,14 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
);
}
// retry
context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &chunk).await
context.sw_client.sync(&context.schema.entity, SyncAction::Upsert, &valid_chunk).await
},
Err(e) => Err(e),
}
}));
}

// wait for all the sync tasks to finish
for join_handle in join_handles {
join_handle.await??;
}
Expand Down
5 changes: 3 additions & 2 deletions src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ mod import;
mod transform;
mod validate;

// reexport the important functions / structs as part of this module
pub use export::export;
pub use import::import;
pub use transform::prepare_scripting_environment;
pub use transform::ScriptingEnvironment;
pub use transform::script::prepare_scripting_environment;
pub use transform::script::ScriptingEnvironment;
pub use validate::validate_paths_for_entity;
Loading