Skip to content

Commit

Permalink
Merge pull request #155 from datastax/feature/no-ttl-and-writetime
Browse files Browse the repository at this point in the history
CDM-62 handle no TTL & Writetime graciously
  • Loading branch information
pravinbhat authored May 25, 2023
2 parents ec6041c + 6f3a390 commit 031a5d7
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>3.4.4</revision>
<revision>3.4.5</revision>
<scala.version>2.12.17</scala.version>
<scala.main.version>2.12</scala.main.version>
<spark.version>3.3.1</spark.version>
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/datastax/astra/migrate/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class CopyJobSession extends AbstractJobSession {

protected CopyJobSession(CqlSession sourceSession, CqlSession astraSession, SparkConf sc) {
super(sourceSession, astraSession, sc);
handleNoTtlWriteTime(Boolean.parseBoolean(Util.getSparkPropOr(sc, "spark.migrate.disableTtlAndWriteTime", "false")));

filterData = Boolean.parseBoolean(sc.get("spark.origin.FilterData", "false"));
filterColName = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumn");
filterColType = Util.getSparkPropOrEmpty(sc, "spark.origin.FilterColumnType");
Expand All @@ -42,6 +44,18 @@ public static CopyJobSession getInstance(CqlSession sourceSession, CqlSession as
return copyJobSession;
}

private void handleNoTtlWriteTime(boolean disableTtlAndWriteTime) {
if (ttlWTCols.isEmpty()) {
if (disableTtlAndWriteTime) {
logger.warn("***** NO NON-FROZEN NON-PRIMARY-KEY COLUMNS FOUND TO DERIVE TTL AND WRITETIME! PROCEEDING WITHOUT MIGRATING TTL AND WRITETIME!! *****");
} else {
logger.error("***** NO NON-FROZEN NON-PRIMARY-KEY COLUMNS FOUND TO DERIVE TTL AND WRITETIME! EXITING JOB!! *****");
logger.error("***** IF YOU WANT TO PROCEED WITH MIGRATION WITHOUT USING TTL AND WRITETIMES FROM ORIGIN, RERUN THE JOB USING OPTION --conf spark.migrate.disableTtlAndWriteTime=true !! *****");
System.exit(-1);
}
}
}

public void getDataAndInsert(BigInteger min, BigInteger max) {
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
boolean done = false;
Expand Down Expand Up @@ -69,7 +83,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
}

if (filterData) {
String col = (String) getData(new TypeInfo(filterColType), filterColIndex, sourceRow);
TypeInfo typeInfo = tableInfo.getColumns().get(filterColIndex).getTypeInfo();
String col = (String) getData(typeInfo, filterColIndex, sourceRow);
if (col.trim().equalsIgnoreCase(filterColValue)) {
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow, tableInfo));
skipCnt++;
Expand Down Expand Up @@ -118,7 +133,8 @@ public void getDataAndInsert(BigInteger min, BigInteger max) {
}

if (filterData) {
String colValue = (String) getData(new TypeInfo(filterColType), filterColIndex, sourceRow);
TypeInfo typeInfo = tableInfo.getColumns().get(filterColIndex).getTypeInfo();
String colValue = (String) getData(typeInfo, filterColIndex, sourceRow);
if (colValue.trim().equalsIgnoreCase(filterColValue)) {
logger.warn("Skipping row and filtering out: {}", getKey(sourceRow, tableInfo));
skipCnt++;
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/datastax/astra/migrate/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class DiffJobSession extends CopyJobSession {
public class DiffJobSession extends AbstractJobSession {

private static DiffJobSession diffJobSession;
private final AtomicLong mismatchCounter = new AtomicLong(0);
Expand Down Expand Up @@ -111,9 +111,6 @@ private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetR
} catch (ExecutionException | InterruptedException e) {
logger.error("Could not perform diff for Key: {}", getKey(srcRow, tableInfo), e);
throw new RuntimeException(e);
} catch (Exception ee) {
logger.error("Could not perform diff for Key: {}", getKey(srcRow, tableInfo), ee);
throw new RuntimeException(ee);
}
}
srcToTargetRowMap.clear();
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/datastax/astra/migrate/schema/ColumnInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.Data;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -33,8 +32,8 @@ public class ColumnInfo {
if (typeInfo.getTypeClass().equals(UdtValue.class)) {
isUDT = true;
}
if (cm.getType().toString().toLowerCase(Locale.ROOT).contains(", frozen")) {
isFrozen = true;
}

isFrozen = typeInfo.isFrozen();
}

}
109 changes: 53 additions & 56 deletions src/main/java/datastax/astra/migrate/schema/TypeInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.data.UdtValue;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.*;
import lombok.Data;

import java.math.BigDecimal;
Expand All @@ -15,75 +15,72 @@

@Data
public class TypeInfo {
private static Map<String, Class> typeMap = loadTypeMap();
private static Map<DataType, Class> cqlToJavaTypeMap = loadCqlToJavaTypeMap();
private Class typeClass = Object.class;
private List<Class> subTypes = new ArrayList<Class>();
private boolean isCounter = false;
private boolean isFrozen = false;

public TypeInfo(DataType dataType) {
this(dataType.toString());
}

public TypeInfo(String dataTypeStr) {
int sIdx = dataTypeStr.indexOf('(');
int eIdx = -1;
if (sIdx != -1) {
eIdx = dataTypeStr.substring(sIdx + 1).indexOf('(');
if (eIdx == -1) {
eIdx = dataTypeStr.substring(sIdx + 1).indexOf(',');
}
eIdx += sIdx + 1;
}
typeClass = getDataTypeToType(dataType);

if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("list")) {
typeClass = List.class;
subTypes.add(typeMap.get(dataTypeStr.substring(sIdx + 1, eIdx).toLowerCase(Locale.ROOT)));
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("set")) {
typeClass = Set.class;
subTypes.add(typeMap.get(dataTypeStr.substring(sIdx + 1, eIdx).toLowerCase(Locale.ROOT)));
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("map")) {
typeClass = Map.class;
subTypes.add(typeMap.get(dataTypeStr.substring(sIdx + 1, dataTypeStr.indexOf("=>")).trim().toLowerCase(Locale.ROOT)));
subTypes.add(typeMap.get(dataTypeStr.substring(dataTypeStr.indexOf("=>") + 2, dataTypeStr.indexOf(',')).trim().toLowerCase(Locale.ROOT)));
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("udt")) {
typeClass = UdtValue.class;
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("tuple")) {
typeClass = TupleValue.class;
} else if (dataTypeStr.toLowerCase(Locale.ROOT).startsWith("counter")) {
typeClass = Long.class;
if (dataType instanceof UserDefinedType) {
isFrozen = ((UserDefinedType) dataType).isFrozen();
} else if (dataType instanceof ListType) {
subTypes.add(getDataTypeToType(((ListType) dataType).getElementType()));
isFrozen = ((ListType) dataType).isFrozen();
} else if (dataType instanceof SetType) {
subTypes.add(getDataTypeToType(((SetType) dataType).getElementType()));
isFrozen = ((SetType) dataType).isFrozen();
} else if (dataType instanceof MapType) {
subTypes.add(getDataTypeToType(((MapType) dataType).getKeyType()));
subTypes.add(getDataTypeToType(((MapType) dataType).getValueType()));
isFrozen = ((MapType) dataType).isFrozen();
} else if (DataTypes.COUNTER.equals(dataType)) {
isCounter = true;
} else {
typeClass = typeMap.get(dataTypeStr.toLowerCase(Locale.ROOT));
}
}

private static Map loadTypeMap() {
Map typeMap = new HashMap<>();
typeMap.put("ascii", String.class);
typeMap.put("bigint", Long.class);
typeMap.put("blob", ByteBuffer.class);
typeMap.put("boolean", Boolean.class);
typeMap.put("counter", Long.class);
typeMap.put("date", LocalDate.class);
typeMap.put("decimal", BigDecimal.class);
typeMap.put("double", Double.class);
typeMap.put("float", Float.class);
typeMap.put("int", Integer.class);
typeMap.put("inet", String.class);
typeMap.put("smallint", Short.class);
typeMap.put("text", String.class);
typeMap.put("time", LocalTime.class);
typeMap.put("timestamp", Instant.class);
typeMap.put("timeuuid", UUID.class);
typeMap.put("tinyint", Byte.class);
typeMap.put("udt", UdtValue.class);
typeMap.put("uuid", UUID.class);
typeMap.put("varchar", String.class);
typeMap.put("varint", BigInteger.class);
private static Map loadCqlToJavaTypeMap() {
Map<DataType, Class> typeMap = new HashMap<>();
typeMap.put(DataTypes.ASCII, String.class);
typeMap.put(DataTypes.BIGINT, Long.class);
typeMap.put(DataTypes.BLOB, ByteBuffer.class);
typeMap.put(DataTypes.BOOLEAN, Boolean.class);
typeMap.put(DataTypes.COUNTER, Long.class);
typeMap.put(DataTypes.DATE, LocalDate.class);
typeMap.put(DataTypes.DECIMAL, BigDecimal.class);
typeMap.put(DataTypes.DOUBLE, Double.class);
typeMap.put(DataTypes.FLOAT, Float.class);
typeMap.put(DataTypes.INT, Integer.class);
typeMap.put(DataTypes.INET, String.class);
typeMap.put(DataTypes.SMALLINT, Short.class);
typeMap.put(DataTypes.TEXT, String.class);
typeMap.put(DataTypes.TIME, LocalTime.class);
typeMap.put(DataTypes.TIMESTAMP, Instant.class);
typeMap.put(DataTypes.TIMEUUID, UUID.class);
typeMap.put(DataTypes.TINYINT, Byte.class);
typeMap.put(DataTypes.UUID, UUID.class);
typeMap.put(DataTypes.VARINT, BigInteger.class);

return typeMap;
}

private Class getDataTypeToType(DataType dataType) {
if (dataType instanceof UserDefinedType) return UdtValue.class;
else if (dataType instanceof ListType) {
return List.class;
} else if (dataType instanceof SetType) {
return Set.class;
} else if (dataType instanceof MapType) {
return Map.class;
} else if (dataType instanceof TupleType) {
return TupleValue.class;
}

return cqlToJavaTypeMap.get(dataType);
}

public String toString() {
return "Type: " + typeClass.toString() + " SubTypes: " + subTypes.toString();
}
Expand Down

0 comments on commit 031a5d7

Please sign in to comment.