Skip to content

Commit

Permalink
Merge pull request #132 from datastax/feature/cdm-44-auto-discover-co…
Browse files Browse the repository at this point in the history
…unter-schema

Implemented auto-discovery of counter tables
  • Loading branch information
msmygit authored May 1, 2023
2 parents 1d00d88 + 26b3a7b commit 30d8495
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 65 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ grep "ERROR CopyJobSession: Error with PartitionRange" /path/to/logfile_name.txt
> A sample Guardrail properties file can be [found here](./src/resources/cdmGuardrail.properties)
# Features
- Auto-detects table schema (column names, types, id fields, collections, UDTs, etc.)
- Supports migration/validation of [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
- Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html)
- Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p)
- Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt))
- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or min/max token-range
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>3.4.0</revision>
<revision>3.4.1</revision>
<scala.version>2.12.17</scala.version>
<scala.main.version>2.12</scala.main.version>
<spark.version>3.3.1</spark.version>
Expand Down
58 changes: 25 additions & 33 deletions src/main/java/datastax/astra/migrate/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class AbstractJobSession extends BaseJobSession {
Expand Down Expand Up @@ -119,14 +121,8 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
if (null == insertCols || insertCols.trim().isEmpty()) {
insertCols = selectCols;
}
String insertBinds = "";
for (String idCol : tableInfo.getKeyColumns()) {
if (insertBinds.isEmpty()) {
insertBinds = idCol + "= ?";
} else {
insertBinds += " and " + idCol + "= ?";
}
}
String insertBinds = String.join(" and ",
tableInfo.getKeyColumns().stream().map(col -> col + " = ?").collect(Collectors.toList()));

String originSelectQry;
if (!isJobMigrateRowsFromFile) {
Expand All @@ -144,32 +140,22 @@ protected AbstractJobSession(CqlSession sourceSession, CqlSession astraSession,
astraSelectStatement = astraSession.prepare(targetSelectQry);

isCounterTable = tableInfo.isCounterTable();
String fullInsertQuery;
if (isCounterTable) {
String updateSelectMappingStr = Util.getSparkPropOr(sc, "spark.counterTable.cql.index", "0");
for (String updateSelectIndex : updateSelectMappingStr.split(",")) {
updateSelectMapping.add(Integer.parseInt(updateSelectIndex));
}

String counterTableUpdate = Util.getSparkProp(sc, "spark.counterTable.cql");
astraInsertStatement = astraSession.prepare(counterTableUpdate);
String fullInsertQuery = "update " + astraKeyspaceTable + " set (" + insertCols + ") VALUES (" + insertBinds + ")";
String updateCols = String.join(" , ",
tableInfo.getOtherColumns().stream().map(s -> s + " += ?").collect(Collectors.toList()));
String updateKeys = String.join(" and ",
tableInfo.getKeyColumns().stream().map(s -> s + " = ?").collect(Collectors.toList()));
fullInsertQuery = "update " + astraKeyspaceTable + " set " + updateCols + " where " + updateKeys;
} else {
insertBinds = "";
for (String str : insertCols.split(",")) {
if (insertBinds.isEmpty()) {
insertBinds += "?";
} else {
insertBinds += ", ?";
}
}

String fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
insertBinds = String.join(" , ", Arrays.stream(insertCols.split(",")).map(col -> " ?").collect(Collectors.toList()));
fullInsertQuery = "insert into " + astraKeyspaceTable + " (" + insertCols + ") VALUES (" + insertBinds + ")";
if (!ttlWTCols.isEmpty()) {
fullInsertQuery += " USING TTL ? AND TIMESTAMP ?";
}
logger.info("PARAM -- Target insert query: {}", fullInsertQuery);
astraInsertStatement = astraSession.prepare(fullInsertQuery);
}
logger.info("PARAM -- Target insert query: {}", fullInsertQuery);
astraInsertStatement = astraSession.prepare(fullInsertQuery);

// Handle rows with blank values for 'timestamp' data-type in primary-key fields
tsReplaceValStr = Util.getSparkPropOr(sc, "spark.target.replace.blankTimestampKeyUsingEpoch", "");
Expand All @@ -182,15 +168,21 @@ public BoundStatement bindInsert(PreparedStatement insertStatement, Row sourceRo
BoundStatement boundInsertStatement = insertStatement.bind().setConsistencyLevel(writeConsistencyLevel);

if (isCounterTable) {
for (int index = 0; index < tableInfo.getAllColumns().size(); index++) {
TypeInfo typeInfo = tableInfo.getColumns().get(index).getTypeInfo();
for (int index = 0; index < tableInfo.getNonKeyColumns().size(); index++) {
TypeInfo typeInfo = tableInfo.getNonKeyColumns().get(index).getTypeInfo();
int colIdx = tableInfo.getIdColumns().size() + index;
// compute the counter delta if reading from astra for the difference
if (astraRow != null && index < (tableInfo.getColumns().size() - tableInfo.getIdColumns().size())) {
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(updateSelectMapping.get(index)) - astraRow.getLong(updateSelectMapping.get(index))), Long.class);
if (astraRow != null) {
boundInsertStatement = boundInsertStatement.set(index, (sourceRow.getLong(colIdx) - astraRow.getLong(colIdx)), Long.class);
} else {
boundInsertStatement = boundInsertStatement.set(index, getData(typeInfo, updateSelectMapping.get(index), sourceRow), typeInfo.getTypeClass());
boundInsertStatement = boundInsertStatement.set(index, sourceRow.getLong(colIdx), Long.class);
}
}
for (int index = 0; index < tableInfo.getIdColumns().size(); index++) {
TypeInfo typeInfo = tableInfo.getIdColumns().get(index).getTypeInfo();
int colIdx = tableInfo.getNonKeyColumns().size() + index;
boundInsertStatement = boundInsertStatement.set(colIdx, getData(typeInfo, index, sourceRow), typeInfo.getTypeClass());
}
} else {
int index = 0;
for (index = 0; index < tableInfo.getAllColumns().size(); index++) {
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/datastax/astra/migrate/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.spark.SparkConf;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,9 +33,6 @@ public abstract class BaseJobSession {
protected RateLimiter writeLimiter;
protected Integer maxRetries = 10;
protected AtomicLong readCounter = new AtomicLong(0);

protected List<Integer> updateSelectMapping = new ArrayList<Integer>();

protected Integer batchSize = 1;
protected Integer fetchSizeInRows = 1000;
protected Integer printStatsAfter = 100000;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/datastax/astra/migrate/schema/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ public class TableInfo {
private List<ColumnInfo> idColumns;
private List<String> partitionKeyColumns;
private List<String> keyColumns;
private List<String> otherColumns;
private List<String> allColumns;
private List<String> ttlAndWriteTimeColumns;
private String desc;

@ToString.Include
private boolean isCounterTable = false;

protected TableInfo(CqlSession session, String keySpace, String table, String selectColsString) {
List<String> selectCols = selectColsString.isEmpty() ? Collections.emptyList() :
Arrays.asList(selectColsString.toLowerCase(Locale.ROOT).split(","));
TableMetadata tm = session.getMetadata().getKeyspace(keySpace).get().getTable(table).get();
desc = tm.describe(false);
if (desc.toLowerCase(Locale.ROOT).contains("counter")) {
isCounterTable = true;
}

partitionColumns = getPartitionKeyColumns(tm);
partitionKeyColumns = colInfoToList(partitionColumns);
Expand All @@ -43,9 +43,11 @@ protected TableInfo(CqlSession session, String keySpace, String table, String se
keyColumns = colInfoToList(idColumns);

nonKeyColumns = getNonKeyColumns(tm, keyColumns, selectCols);
otherColumns = colInfoToList(nonKeyColumns);
columns.addAll(idColumns);
columns.addAll(nonKeyColumns);
allColumns = colInfoToList(columns);
isCounterTable = isCounterTable(nonKeyColumns);

ttlAndWriteTimeColumns = loadTtlAndWriteTimeCols();
}
Expand Down Expand Up @@ -99,4 +101,8 @@ private List<ColumnInfo> getNonKeyColumns(TableMetadata tm, List keyColumnsNames
private List<String> colInfoToList(List<ColumnInfo> listColInfo) {
return listColInfo.stream().map(ColumnInfo::getColName).collect(Collectors.toList());
}

private boolean isCounterTable(List<ColumnInfo> nonKeyColumns) {
return nonKeyColumns.stream().filter(ci -> ci.getTypeInfo().isCounter()).findAny().isPresent();
}
}
4 changes: 4 additions & 0 deletions src/main/java/datastax/astra/migrate/schema/TypeInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class TypeInfo {
private static Map<String, Class> typeMap = loadTypeMap();
private Class typeClass = Object.class;
private List<Class> subTypes = new ArrayList<Class>();
private boolean isCounter = false;

public TypeInfo(DataType dataType) {
this(dataType.toString());
Expand Down Expand Up @@ -48,6 +49,9 @@ public TypeInfo(String dataTypeStr) {
typeClass = UdtValue.class;
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("tuple")) {
typeClass = TupleValue.class;
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("counter")) {
typeClass = Long.class;
isCounter = true;
} else {
typeClass = typeMap.get(dataTypeStr.toLowerCase(Locale.ROOT));
}
Expand Down
22 changes: 0 additions & 22 deletions src/resources/cdm.properties
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,6 @@ spark.batchSize 10
# ENABLE ONLY IF YOU WANT TO MIGRATE/VALIDATE ROWS BASED ON A VALID CQL FILTER
#spark.query.condition

# ENABLE ONLY IF IT IS A COUNTER TABLE
#spark.counterTable false
#spark.counterTable.cql
#spark.counterTable.cql.index 0

############################### EXAMPLE MAPPING USING A DEMO counter column TABLE ###########################
# CREATE TABLE cycling.cyclist_count (
# pk1 uuid,
# pk2 date,
# cc1 boolean,
# c1 counter,
# PRIMARY KEY((pk1,pk2),cc1)
# );
# then, our counter table mapping would look like below,
# spark.counterTable true
# spark.counterTable.cql UPDATE cycling.cyclist_count SET c1 += ? WHERE pk1 = ? AND pk2 = ? AND cc1 = ?
# spark.counterTable.cql.index 3,0,1,2
#
# Remember the above count index order is based on the below column mapping ordering,
# spark.query.origin pk1,pk2,cc1,c
#############################################################################################################

# ENABLE ONLY IF YOU WANT TO FILTER BASED ON WRITE-TIME (values must be in microseconds)
#spark.origin.writeTimeStampFilter false
#spark.origin.minWriteTimeStampFilter 0
Expand Down

0 comments on commit 30d8495

Please sign in to comment.