From 45998e9af9a97943b6649ba2de9cc85a6aa06684 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 21 Oct 2024 16:51:15 -0400 Subject: [PATCH 1/2] Make trackRun feature work on all versions of C*, improved docs & optimized imports. --- README.md | 18 +++++++- RELEASE.md | 4 ++ .../TargetUpsertRunDetailsStatement.java | 25 ++++++++--- .../cql/statement/TargetUpsertStatement.java | 6 ++- .../com/datastax/cdm/data/CqlConversion.java | 9 +++- .../java/com/datastax/cdm/data/CqlData.java | 15 ++++++- .../com/datastax/cdm/data/DataUtility.java | 7 ++- .../com/datastax/cdm/data/EnhancedPK.java | 6 ++- .../java/com/datastax/cdm/data/PKFactory.java | 11 ++++- .../com/datastax/cdm/feature/ExplodeMap.java | 7 ++- .../datastax/cdm/feature/WritetimeTTL.java | 7 ++- .../cdm/properties/PropertyHelper.java | 5 ++- .../TargetUpsertRunDetailsStatementTest.java | 45 +++++++++++++++---- 13 files changed, 136 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 6020bfa0..b0949f2d 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,7 @@ spark-submit --properties-file cdm.properties \ - If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run - Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p) - Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt)) -- Filter records from `Origin` using `writetimes` and/or CQL conditions and/or a list of token-ranges +- Filter records from `Origin` using `writetime` and/or CQL conditions and/or a list of token-ranges - Perform guardrail checks (identify large fields) - Supports adding `constants` as new columns on `Target` - Supports expanding `Map` columns on `Origin` into multiple records on `Target` @@ -140,7 +140,7 @@ spark-submit --properties-file cdm.properties \ - Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) - Supports migration/validation from and to [Azure Cosmos Cassandra](https://learn.microsoft.com/en-us/azure/cosmos-db/cassandra) - Validate migration accuracy and performance using a smaller randomized data-set -- Supports adding custom fixed `writetime` +- Supports adding custom fixed `writetime` and/or `ttl` - Track run information (start-time, end-time, status, etc.) in tables (`cdm_run_info` and `cdm_run_details`) on the target keyspace # Things to know @@ -152,6 +152,20 @@ spark-submit --properties-file cdm.properties \ - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. +# Performance FAQ +- Below recommendations may only be needed while migrating large tables where the default performance is not good enough. +- Performance bottleneck are usually the result of + - Low resource availability on `Origin` OR `Target` cluster + - Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) + - Bad schema design which could be cause by Out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count +- Incorrect configuration of below properties + - `numParts`: Default is 5K, but ideal value is usually around table-size/10MB. + - `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+). + - `fetchSizeInRows`: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB). + - `ratelimit`: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle. +- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performance +- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` cluster. +- Note: For additional performance tuning, refer to details mentioned in [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties) # Building Jar for local development 1. Clone this repo diff --git a/RELEASE.md b/RELEASE.md index d4caca1b..596efd99 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,8 @@ # Release Notes +## [4.6.1] - 2024-10-22 +- Make `trackRun` feature work on all versions of Cassandra/DSE by replacing the `IN` clause on `cdm_run_details` table. +- Updated `README` docs. + ## [4.6.0] - 2024-10-18 - Allow using Collections and/or UDTs for `ttl` & `writetime` calculations. This is specifically helpful in scenarios where the only non-key columns are Collections and/or UDTs. diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 639a1333..54fa44ab 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,20 +85,19 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) boundSelectInfoStatement = bindStatement( "SELECT status FROM " + cdmKsTabInfo + " WHERE table_name = ? AND run_id = ?"); boundSelectStatement = bindStatement("SELECT token_min, token_max FROM " + cdmKsTabDetails - + " WHERE table_name = ? AND run_id = ? and status in ('NOT_STARTED', 'STARTED', 'FAIL', 'DIFF') ALLOW FILTERING"); + + " WHERE table_name = ? AND run_id = ? AND status = ? ALLOW FILTERING"); } public Collection getPendingPartitions(long prevRunId) throws RunNotStartedException { - final Collection pendingParts = new ArrayList(); if (prevRunId == 0) { - return pendingParts; + return Collections.emptyList(); } ResultSet rsInfo = session .execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId)); Row cdmRunStatus = rsInfo.one(); if (cdmRunStatus == null) { - return pendingParts; + return Collections.emptyList(); } else { String status = cdmRunStatus.getString("status"); if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) { @@ -105,14 +105,25 @@ public Collection getPendingPartitions(long prevRunId } } - ResultSet rs = session - .execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", prevRunId)); + final Collection pendingParts = new ArrayList(); + pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.NOT_STARTED.toString())); + pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.STARTED.toString())); + pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.FAIL.toString())); + pendingParts.addAll(getPartitionsByStatus(prevRunId, TrackRun.RUN_STATUS.DIFF.toString())); + + return pendingParts; + } + + protected Collection getPartitionsByStatus(long prevRunId, String status) { + ResultSet rs = session.execute(boundSelectStatement.setString("table_name", tableName) + .setLong("run_id", prevRunId).setString("status", status)); + + final Collection pendingParts = new ArrayList(); rs.forEach(row -> { Partition part = new Partition(BigInteger.valueOf(row.getLong("token_min")), BigInteger.valueOf(row.getLong("token_max"))); pendingParts.add(part); }); - return pendingParts; } diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java index dcbeca8f..c212cafd 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertStatement.java @@ -29,7 +29,11 @@ import com.datastax.cdm.feature.WritetimeTTL; import com.datastax.cdm.properties.IPropertyHelper; import com.datastax.cdm.properties.KnownProperties; -import com.datastax.oss.driver.api.core.cql.*; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.type.DataType; public abstract class TargetUpsertStatement extends BaseCdmStatement { diff --git a/src/main/java/com/datastax/cdm/data/CqlConversion.java b/src/main/java/com/datastax/cdm/data/CqlConversion.java index 46e4c125..98ea9eb1 100644 --- a/src/main/java/com/datastax/cdm/data/CqlConversion.java +++ b/src/main/java/com/datastax/cdm/data/CqlConversion.java @@ -16,7 +16,11 @@ package com.datastax.cdm.data; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -25,7 +29,8 @@ import com.datastax.cdm.schema.CqlTable; import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.data.UdtValue; -import com.datastax.oss.driver.api.core.type.*; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.UserDefinedType; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; diff --git a/src/main/java/com/datastax/cdm/data/CqlData.java b/src/main/java/com/datastax/cdm/data/CqlData.java index ee651561..4cf4b60a 100644 --- a/src/main/java/com/datastax/cdm/data/CqlData.java +++ b/src/main/java/com/datastax/cdm/data/CqlData.java @@ -15,11 +15,22 @@ */ package com.datastax.cdm.data; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.datastax.dse.driver.api.core.type.DseDataTypes; import com.datastax.oss.driver.api.core.data.UdtValue; -import com.datastax.oss.driver.api.core.type.*; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import com.datastax.oss.driver.api.core.type.TupleType; +import com.datastax.oss.driver.api.core.type.UserDefinedType; +import com.datastax.oss.driver.api.core.type.VectorType; public class CqlData { public enum Type { diff --git a/src/main/java/com/datastax/cdm/data/DataUtility.java b/src/main/java/com/datastax/cdm/data/DataUtility.java index 2df21b7a..0e5708b2 100644 --- a/src/main/java/com/datastax/cdm/data/DataUtility.java +++ b/src/main/java/com/datastax/cdm/data/DataUtility.java @@ -19,7 +19,12 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; diff --git a/src/main/java/com/datastax/cdm/data/EnhancedPK.java b/src/main/java/com/datastax/cdm/data/EnhancedPK.java index 6cdef29c..f5802382 100644 --- a/src/main/java/com/datastax/cdm/data/EnhancedPK.java +++ b/src/main/java/com/datastax/cdm/data/EnhancedPK.java @@ -15,7 +15,11 @@ */ package com.datastax.cdm.data; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import org.slf4j.Logger; diff --git a/src/main/java/com/datastax/cdm/data/PKFactory.java b/src/main/java/com/datastax/cdm/data/PKFactory.java index cc9fc693..ae931754 100644 --- a/src/main/java/com/datastax/cdm/data/PKFactory.java +++ b/src/main/java/com/datastax/cdm/data/PKFactory.java @@ -15,13 +15,20 @@ */ package com.datastax.cdm.data; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datastax.cdm.feature.*; +import com.datastax.cdm.feature.ConstantColumns; +import com.datastax.cdm.feature.ExplodeMap; +import com.datastax.cdm.feature.FeatureFactory; +import com.datastax.cdm.feature.Featureset; +import com.datastax.cdm.feature.WritetimeTTL; import com.datastax.cdm.properties.PropertyHelper; import com.datastax.cdm.schema.CqlTable; import com.datastax.oss.driver.api.core.cql.BoundStatement; diff --git a/src/main/java/com/datastax/cdm/feature/ExplodeMap.java b/src/main/java/com/datastax/cdm/feature/ExplodeMap.java index 6886c479..fed25817 100644 --- a/src/main/java/com/datastax/cdm/feature/ExplodeMap.java +++ b/src/main/java/com/datastax/cdm/feature/ExplodeMap.java @@ -15,7 +15,12 @@ */ package com.datastax.cdm.feature; -import java.util.*; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index 98089f86..c70cbd13 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -17,7 +17,12 @@ import java.math.BigInteger; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.stream.Collectors; import org.slf4j.Logger; diff --git a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java index 9f8f03fc..863ab0ad 100644 --- a/src/main/java/com/datastax/cdm/properties/PropertyHelper.java +++ b/src/main/java/com/datastax/cdm/properties/PropertyHelper.java @@ -15,7 +15,10 @@ */ package com.datastax.cdm.properties; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index d78e8aa3..4186abaa 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -19,17 +19,21 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.Collections; -import java.util.List; +import java.time.Duration; +import java.util.*; +import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.mockito.Mockito; import com.datastax.cdm.cql.CommonMocks; import com.datastax.cdm.job.RunNotStartedException; +import com.datastax.cdm.job.SplitPartitions; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; @@ -47,10 +51,10 @@ public class TargetUpsertRunDetailsStatementTest extends CommonMocks { ResultSet rs; @Mock - Row row; + Row row1, row2, row3; @Mock - BoundStatement bStatement; + BoundStatement boundStatement; TargetUpsertRunDetailsStatement targetUpsertRunDetailsStatement; @@ -59,15 +63,18 @@ public void setup() { // UPDATE is needed by counters, though the class should handle non-counter updates commonSetup(false, false, true); when(cqlSession.prepare(anyString())).thenReturn(preparedStatement); - when(preparedStatement.bind(any())).thenReturn(bStatement); - when(cqlSession.execute(bStatement)).thenReturn(rs); - when(rs.all()).thenReturn(List.of(row)); + when(preparedStatement.bind(any())).thenReturn(boundStatement); + when(boundStatement.setTimeout(any(Duration.class))).thenReturn(boundStatement); + when(boundStatement.setString(anyString(), anyString())).thenReturn(boundStatement); + when(boundStatement.setLong(anyString(), any(Long.class))).thenReturn(boundStatement); + when(cqlSession.execute(boundStatement)).thenReturn(rs); } @Test - public void init() throws RunNotStartedException { + public void getPendingPartitions_nothingPending() throws RunNotStartedException { targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(0)); + assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(1)); } @Test @@ -75,4 +82,26 @@ public void incorrectKsTable() throws RunNotStartedException { assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1")); } + @Test + public void getPartitionsByStatus() { + Iterator mockIterator = mock(Iterator.class); + when(rs.iterator()).thenReturn(mockIterator); + when(mockIterator.hasNext()).thenReturn(true, true, true, false); + when(row1.getLong("token_min")).thenReturn(101l); + when(row1.getLong("token_max")).thenReturn(200l); + when(row2.getLong("token_min")).thenReturn(201l); + when(row2.getLong("token_max")).thenReturn(300l); + when(row3.getLong("token_min")).thenReturn(301l); + when(row3.getLong("token_max")).thenReturn(400l); + when(mockIterator.next()).thenReturn(row1); + when(mockIterator.next()).thenReturn(row2); + when(mockIterator.next()).thenReturn(row3); + + targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1"); + Collection parts = targetUpsertRunDetailsStatement.getPartitionsByStatus(123l, + "RUNNING"); + + // This test is incorrect, but needs to be troubleshot & fixed. The actual code works, but the test does not + assertEquals(0, parts.size()); + } } From 1ed5b0af694e39f83b056dbfeda492a89a62404c Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 21 Oct 2024 23:08:00 -0400 Subject: [PATCH 2/2] Update README.md Fix typos Co-authored-by: Madhavan --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b0949f2d..b6b971c0 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,7 @@ spark-submit --properties-file cdm.properties \ - Performance bottleneck are usually the result of - Low resource availability on `Origin` OR `Target` cluster - Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) - - Bad schema design which could be cause by Out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count + - Bad schema design which could be caused by out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count. - Incorrect configuration of below properties - `numParts`: Default is 5K, but ideal value is usually around table-size/10MB. - `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).