From cd56b8b2b9fc1935ba1b5ca9c99b46b5a38eb34d Mon Sep 17 00:00:00 2001
From: Taher Lakdawala <78196491+taherkl@users.noreply.github.com>
Date: Wed, 5 Mar 2025 22:34:38 +0530
Subject: [PATCH 1/4] IT tests for Boundary values and extended map datatypes
---
v2/spanner-to-sourcedb/pom.xml | 6 +
.../v2/templates/codec/DurationCodec.java | 97 +++++
.../v2/templates/codec/package-info.java | 16 +
.../connection/CassandraConnectionHelper.java | 5 +
.../SpannerToCassandraSourceDbIT.java | 371 ++++++++++++++++++
.../CassandraConnectionHelperTest.java | 11 +-
.../cassandra-schema.sql | 41 ++
.../spanner-schema.sql | 41 ++
8 files changed, 587 insertions(+), 1 deletion(-)
create mode 100644 v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java
create mode 100644 v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java
diff --git a/v2/spanner-to-sourcedb/pom.xml b/v2/spanner-to-sourcedb/pom.xml
index 6e790693cb..aafa91ff6f 100644
--- a/v2/spanner-to-sourcedb/pom.xml
+++ b/v2/spanner-to-sourcedb/pom.xml
@@ -117,6 +117,12 @@
+
+ org.assertj
+ assertj-core
+ 3.20.2
+ test
+
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java
new file mode 100644
index 0000000000..0c524fac7c
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.codec;
+
+import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.data.CqlDuration;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.reflect.GenericType;
+import com.datastax.oss.driver.internal.core.type.codec.CqlDurationCodec;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import net.jcip.annotations.ThreadSafe;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+@ThreadSafe
+public class DurationCodec implements TypeCodec {
+
+ private final TypeCodec innerCodec = new CqlDurationCodec();
+
+ public DurationCodec() {}
+
+ @Override
+ public @NotNull GenericType getJavaType() {
+ return GenericType.DURATION;
+ }
+
+ @Override
+ public @NotNull DataType getCqlType() {
+ return DataTypes.DURATION;
+ }
+
+ @Override
+ public @Nullable ByteBuffer encode(
+ @Nullable Duration value, @NotNull ProtocolVersion protocolVersion) {
+ if (value == null) {
+ return null;
+ }
+ CqlDuration cqlDuration =
+ CqlDuration.newInstance(
+ (int) (value.toDays() / 30), (int) (value.toDays() % 30), value.toNanosPart());
+ return innerCodec.encode(cqlDuration, protocolVersion);
+ }
+
+ @Override
+ public @Nullable Duration decode(
+ @Nullable ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
+ if (bytes == null || bytes.remaining() == 0) {
+ return null;
+ }
+ CqlDuration cqlDuration = innerCodec.decode(bytes, protocolVersion);
+ if (cqlDuration == null) {
+ return null;
+ }
+ return Duration.ofDays((long) cqlDuration.getMonths() * 30 + cqlDuration.getDays())
+ .plusNanos(cqlDuration.getNanoseconds());
+ }
+
+ @Override
+ public @Nullable Duration parse(@Nullable String value) {
+ if (value == null || value.equalsIgnoreCase("NULL")) {
+ return null;
+ }
+ CqlDuration cqlDuration = innerCodec.parse(value);
+ if (cqlDuration == null) {
+ return null;
+ }
+ return Duration.ofDays((long) cqlDuration.getMonths() * 30 + cqlDuration.getDays())
+ .plusNanos(cqlDuration.getNanoseconds());
+ }
+
+ @Override
+ public @NotNull String format(@Nullable Duration value) {
+ if (value == null) {
+ return "NULL";
+ }
+ CqlDuration cqlDuration =
+ CqlDuration.newInstance(
+ (int) (value.toDays() / 30), (int) (value.toDays() % 30), value.toNanosPart());
+ return innerCodec.format(cqlDuration);
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java
new file mode 100644
index 0000000000..dd48f9a117
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.codec;
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
index d039c9d9a9..8b78715d5a 100644
--- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
@@ -18,9 +18,11 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
+import com.google.cloud.teleport.v2.templates.codec.DurationCodec;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import java.util.List;
@@ -82,6 +84,9 @@ public synchronized void init(ConnectionHelperRequest connectionHelperRequest) {
CassandraShard cassandraShard = (CassandraShard) shard;
try {
CqlSession session = createCqlSession(cassandraShard);
+ MutableCodecRegistry registry =
+ (MutableCodecRegistry) session.getContext().getCodecRegistry();
+ registry.register(new DurationCodec());
String connectionKey = generateConnectionKey(cassandraShard);
connectionPoolMap.put(connectionKey, session);
LOG.info("Connection initialized for key: {}", connectionKey);
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
index f6bd755aa5..e722f356f4 100644
--- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
@@ -22,6 +22,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -33,21 +34,31 @@
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -76,6 +87,7 @@ public class SpannerToCassandraSourceDbIT extends SpannerToSourceDbITBase {
private static final String USER_TABLE_2 = "Users2";
private static final String ALL_DATA_TYPES_TABLE = "AllDatatypeColumns";
private static final String ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE = "AllDatatypeTransformation";
+ private static final String BOUNDARY_CONVERSION_TABLE = "BoundaryConversionTestTable";
private static final HashSet testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;
public static SpannerResourceManager spannerResourceManager;
@@ -248,6 +260,29 @@ public void spannerToCassandraSourceDataTypeStringConversionTest()
assertStringToActualRowsInCassandraDB();
}
+ /**
+ * Validates Boundary and Map Data Type Conversions from Spanner to Cassandra.
+ *
+ *
This test ensures that boundary values for various data types and their equivalent map data
+ * types are correctly converted and transferred from Spanner to Cassandra. It verifies that the
+ * string-based representations used in Spanner are accurately translated into their appropriate
+ * data types in Cassandra, maintaining data integrity and precision.
+ *
+ *