diff --git a/README.md b/README.md index 915670c..f7a0cea 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,12 @@ the command line. The format is one switch and option per line: -host 1.2.3.4 ``` +### Case sensitivity + +Keyspace, table, column identifiers are case insensitive in Apache Cassandra. Anyways the +`schema` parameter is treated internally as case sensitive and the queries used to load/unload +data are quoting all identifiers to preserve the case sensitivity. + ## Usage Statement: ``` Usage: -f -host -schema [OPTIONS] diff --git a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java index 31bbbf5..7f2ac31 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java @@ -71,7 +71,6 @@ class CqlDelimLoadTask implements Callable { private String BADINSERT = ".BADINSERT"; private String LOG = ".LOG"; private Session session; - private String insert; private PreparedStatement statement; private ConsistencyLevel consistencyLevel; private CqlDelimParser cdp; @@ -168,8 +167,7 @@ private void setup() throws IOException, ParseException { cdp = new CqlDelimParser(cqlSchema, delimiter, nullString, dateFormatString, boolStyle, locale, skipCols, session, true); - insert = cdp.generateInsert(); - statement = session.prepare(insert); + statement = session.prepare(cdp.generateInsert()); statement.setRetryPolicy(new LoaderRetryPolicy(numRetries)); statement.setConsistencyLevel(consistencyLevel); } diff --git a/src/main/java/com/datastax/loader/CqlDelimParser.java b/src/main/java/com/datastax/loader/CqlDelimParser.java index 0ecd850..40ae9ee 100644 --- a/src/main/java/com/datastax/loader/CqlDelimParser.java +++ b/src/main/java/com/datastax/loader/CqlDelimParser.java @@ -15,41 +15,43 @@ */ package com.datastax.loader; -import com.datastax.loader.parser.Parser; -import com.datastax.loader.parser.DelimParser; -import com.datastax.loader.parser.IntegerParser; -import com.datastax.loader.parser.LongParser; -import com.datastax.loader.parser.FloatParser; -import com.datastax.loader.parser.DoubleParser; -import com.datastax.loader.parser.StringParser; -import com.datastax.loader.parser.BooleanParser; -import com.datastax.loader.parser.UUIDParser; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; import com.datastax.loader.parser.BigDecimalParser; import com.datastax.loader.parser.BigIntegerParser; +import com.datastax.loader.parser.BooleanParser; import com.datastax.loader.parser.ByteBufferParser; -import com.datastax.loader.parser.InetAddressParser; import com.datastax.loader.parser.DateParser; +import com.datastax.loader.parser.DelimParser; +import com.datastax.loader.parser.DoubleParser; +import com.datastax.loader.parser.FloatParser; +import com.datastax.loader.parser.InetAddressParser; +import com.datastax.loader.parser.IntegerParser; import com.datastax.loader.parser.ListParser; -import com.datastax.loader.parser.SetParser; +import com.datastax.loader.parser.LongParser; import com.datastax.loader.parser.MapParser; +import com.datastax.loader.parser.Parser; +import com.datastax.loader.parser.SetParser; +import com.datastax.loader.parser.StringParser; +import com.datastax.loader.parser.UUIDParser; -import java.lang.String; -import java.lang.IndexOutOfBoundsException; -import java.lang.NumberFormatException; import java.text.ParseException; -import java.util.Map; -import java.util.List; -import java.util.HashMap; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Locale; -import java.util.regex.Pattern; +import java.util.Map; import java.util.regex.Matcher; +import java.util.regex.Pattern; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.exceptions.InvalidTypeException; -import com.datastax.driver.core.DataType; +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; public class CqlDelimParser { private Map pmap; @@ -126,10 +128,15 @@ private void processCqlSchema(String cqlSchema, Session session) throws ParseExc } private List schemaBits(String in, Session session) throws ParseException { - String query = "SELECT " + in + " FROM " + keyspace + "." + tablename + " LIMIT 1"; - ColumnDefinitions cd = session.execute(query).getColumnDefinitions(); String[] inList = in.split(","); - List sbl = new ArrayList(); + Select.Selection columnsSelector = QueryBuilder.select(); + for(String col: inList) { + columnsSelector.column(quote(col.trim())); + } + Select columnsMetaStmt = columnsSelector.from(quote(keyspace), quote(tablename)).limit(1); + ColumnDefinitions cd = session.execute(columnsMetaStmt).getColumnDefinitions(); + + List sbl = new ArrayList(inList.length); for (int i = 0; i < inList.length; i++) { String col = inList[i].trim(); SchemaBits sb = new SchemaBits(); @@ -200,26 +207,22 @@ private void createDelimParser(String delimiter, String nullString, } // Convenience method to return the INSERT statement for a PreparedStatement. - public String generateInsert() { - String insert = "INSERT INTO " + keyspace + "." + tablename + "(" + sbl.get(0).name; - String qmarks = "?"; - for (int i = 1; i < sbl.size(); i++) { - insert = insert + ", " + sbl.get(i).name; - qmarks = qmarks + ", ?"; + public Insert generateInsert() { + Insert insertStmt = QueryBuilder.insertInto(quote(keyspace), quote(tablename)); + for(SchemaBits sb: sbl) { + insertStmt.value(quote(sb.name), bindMarker()); } - insert = insert + ") VALUES (" + qmarks + ")"; - return insert; + return insertStmt; } - public String generateSelect() { - String select = "SELECT " + sbl.get(0).name; - for (int i = 1; i < sbl.size(); i++) { - select = select + ", " + sbl.get(i).name; + public Select generateSelect() { + Select.Selection colSelector = QueryBuilder.select(); + for(SchemaBits sb: sbl) { + colSelector.column(quote(sb.name)); + } + return colSelector.from(quote(keyspace), quote(tablename)); } - select += " FROM " + keyspace + "." + tablename; - return select; - } - + public String getKeyspace() { return keyspace; } @@ -236,5 +239,11 @@ public List parse(String line) { public String format(Row row) throws IndexOutOfBoundsException, InvalidTypeException { return delimParser.format(row); } + + // when upgrading to Java driver 3.0.0 this method + // can be replaced by Metadata.quote + public static String quote(String identifier) { + return '"' + identifier.replace("\"", "") + '"'; + } } diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java b/src/main/java/com/datastax/loader/CqlDelimUnload.java index 897c937..60c889a 100644 --- a/src/main/java/com/datastax/loader/CqlDelimUnload.java +++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java @@ -15,63 +15,62 @@ */ package com.datastax.loader; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import com.datastax.driver.core.querybuilder.Select; import com.datastax.loader.parser.BooleanParser; -import java.lang.System; -import java.lang.String; -import java.lang.StringBuilder; -import java.lang.Integer; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.Set; -import java.util.HashSet; -import java.util.Deque; -import java.util.ArrayDeque; -import java.util.Locale; -import java.math.BigInteger; -import java.io.FileOutputStream; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; import java.io.BufferedOutputStream; -import java.io.PrintStream; -import java.io.File; import java.io.BufferedReader; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.InputStream; +import java.io.File; import java.io.FileInputStream; -import java.io.IOException; import java.io.FileNotFoundException; -import java.text.ParseException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.math.BigInteger; +import java.security.KeyManagementException; import java.security.KeyStore; -import java.security.SecureRandom; import java.security.KeyStoreException; -import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.SSLOptions; -import com.datastax.driver.core.policies.TokenAwarePolicy; -import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.gt; +import static com.datastax.driver.core.querybuilder.QueryBuilder.lte; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.driver.core.querybuilder.QueryBuilder.token; public class CqlDelimUnload { @@ -378,15 +377,12 @@ public boolean run(String[] args) executor.shutdown(); } else { - BigInteger begin = null; - BigInteger end = null; - BigInteger delta = null; List beginList = new ArrayList(); List endList = new ArrayList(); if (null != beginToken) { - begin = new BigInteger(beginToken); - end = new BigInteger(endToken); - delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads))); + BigInteger begin = new BigInteger(beginToken); + BigInteger end = new BigInteger(endToken); + BigInteger delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads))); for (int mype = 0; mype < numThreads; mype++) { if (mype < numThreads - 1) { beginList.add(begin.add(delta.multiply(new BigInteger(String.valueOf(mype)))).toString()); @@ -497,21 +493,12 @@ public Long call() throws IOException, ParseException { } private String getPartitionKey(CqlDelimParser cdp, Session tsession) { - String keyspace = cdp.getKeyspace(); - String table = cdp.getTable(); - if (keyspace.startsWith("\"") && keyspace.endsWith("\"")) - keyspace = keyspace.replaceAll("\"", ""); - else - keyspace = keyspace.toLowerCase(); - if (table.startsWith("\"") && table.endsWith("\"")) - table = table.replaceAll("\"", ""); - else - table = table.toLowerCase(); - String query = "SELECT column_name, component_index, type " - + "FROM system.schema_columns WHERE keyspace_name = '" - + keyspace + "' AND columnfamily_name = '" - + table + "'"; - List rows = tsession.execute(query).all(); + String keyspace = cdp.getKeyspace().replace("\"", ""); + String table = cdp.getTable().replace("\"", ""); + List rows = tsession.execute(select("column_name", "component_index", "type") + .from("system", "schema_columns") + .where(eq("keyspace_name", keyspace)) + .and(eq("columnfamily_name", table))).all(); if (rows.isEmpty()) { System.err.println("Can't find the keyspace/table"); // error @@ -547,14 +534,13 @@ private void setup() throws IOException, ParseException { cdp = new CqlDelimParser(cqlSchema, delimiter, nullString, dateFormatString, boolStyle, locale, null, session, false); - String select = cdp.generateSelect(); - String partitionKey = getPartitionKey(cdp, session); + Select selectStmt = cdp.generateSelect(); if (null != beginToken) { - select = select + " WHERE Token(" + partitionKey + ") > " - + beginToken + " AND Token(" + partitionKey + ") <= " - + endToken; + String partitionKey = getPartitionKey(cdp, session); + selectStmt.where(gt(token(partitionKey), beginToken)) + .and(lte(token(partitionKey), endToken)); } - statement = session.prepare(select); + statement = session.prepare(selectStmt); statement.setConsistencyLevel(consistencyLevel); }