Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[postgrenosql] Each thread gets its own connection to the database. #1691

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -40,44 +36,64 @@
* PostgreNoSQL client for YCSB framework.
*/
public class PostgreNoSQLDBClient extends DB {
private static final Logger LOG = LoggerFactory.getLogger(PostgreNoSQLDBClient.class);

/** Count the number of times initialized to teardown on the last. */
private static final AtomicInteger INIT_COUNT = new AtomicInteger(0);

/** Cache for already prepared statements. */
private static ConcurrentMap<StatementType, PreparedStatement> cachedStatements;

/** The driver to get the connection to postgresql. */
private static Driver postgrenosqlDriver;

/** The connection to the database. */
private static Connection connection;
private static final Logger LOG = LoggerFactory.getLogger(PostgreNoSQLDBClient.class);

/** The class to use as the jdbc driver. */
/**
* The class to use as the jdbc driver.
*/
public static final String DRIVER_CLASS = "db.driver";

/** The URL to connect to the database. */
/**
* The URL to connect to the database.
*/
public static final String CONNECTION_URL = "postgrenosql.url";

/** The user name to use to connect to the database. */
/**
* The user name to use to connect to the database.
*/
public static final String CONNECTION_USER = "postgrenosql.user";

/** The password to use for establishing the connection. */
/**
* The password to use for establishing the connection.
*/
public static final String CONNECTION_PASSWD = "postgrenosql.passwd";

/** The JDBC connection auto-commit property for the driver. */
/**
* The JDBC connection auto-commit property for the driver.
*/
public static final String JDBC_AUTO_COMMIT = "postgrenosql.autocommit";

/** The primary key in the user table. */
/**
* The primary key in the user table.
*/
public static final String PRIMARY_KEY = "YCSB_KEY";

/** The field name prefix in the table. */
/**
* The field name prefix in the table.
*/
public static final String COLUMN_NAME = "YCSB_VALUE";

private static final String DEFAULT_PROP = "";

/** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */
/**
* Cache for already prepared statements.
*/
private Map<StatementType, PreparedStatement> cachedStatements;

/**
* The driver to get the connection to postgresql.
*/
private Driver postgrenosqlDriver;

/**
* The connection to the database.
*/
private Connection connection;

/**
* Returns parsed boolean value from the properties if set, otherwise returns defaultVal.
*/
private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) {
String valueStr = props.getProperty(key);
if (valueStr != null) {
Expand All @@ -88,54 +104,50 @@ private static boolean getBoolProperty(Properties props, String key, boolean def

@Override
public void init() throws DBException {
INIT_COUNT.incrementAndGet();
synchronized (PostgreNoSQLDBClient.class) {
if (postgrenosqlDriver != null) {
return;
}
if (postgrenosqlDriver != null) {
return;
}

Properties props = getProperties();
String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
Properties props = getProperties();
String urls = props.getProperty(CONNECTION_URL, DEFAULT_PROP);
String user = props.getProperty(CONNECTION_USER, DEFAULT_PROP);
String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);

try {
Properties tmpProps = new Properties();
tmpProps.setProperty("user", user);
tmpProps.setProperty("password", passwd);
try {
Properties tmpProps = new Properties();
tmpProps.setProperty("user", user);
tmpProps.setProperty("password", passwd);

cachedStatements = new ConcurrentHashMap<>();
cachedStatements = new HashMap<>();

postgrenosqlDriver = new Driver();
connection = postgrenosqlDriver.connect(urls, tmpProps);
connection.setAutoCommit(autoCommit);
postgrenosqlDriver = new Driver();
connection = postgrenosqlDriver.connect(urls, tmpProps);
connection.setAutoCommit(autoCommit);

} catch (Exception e) {
LOG.error("Error during initialization: " + e);
}
} catch (Exception e) {
LOG.error("Error during initialization: " + e);
}
}

@Override
public void cleanup() throws DBException {
if (INIT_COUNT.decrementAndGet() == 0) {
try {
cachedStatements.clear();
try {
cachedStatements.clear();

if (!connection.getAutoCommit()){
connection.commit();
}
connection.close();
} catch (SQLException e) {
System.err.println("Error in cleanup execution. " + e);
if (!connection.getAutoCommit()) {
connection.commit();
}
postgrenosqlDriver = null;
connection.close();
} catch (SQLException e) {
System.err.println("Error in cleanup execution. " + e);
}
postgrenosqlDriver = null;
}

@Override
public Status read(String tableName, String key, Set<String> fields, Map<String, ByteIterator> result) {
public Status read(String tableName, String key, Set<String> fields,
Map<String, ByteIterator> result) {
try {
StatementType type = new StatementType(StatementType.Type.READ, tableName, fields);
PreparedStatement readStatement = cachedStatements.get(type);
Expand All @@ -146,16 +158,16 @@ public Status read(String tableName, String key, Set<String> fields, Map<String,
ResultSet resultSet = readStatement.executeQuery();
if (!resultSet.next()) {
resultSet.close();
return Status.NOT_FOUND;
return Status.NOT_FOUND;
}

if (result != null) {
if (fields == null){
do{
if (fields == null) {
do {
String field = resultSet.getString(2);
String value = resultSet.getString(3);
result.put(field, new StringByteIterator(value));
}while (resultSet.next());
} while (resultSet.next());
} else {
for (String field : fields) {
String value = resultSet.getString(field);
Expand All @@ -174,7 +186,7 @@ public Status read(String tableName, String key, Set<String> fields, Map<String,

@Override
public Status scan(String tableName, String startKey, int recordcount, Set<String> fields,
Vector<HashMap<String, ByteIterator>> result) {
Vector<HashMap<String, ByteIterator>> result) {
try {
StatementType type = new StatementType(StatementType.Type.SCAN, tableName, fields);
PreparedStatement scanStatement = cachedStatements.get(type);
Expand Down Expand Up @@ -206,7 +218,7 @@ public Status scan(String tableName, String startKey, int recordcount, Set<Strin

@Override
public Status update(String tableName, String key, Map<String, ByteIterator> values) {
try{
try {
StatementType type = new StatementType(StatementType.Type.UPDATE, tableName, null);
PreparedStatement updateStatement = cachedStatements.get(type);
if (updateStatement == null) {
Expand Down Expand Up @@ -238,7 +250,7 @@ public Status update(String tableName, String key, Map<String, ByteIterator> val

@Override
public Status insert(String tableName, String key, Map<String, ByteIterator> values) {
try{
try {
StatementType type = new StatementType(StatementType.Type.INSERT, tableName, null);
PreparedStatement insertStatement = cachedStatements.get(type);
if (insertStatement == null) {
Expand Down Expand Up @@ -271,7 +283,7 @@ public Status insert(String tableName, String key, Map<String, ByteIterator> val

@Override
public Status delete(String tableName, String key) {
try{
try {
StatementType type = new StatementType(StatementType.Type.DELETE, tableName, null);
PreparedStatement deleteStatement = cachedStatements.get(type);
if (deleteStatement == null) {
Expand All @@ -280,7 +292,7 @@ public Status delete(String tableName, String key) {
deleteStatement.setString(1, key);

int result = deleteStatement.executeUpdate();
if (result == 1){
if (result == 1) {
return Status.OK;
}

Expand All @@ -292,7 +304,7 @@ public Status delete(String tableName, String key) {
}

private PreparedStatement createAndCacheReadStatement(StatementType readType)
throws SQLException{
throws SQLException {
PreparedStatement readStatement = connection.prepareStatement(createReadStatement(readType));
PreparedStatement statement = cachedStatements.putIfAbsent(readType, readStatement);
if (statement == null) {
Expand All @@ -301,13 +313,13 @@ private PreparedStatement createAndCacheReadStatement(StatementType readType)
return statement;
}

private String createReadStatement(StatementType readType){
private String createReadStatement(StatementType readType) {
StringBuilder read = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY);

if (readType.getFields() == null) {
read.append(", (jsonb_each_text(" + COLUMN_NAME + ")).*");
} else {
for (String field:readType.getFields()){
for (String field : readType.getFields()) {
read.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field);
}
}
Expand All @@ -321,7 +333,7 @@ private String createReadStatement(StatementType readType){
}

private PreparedStatement createAndCacheScanStatement(StatementType scanType)
throws SQLException{
throws SQLException {
PreparedStatement scanStatement = connection.prepareStatement(createScanStatement(scanType));
PreparedStatement statement = cachedStatements.putIfAbsent(scanType, scanStatement);
if (statement == null) {
Expand All @@ -330,10 +342,10 @@ private PreparedStatement createAndCacheScanStatement(StatementType scanType)
return statement;
}

private String createScanStatement(StatementType scanType){
private String createScanStatement(StatementType scanType) {
StringBuilder scan = new StringBuilder("SELECT " + PRIMARY_KEY + " AS " + PRIMARY_KEY);
if (scanType.getFields() != null){
for (String field:scanType.getFields()){
if (scanType.getFields() != null) {
for (String field : scanType.getFields()) {
scan.append(", " + COLUMN_NAME + "->>'" + field + "' AS " + field);
}
}
Expand All @@ -349,16 +361,17 @@ private String createScanStatement(StatementType scanType){
}

public PreparedStatement createAndCacheUpdateStatement(StatementType updateType)
throws SQLException{
PreparedStatement updateStatement = connection.prepareStatement(createUpdateStatement(updateType));
throws SQLException {
PreparedStatement updateStatement = connection.prepareStatement(
createUpdateStatement(updateType));
PreparedStatement statement = cachedStatements.putIfAbsent(updateType, updateStatement);
if (statement == null) {
return updateStatement;
}
return statement;
}

private String createUpdateStatement(StatementType updateType){
private String createUpdateStatement(StatementType updateType) {
StringBuilder update = new StringBuilder("UPDATE ");
update.append(updateType.getTableName());
update.append(" SET ");
Expand All @@ -371,16 +384,17 @@ private String createUpdateStatement(StatementType updateType){
}

private PreparedStatement createAndCacheInsertStatement(StatementType insertType)
throws SQLException{
PreparedStatement insertStatement = connection.prepareStatement(createInsertStatement(insertType));
throws SQLException {
PreparedStatement insertStatement = connection.prepareStatement(
createInsertStatement(insertType));
PreparedStatement statement = cachedStatements.putIfAbsent(insertType, insertStatement);
if (statement == null) {
return insertStatement;
}
return statement;
}

private String createInsertStatement(StatementType insertType){
private String createInsertStatement(StatementType insertType) {
StringBuilder insert = new StringBuilder("INSERT INTO ");
insert.append(insertType.getTableName());
insert.append(" (" + PRIMARY_KEY + "," + COLUMN_NAME + ")");
Expand All @@ -389,16 +403,17 @@ private String createInsertStatement(StatementType insertType){
}

private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType)
throws SQLException{
PreparedStatement deleteStatement = connection.prepareStatement(createDeleteStatement(deleteType));
throws SQLException {
PreparedStatement deleteStatement = connection.prepareStatement(
createDeleteStatement(deleteType));
PreparedStatement statement = cachedStatements.putIfAbsent(deleteType, deleteStatement);
if (statement == null) {
return deleteStatement;
}
return statement;
}

private String createDeleteStatement(StatementType deleteType){
private String createDeleteStatement(StatementType deleteType) {
StringBuilder delete = new StringBuilder("DELETE FROM ");
delete.append(deleteType.getTableName());
delete.append(" WHERE ");
Expand Down