Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

treat schema parameter as case sensitive #32

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ the command line. The format is one switch and option per line:
-host 1.2.3.4
```

### Case sensitivity

Keyspace, table, column identifiers are case insensitive in Apache Cassandra. Anyways the
`schema` parameter is treated internally as case sensitive and the queries used to load/unload
data are quoting all identifiers to preserve the case sensitivity.

## Usage Statement:
```
Usage: -f <filename> -host <ipaddress> -schema <schema> [OPTIONS]
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/datastax/loader/CqlDelimLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class CqlDelimLoadTask implements Callable<Long> {
private String BADINSERT = ".BADINSERT";
private String LOG = ".LOG";
private Session session;
private String insert;
private PreparedStatement statement;
private ConsistencyLevel consistencyLevel;
private CqlDelimParser cdp;
Expand Down Expand Up @@ -168,8 +167,7 @@ private void setup() throws IOException, ParseException {
cdp = new CqlDelimParser(cqlSchema, delimiter, nullString,
dateFormatString, boolStyle, locale,
skipCols, session, true);
insert = cdp.generateInsert();
statement = session.prepare(insert);
statement = session.prepare(cdp.generateInsert());
statement.setRetryPolicy(new LoaderRetryPolicy(numRetries));
statement.setConsistencyLevel(consistencyLevel);
}
Expand Down
93 changes: 51 additions & 42 deletions src/main/java/com/datastax/loader/CqlDelimParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,43 @@
*/
package com.datastax.loader;

import com.datastax.loader.parser.Parser;
import com.datastax.loader.parser.DelimParser;
import com.datastax.loader.parser.IntegerParser;
import com.datastax.loader.parser.LongParser;
import com.datastax.loader.parser.FloatParser;
import com.datastax.loader.parser.DoubleParser;
import com.datastax.loader.parser.StringParser;
import com.datastax.loader.parser.BooleanParser;
import com.datastax.loader.parser.UUIDParser;

import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.loader.parser.BigDecimalParser;
import com.datastax.loader.parser.BigIntegerParser;
import com.datastax.loader.parser.BooleanParser;
import com.datastax.loader.parser.ByteBufferParser;
import com.datastax.loader.parser.InetAddressParser;
import com.datastax.loader.parser.DateParser;
import com.datastax.loader.parser.DelimParser;
import com.datastax.loader.parser.DoubleParser;
import com.datastax.loader.parser.FloatParser;
import com.datastax.loader.parser.InetAddressParser;
import com.datastax.loader.parser.IntegerParser;
import com.datastax.loader.parser.ListParser;
import com.datastax.loader.parser.SetParser;
import com.datastax.loader.parser.LongParser;
import com.datastax.loader.parser.MapParser;
import com.datastax.loader.parser.Parser;
import com.datastax.loader.parser.SetParser;
import com.datastax.loader.parser.StringParser;
import com.datastax.loader.parser.UUIDParser;

import java.lang.String;
import java.lang.IndexOutOfBoundsException;
import java.lang.NumberFormatException;
import java.text.ParseException;
import java.util.Map;
import java.util.List;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.DataType;
import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;

public class CqlDelimParser {
private Map<DataType.Name, Parser> pmap;
Expand Down Expand Up @@ -126,10 +128,15 @@ private void processCqlSchema(String cqlSchema, Session session) throws ParseExc
}

private List<SchemaBits> schemaBits(String in, Session session) throws ParseException {
String query = "SELECT " + in + " FROM " + keyspace + "." + tablename + " LIMIT 1";
ColumnDefinitions cd = session.execute(query).getColumnDefinitions();
String[] inList = in.split(",");
List<SchemaBits> sbl = new ArrayList<SchemaBits>();
Select.Selection columnsSelector = QueryBuilder.select();
for(String col: inList) {
columnsSelector.column(quote(col.trim()));
}
Select columnsMetaStmt = columnsSelector.from(quote(keyspace), quote(tablename)).limit(1);
ColumnDefinitions cd = session.execute(columnsMetaStmt).getColumnDefinitions();

List<SchemaBits> sbl = new ArrayList<SchemaBits>(inList.length);
for (int i = 0; i < inList.length; i++) {
String col = inList[i].trim();
SchemaBits sb = new SchemaBits();
Expand Down Expand Up @@ -200,26 +207,22 @@ private void createDelimParser(String delimiter, String nullString,
}

// Convenience method to return the INSERT statement for a PreparedStatement.
public String generateInsert() {
String insert = "INSERT INTO " + keyspace + "." + tablename + "(" + sbl.get(0).name;
String qmarks = "?";
for (int i = 1; i < sbl.size(); i++) {
insert = insert + ", " + sbl.get(i).name;
qmarks = qmarks + ", ?";
public Insert generateInsert() {
Insert insertStmt = QueryBuilder.insertInto(quote(keyspace), quote(tablename));
for(SchemaBits sb: sbl) {
insertStmt.value(quote(sb.name), bindMarker());
}
insert = insert + ") VALUES (" + qmarks + ")";
return insert;
return insertStmt;
}

public String generateSelect() {
String select = "SELECT " + sbl.get(0).name;
for (int i = 1; i < sbl.size(); i++) {
select = select + ", " + sbl.get(i).name;
public Select generateSelect() {
Select.Selection colSelector = QueryBuilder.select();
for(SchemaBits sb: sbl) {
colSelector.column(quote(sb.name));
}
return colSelector.from(quote(keyspace), quote(tablename));
}
select += " FROM " + keyspace + "." + tablename;
return select;
}


public String getKeyspace() {
return keyspace;
}
Expand All @@ -236,5 +239,11 @@ public List<Object> parse(String line) {
public String format(Row row) throws IndexOutOfBoundsException, InvalidTypeException {
return delimParser.format(row);
}

// when upgrading to Java driver 3.0.0 this method
// can be replaced by Metadata.quote
public static String quote(String identifier) {
return '"' + identifier.replace("\"", "") + '"';
}
}

130 changes: 58 additions & 72 deletions src/main/java/com/datastax/loader/CqlDelimUnload.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,62 @@
*/
package com.datastax.loader;


import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.loader.parser.BooleanParser;

import java.lang.System;
import java.lang.String;
import java.lang.StringBuilder;
import java.lang.Integer;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.HashSet;
import java.util.Deque;
import java.util.ArrayDeque;
import java.util.Locale;
import java.math.BigInteger;
import java.io.FileOutputStream;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedOutputStream;
import java.io.PrintStream;
import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.InputStreamReader;
import java.io.InputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.text.ParseException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.math.BigInteger;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.security.KeyStoreException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.gt;
import static com.datastax.driver.core.querybuilder.QueryBuilder.lte;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.token;


public class CqlDelimUnload {
Expand Down Expand Up @@ -378,15 +377,12 @@ public boolean run(String[] args)
executor.shutdown();
}
else {
BigInteger begin = null;
BigInteger end = null;
BigInteger delta = null;
List<String> beginList = new ArrayList<String>();
List<String> endList = new ArrayList<String>();
if (null != beginToken) {
begin = new BigInteger(beginToken);
end = new BigInteger(endToken);
delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads)));
BigInteger begin = new BigInteger(beginToken);
BigInteger end = new BigInteger(endToken);
BigInteger delta = end.subtract(begin).divide(new BigInteger(String.valueOf(numThreads)));
for (int mype = 0; mype < numThreads; mype++) {
if (mype < numThreads - 1) {
beginList.add(begin.add(delta.multiply(new BigInteger(String.valueOf(mype)))).toString());
Expand Down Expand Up @@ -497,21 +493,12 @@ public Long call() throws IOException, ParseException {
}

private String getPartitionKey(CqlDelimParser cdp, Session tsession) {
String keyspace = cdp.getKeyspace();
String table = cdp.getTable();
if (keyspace.startsWith("\"") && keyspace.endsWith("\""))
keyspace = keyspace.replaceAll("\"", "");
else
keyspace = keyspace.toLowerCase();
if (table.startsWith("\"") && table.endsWith("\""))
table = table.replaceAll("\"", "");
else
table = table.toLowerCase();
String query = "SELECT column_name, component_index, type "
+ "FROM system.schema_columns WHERE keyspace_name = '"
+ keyspace + "' AND columnfamily_name = '"
+ table + "'";
List<Row> rows = tsession.execute(query).all();
String keyspace = cdp.getKeyspace().replace("\"", "");
String table = cdp.getTable().replace("\"", "");
List<Row> rows = tsession.execute(select("column_name", "component_index", "type")
.from("system", "schema_columns")
.where(eq("keyspace_name", keyspace))
.and(eq("columnfamily_name", table))).all();
if (rows.isEmpty()) {
System.err.println("Can't find the keyspace/table");
// error
Expand Down Expand Up @@ -547,14 +534,13 @@ private void setup() throws IOException, ParseException {
cdp = new CqlDelimParser(cqlSchema, delimiter, nullString,
dateFormatString,
boolStyle, locale, null, session, false);
String select = cdp.generateSelect();
String partitionKey = getPartitionKey(cdp, session);
Select selectStmt = cdp.generateSelect();
if (null != beginToken) {
select = select + " WHERE Token(" + partitionKey + ") > "
+ beginToken + " AND Token(" + partitionKey + ") <= "
+ endToken;
String partitionKey = getPartitionKey(cdp, session);
selectStmt.where(gt(token(partitionKey), beginToken))
.and(lte(token(partitionKey), endToken));
}
statement = session.prepare(select);
statement = session.prepare(selectStmt);
statement.setConsistencyLevel(consistencyLevel);
}

Expand Down