diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index 5289f034..e8e82f35 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -28,5 +28,8 @@ jobs:
       - name: Running Kafka
         run: docker-compose -f docker/compose/kafka-schema-registry.yml up -d && sleep 10
 
+      - name: Running PostgreSQL (to test DB SQL Executor)
+        run: docker-compose -f docker/compose/pg_compose.yml up -d
+
       - name: Building and testing the changes
         run: mvn clean test
diff --git a/core/pom.xml b/core/pom.xml
index f59e00aa..7fb25c0b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -175,11 +175,20 @@
             <artifactId>micro-simulator</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-dbutils</groupId>
+            <artifactId>commons-dbutils</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
             <scope>test</scope>
         </dependency>
+		<dependency> 
+		    <groupId>org.postgresql</groupId>
+		    <artifactId>postgresql</artifactId>
+            <!--<scope>test</scope>--> <!-- Make it available to dependant projects. Hence commented -->
+		</dependency>
         <dependency>
             <groupId>com.aventstack</groupId>
             <artifactId>extentreports</artifactId>
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvLoader.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvLoader.java
new file mode 100644
index 00000000..877f4469
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvLoader.java
@@ -0,0 +1,135 @@
+package org.jsmart.zerocode.core.db;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.univocity.parsers.csv.CsvParser;
+
+/**
+ * Data loading in the database from a CSV external source
+ */
+class DbCsvLoader {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DbCsvLoader.class);
+	private Connection conn;
+	private CsvParser csvParser;
+
+	public DbCsvLoader(Connection conn, CsvParser csvParser) {
+		this.conn = conn;
+		this.csvParser = csvParser;
+	}
+
+	/**
+	 * Loads rows in CSV format (csvLines) into a table in the database
+	 * and returns the total number of rows.
+	 */
+	public int loadCsv(String table, List<String> csvLines, boolean withHeaders, String nullString) throws SQLException {
+		if (csvLines == null || csvLines.isEmpty())
+			return 0;
+		
+		List<String[]> lines = parseLines(table, csvLines);
+
+		String[] headers = buildHeaders(lines.get(0), withHeaders);
+		List<Object[]> paramset = buildParameters(table, headers, lines, withHeaders, nullString);
+		if (paramset.isEmpty()) // can have headers, but no rows
+			return 0;
+
+		String sql = buildSql(table, headers, paramset.get(0).length);
+		LOGGER.info("Loading CSV using this sql: {}", sql);
+
+		QueryRunner runner = new QueryRunner();
+		int insertCount = 0;
+		for (int i = 0 ; i < paramset.size(); i++) {
+			insertRow(runner, i, sql, paramset.get(i));
+			insertCount++;
+		}
+		LOGGER.info("Total of rows inserted: {}", insertCount);
+		return insertCount;
+	}
+	
+	private List<String[]> parseLines(String table, List<String> lines) {
+		int numCol = 0; // will check that every row has same columns than the first
+		List<String[]> parsedLines = new ArrayList<>();
+		for (int i = 0; i<lines.size(); i++) {
+			String[] parsedLine = csvParser.parseLine(lines.get(i));
+			parsedLines.add(parsedLine);
+			if (i == 0) {
+				numCol=parsedLine.length;
+			} else if (numCol != parsedLine.length) {
+				String message = String.format("Error parsing CSV content to load into table %s: "
+						+ "Row %d has %d columns and should have %d", table, i + 1, parsedLine.length, numCol);
+				LOGGER.error(message);
+				throw new RuntimeException(message);
+			}
+		}
+		return parsedLines;
+	}
+
+	private String[] buildHeaders(String[] line, boolean withHeaders) {
+		return withHeaders ? line : new String[] {};
+	}
+
+	private List<Object[]> buildParameters(String table, String[] headers, List<String[]> lines, boolean withHeaders, String nullString) {
+		DbValueConverter converter = new DbValueConverter(conn, table);
+		List<Object[]> paramset = new ArrayList<>();
+		for (int i = withHeaders ? 1 : 0; i < lines.size(); i++) {
+			String[] parsedLine = lines.get(i);
+			parsedLine = processNulls(parsedLine, nullString);
+			Object[] params;
+			try {
+				params = converter.convertColumnValues(headers, parsedLine);
+				LOGGER.info("    row [{}] params: {}", i + 1, Arrays.asList(params).toString());
+			} catch (Exception e) { // Not only SQLException as converter also does parsing
+				String message = String.format("Error matching data type of parameters and table columns at CSV row %d", i + 1);
+				LOGGER.error(message);
+				LOGGER.error("Exception message: {}", e.getMessage());
+				throw new RuntimeException(message, e);
+			}
+			paramset.add(params);
+		}
+		return paramset;
+	}
+
+	private String[] processNulls(String[] line, String nullString) {
+		for (int i = 0; i < line.length; i++) {
+			if (StringUtils.isBlank(nullString) && StringUtils.isBlank(line[i])) {
+				line[i] = null;
+			} else if (!StringUtils.isBlank(nullString)) {
+				if (StringUtils.isBlank(line[i])) // null must be empty string
+					line[i] = "";
+				else if (nullString.trim().equalsIgnoreCase(line[i].trim()))
+					line[i] = null;
+			}
+		}
+		return line;
+	}
+
+	private String buildSql(String table, String[] headers, int columnCount) {
+		String placeholders = IntStream.range(0, columnCount)
+				.mapToObj(i -> "?").collect(Collectors.joining(","));
+		return "INSERT INTO " + table 
+				+ (headers.length > 0 ? " (" + String.join(",", headers) + ")" : "") 
+				+ " VALUES (" + placeholders + ");";
+	}
+	
+	private void insertRow(QueryRunner runner, int rowId, String sql, Object[] params) {
+		try {
+			runner.update(conn, sql, params);
+		} catch (SQLException e) {
+			String message = String.format("Error inserting data at CSV row %d", rowId + 1);
+			LOGGER.error(message);
+			LOGGER.error("Exception message: {}", e.getMessage());
+			throw new RuntimeException(message, e);
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvRequest.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvRequest.java
new file mode 100644
index 00000000..cebffe1c
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbCsvRequest.java
@@ -0,0 +1,101 @@
+package org.jsmart.zerocode.core.db;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DbCsvRequest {
+    private final String tableName;
+    private final List<String> csvSource;
+    private final Boolean withHeaders;
+    private final String nullString;
+
+    public DbCsvRequest(
+            @JsonProperty(value="tableName", required=true) String tableName,
+            @JsonProperty("csvSource") JsonNode csvSourceJsonNode,
+            @JsonProperty("withHeaders") Boolean withHeaders,
+            @JsonProperty("nullString") String nullString) {
+        this.tableName = tableName;
+        this.withHeaders = Optional.ofNullable(withHeaders).orElse(false);
+        this.nullString = Optional.ofNullable(nullString).orElse("");
+        this.csvSource = Optional.ofNullable(csvSourceJsonNode).map(this::getCsvSourceFrom).orElse(Collections.emptyList());
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public List<String> getCsvSource() {
+        return csvSource;
+    }
+
+    public boolean getWithHeaders() {
+        return withHeaders;
+    }
+
+    public String getNullString() {
+        return nullString;
+    }
+
+    // Code below is duplicated from org.jsmart.zerocode.core.domain.Parametrized.java and not included in tests.
+    // TODO Consider some refactoring later and review error message when file not found
+    
+    private List<String> getCsvSourceFrom(JsonNode csvSourceJsonNode) {
+        try {
+            if (csvSourceJsonNode.isArray()) {
+                return readCsvSourceFromJson(csvSourceJsonNode);
+
+            } else {
+                return readCsvSourceFromExternalCsvFile(csvSourceJsonNode);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error deserializing csvSource", e);
+        }
+    }
+
+    private List<String> readCsvSourceFromJson(JsonNode csvSourceJsonNode) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectReader reader = mapper.readerFor(new TypeReference<List<String>>() {
+        });
+        return reader.readValue(csvSourceJsonNode);
+    }
+
+    private List<String> readCsvSourceFromExternalCsvFile(JsonNode csvSourceJsonNode) throws IOException {
+        String csvSourceFilePath = csvSourceJsonNode.textValue();
+        if (StringUtils.isNotBlank(csvSourceFilePath)) {
+            Path path = Paths.get("./src/test/resources/",csvSourceFilePath);
+            List<String> csvSourceFileLines = Files.lines(path)
+                    .filter(StringUtils::isNotBlank)
+                    .collect(Collectors.toList());
+            //if (this.ignoreHeader) {
+            //    return csvSourceFileLines.stream()
+            //            .skip(1)
+            //            .collect(Collectors.toList());
+            //}
+            return csvSourceFileLines;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String toString() {
+        return "Parameterized{" +
+                "tableName=" + tableName +
+                ", csvSource=" + csvSource +
+                ", withHeaders=" + withHeaders +
+                ", nullString=" + nullString +
+                '}';
+    }
+}
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java
new file mode 100644
index 00000000..22392d24
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlExecutor.java
@@ -0,0 +1,119 @@
+package org.jsmart.zerocode.core.db;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+import com.univocity.parsers.csv.CsvParser;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interaction with a database using SQL to read/write
+ * Requires the appropriated connection data in the target environment
+ * properties, see src/test/resources/db_test.properties
+ */
+public class DbSqlExecutor {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlExecutor.class);
+	public static final String SQL_RESULTS_KEY = "rows";
+	public static final String CSV_RESULTS_KEY = "size";
+	
+	// Optional to log the explanatory error message if the env variables are no defined
+	@Inject(optional = true)
+	@Named("db.driver.url") private String url;
+
+	@Inject(optional = true)
+	@Named("db.driver.user") private String user;
+	
+	@Inject(optional = true)
+	@Named("db.driver.password") private String password;
+	
+    @Inject
+    private CsvParser csvParser;
+	
+	/**
+	 * The LOADCSV operation inserts the content of a CSV file into a table,
+	 * and returns the number of records inserted under the key "size"
+	 */
+	public Map<String, Object> LOADCSV(DbCsvRequest request) { // uppercase for consistency with http api operations
+		return loadcsv(request);
+	}
+
+	public Map<String, Object> loadcsv(DbCsvRequest request) {
+		Connection conn = createAndGetConnection();
+		try {
+			LOGGER.info("Load CSV, request -> {} ", request);
+			DbCsvLoader runner = new DbCsvLoader(conn, csvParser);
+			long result = runner.loadCsv(request.getTableName(), request.getCsvSource(), 
+					request.getWithHeaders(), request.getNullString());
+			Map<String, Object> response = new HashMap<>();
+			response.put(CSV_RESULTS_KEY, result);
+			return response;
+		} catch (Exception e) {
+			String message = "Failed to load CSV";
+			LOGGER.error(message, e);
+			throw new RuntimeException(message, e);
+		} finally {
+			closeConnection(conn);
+		}
+	}
+
+	/**
+	 * The EXECUTE operation returns the records retrieved by the SQL specified in the request 
+	 * under the key "rows" (select), or an empty object (insert, update)
+	 */
+	public Map<String, Object> EXECUTE(DbSqlRequest request) {
+		return execute(request);
+	}
+	
+	public Map<String, Object> execute(DbSqlRequest request) {
+		Connection conn = createAndGetConnection();
+		try {
+			LOGGER.info("Execute SQL, request -> {} ", request);
+			DbSqlRunner runner = new DbSqlRunner(conn);
+			List<Map<String, Object>> results = runner.execute(request.getSql(), request.getSqlParams());
+			Map<String, Object> response = new HashMap<>();
+			if (results == null) { // will return empty node, use "verify":{}
+				response.put(SQL_RESULTS_KEY, new ObjectMapper().createObjectNode());
+			} else {
+				response.put(SQL_RESULTS_KEY, results);
+			}
+			return response;
+		} catch (SQLException e) {
+			String message = "Failed to execute SQL";
+			LOGGER.error(message, e);
+			throw new RuntimeException(message, e);
+		} finally {
+			closeConnection(conn);
+		}
+	}
+
+	/**
+	 * Returns a new JDBC connection using DriverManager.
+	 * Override this method in case you get the connections using another approach
+	 * (e.g. DataSource)
+	 */
+	protected Connection createAndGetConnection() {
+		LOGGER.info("Create and get connection, url: {}, user: {}", url, user);
+		try {
+			return DriverManager.getConnection(url, user, password);
+		} catch (SQLException e) {
+			String message = "Failed to create connection, Please check the target environment properties "
+					+ "to connect the database (db.driver.url, db.driver.user and db.driver.password)";
+			LOGGER.error(message, e);
+			throw new RuntimeException(message, e);
+		}
+	}
+
+	protected void closeConnection(Connection conn) {
+		DbUtils.closeQuietly(conn);
+	}
+
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRequest.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRequest.java
new file mode 100644
index 00000000..c89b84c1
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRequest.java
@@ -0,0 +1,37 @@
+package org.jsmart.zerocode.core.db;
+
+import java.util.Arrays;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class DbSqlRequest {
+	private final String sql;
+	private final Object[] sqlParams;
+
+	@JsonCreator
+	public DbSqlRequest(
+			@JsonProperty("sql") String sql,
+			@JsonProperty("sqlParams") Object[] sqlParams) {
+		this.sql = sql;
+		this.sqlParams = sqlParams;
+	}
+
+	public String getSql() {
+		return sql;
+	}
+
+	public Object[] getSqlParams() {
+		return sqlParams;
+	}
+
+	@Override
+	public String toString() {
+		return "Request{" 
+				+ "sql=" + sql 
+				+ ", sqlParams=" + (sqlParams == null ? "[]" : Arrays.asList(sqlParams).toString())
+				+ '}';
+	}
+}
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRunner.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRunner.java
new file mode 100644
index 00000000..81c58c5d
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbSqlRunner.java
@@ -0,0 +1,45 @@
+package org.jsmart.zerocode.core.db;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.apache.commons.dbutils.handlers.MapListHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Execution of SQL statements against a database
+ */
+class DbSqlRunner {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlRunner.class);
+	private Connection conn;
+	
+	public DbSqlRunner(Connection conn) {
+		this.conn = conn;
+	}
+	
+	/**
+	 * Executes a SQL statement with parameters (optional) and returns a list of maps 
+	 * with the ResultSet content (select) or null (insert, update)
+	 */
+	public List<Map<String, Object>> execute(String sql, Object[] params) throws SQLException {
+		// There is only one execute operation instead of separate update and query.
+		// The DbUtils execute method returns a list containing each ResultSet (each is a list of maps):
+		// - Empty (insert and update)
+		// - With one or more ResultSets (select): use the first one
+		// - Note that some drivers never return more than one ResultSet (e.g. H2)
+		QueryRunner runner = new QueryRunner();
+		List<List<Map<String, Object>>> result = runner.execute(conn, sql, new MapListHandler(), params);
+		if (result.isEmpty()) {
+			return null;
+		} else {
+			if (result.size() > 1)
+				LOGGER.warn("The SQL query returned more than one ResultSet, keeping only the first one");
+			return result.get(0);
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/core/src/main/java/org/jsmart/zerocode/core/db/DbValueConverter.java b/core/src/main/java/org/jsmart/zerocode/core/db/DbValueConverter.java
new file mode 100644
index 00000000..36432536
--- /dev/null
+++ b/core/src/main/java/org/jsmart/zerocode/core/db/DbValueConverter.java
@@ -0,0 +1,161 @@
+package org.jsmart.zerocode.core.db;
+
+import static org.apache.commons.lang3.time.DateUtils.parseDate;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.text.ParseException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Conversion of string values to be inserted in the database
+ * into objects compatible with the java sql type of the target columns.
+ */
+public class DbValueConverter {
+	private static final Logger LOGGER = LoggerFactory.getLogger(DbSqlExecutor.class);
+
+	private Connection conn;
+	private String table;
+	private DatabaseMetaData databaseMetaData;
+	public Map<String, Integer> columnTypes; // java.sql.Types
+
+	public DbValueConverter(Connection conn, String table) {
+		this.conn = conn;
+		this.table = table;
+		try {
+			initializeMetadata();
+		} catch (Exception e) {
+			logInitializeError();
+		}
+	}
+
+	private void initializeMetadata() throws SQLException {
+		LOGGER.info("Metadata initialization for table: {}", table);
+		columnTypes = new LinkedHashMap<>(); // must keep column order
+		databaseMetaData = conn.getMetaData();
+
+		table = convertToStoredCase(table); // to locate table name in metadata
+		LOGGER.info("Database storesLowerCaseIdentifiers={}, storesUpperCaseIdentifiers={}",
+				databaseMetaData.storesLowerCaseIdentifiers(), databaseMetaData.storesUpperCaseIdentifiers());
+
+		try (ResultSet rs = databaseMetaData.getColumns(null, null, table, "%")) {
+			while (rs.next()) {
+				String storedName = rs.getString("COLUMN_NAME");
+				int typeValue = rs.getInt("DATA_TYPE");
+				// internally, key is lowercase to allow case insensitive lookups
+				columnTypes.put(storedName.toLowerCase(), typeValue);
+			}
+		}
+		LOGGER.info("Mapping from java columns to sql types: {}", columnTypes.toString());
+		if (columnTypes.isEmpty())
+			logInitializeError();
+	}
+
+	private String convertToStoredCase(String identifier) throws SQLException {
+		if (databaseMetaData.storesLowerCaseIdentifiers()) // e.g. Postgres
+			identifier = identifier.toLowerCase();
+		else if (databaseMetaData.storesUpperCaseIdentifiers()) // e.g. H2
+			identifier = identifier.toUpperCase();
+		return identifier;
+	}
+
+	private void logInitializeError() {
+		LOGGER.error("Initialization of metadata for table {} failed. "
+				+ "Errors may appear when matching query parameters to their data types", table);
+	}
+	
+	/**
+	 * Given an array of column names and other array with their corresponding values (as strings),
+	 * transforms each value to the compatible data type that allow to be inserted in the database.
+	 * If the column names are missing, uses all columns in the current table as the column names.
+	 */
+	Object[] convertColumnValues(String[] columns, String[] values) {
+		if (ArrayUtils.isEmpty(columns)) // if no specified, use all columns in the table
+			columns = columnTypes.keySet().toArray(new String[0]);
+
+		Object[] converted = new Object[values.length];
+		for (int i = 0; i < values.length; i++) {
+			converted[i] = i < columns.length && i < values.length 
+					? convertColumnValue(columns[i], values[i])
+					: values[i];
+		}
+		return converted;
+	}
+
+	private Object convertColumnValue(String column, String value) {
+		try {
+			return convertColumnValueWithThrow(column, value);
+		} catch (ParseException e) {
+			LOGGER.error("Can't convert the data type of value {} at column {}", value, column);
+			return value;
+		}
+	}
+
+	/**
+	 * Converts the string representation of a data type value into the appropriate simple SQL data type.
+	 * If a data type is not handled by this method (or is string), returns the input string value as fallback.
+	 * 
+	 * See table B-1 in JDBC 4.2 Specification
+	 */
+	private Object convertColumnValueWithThrow(String column, String value) throws ParseException {
+		if (value == null)
+			return null;
+		if (!columnTypes.containsKey(column.toLowerCase())) // fallback if no metadata
+			return value;
+		
+		int sqlType = columnTypes.get(column.toLowerCase());
+		return convertColumnValueFromJavaSqlType(sqlType, value);
+	}
+		
+	private Object convertColumnValueFromJavaSqlType(int sqlType, String value) throws ParseException {
+		switch (sqlType) {
+		case java.sql.Types.NUMERIC:
+		case java.sql.Types.DECIMAL: return java.math.BigDecimal.valueOf(Double.parseDouble(value));
+		
+		case java.sql.Types.BIT: //accepts "1" as true (e.g. SqlServer)
+		case java.sql.Types.BOOLEAN: return Boolean.valueOf("1".equals(value) ? "true" : value);
+		
+		case java.sql.Types.TINYINT: return Byte.valueOf(value);
+		case java.sql.Types.SMALLINT: return Short.valueOf(value);
+		case java.sql.Types.INTEGER: return Integer.valueOf(value);
+		case java.sql.Types.BIGINT: return Long.valueOf(value);
+		
+		case java.sql.Types.REAL: return Float.valueOf(value);
+		case java.sql.Types.FLOAT: return Double.valueOf(value);
+		case java.sql.Types.DOUBLE: return Double.valueOf(value);
+		
+		case java.sql.Types.DATE: return new java.sql.Date(parseDate(value, getDateFormats()).getTime());
+		case java.sql.Types.TIME: return new java.sql.Time(parseDate(value, getTimeFormats()).getTime());
+		case java.sql.Types.TIMESTAMP: return new java.sql.Timestamp(parseDate(value, getTimestampFormats()).getTime());
+		default:
+			return value;
+			// Not including: binary and advanced datatypes (e.g. blob, array...)
+		}
+	}
+	
+	// Supported date time formats are the common ISO-8601 formats
+	// defined in org.apache.commons.lang3.time.DateFormatUtils,
+	// as well as their variants that specify milliseconds.
+	// This may be made user configurable later, via properties and/or embedded in the payload
+	
+	private String[] getDateFormats() {
+		return new String[] { "yyyy-MM-dd" };
+	}
+
+	private String[] getTimeFormats() {
+		return new String[] { "HH:mm:ssZ", "HH:mm:ss.SSSZ", "HH:mm:ss", "HH:mm:ss.SSS" };
+	}
+
+	private String[] getTimestampFormats() {
+		return new String[] { "yyyy-MM-dd'T'HH:mm:ssZ", "yyyy-MM-dd'T'HH:mm:ss.SSSZ", 
+				"yyyy-MM-dd'T'HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS" };
+	}
+
+}
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbCsvLoaderTest.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbCsvLoaderTest.java
new file mode 100644
index 00000000..80729ad3
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbCsvLoaderTest.java
@@ -0,0 +1,120 @@
+package org.jsmart.zerocode.core.db;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.jukito.JukitoRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.google.inject.Inject;
+import com.univocity.parsers.csv.CsvParser;
+
+@RunWith(JukitoRunner.class)
+public class DbCsvLoaderTest extends DbTestBase{
+	
+	private DbCsvLoader loader;
+	
+	@Inject
+	CsvParser csvParser;
+	
+	@Override
+	public void setUp() throws SQLException {
+		super.setUp();
+		loader = new DbCsvLoader(conn, csvParser);
+		execute("DROP TABLE IF EXISTS CSVTABLE; "
+				+ "CREATE TABLE CSVTABLE (ID INTEGER, NAME VARCHAR(20), STATUS BOOLEAN)", null);
+	}
+	
+	private int loadCsv(String table, String[] lines, boolean withHeaders, String nullString) throws SQLException {
+		List<String> linesList = Arrays.asList(lines);
+		return loader.loadCsv(table, linesList, withHeaders, nullString);
+	}
+	
+	private void assertLoaded(int count, String expected) throws SQLException {
+		List<Map<String, Object>> rows = execute("SELECT ID,NAME,STATUS FROM CSVTABLE ORDER BY ID NULLS FIRST", null);
+		assertThat(rows.toString(), equalTo(convertDbCase(expected)));
+		assertThat(rows.size(), equalTo(count));
+	}
+	
+	@Test
+	public void testLoadSimpleCsvWithoutHeaders() throws SQLException {
+		int count = loadCsv("CSVTABLE", new String[] { "101,me,false", "102,you,true" }, false, "");
+		assertLoaded(count, "[{ID=101, NAME=me, STATUS=false}, {ID=102, NAME=you, STATUS=true}]");
+	}
+
+	@Test
+	public void testLoadSimpleCsvWithHeaders() throws SQLException {
+		int count = loadCsv("CSVTABLE", new String[] { "ID,NAME,STATUS", "101,me,false", "102,you,true" }, true, "");
+		assertLoaded(count, "[{ID=101, NAME=me, STATUS=false}, {ID=102, NAME=you, STATUS=true}]");
+	}
+
+	@Test
+	public void testLoadCsvIsNotCleanInsert() throws SQLException {
+		loadCsv("CSVTABLE", new String[] { "101,me,false" }, false, "");
+		loadCsv("CSVTABLE", new String[] { "103,other,true" }, false, "");
+		assertLoaded(2, "[{ID=101, NAME=me, STATUS=false}, {ID=103, NAME=other, STATUS=true}]");
+	}
+
+	@Test
+	public void whenNoDataRows_thenReturnZero() throws SQLException {
+		int count = loader.loadCsv("CSVTABLE", null, false, "");
+		assertLoaded(count, "[]");
+		count = loadCsv("CSVTABLE", new String[] { }, false, ""); //noheaders norows
+		assertLoaded(count, "[]");
+		count = loadCsv("CSVTABLE", new String[] {"ID,NAME,STATUS" }, true, ""); //headers norows
+		assertLoaded(count, "[]");
+		count = loadCsv("CSVTABLE", new String[] { }, true, ""); //headers missing
+		assertLoaded(count, "[]");
+	}
+
+	@Test
+	public void whenCsvValuesContainSpaces_thenValuesAreTrimmed() throws SQLException {
+		loadCsv("CSVTABLE", new String[] { "  ID ,  \t  NAME  \r  , STATUS  ", "  101 ,\tmy\t  name\r, false  " }, true, "");
+		assertLoaded(1, "[{ID=101, NAME=my\t  name, STATUS=false}]");
+	}
+
+	@Test
+	public void whenNullStringUnset_thenEmptyIsNull() throws SQLException {
+		loadCsv("CSVTABLE", new String[] { "  \t  , me ,  \t  ", "102,,true" }, false, "");
+		assertLoaded(2, "[{ID=null, NAME=me, STATUS=null}, {ID=102, NAME=null, STATUS=true}]");
+	}
+
+	@Test
+	public void whenNullStringSet_thenEmptyIsNotNull_AndCaseInsensitive() throws SQLException {
+		loadCsv("CSVTABLE", new String[] { "  null  ,me,  NULL  ", "102, ,true" }, false, "null");
+		assertLoaded(2, "[{ID=null, NAME=me, STATUS=null}, {ID=102, NAME=, STATUS=true}]");
+	}
+
+	@Test
+	public void whenRowsHaveDistinctSizes_thenRaiseExceptionWithMessage() throws SQLException {
+		RuntimeException e = assertThrows(RuntimeException.class, () -> {
+			loadCsv("CSVTABLE", new String[] { "ID,NAME,STATUS", "101,me,true,additional" , "102,you,true" }, true, "");
+		});
+		assertThat(e.getMessage(), equalTo(
+				"Error parsing CSV content to load into table CSVTABLE: Row 2 has 4 columns and should have 3"));
+	}
+	
+	@Test
+	public void whenParameterHasWrongType_thenRaiseExceptionWithMessage() throws SQLException {
+		RuntimeException e = assertThrows(RuntimeException.class, () -> {
+			loadCsv("CSVTABLE", new String[] { "ID,NAME,STATUS", "101,me,true" , "XXXX,you,true" }, true, "");
+		});
+		assertThat(e.getMessage(), equalTo("Error matching data type of parameters and table columns at CSV row 3"));
+	}
+	
+	@Test
+	public void whenInsertFails_thenRaiseExceptionWithMessage() throws SQLException {
+		RuntimeException e = assertThrows(RuntimeException.class, () -> {
+			loadCsv("CSVTABLE", new String[] { "101,me,true,extra1" , "102,you,true,extra2" }, false, "");
+		});
+		assertThat(e.getMessage(), equalTo("Error inserting data at CSV row 1"));
+	}
+
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioPostgresTest.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioPostgresTest.java
new file mode 100644
index 00000000..7dd0c31a
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioPostgresTest.java
@@ -0,0 +1,24 @@
+package org.jsmart.zerocode.core.db;
+import org.jsmart.zerocode.core.domain.Scenario;
+import org.jsmart.zerocode.core.domain.TargetEnv;
+import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@TargetEnv("db_test_postgres.properties")
+@RunWith(ZeroCodeUnitRunner.class)
+public class DbSqlExecutorScenarioPostgresTest {
+
+	// Note: Spin up the DB container before running this test: docker/compose/pg_compose.yml
+	
+    @Test
+	@Scenario("integration_test_files/db/db_csv_load_with_headers_postgres.json")
+	public void testDbCsvLoadWithHeaders() throws Exception {
+	}
+
+    @Test
+	@Scenario("integration_test_files/db/db_sql_execute_postgres.json")
+	public void testDbSqlExecute() throws Exception {
+	}
+
+}
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioTest.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioTest.java
new file mode 100644
index 00000000..e294777c
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlExecutorScenarioTest.java
@@ -0,0 +1,40 @@
+package org.jsmart.zerocode.core.db;
+import org.jsmart.zerocode.core.domain.Scenario;
+import org.jsmart.zerocode.core.domain.TargetEnv;
+import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@TargetEnv("db_test.properties")
+@RunWith(ZeroCodeUnitRunner.class)
+public class DbSqlExecutorScenarioTest {
+
+    @Test
+	@Scenario("integration_test_files/db/db_csv_load_with_headers.json")
+	public void testDbCsvLoadWithHeaders() throws Exception {
+	}
+
+    @Test // same scenario and test database
+	@Scenario("integration_test_files/db/db_csv_load_without_headers.json")
+	public void testDbCsvLoadWithoutHeaders() throws Exception {
+	}
+
+    @Test
+	@Scenario("integration_test_files/db/db_sql_execute.json")
+	public void testDbSqlExecute() throws Exception {
+	}
+
+    // Manual test: error handling.
+    // To facilitate the location of the source of possible errors (e.g. a wrong SQL statement),
+    // exceptions that occur in the DbSqlExecutor should show:
+    // - A log entry with the error message
+    // - The stacktrace of the exception to facilitate locating the source
+    // - The usual chain of errors and stacktraces produced by zerocode when an step fails
+    //
+    // Recommended situations for manual test:
+    // - Target environment variables are no defined
+    // - A syntactically wrong SQL statement in a step
+    // - A header that does not correspond with any column when loading data from CSV
+    // - A value with the wrong data type (e.g. string in a numeric column) when loading data from CSV
+    
+}
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlRunnerTest.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlRunnerTest.java
new file mode 100644
index 00000000..79d433f5
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbSqlRunnerTest.java
@@ -0,0 +1,63 @@
+package org.jsmart.zerocode.core.db;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbutils.QueryRunner;
+import org.jukito.JukitoRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(JukitoRunner.class)
+public class DbSqlRunnerTest extends DbTestBase {
+
+	@Before
+	public void setUp() throws SQLException {
+		super.setUp();
+		new QueryRunner().update(conn, "DROP TABLE IF EXISTS SQLTABLE; "
+				+ "CREATE TABLE SQLTABLE (ID INTEGER, NAME VARCHAR(20)); "
+				+ "INSERT INTO SQLTABLE VALUES (1, 'string 1'); "
+				+ "INSERT INTO SQLTABLE VALUES (2, 'string 2');");
+	}
+
+	@Test
+	public void sqlSelectQueryShouldReturnListOfMap() throws ClassNotFoundException, SQLException {
+		List<Map<String, Object>> rows = execute("SELECT ID, NAME FROM SQLTABLE ORDER BY ID DESC", null);
+		assertThat(rows.toString(), equalTo(convertDbCase("[{ID=2, NAME=string 2}, {ID=1, NAME=string 1}]")));
+	}
+
+	@Test
+	public void sqlSelectWithoutResultsShouldReturnEmptyList() throws ClassNotFoundException, SQLException {
+		List<Map<String, Object>> rows = execute("SELECT ID, NAME FROM SQLTABLE where ID<0", null);
+		assertThat(rows.toString(), equalTo("[]"));
+	}
+
+	@Test
+	public void multipleSqlSelectShouldReturnTheFirstResultSet() throws ClassNotFoundException, SQLException {
+		List<Map<String, Object>> rows = execute("SELECT ID, NAME FROM SQLTABLE where ID=2; SELECT ID, NAME FROM SQLTABLE where ID=1;", null);
+		assertThat(rows.toString(), equalTo(convertDbCase("[{ID=2, NAME=string 2}]")));
+	}
+
+	@Test
+	public void sqlInsertShouldReturnNull() throws ClassNotFoundException, SQLException {
+		Object nullRows = execute("INSERT INTO SQLTABLE VALUES (3, 'string 3')", null);
+		assertThat(nullRows, nullValue());
+		// check rows are inserted
+		List<Map<String, Object>> rows = execute("SELECT ID, NAME FROM SQLTABLE ORDER BY ID", new Object[] {});
+		assertThat(rows.toString(), equalTo(convertDbCase("[{ID=1, NAME=string 1}, {ID=2, NAME=string 2}, {ID=3, NAME=string 3}]")));
+	}
+
+	@Test
+	public void executeWithParametersShouldAllowNulls() throws SQLException {
+		execute("INSERT INTO SQLTABLE VALUES (?, ?)", new Object[] { 4, null });
+		List<Map<String, Object>> rows = execute("SELECT ID, NAME FROM SQLTABLE where ID = ?", new Object[] { 4 });
+		assertThat(rows.toString(), equalTo(convertDbCase("[{ID=4, NAME=null}]")));
+	}
+
+}
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbTestBase.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbTestBase.java
new file mode 100644
index 00000000..86fe5a77
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbTestBase.java
@@ -0,0 +1,70 @@
+package org.jsmart.zerocode.core.db;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.dbutils.DbUtils;
+import org.jsmart.zerocode.core.di.main.ApplicationMainModule;
+import org.jukito.TestModule;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * Base class for the unit DB test classes: manages connections,
+ * execution of queries and DBMS specific features
+ */
+public abstract class DbTestBase {
+	// Subclasses must use JukitoRunner
+    public static class JukitoModule extends TestModule {
+        @Override
+        protected void configureTest() {
+            ApplicationMainModule applicationMainModule = new ApplicationMainModule("db_test.properties");
+            install(applicationMainModule);
+        }
+    }
+
+	@Inject
+	@Named("db.driver.url") protected String url;
+
+	@Inject(optional = true)
+	@Named("db.driver.user") protected String user;
+	
+	@Inject(optional = true)
+	@Named("db.driver.password") protected String password;
+
+	protected Connection conn; // managed connection for each test
+	protected boolean isPostgres = false; // set by each connection, to allow portable assertions
+
+	@Before
+	public void setUp() throws SQLException {
+		conn = connect();
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		DbUtils.closeQuietly(conn);
+	}
+	
+	protected Connection connect() throws SQLException {
+		isPostgres = url.startsWith("jdbc:postgresql:");
+		return DriverManager.getConnection(url, user, password);
+	}
+
+	protected List<Map<String, Object>> execute(String sql, Object[] params) throws SQLException {
+		DbSqlRunner runner = new DbSqlRunner(conn);
+		return runner.execute(sql, params);
+	}
+	
+	// Table and columns in all tests are uppercase because H2 stores uppercase by default.
+	// But postgres stores lowercase, so some expected strings need case conversion
+	protected String convertDbCase(String value) {
+		return isPostgres ? value.toLowerCase() : value;
+	}
+
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/jsmart/zerocode/core/db/DbValueConverterTest.java b/core/src/test/java/org/jsmart/zerocode/core/db/DbValueConverterTest.java
new file mode 100644
index 00000000..46ebe82a
--- /dev/null
+++ b/core/src/test/java/org/jsmart/zerocode/core/db/DbValueConverterTest.java
@@ -0,0 +1,176 @@
+package org.jsmart.zerocode.core.db;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.sql.SQLException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.jukito.JukitoRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(JukitoRunner.class)
+public class DbValueConverterTest extends DbTestBase {
+
+	@Test
+	public void convertBasicDataTypeValues() throws SQLException {
+		doTestConversion("", "Btable", "VINT INTEGER, VCHAR VARCHAR(20), VBOOL BOOLEAN, EXTRA CHAR(1)",
+				new String[] { "Vint", "Vchar", "Vbool" },
+				new String[] { "101", "astring", "true" },
+				"[{VINT=101, VCHAR=astring, VBOOL=true, EXTRA=null}]");
+	}
+
+	@Test
+	public void convertNullAndBitValues() throws SQLException {
+		doTestConversion("", "NTABLE", "VINT INT, VCHAR VARCHAR(20), VBOOL BOOLEAN",
+				new String[] { "VINT", "VCHAR", "VBOOL" }, 
+				new String[] { null, null, "1" }, // incl. alternate boolean
+				"[{VINT=null, VCHAR=null, VBOOL=true}]");
+	}
+
+	@Test
+	public void convertDecimalAndFloatValues() throws SQLException {
+		doTestConversion("", "FTABLE", "VEXACT NUMERIC(10,0), VDEC DECIMAL(10,2), VFLOAT FLOAT, VREAL REAL",
+				new String[] { "VEXACT", "VDEC", "VFLOAT", "VREAL" },
+				new String[] { "102", "123.45", "234.56", "3.4561E+2" },
+				"[{VEXACT=102, VDEC=123.45, VFLOAT=234.56, VREAL=345.61}]");
+	}
+
+	@Test
+	public void convertDateAndTimeValues() throws SQLException {
+		List<Map<String, Object>> rows = doTestConversion("", "DTABLE", "VTS1 TIMESTAMP, VTS2 TIMESTAMP, VTIME TIME, VDATE DATE",
+				new String[] { "VTS1", "VTS2", "VTIME", "VDATE" }, 
+				new String[] { "2024-09-04T08:01:02.456+0300", "2024-09-04T08:01:02+0300", "08:01:02+0300", "2024-09-04" },
+				null); 
+		// assert individually to allow compare with GMT time (not local)
+		assertThat(gmtTimestamp((Date) rows.get(0).get("VTS1")), equalTo("2024-09-04T05:01:02.456"));
+		assertThat(gmtTimestamp((Date) rows.get(0).get("VTS2")), equalTo("2024-09-04T05:01:02.000"));
+		assertThat(gmtTimestamp((Date) rows.get(0).get("VTIME")), equalTo("1970-01-01T05:01:02.000"));
+		assertThat(rows.get(0).get("VDATE").toString(), "2024-09-04", equalTo(rows.get(0).get("VDATE").toString()));
+	}
+
+	@Test
+	public void convertWithMixedCaseColumnName() throws SQLException {
+		// Uses a date type to ensure that is the converter who tries making the conversion, not the driver
+		// (neither H2 nor Postgres drivers do conversion of dates)
+		List<Map<String, Object>> rows = doTestConversion("", "ITable", "VDATE DATE",
+				new String[] { "VDate" }, 
+				new String[] { "2024-09-04" },
+				"[{VDATE=2024-09-04}]");
+		assertThat(rows.get(0).get("VDATE").toString(), equalTo("2024-09-04"));
+	}
+
+	@Test
+	public void whenNoColumnsSpecified_ThenAllTableColumns_AreIncluded() throws SQLException {
+		doTestConversion("", "OTABLE", "VINT SMALLINT, VCHAR VARCHAR(20)", 
+				null, 
+				new String[] { "101", "astring" },
+				"[{VINT=101, VCHAR=astring}]");
+	}
+
+	@Test
+	public void whenNoColumnsSpecified_AndColumnCountMismatch_ThenConversionFails() throws SQLException {
+		assertThrows(SQLException.class, () -> {
+			doTestConversion("", "FMTABLE", "VINT BIGINT", 
+				null, 
+				new String[] { "101", "999" },
+				"[{VINT=101}]");
+		});
+	}
+
+	@Test
+	public void whenColumnNotFound_ThenConversionFails() throws SQLException {
+		assertThrows(SQLException.class, () -> {
+			doTestConversion("", "FCTABLE", "VINT INTEGER, VCHAR VARCHAR(20)", 
+					new String[] { "VINT", "NOTEXISTS" },
+					new String[] { "101", "astring" }, 
+					"[{VINT=101, VCHAR=notexists}]");
+		});
+	}
+	
+	@Test
+	public void whenValueHasWrongFormat_ThenConversionFails() throws SQLException {
+		assertThrows(SQLException.class, () -> {
+			doTestConversion("", "FVTABLE", "VTS TIMESTAMP", 
+					new String[] { "VTS" },
+					new String[] { "notadate" }, 
+					"[{VTS=notadate}]");
+		});
+	}
+	
+	// Failures due to problems with metadata.
+	// Simulates failures getting metadata so that the conversion is left to
+	// be done by the driver.
+	// - Below tests will pass because the H2 driver converts numeric values
+	// - but fail if driver is changed to Postgres (does not convert numeric), skipped
+	
+	@Test
+	public void whenMetadataNotFound_ThenConversions_AreUpToTheDriver_WithColumns() throws SQLException {
+		if (isPostgres)
+			return;
+		doTestConversion("table", "F1TABLE", "VINT INTEGER, VCHAR VARCHAR(20)", 
+				new String[] { "VINT", "VCHAR" },
+				new String[] { "101", "astring" }, 
+				"[{VINT=101, VCHAR=astring}]");
+	}
+
+	@Test
+	public void whenMetadataNotFound_ThenConversions_AreUpToTheDriver_WithoutColumns() throws SQLException {
+		if (isPostgres)
+			return;
+		doTestConversion("table", "F2CTABLE", "VINT INTEGER, VCHAR VARCHAR(20)", 
+				null, 
+				new String[] { "101", "astring" },
+				"[{VINT=101, VCHAR=astring}]");
+	}
+
+	@Test
+	public void whenMetadataFails_ThenConversions_AreUpToTheDriver() throws SQLException {
+		if (isPostgres)
+			return;
+		doTestConversion("conn", "F3TABLE", "VINT INTEGER, VCHAR VARCHAR(20)", 
+				new String[] { "VINT", "VCHAR" },
+				new String[] { "101", "astring" }, 
+				"[{VINT=101, VCHAR=astring}]");
+	}
+	
+	private List<Map<String, Object>> doTestConversion(String failureToSimulate, String table, 
+			String ddlTypes, String[] columns, String[] params, String expected) throws SQLException {
+		execute("DROP TABLE IF EXISTS " + table + ";" 
+			+ " CREATE TABLE " + table + " (" + ddlTypes + ");", null);
+		String sql = "INSERT INTO " + table 
+				+ (columns != null ? " (" + String.join(",", columns) + ")" : "")
+				+ " VALUES (" + placeholders(params.length) + ")";
+
+		DbValueConverter converter = new DbValueConverter(
+				"conn".equals(failureToSimulate) ? null : conn,
+				"table".equals(failureToSimulate) ? "notexists" : table);
+		Object[] converted = converter.convertColumnValues(columns, params);
+		execute(sql, converted);
+
+		List<Map<String, Object>> rows = execute("SELECT * FROM " + table, null);
+		if (expected != null) // null to check without specified columns
+			assertThat(rows.toString(), equalTo(convertDbCase(expected)));
+		return rows;
+	}
+
+	private String placeholders(int columnCount) {
+		String[] placeholders = new String[columnCount];
+		Arrays.fill(placeholders, "?");
+		return String.join(",", placeholders);
+	}
+
+	private String gmtTimestamp(Date dt) {
+		java.text.DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+		df.setTimeZone(TimeZone.getTimeZone("GMT"));
+		return df.format(dt);
+	}
+	
+}
diff --git a/core/src/test/resources/db_test.properties b/core/src/test/resources/db_test.properties
new file mode 100644
index 00000000..fe99f714
--- /dev/null
+++ b/core/src/test/resources/db_test.properties
@@ -0,0 +1,7 @@
+# Connection info used by the DbSqlExecutor
+
+# JDBC connection string to the test database (H2)
+db.driver.url=jdbc:h2:./target/test_db_sql_executor
+# If connection requires authentication, specify user and password:
+# db.driver.user=
+# db.driver.password=
diff --git a/core/src/test/resources/db_test_postgres.properties b/core/src/test/resources/db_test_postgres.properties
new file mode 100644
index 00000000..ed699717
--- /dev/null
+++ b/core/src/test/resources/db_test_postgres.properties
@@ -0,0 +1,7 @@
+# Connection info used by the DbSqlExecutor
+
+# JDBC connection string to a PostgreSQL test database
+# Spin up the DB container before running the postgres tests: docker/compose/pg_compose.yml
+db.driver.url=jdbc:postgresql://localhost:35432/postgres
+db.driver.user=postgres
+db.driver.password=example
diff --git a/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers.json b/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers.json
new file mode 100644
index 00000000..f2f0f258
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers.json
@@ -0,0 +1,43 @@
+{
+    "scenarioName": "DbSqlExecutor: Load a CSV file with headers",
+    "steps": [
+        {
+            "name": "Test database setup",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "DROP TABLE IF EXISTS PLAYERS; CREATE TABLE PLAYERS (ID INTEGER, NAME VARCHAR(20), AGE INTEGER);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert rows from a CSV file with headers",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "LOADCSV",
+            "request": {
+                "tableName": "players",
+                "csvSource": "integration_test_files/db/players_with_headers.csv",
+                "withHeaders" : true
+            },
+            "verify": { 
+                "size" : 3
+            }
+        },
+        {
+            "name": "Check the content of inserted rows",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "SELECT ID, NAME, AGE FROM PLAYERS ORDER BY ID"
+            },
+            "verify": {
+                "rows.SIZE": 3,
+                "rows": [ //<-- to make this pass in postgres, set the keys to lowercase
+                    { "ID": 1001, "NAME": "Ronaldo", "AGE": 23 },
+                    { "ID": 1002, "NAME": "Devaldo", "AGE": null },
+                    { "ID": 1003, "NAME": "Trevaldo", "AGE": 35 }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers_postgres.json b/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers_postgres.json
new file mode 100644
index 00000000..544e32c0
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/db_csv_load_with_headers_postgres.json
@@ -0,0 +1,43 @@
+{
+    "scenarioName": "DbSqlExecutor: Load a CSV file with headers - PostgreSQL",
+    "steps": [
+        {
+            "name": "Test database setup",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "DROP TABLE IF EXISTS PLAYERS; CREATE TABLE PLAYERS (ID INTEGER, NAME VARCHAR(20), AGE INTEGER);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert rows from a CSV file with headers",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "LOADCSV",
+            "request": {
+                "tableName": "players",
+                "csvSource": "integration_test_files/db/players_with_headers.csv",
+                "withHeaders" : true
+            },
+            "verify": { 
+                "size" : 3
+            }
+        },
+        {
+            "name": "Check the content of inserted rows",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "SELECT ID, NAME, AGE FROM PLAYERS ORDER BY ID"
+            },
+            "verify": {
+                "rows.SIZE": 3,
+                "rows": [ //<-- same than db_csv_load_with_headers.json, but keys in lowercase (postgres converts to lower)
+                    { "id": 1001, "name": "Ronaldo", "age": 23 },
+                    { "id": 1002, "name": "Devaldo", "age": null },
+                    { "id": 1003, "name": "Trevaldo", "age": 35 }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/db_csv_load_without_headers.json b/core/src/test/resources/integration_test_files/db/db_csv_load_without_headers.json
new file mode 100644
index 00000000..16212042
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/db_csv_load_without_headers.json
@@ -0,0 +1,43 @@
+{
+    "scenarioName": "DbSqlExecutor: Load a CSV file without headers",
+    "steps": [
+        {
+            "name": "Test database setup",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "DROP TABLE IF EXISTS PLAYERS; CREATE TABLE PLAYERS (ID INTEGER, NAME VARCHAR(20), AGE INTEGER);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert rows from a CSV file without headers",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "LOADCSV",
+            "request": {
+                "tableName": "players",
+                "csvSource": "integration_test_files/db/players_without_headers.csv",
+                "nullString": "NULL!!"
+            },
+            "verify": { 
+                "size" : 3
+            }
+        },
+        {
+            "name": "Check the content of inserted rows",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "SELECT ID, NAME, AGE FROM PLAYERS ORDER BY ID"
+            },
+            "verify": {
+                "rows.SIZE": 3,
+                "rows": [ //<-- to make this pass in postgres, set the keys to lowercase
+                    { "ID": 1001, "NAME": "Ronaldo", "AGE": 23 },
+                    { "ID": 1002, "NAME": null, "AGE": 24 },
+                    { "ID": 1003, "NAME": "Trevaldo", "AGE": 35 }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/db_sql_execute.json b/core/src/test/resources/integration_test_files/db/db_sql_execute.json
new file mode 100644
index 00000000..6e0d0daf
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/db_sql_execute.json
@@ -0,0 +1,49 @@
+{
+    "scenarioName": "DbSqlExecutor: Read and write data using SQL",
+    "steps": [
+        {
+            "name": "Test database setup",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "DROP TABLE IF EXISTS PEOPLE; CREATE TABLE PEOPLE (ID INTEGER, NAME VARCHAR(20), START DATE, ACTIVE BOOLEAN);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert rows using SQL",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "INSERT INTO PEOPLE VALUES (1, 'Jeff Bejo', '2024-09-01', true); INSERT INTO PEOPLE VALUES (2, 'John Bajo', '2024-09-02', false);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert with parameters and nulls",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "execute", //<-- Uppercase for consistency, but also allows lowercase
+            "request": {
+                "sql": "INSERT INTO PEOPLE (ID, NAME, START, ACTIVE) VALUES (?, ?, ?, ?);",
+                "sqlParams": [3, null, null, true]
+            },
+            "verify": { }
+        },
+        {
+            "name": "Retrieve rows using SQL",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "SELECT ID, NAME, to_char(START,'yyyy-MM-dd') AS START, ACTIVE FROM PEOPLE WHERE ACTIVE=?",
+                "sqlParams": [true]
+            },
+            "verify": {
+				"rows.SIZE": 2,
+                "rows": [ //<-- to make this pass in postgres, set the keys to lowercase
+                    { "ID": 1, "NAME": "Jeff Bejo", "START": "2024-09-01", "ACTIVE": true },
+                    { "ID": 3, "NAME": null, "START": null, "ACTIVE": true }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/db_sql_execute_postgres.json b/core/src/test/resources/integration_test_files/db/db_sql_execute_postgres.json
new file mode 100644
index 00000000..11d283ef
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/db_sql_execute_postgres.json
@@ -0,0 +1,49 @@
+{
+    "scenarioName": "DbSqlExecutor: Read and write data using SQL - PostgreSQL",
+    "steps": [
+        {
+            "name": "Test database setup",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "DROP TABLE IF EXISTS PEOPLE; CREATE TABLE PEOPLE (ID INTEGER, NAME VARCHAR(20), START DATE, ACTIVE BOOLEAN);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert rows using SQL",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "INSERT INTO PEOPLE VALUES (1, 'Jeff Bejo', '2024-09-01', true); INSERT INTO PEOPLE VALUES (2, 'John Bajo', '2024-09-02', false);"
+            },
+            "verify": { }
+        },
+        {
+            "name": "Insert with parameters and nulls",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "execute", //<-- Uppercase for consistency, but also allows lowercase
+            "request": {
+                "sql": "INSERT INTO PEOPLE (ID, NAME, START, ACTIVE) VALUES (?, ?, ?, ?);",
+                "sqlParams": [3, null, null, true]
+            },
+            "verify": { }
+        },
+        {
+            "name": "Retrieve rows using SQL",
+            "url": "org.jsmart.zerocode.core.db.DbSqlExecutor",
+            "operation": "EXECUTE",
+            "request": {
+                "sql": "SELECT ID, NAME, to_char(START,'yyyy-MM-dd') AS START, ACTIVE FROM PEOPLE WHERE ACTIVE=?",
+                "sqlParams": [true]
+            },
+            "verify": {
+				"rows.SIZE": 2,
+                "rows": [ //<-- same than db_sql_execute.json, but keys in lowercase (postgres converts to lower)
+                    { "id": 1, "name": "Jeff Bejo", "start": "2024-09-01", "active": true },
+                    { "id": 3, "name": null, "start": null, "active": true }
+                ]
+            }
+        }
+    ]
+}
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/players_with_headers.csv b/core/src/test/resources/integration_test_files/db/players_with_headers.csv
new file mode 100644
index 00000000..22f8c751
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/players_with_headers.csv
@@ -0,0 +1,4 @@
+ID, AGE, NAME
+1001, 23, Ronaldo
+1002,   , Devaldo
+1003, 35, Trevaldo
\ No newline at end of file
diff --git a/core/src/test/resources/integration_test_files/db/players_without_headers.csv b/core/src/test/resources/integration_test_files/db/players_without_headers.csv
new file mode 100644
index 00000000..d6f049e4
--- /dev/null
+++ b/core/src/test/resources/integration_test_files/db/players_without_headers.csv
@@ -0,0 +1,3 @@
+1001, Ronaldo, 23
+1002, null!!, 24
+1003, Trevaldo, 35
\ No newline at end of file
diff --git a/docker/compose/pg_compose.yml b/docker/compose/pg_compose.yml
index 100398f6..3ad83a2e 100644
--- a/docker/compose/pg_compose.yml
+++ b/docker/compose/pg_compose.yml
@@ -12,9 +12,3 @@ services:
       POSTGRES_PASSWORD: example
     ports:
       - 35432:5432
-
-  adminer:
-    image: adminer:4.7.0
-    restart: always
-    ports:
-      - 8080:8080
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index c9658480..9d354afa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,10 +84,12 @@
         <commons-collections4.version>4.4</commons-collections4.version>
         <commons-lang3.version>3.14.0</commons-lang3.version>
         <commons-text.version>1.11.0</commons-text.version>
+        <commons-dbutils.version>1.8.1</commons-dbutils.version>
         <micro-simulator.version>1.1.10</micro-simulator.version>
         <httpclient.version>4.5.13</httpclient.version>
         <httpmime.version>4.5.12</httpmime.version>
         <h2.db.version>2.2.220</h2.db.version>
+        <postgres.db.version>42.7.4</postgres.db.version>
         <extentreports.version>5.0.9</extentreports.version>
         <version.kafka-clients>3.7.0</version.kafka-clients>
         <version.gson>2.10.1</version.gson>
@@ -287,12 +289,22 @@
                 <version>${micro-simulator.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>commons-dbutils</groupId>
+                <artifactId>commons-dbutils</artifactId>
+                <version>${commons-dbutils.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.h2database</groupId>
                 <artifactId>h2</artifactId>
                 <version>${h2.db.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency> 
+                <groupId>org.postgresql</groupId>
+                <artifactId>postgresql</artifactId>
+                <version>${postgres.db.version}</version>
+            </dependency>
             <dependency>
                 <groupId>com.aventstack</groupId>
                 <artifactId>extentreports</artifactId>