diff --git a/profiles/manufacturer.yaml b/profiles/manufacturer.yaml index 7ab34a1..c4f3247 100644 --- a/profiles/manufacturer.yaml +++ b/profiles/manufacturer.yaml @@ -2,5 +2,9 @@ entity: product_manufacturer mappings: - file_column: "identifier" entity_path: "id" - - file_column: "name (default language)" + - file_column: "default name" entity_path: "name" + - file_column: "default description" + entity_path: "description" + - file_column: "website" + entity_path: "link" diff --git a/profiles/product_required.yaml b/profiles/product_required.yaml new file mode 100644 index 0000000..b3b8c5f --- /dev/null +++ b/profiles/product_required.yaml @@ -0,0 +1,32 @@ +# Profile that contains all required fields for a product +entity: product +mappings: + - file_column: "id" + entity_path: "id" + - file_column: "product number" + entity_path: "productNumber" + - file_column: "default name" + entity_path: "name" + - file_column: "default price net" + key: "default_price_net" + - file_column: "default price gross" + key: "default_price_gross" + - file_column: "stock" + entity_path: "stock" + - file_column: "tax id" + entity_path: "taxId" +serialize_script: | + // ToDo: add convenience function to lookup default currencyId + let price = entity.price.find(|p| p.currencyId == "b7d2554b0ce847cd82f3ac9bd1c0dfca"); + row.default_price_net = price.net; + row.default_price_gross = price.gross; + +deserialize_script: | + entity.price = [ + #{ + net: row.default_price_net, + gross: row.default_price_gross, + linked: true, + currencyId: "b7d2554b0ce847cd82f3ac9bd1c0dfca", + } + ]; diff --git a/src/api.rs b/src/api.rs index 92fc8eb..26a4648 100644 --- a/src/api.rs +++ b/src/api.rs @@ -36,7 +36,7 @@ impl SwClient { let entity: String = entity.into(); let start_instant = Instant::now(); println!( - "sync {:?} {} with payload size {}", + "sync {:?} '{}' with payload size {}", action, &entity, payload.len() diff --git a/src/data/export.rs b/src/data/export.rs index af6a445..6874e8d 100644 --- a/src/data/export.rs +++ b/src/data/export.rs @@ -11,7 +11,7 @@ pub async fn export(context: Arc) -> anyhow::Result<()> { let mut page = 1; let mut counter = 0; println!( - "Reading {} of type {} with chunk limit {}", + "Reading {} of entity '{}' with chunk limit {}", total, context.schema.entity, chunk_limit ); diff --git a/src/data/import.rs b/src/data/import.rs index d2fcab3..83671c0 100644 --- a/src/data/import.rs +++ b/src/data/import.rs @@ -42,7 +42,7 @@ pub async fn import(context: Arc) -> anyhow::Result<()> { let row_index = row_indices.remove(entry); let row = chunk.remove(entry); println!( - "server validation error on row {}: {} Remaining pointer {} ignored payload:\n{}", + "server validation error on row {}: {} Remaining pointer '{}' ignored payload:\n{}", row_index + 2, err.detail, remaining_pointer, @@ -63,88 +63,3 @@ pub async fn import(context: Arc) -> anyhow::Result<()> { Ok(()) } - -// old implementation below (was in main) -/* -let start_instant = Instant::now(); -let payload_size = std::env::args().nth(1).map_or(200usize, |s| { - s.parse() - .expect("invalid argument, provide a number for payload_size") -}); -let credentials = tokio::fs::read_to_string("./.credentials.toml") - .await - .context("can't read ./.credentials.toml")?; -let credentials: Credentials = toml::from_str(&credentials)?; -let currency_id = "b7d2554b0ce847cd82f3ac9bd1c0dfca"; - -let sw_client = SwClient::new(credentials).await?; -let entity_schema = sw_client.entity_schema().await?; - -// todo move blocking call to separate thread -let mut csv_reader = csv::ReaderBuilder::new() - .delimiter(b';') - .from_path("./data/10kProducts.csv")?; -let headers = csv_reader.headers()?.clone(); -println!("CSV headers: {:?}", headers); - -let iter = csv_reader.records().map(|r| { - let result = r.unwrap(); - - let sync_product = json!({ - "id": result[0], - "taxId": result[5], - "price": [ - { - "currencyId": currency_id, - "net": result[1].parse::().unwrap(), - "gross": result[2].parse::().unwrap(), - "linked": false, - } - ], - "name": result[6], - "productNumber": result[3], - "stock": result[4].parse::().unwrap(), - }); - - sync_product -}); - -let mut join_handles = vec![]; -for sync_values in &iter.enumerate().chunks(payload_size) { - let (mut row_indices, mut chunk): (Vec, Vec) = - sync_values.unzip(); - let sw_client = sw_client.clone(); - join_handles.push(tokio::spawn(async move { - match sw_client.sync("product", SyncAction::Upsert, &chunk).await { - Ok(()) => Ok(()), - Err(SwApiError::Server(_, body)) => { - for err in body.errors.iter().rev() { - const PREFIX: &str = "/write_data/"; - let (entry_str , remaining_pointer)= &err.source.pointer[PREFIX.len()..].split_once('/').expect("error pointer"); - let entry: usize = entry_str.parse().expect("error pointer should contain usize"); - - let row_index = row_indices.remove(entry); - let row = chunk.remove(entry); - println!( - "server validation error on row {}: {} Remaining pointer {} ignored payload:\n{}", - row_index + 2, - err.detail, - remaining_pointer, - serde_json::to_string_pretty(&row)?, - ); - } - // retry - sw_client.sync("product", SyncAction::Upsert, &chunk).await - }, - Err(e) => Err(e), - } - })); -} - -for join_handle in join_handles { - join_handle.await??; -} - -println!("Finished after {} ms", start_instant.elapsed().as_millis()); -Ok(()) - */ diff --git a/src/data/transform.rs b/src/data/transform.rs index 7a8a0eb..def1c04 100644 --- a/src/data/transform.rs +++ b/src/data/transform.rs @@ -29,7 +29,7 @@ pub fn deserialize_row( .iter() .position(|h| h == mapping.file_column) .context(format!( - "Can't find column {} in CSV headers", + "Can't find column '{}' in CSV headers", mapping.file_column ))?; @@ -67,7 +67,7 @@ pub fn deserialize_row( .iter() .position(|header| header == path_mapping.file_column) .context(format!( - "Can't find column {} in CSV headers", + "Can't find column '{}' in CSV headers", path_mapping.file_column ))?; @@ -130,7 +130,7 @@ pub fn serialize_entity( let value = match path_mapping.entity_path.as_ref() { "id" => &serde_json::Value::String(entity.id.to_string()), path => entity.attributes.get(path).context(format!( - "could not get field path {} specified in mapping, entity attributes:\n{}", + "could not get field path '{}' specified in mapping, entity attributes:\n{}", path, serde_json::to_string_pretty(&entity.attributes).unwrap() ))?, @@ -147,7 +147,7 @@ pub fn serialize_entity( let value = script_row .get(script_mapping.key.as_str()) .context(format!( - "failed to retrieve script key {} of row", + "failed to retrieve script key '{}' of row", script_mapping.key ))?; let value_str = serde_json::to_string(value)?; diff --git a/src/main.rs b/src/main.rs index b18b79e..43836f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use crate::api::SwClient; -use crate::config::{Credentials, Schema}; +use crate::config::{Credentials, Mapping, Schema}; use crate::data::{export, import, prepare_scripting_environment, ScriptingEnvironment}; -use anyhow::Context; +use anyhow::{anyhow, Context}; use clap::{ArgAction, Parser, Subcommand}; use std::path::PathBuf; use std::sync::Arc; @@ -44,7 +44,7 @@ enum Commands { /// Path to input data file #[arg(short, long)] - input: PathBuf, + file: PathBuf, /// Maximum amount of entities, can be used for debugging #[arg(short, long)] @@ -62,7 +62,7 @@ enum Commands { /// Path to output file #[arg(short, long)] - output: PathBuf, + file: PathBuf, /// Maximum amount of entities, can be used for debugging #[arg(short, long)] @@ -97,21 +97,21 @@ async fn main() -> anyhow::Result<()> { } Commands::Import { schema, - input, + file, limit, verbose, } => { - let context = create_context(schema, input, limit, verbose).await?; + let context = create_context(schema, file, limit, verbose).await?; import(Arc::new(context)).await?; println!("Imported successfully"); } Commands::Export { schema, - output, + file, limit, verbose, } => { - let context = create_context(schema, output, limit, verbose).await?; + let context = create_context(schema, file, limit, verbose).await?; export(Arc::new(context)).await?; println!("Exported successfully"); } @@ -159,7 +159,15 @@ async fn create_context( .await .context("No provided schema file not found")?; let schema: Schema = serde_yaml::from_str(&serialized_schema)?; - // ToDo: schema verification + for mapping in &schema.mappings { + if let Mapping::ByPath(by_path) = mapping { + if by_path.entity_path.contains('.') || by_path.entity_path.contains('/') { + return Err(anyhow!("entity_path currently only supports fields of the entity and no associations, but found '{}'", by_path.entity_path)); + } + } + } + + // ToDo: further schema verification // ToDo: create lookup table for languages + currencies?