Skip to content
This repository has been archived by the owner on Jul 15, 2024. It is now read-only.

Commit

Permalink
Little more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
MalteJanz committed Jun 22, 2024
1 parent 79c6648 commit 88d68cb
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 102 deletions.
6 changes: 5 additions & 1 deletion profiles/manufacturer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@ entity: product_manufacturer
mappings:
- file_column: "identifier"
entity_path: "id"
- file_column: "name (default language)"
- file_column: "default name"
entity_path: "name"
- file_column: "default description"
entity_path: "description"
- file_column: "website"
entity_path: "link"
32 changes: 32 additions & 0 deletions profiles/product_required.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Profile that contains all required fields for a product
entity: product
mappings:
- file_column: "id"
entity_path: "id"
- file_column: "product number"
entity_path: "productNumber"
- file_column: "default name"
entity_path: "name"
- file_column: "default price net"
key: "default_price_net"
- file_column: "default price gross"
key: "default_price_gross"
- file_column: "stock"
entity_path: "stock"
- file_column: "tax id"
entity_path: "taxId"
serialize_script: |
// ToDo: add convenience function to lookup default currencyId
let price = entity.price.find(|p| p.currencyId == "b7d2554b0ce847cd82f3ac9bd1c0dfca");
row.default_price_net = price.net;
row.default_price_gross = price.gross;
deserialize_script: |
entity.price = [
#{
net: row.default_price_net,
gross: row.default_price_gross,
linked: true,
currencyId: "b7d2554b0ce847cd82f3ac9bd1c0dfca",
}
];
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SwClient {
let entity: String = entity.into();
let start_instant = Instant::now();
println!(
"sync {:?} {} with payload size {}",
"sync {:?} '{}' with payload size {}",
action,
&entity,
payload.len()
Expand Down
2 changes: 1 addition & 1 deletion src/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub async fn export(context: Arc<SyncContext>) -> anyhow::Result<()> {
let mut page = 1;
let mut counter = 0;
println!(
"Reading {} of type {} with chunk limit {}",
"Reading {} of entity '{}' with chunk limit {}",
total, context.schema.entity, chunk_limit
);

Expand Down
87 changes: 1 addition & 86 deletions src/data/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {
let row_index = row_indices.remove(entry);
let row = chunk.remove(entry);
println!(
"server validation error on row {}: {} Remaining pointer {} ignored payload:\n{}",
"server validation error on row {}: {} Remaining pointer '{}' ignored payload:\n{}",
row_index + 2,
err.detail,
remaining_pointer,
Expand All @@ -63,88 +63,3 @@ pub async fn import(context: Arc<SyncContext>) -> anyhow::Result<()> {

Ok(())
}

// old implementation below (was in main)
/*
let start_instant = Instant::now();
let payload_size = std::env::args().nth(1).map_or(200usize, |s| {
s.parse()
.expect("invalid argument, provide a number for payload_size")
});
let credentials = tokio::fs::read_to_string("./.credentials.toml")
.await
.context("can't read ./.credentials.toml")?;
let credentials: Credentials = toml::from_str(&credentials)?;
let currency_id = "b7d2554b0ce847cd82f3ac9bd1c0dfca";
let sw_client = SwClient::new(credentials).await?;
let entity_schema = sw_client.entity_schema().await?;
// todo move blocking call to separate thread
let mut csv_reader = csv::ReaderBuilder::new()
.delimiter(b';')
.from_path("./data/10kProducts.csv")?;
let headers = csv_reader.headers()?.clone();
println!("CSV headers: {:?}", headers);
let iter = csv_reader.records().map(|r| {
let result = r.unwrap();
let sync_product = json!({
"id": result[0],
"taxId": result[5],
"price": [
{
"currencyId": currency_id,
"net": result[1].parse::<f64>().unwrap(),
"gross": result[2].parse::<f64>().unwrap(),
"linked": false,
}
],
"name": result[6],
"productNumber": result[3],
"stock": result[4].parse::<i32>().unwrap(),
});
sync_product
});
let mut join_handles = vec![];
for sync_values in &iter.enumerate().chunks(payload_size) {
let (mut row_indices, mut chunk): (Vec<usize>, Vec<serde_json::Value>) =
sync_values.unzip();
let sw_client = sw_client.clone();
join_handles.push(tokio::spawn(async move {
match sw_client.sync("product", SyncAction::Upsert, &chunk).await {
Ok(()) => Ok(()),
Err(SwApiError::Server(_, body)) => {
for err in body.errors.iter().rev() {
const PREFIX: &str = "/write_data/";
let (entry_str , remaining_pointer)= &err.source.pointer[PREFIX.len()..].split_once('/').expect("error pointer");
let entry: usize = entry_str.parse().expect("error pointer should contain usize");
let row_index = row_indices.remove(entry);
let row = chunk.remove(entry);
println!(
"server validation error on row {}: {} Remaining pointer {} ignored payload:\n{}",
row_index + 2,
err.detail,
remaining_pointer,
serde_json::to_string_pretty(&row)?,
);
}
// retry
sw_client.sync("product", SyncAction::Upsert, &chunk).await
},
Err(e) => Err(e),
}
}));
}
for join_handle in join_handles {
join_handle.await??;
}
println!("Finished after {} ms", start_instant.elapsed().as_millis());
Ok(())
*/
8 changes: 4 additions & 4 deletions src/data/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn deserialize_row(
.iter()
.position(|h| h == mapping.file_column)
.context(format!(
"Can't find column {} in CSV headers",
"Can't find column '{}' in CSV headers",
mapping.file_column
))?;

Expand Down Expand Up @@ -67,7 +67,7 @@ pub fn deserialize_row(
.iter()
.position(|header| header == path_mapping.file_column)
.context(format!(
"Can't find column {} in CSV headers",
"Can't find column '{}' in CSV headers",
path_mapping.file_column
))?;

Expand Down Expand Up @@ -130,7 +130,7 @@ pub fn serialize_entity(
let value = match path_mapping.entity_path.as_ref() {
"id" => &serde_json::Value::String(entity.id.to_string()),
path => entity.attributes.get(path).context(format!(
"could not get field path {} specified in mapping, entity attributes:\n{}",
"could not get field path '{}' specified in mapping, entity attributes:\n{}",
path,
serde_json::to_string_pretty(&entity.attributes).unwrap()
))?,
Expand All @@ -147,7 +147,7 @@ pub fn serialize_entity(
let value = script_row
.get(script_mapping.key.as_str())
.context(format!(
"failed to retrieve script key {} of row",
"failed to retrieve script key '{}' of row",
script_mapping.key
))?;
let value_str = serde_json::to_string(value)?;
Expand Down
26 changes: 17 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::api::SwClient;
use crate::config::{Credentials, Schema};
use crate::config::{Credentials, Mapping, Schema};
use crate::data::{export, import, prepare_scripting_environment, ScriptingEnvironment};
use anyhow::Context;
use anyhow::{anyhow, Context};
use clap::{ArgAction, Parser, Subcommand};
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -44,7 +44,7 @@ enum Commands {

/// Path to input data file
#[arg(short, long)]
input: PathBuf,
file: PathBuf,

/// Maximum amount of entities, can be used for debugging
#[arg(short, long)]
Expand All @@ -62,7 +62,7 @@ enum Commands {

/// Path to output file
#[arg(short, long)]
output: PathBuf,
file: PathBuf,

/// Maximum amount of entities, can be used for debugging
#[arg(short, long)]
Expand Down Expand Up @@ -97,21 +97,21 @@ async fn main() -> anyhow::Result<()> {
}
Commands::Import {
schema,
input,
file,
limit,
verbose,
} => {
let context = create_context(schema, input, limit, verbose).await?;
let context = create_context(schema, file, limit, verbose).await?;
import(Arc::new(context)).await?;
println!("Imported successfully");
}
Commands::Export {
schema,
output,
file,
limit,
verbose,
} => {
let context = create_context(schema, output, limit, verbose).await?;
let context = create_context(schema, file, limit, verbose).await?;
export(Arc::new(context)).await?;
println!("Exported successfully");
}
Expand Down Expand Up @@ -159,7 +159,15 @@ async fn create_context(
.await
.context("No provided schema file not found")?;
let schema: Schema = serde_yaml::from_str(&serialized_schema)?;
// ToDo: schema verification
for mapping in &schema.mappings {
if let Mapping::ByPath(by_path) = mapping {
if by_path.entity_path.contains('.') || by_path.entity_path.contains('/') {
return Err(anyhow!("entity_path currently only supports fields of the entity and no associations, but found '{}'", by_path.entity_path));
}
}
}

// ToDo: further schema verification

// ToDo: create lookup table for languages + currencies?

Expand Down

0 comments on commit 88d68cb

Please sign in to comment.