Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-680 # Add the DB SQL Executor to import data from CSV and execute SQL statements #686

Merged
merged 9 commits into from
Dec 22, 2024
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,8 @@ jobs:
- name: Running Kafka
run: docker-compose -f docker/compose/kafka-schema-registry.yml up -d && sleep 10

- name: Running PostgreSQL (to test DB SQL Executor)
run: docker-compose -f docker/compose/pg_compose.yml up -d

- name: Building and testing the changes
run: mvn clean test
9 changes: 9 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,20 @@
<artifactId>micro-simulator</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<!--<scope>test</scope>--> <!-- Make it available to dependant projects. Hence commented -->
</dependency>
<dependency>
<groupId>com.aventstack</groupId>
<artifactId>extentreports</artifactId>
Expand Down
135 changes: 135 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbCsvLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package org.jsmart.zerocode.core.db;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.univocity.parsers.csv.CsvParser;

/**
* Data loading in the database from a CSV external source
*/
class DbCsvLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DbCsvLoader.class);
private Connection conn;
private CsvParser csvParser;

public DbCsvLoader(Connection conn, CsvParser csvParser) {
this.conn = conn;
this.csvParser = csvParser;
}

/**
* Loads rows in CSV format (csvLines) into a table in the database
* and returns the total number of rows.
*/
public int loadCsv(String table, List<String> csvLines, boolean withHeaders, String nullString) throws SQLException {
if (csvLines == null || csvLines.isEmpty())
return 0;

List<String[]> lines = parseLines(table, csvLines);

String[] headers = buildHeaders(lines.get(0), withHeaders);
List<Object[]> paramset = buildParameters(table, headers, lines, withHeaders, nullString);
if (paramset.isEmpty()) // can have headers, but no rows
return 0;

String sql = buildSql(table, headers, paramset.get(0).length);
LOGGER.info("Loading CSV using this sql: {}", sql);

QueryRunner runner = new QueryRunner();
int insertCount = 0;
for (int i = 0 ; i < paramset.size(); i++) {
insertRow(runner, i, sql, paramset.get(i));
insertCount++;
}
LOGGER.info("Total of rows inserted: {}", insertCount);
return insertCount;
}

private List<String[]> parseLines(String table, List<String> lines) {
int numCol = 0; // will check that every row has same columns than the first
List<String[]> parsedLines = new ArrayList<>();
for (int i = 0; i<lines.size(); i++) {
String[] parsedLine = csvParser.parseLine(lines.get(i));
parsedLines.add(parsedLine);
if (i == 0) {
numCol=parsedLine.length;
} else if (numCol != parsedLine.length) {
String message = String.format("Error parsing CSV content to load into table %s: "
+ "Row %d has %d columns and should have %d", table, i + 1, parsedLine.length, numCol);
LOGGER.error(message);
throw new RuntimeException(message);
}
}
return parsedLines;
}

private String[] buildHeaders(String[] line, boolean withHeaders) {
return withHeaders ? line : new String[] {};
}

private List<Object[]> buildParameters(String table, String[] headers, List<String[]> lines, boolean withHeaders, String nullString) {
DbValueConverter converter = new DbValueConverter(conn, table);
List<Object[]> paramset = new ArrayList<>();
for (int i = withHeaders ? 1 : 0; i < lines.size(); i++) {
String[] parsedLine = lines.get(i);
parsedLine = processNulls(parsedLine, nullString);
Object[] params;
try {
params = converter.convertColumnValues(headers, parsedLine);
LOGGER.info(" row [{}] params: {}", i + 1, Arrays.asList(params).toString());
} catch (Exception e) { // Not only SQLException as converter also does parsing
String message = String.format("Error matching data type of parameters and table columns at CSV row %d", i + 1);
LOGGER.error(message);
LOGGER.error("Exception message: {}", e.getMessage());
throw new RuntimeException(message, e);
}
paramset.add(params);
}
return paramset;
}

private String[] processNulls(String[] line, String nullString) {
for (int i = 0; i < line.length; i++) {
if (StringUtils.isBlank(nullString) && StringUtils.isBlank(line[i])) {
line[i] = null;
} else if (!StringUtils.isBlank(nullString)) {
if (StringUtils.isBlank(line[i])) // null must be empty string
line[i] = "";
else if (nullString.trim().equalsIgnoreCase(line[i].trim()))
line[i] = null;
}
}
return line;
}

private String buildSql(String table, String[] headers, int columnCount) {
String placeholders = IntStream.range(0, columnCount)
.mapToObj(i -> "?").collect(Collectors.joining(","));
return "INSERT INTO " + table
+ (headers.length > 0 ? " (" + String.join(",", headers) + ")" : "")
+ " VALUES (" + placeholders + ");";
}

private void insertRow(QueryRunner runner, int rowId, String sql, Object[] params) {
try {
runner.update(conn, sql, params);
} catch (SQLException e) {
String message = String.format("Error inserting data at CSV row %d", rowId + 1);
LOGGER.error(message);
LOGGER.error("Exception message: {}", e.getMessage());
throw new RuntimeException(message, e);
}
}

}
101 changes: 101 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbCsvRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.jsmart.zerocode.core.db;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class DbCsvRequest {
private final String tableName;
private final List<String> csvSource;
private final Boolean withHeaders;
private final String nullString;

public DbCsvRequest(
@JsonProperty(value="tableName", required=true) String tableName,
@JsonProperty("csvSource") JsonNode csvSourceJsonNode,
@JsonProperty("withHeaders") Boolean withHeaders,
@JsonProperty("nullString") String nullString) {
this.tableName = tableName;
this.withHeaders = Optional.ofNullable(withHeaders).orElse(false);
this.nullString = Optional.ofNullable(nullString).orElse("");
this.csvSource = Optional.ofNullable(csvSourceJsonNode).map(this::getCsvSourceFrom).orElse(Collections.emptyList());
}

public String getTableName() {
return tableName;
}

public List<String> getCsvSource() {
return csvSource;
}

public boolean getWithHeaders() {
return withHeaders;
}

public String getNullString() {
return nullString;
}

// Code below is duplicated from org.jsmart.zerocode.core.domain.Parametrized.java and not included in tests.
// TODO Consider some refactoring later and review error message when file not found

private List<String> getCsvSourceFrom(JsonNode csvSourceJsonNode) {
try {
if (csvSourceJsonNode.isArray()) {
return readCsvSourceFromJson(csvSourceJsonNode);

} else {
return readCsvSourceFromExternalCsvFile(csvSourceJsonNode);
}
} catch (IOException e) {
throw new RuntimeException("Error deserializing csvSource", e);
}
}

private List<String> readCsvSourceFromJson(JsonNode csvSourceJsonNode) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {
});
return reader.readValue(csvSourceJsonNode);
}

private List<String> readCsvSourceFromExternalCsvFile(JsonNode csvSourceJsonNode) throws IOException {
String csvSourceFilePath = csvSourceJsonNode.textValue();
if (StringUtils.isNotBlank(csvSourceFilePath)) {
Path path = Paths.get("./src/test/resources/",csvSourceFilePath);
List<String> csvSourceFileLines = Files.lines(path)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
//if (this.ignoreHeader) {
// return csvSourceFileLines.stream()
// .skip(1)
// .collect(Collectors.toList());
//}
return csvSourceFileLines;
}
return Collections.emptyList();
}

@Override
public String toString() {
return "Parameterized{" +
"tableName=" + tableName +
", csvSource=" + csvSource +
", withHeaders=" + withHeaders +
", nullString=" + nullString +
'}';
}
}
119 changes: 119 additions & 0 deletions core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.jsmart.zerocode.core.db;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.univocity.parsers.csv.CsvParser;

import org.apache.commons.dbutils.DbUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Interaction with a database using SQL to read/write
* Requires the appropriated connection data in the target environment
* properties, see src/test/resources/db_test.properties
*/
public class DbSqlExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlExecutor.class);
public static final String SQL_RESULTS_KEY = "rows";
public static final String CSV_RESULTS_KEY = "size";

// Optional to log the explanatory error message if the env variables are no defined
@Inject(optional = true)
@Named("db.driver.url") private String url;

@Inject(optional = true)
@Named("db.driver.user") private String user;

@Inject(optional = true)
@Named("db.driver.password") private String password;

@Inject
private CsvParser csvParser;

/**
* The LOADCSV operation inserts the content of a CSV file into a table,
* and returns the number of records inserted under the key "size"
*/
public Map<String, Object> LOADCSV(DbCsvRequest request) { // uppercase for consistency with http api operations
return loadcsv(request);
}

public Map<String, Object> loadcsv(DbCsvRequest request) {
Connection conn = createAndGetConnection();
try {
LOGGER.info("Load CSV, request -> {} ", request);
DbCsvLoader runner = new DbCsvLoader(conn, csvParser);
long result = runner.loadCsv(request.getTableName(), request.getCsvSource(),
request.getWithHeaders(), request.getNullString());
Map<String, Object> response = new HashMap<>();
response.put(CSV_RESULTS_KEY, result);
return response;
} catch (Exception e) {
String message = "Failed to load CSV";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
} finally {
closeConnection(conn);
}
}

/**
* The EXECUTE operation returns the records retrieved by the SQL specified in the request
* under the key "rows" (select), or an empty object (insert, update)
*/
public Map<String, Object> EXECUTE(DbSqlRequest request) {
return execute(request);
}

public Map<String, Object> execute(DbSqlRequest request) {
Connection conn = createAndGetConnection();
try {
LOGGER.info("Execute SQL, request -> {} ", request);
DbSqlRunner runner = new DbSqlRunner(conn);
List<Map<String, Object>> results = runner.execute(request.getSql(), request.getSqlParams());
Map<String, Object> response = new HashMap<>();
if (results == null) { // will return empty node, use "verify":{}
response.put(SQL_RESULTS_KEY, new ObjectMapper().createObjectNode());
} else {
response.put(SQL_RESULTS_KEY, results);
}
return response;
} catch (SQLException e) {
String message = "Failed to execute SQL";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
} finally {
closeConnection(conn);
}
}

/**
* Returns a new JDBC connection using DriverManager.
* Override this method in case you get the connections using another approach
* (e.g. DataSource)
*/
protected Connection createAndGetConnection() {
LOGGER.info("Create and get connection, url: {}, user: {}", url, user);
try {
return DriverManager.getConnection(url, user, password);
} catch (SQLException e) {
String message = "Failed to create connection, Please check the target environment properties "
+ "to connect the database (db.driver.url, db.driver.user and db.driver.password)";
LOGGER.error(message, e);
throw new RuntimeException(message, e);
}
}

protected void closeConnection(Connection conn) {
DbUtils.closeQuietly(conn);
}

}
Loading
Loading