Skip to content

Commit

Permalink
Merge pull request #152 from datastax/diffjobfromfile
Browse files Browse the repository at this point in the history
Re-run DiffData job from a file containing partition ranges
  • Loading branch information
mfmaher2 authored May 12, 2023
2 parents bdec1b1 + 34d4381 commit ec6041c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ target/
dependency-reduced-pom.xml
.idea/*
cassandra-data-migrator.iml
*/DS_Store
*.DS_Store
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ tar -xvzf spark-3.3.1-bin-hadoop3.tgz
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--master "local[*]" /
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
--class datastax.astra.migrate.Migrate cassandra-data-migrator-3.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

Note:
Expand All @@ -54,7 +54,7 @@ Note:
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--master "local[*]" /
--class datastax.astra.migrate.DiffData cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
--class datastax.astra.migrate.DiffData cassandra-data-migrator-3.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

- Validation job will report differences as “ERRORS” in the log file as shown below
Expand Down Expand Up @@ -85,7 +85,7 @@ Note:
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--master "local[*]" /
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name.txt
--class datastax.astra.migrate.MigratePartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range
Expand All @@ -103,7 +103,23 @@ This mode is specifically useful to processes a subset of partition-ranges that
```
grep "ERROR CopyJobSession: Error with PartitionRange" /path/to/logfile_name.txt | awk '{print $13","$15}' > partitions.csv
```
# Data validation for specific partition ranges
- You can also use the tool to validate data for a specific partition ranges using class option `--class datastax.astra.migrate.DiffPartitionsFromFile` as shown below,
```
./spark-submit --properties-file cdm.properties /
--conf spark.origin.keyspaceTable="<keyspace-name>.<table-name>" /
--master "local[*]" /
--class datastax.astra.migrate.DiffPartitionsFromFile cassandra-data-migrator-3.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
```

When running in above mode the tool assumes a `partitions.csv` file to be present in the current folder in the below format, where each line (`min,max`) represents a partition-range,
```
-507900353496146534,-107285462027022883
-506781526266485690,1506166634797362039
2637884402540451982,4638499294009575633
798869613692279889,8699484505161403540
```
This mode is specifically useful to processes a subset of partition-ranges that may have failed during a previous run.

# Perform large-field Guardrail violation checks
- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field) `--class datastax.astra.migrate.Guardrail` as shown below
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<revision>3.4.2</revision>
<revision>3.4.4</revision>
<scala.version>2.12.17</scala.version>
<scala.main.version>2.12</scala.main.version>
<spark.version>3.3.1</spark.version>
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/datastax/astra/migrate/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -107,8 +108,12 @@ private void diffAndClear(Map<Row, CompletionStage<AsyncResultSet>> srcToTargetR
try {
Row targetRow = srcToTargetRowMap.get(srcRow).toCompletableFuture().get().one();
diff(srcRow, targetRow);
} catch (Exception e) {
} catch (ExecutionException | InterruptedException e) {
logger.error("Could not perform diff for Key: {}", getKey(srcRow, tableInfo), e);
throw new RuntimeException(e);
} catch (Exception ee) {
logger.error("Could not perform diff for Key: {}", getKey(srcRow, tableInfo), ee);
throw new RuntimeException(ee);
}
}
srcToTargetRowMap.clear();
Expand Down
34 changes: 34 additions & 0 deletions src/main/scala/datastax/astra/migrate/DiffPartitionsFromFile.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datastax.astra.migrate

import com.datastax.spark.connector.cql.CassandraConnector
import org.slf4j.LoggerFactory

import org.apache.spark.SparkConf
import scala.collection.JavaConversions._

object DiffPartitionsFromFile extends AbstractJob {

val logger = LoggerFactory.getLogger(this.getClass.getName)
logger.info("Started Data Validation App based on the partitions from partitions.csv file")

diffTable(sourceConnection, destinationConnection, sc)

exitSpark

private def diffTable(sourceConnection: CassandraConnector, destinationConnection: CassandraConnector, config: SparkConf) = {
val partitions = SplitPartitions.getSubPartitionsFromFile(numSplits)
logger.info("PARAM Calculated -- Total Partitions: " + partitions.size())
val parts = sContext.parallelize(partitions.toSeq, partitions.size);
logger.info("Spark parallelize created : " + parts.count() + " parts!");

parts.foreach(part => {
sourceConnection.withSessionDo(sourceSession =>
destinationConnection.withSessionDo(destinationSession =>
DiffJobSession.getInstance(sourceSession, destinationSession, config)
.getDataAndDiff(part.getMin, part.getMax)))
})

DiffJobSession.getInstance(null, null, config).printCounts(true);
}

}

0 comments on commit ec6041c

Please sign in to comment.