diff --git a/v2/spanner-to-sourcedb/pom.xml b/v2/spanner-to-sourcedb/pom.xml
index 6e790693cb..a0ec5dd512 100644
--- a/v2/spanner-to-sourcedb/pom.xml
+++ b/v2/spanner-to-sourcedb/pom.xml
@@ -117,6 +117,12 @@
+
+ org.assertj
+ assertj-core
+ 3.20.2
+ test
+
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java
new file mode 100644
index 0000000000..0c524fac7c
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/DurationCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.codec;
+
+import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.data.CqlDuration;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.reflect.GenericType;
+import com.datastax.oss.driver.internal.core.type.codec.CqlDurationCodec;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import net.jcip.annotations.ThreadSafe;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+@ThreadSafe
+public class DurationCodec implements TypeCodec {
+
+ private final TypeCodec innerCodec = new CqlDurationCodec();
+
+ public DurationCodec() {}
+
+ @Override
+ public @NotNull GenericType getJavaType() {
+ return GenericType.DURATION;
+ }
+
+ @Override
+ public @NotNull DataType getCqlType() {
+ return DataTypes.DURATION;
+ }
+
+ @Override
+ public @Nullable ByteBuffer encode(
+ @Nullable Duration value, @NotNull ProtocolVersion protocolVersion) {
+ if (value == null) {
+ return null;
+ }
+ CqlDuration cqlDuration =
+ CqlDuration.newInstance(
+ (int) (value.toDays() / 30), (int) (value.toDays() % 30), value.toNanosPart());
+ return innerCodec.encode(cqlDuration, protocolVersion);
+ }
+
+ @Override
+ public @Nullable Duration decode(
+ @Nullable ByteBuffer bytes, @NotNull ProtocolVersion protocolVersion) {
+ if (bytes == null || bytes.remaining() == 0) {
+ return null;
+ }
+ CqlDuration cqlDuration = innerCodec.decode(bytes, protocolVersion);
+ if (cqlDuration == null) {
+ return null;
+ }
+ return Duration.ofDays((long) cqlDuration.getMonths() * 30 + cqlDuration.getDays())
+ .plusNanos(cqlDuration.getNanoseconds());
+ }
+
+ @Override
+ public @Nullable Duration parse(@Nullable String value) {
+ if (value == null || value.equalsIgnoreCase("NULL")) {
+ return null;
+ }
+ CqlDuration cqlDuration = innerCodec.parse(value);
+ if (cqlDuration == null) {
+ return null;
+ }
+ return Duration.ofDays((long) cqlDuration.getMonths() * 30 + cqlDuration.getDays())
+ .plusNanos(cqlDuration.getNanoseconds());
+ }
+
+ @Override
+ public @NotNull String format(@Nullable Duration value) {
+ if (value == null) {
+ return "NULL";
+ }
+ CqlDuration cqlDuration =
+ CqlDuration.newInstance(
+ (int) (value.toDays() / 30), (int) (value.toDays() % 30), value.toNanosPart());
+ return innerCodec.format(cqlDuration);
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java
new file mode 100644
index 0000000000..dd48f9a117
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/codec/package-info.java
@@ -0,0 +1,16 @@
+/*
+ * Copyright (C) 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.codec;
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
index d039c9d9a9..8b78715d5a 100644
--- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelper.java
@@ -18,9 +18,11 @@
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
+import com.google.cloud.teleport.v2.templates.codec.DurationCodec;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import java.util.List;
@@ -82,6 +84,9 @@ public synchronized void init(ConnectionHelperRequest connectionHelperRequest) {
CassandraShard cassandraShard = (CassandraShard) shard;
try {
CqlSession session = createCqlSession(cassandraShard);
+ MutableCodecRegistry registry =
+ (MutableCodecRegistry) session.getContext().getCodecRegistry();
+ registry.register(new DurationCodec());
String connectionKey = generateConnectionKey(cassandraShard);
connectionPoolMap.put(connectionKey, session);
LOG.info("Connection initialized for key: {}", connectionKey);
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
index f6bd755aa5..e722f356f4 100644
--- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java
@@ -22,6 +22,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
@@ -33,21 +34,31 @@
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
@@ -76,6 +87,7 @@ public class SpannerToCassandraSourceDbIT extends SpannerToSourceDbITBase {
private static final String USER_TABLE_2 = "Users2";
private static final String ALL_DATA_TYPES_TABLE = "AllDatatypeColumns";
private static final String ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE = "AllDatatypeTransformation";
+ private static final String BOUNDARY_CONVERSION_TABLE = "BoundaryConversionTestTable";
private static final HashSet testInstances = new HashSet<>();
private static PipelineLauncher.LaunchInfo jobInfo;
public static SpannerResourceManager spannerResourceManager;
@@ -248,6 +260,29 @@ public void spannerToCassandraSourceDataTypeStringConversionTest()
assertStringToActualRowsInCassandraDB();
}
+ /**
+ * Validates Boundary and Map Data Type Conversions from Spanner to Cassandra.
+ *
+ *
This test ensures that boundary values for various data types and their equivalent map data
+ * types are correctly converted and transferred from Spanner to Cassandra. It verifies that the
+ * string-based representations used in Spanner are accurately translated into their appropriate
+ * data types in Cassandra, maintaining data integrity and precision.
+ *
+ *
The test involves inserting maximum and boundary data values into Spanner, then reading and
+ * asserting the values from Cassandra to ensure consistent data conversion and integrity.
+ *
+ * @throws InterruptedException if the thread is interrupted during execution.
+ * @throws IOException if an I/O error occurs during test execution.
+ * @throws MultipleFailureException if multiple assertions fail during validation.
+ */
+ @Test
+ public void validateBoundaryAndMapDataConversionsBetweenSpannerAndCassandra()
+ throws InterruptedException, IOException, MultipleFailureException {
+ assertThatPipeline(jobInfo).isRunning();
+ insertBoundaryValuesIntoSpanner();
+ assertCassandraBoundaryData();
+ }
+
/**
* Retrieves the total row count of a specified table in Cassandra.
*
@@ -1162,4 +1197,340 @@ private void assertStringToActualRowsInCassandraDB() throws MultipleFailureExcep
assertThat(row.getBigInteger("varint_column"))
.isEqualTo(java.math.BigInteger.valueOf(123456789L)));
}
+
+ private void insertBoundaryValuesIntoSpanner() {
+ Mutation mutation =
+ Mutation.newInsertOrUpdateBuilder(BOUNDARY_CONVERSION_TABLE)
+ .set("varchar_column")
+ .to("SampleVarchar")
+ .set("tinyint_column")
+ .to(Byte.MAX_VALUE)
+ .set("smallint_column")
+ .to(Short.MAX_VALUE)
+ .set("int_column")
+ .to(Integer.MAX_VALUE)
+ .set("bigint_column")
+ .to(Long.MAX_VALUE)
+ .set("float_column")
+ .to(Float.POSITIVE_INFINITY)
+ .set("double_column")
+ .to(Double.POSITIVE_INFINITY)
+ .set("decimal_column")
+ .to(new BigDecimal("99999999999999999999999999999.999999999").toPlainString())
+ .set("bool_column")
+ .to(Boolean.TRUE)
+ .set("ascii_column")
+ .to("ASCII_TEXT")
+ .set("text_column")
+ .to("Text data")
+ .set("bytes_column")
+ .to("R29vZ2xl")
+ .set("date_column")
+ .to(Date.parseDate("9999-12-31"))
+ .set("time_column")
+ .to("23:59:59.999999")
+ .set("timestamp_column")
+ .to(String.valueOf(Timestamp.parseTimestamp("9999-12-31T23:59:59.999999Z")))
+ .set("duration_column")
+ .to("P4DT1H")
+ .set("uuid_column")
+ .to("123e4567-e89b-12d3-a456-426614174000")
+ .set("timeuuid_column")
+ .to("123e4567-e89b-12d3-a456-426614174000")
+ .set("inet_column")
+ .to("192.168.0.1")
+ .set("map_bool_column")
+ .to(Value.json("{\"true\": false}"))
+ .set("map_float_column")
+ .to(
+ Value.json(
+ "{\"3.4028235E38\": 1.4E-45, \"Infinity\": \"Infinity\", \"NaN\": \"NaN\"}"))
+ .set("map_double_column")
+ .to(Value.json("{\"2.718281828459045\": \"2.718281828459045\"}"))
+ .set("map_tinyint_column")
+ .to(Value.json("{\"127\": \"-128\"}"))
+ .set("map_smallint_column")
+ .to(Value.json("{\"32767\": \"-32768\"}"))
+ .set("map_int_column")
+ .to(Value.json("{\"2147483647\": \"-2147483648\"}"))
+ .set("map_bigint_column")
+ .to(Value.json("{\"9223372036854775807\": 9007199254740993}"))
+ .set("map_varint_column")
+ .to(Value.json("{\"100000000000000000000\": \"-100000000000000000000\"}"))
+ .set("map_decimal_column")
+ .to(Value.json("{\"12345.6789\": \"98765.4321\"}"))
+ .set("map_ascii_column")
+ .to(Value.json("{\"example1\": \"string1\", \"example2\": \"string2\"}"))
+ .set("map_varchar_column")
+ .to(Value.json("{\"key1\": \"value1\", \"key2\": \"value2\"}"))
+ .set("map_blob_column")
+ .to(Value.json("{\"R29vZ2xl\": \"Q29tcGFueQ==\"}"))
+ .set("map_date_column")
+ .to(Value.json("{\"2025-01-27\": \"1995-01-29\"}"))
+ .set("map_time_column")
+ .to(Value.json("{\"12:30:00\": \"02:45:00\"}"))
+ .set("map_timestamp_column")
+ .to(Value.json("{\"2025-01-01T00:00:00Z\": \"9999-12-31T23:59:59.999999Z\"}"))
+ .set("map_duration_column")
+ .to(Value.json("{\"P4DT1H\": \"P4DT1H\"}"))
+ .set("map_uuid_column")
+ .to(
+ Value.json(
+ "{\"123e4567-e89b-12d3-a456-426614174000\": \"321e4567-e89b-12d3-a456-426614174000\"}"))
+ .set("map_timeuuid_column")
+ .to(
+ Value.json(
+ "{\"321e4567-e89b-12d3-a456-426614174000\": \"123e4567-e89b-12d3-a456-426614174000\"}"))
+ .set("map_inet_column")
+ .to(
+ Value.json(
+ "{\"48.49.50.51\": \"::1\",\"3031:3233:3435:3637:3839:4041:4243:4445\": \"::ffff:192.0.2.128\" }"))
+ .build();
+
+ spannerResourceManager.write(mutation);
+ }
+
+ private void assertCassandraBoundaryData() throws InterruptedException, MultipleFailureException {
+ PipelineOperator.Result result =
+ pipelineOperator()
+ .waitForCondition(
+ createConfig(jobInfo, Duration.ofMinutes(10)),
+ () -> getRowCount(BOUNDARY_CONVERSION_TABLE) == 1);
+ assertThatResult(result).meetsConditions();
+
+ Iterable rows;
+ try {
+ rows = cassandraResourceManager.readTable(BOUNDARY_CONVERSION_TABLE);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to read from Cassandra table: " + BOUNDARY_CONVERSION_TABLE, e);
+ }
+ // Convert hexadecimal string to a byte array
+ String hexString = "476f6f676c65"; // "Google" without "0x" prefix
+ byte[] expectedBytes = new byte[hexString.length() / 2];
+
+ for (int i = 0; i < expectedBytes.length; i++) {
+ int index = i * 2;
+ int j = Integer.parseInt(hexString.substring(index, index + 2), 16);
+ expectedBytes[i] = (byte) j;
+ }
+
+ assertThat(rows).hasSize(1);
+ Row row = rows.iterator().next();
+ assertAll(
+ // Basic Data Types
+ () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar"),
+ () -> assertThat(row.getByte("tinyint_column")).isEqualTo(Byte.MAX_VALUE),
+ () -> assertThat(row.getShort("smallint_column")).isEqualTo(Short.MAX_VALUE),
+ () -> assertThat(row.getInt("int_column")).isEqualTo(Integer.MAX_VALUE),
+ () -> assertThat(row.getLong("bigint_column")).isEqualTo(Long.MAX_VALUE),
+ () -> assertThat(row.getFloat("float_column")).isEqualTo(Float.POSITIVE_INFINITY),
+ () -> assertThat(row.getDouble("double_column")).isEqualTo(Double.POSITIVE_INFINITY),
+ () ->
+ assertThat(row.getBigDecimal("decimal_column"))
+ .isEqualTo(new BigDecimal("99999999999999999999999999999.999999999")),
+ () -> assertThat(row.getBoolean("bool_column")).isTrue(),
+ () -> assertThat(row.getString("ascii_column")).isEqualTo("ASCII_TEXT"),
+ () -> assertThat(row.getString("text_column")).isEqualTo("Text data"),
+ () -> assertThat(row.getCqlDuration("duration_column").toString()).isEqualTo("4d"),
+ () ->
+ assertThat(row.getBytesUnsafe("bytes_column"))
+ .isEqualTo(ByteBuffer.wrap(expectedBytes)),
+ () -> assertThat(row.getLocalDate("date_column")).isEqualTo(LocalDate.parse("9999-12-31")),
+ () ->
+ assertThat(row.getLocalTime("time_column"))
+ .isEqualTo(java.time.LocalTime.parse("23:59:59.999999")),
+ () ->
+ assertThat(row.getInstant("timestamp_column"))
+ .isEqualTo(java.time.Instant.parse("9999-12-31T23:59:59.999Z")),
+ // Maps
+ () ->
+ assertThat(row.getMap("map_bool_column", Boolean.class, Boolean.class))
+ .isEqualTo(Map.of(true, false)),
+ () -> {
+ Map expected =
+ Map.of(
+ 3.4028235E38f,
+ 1.4E-45f,
+ Float.POSITIVE_INFINITY,
+ Float.POSITIVE_INFINITY,
+ Float.NaN,
+ Float.NaN);
+
+ Map actual = row.getMap("map_float_column", Float.class, Float.class);
+
+ // Check if all expected keys exist in the actual map, and assert their values
+ expected.forEach(
+ (key, expectedValue) -> {
+ Assertions.assertThat(actual.containsKey(key))
+ .withFailMessage("Actual map is missing key: %s", key)
+ .isTrue();
+
+ Float actualValue = actual.get(key);
+
+ if (Float.isNaN(expectedValue)) {
+ // Handle NaN separately because NaN is not equal to itself
+ Assertions.assertThat(Float.isNaN(actualValue))
+ .withFailMessage("Value for key %s should be NaN", key)
+ .isTrue();
+ } else if (Float.isInfinite(expectedValue)) {
+ // Handle Infinity separately
+ Assertions.assertThat(actualValue)
+ .withFailMessage("Value for key %s should be Infinity", key)
+ .isEqualTo(Float.POSITIVE_INFINITY);
+ } else {
+ // Regular comparison
+ Assertions.assertThat(actualValue)
+ .withFailMessage("Value for key %s is incorrect", key)
+ .isEqualTo(expectedValue);
+ }
+ });
+
+ // Check if the actual map does not have extra keys that are not expected
+ Set unexpectedKeys =
+ actual.keySet().stream()
+ .filter(key -> !expected.containsKey(key))
+ .collect(Collectors.toSet());
+
+ Assertions.assertThat(unexpectedKeys)
+ .withFailMessage("Actual map has unexpected keys: %s", unexpectedKeys)
+ .isEmpty();
+ },
+ () ->
+ assertThat(row.getMap("map_double_column", Double.class, Double.class))
+ .isEqualTo(Map.of(2.718281828459045, 2.718281828459045)),
+ () ->
+ assertThat(row.getMap("map_tinyint_column", Byte.class, Byte.class))
+ .isEqualTo(Map.of((byte) 127, (byte) -128)),
+ () ->
+ assertThat(row.getMap("map_smallint_column", Short.class, Short.class))
+ .isEqualTo(Map.of((short) 32767, (short) -32768)),
+ () ->
+ assertThat(row.getMap("map_int_column", Integer.class, Integer.class))
+ .isEqualTo(Map.of(2147483647, -2147483648)),
+ () ->
+ assertThat(row.getMap("map_bigint_column", Long.class, Long.class))
+ .isEqualTo(Map.of(9223372036854775807L, 9007199254740993L)),
+ () ->
+ assertThat(row.getMap("map_varint_column", BigInteger.class, BigInteger.class))
+ .isEqualTo(
+ Map.of(
+ new BigInteger("100000000000000000000"),
+ new BigInteger("-100000000000000000000"))),
+ () ->
+ assertThat(row.getMap("map_decimal_column", BigDecimal.class, BigDecimal.class))
+ .isEqualTo(Map.of(new BigDecimal("12345.6789"), new BigDecimal("98765.4321"))),
+ () ->
+ assertThat(row.getMap("map_ascii_column", String.class, String.class))
+ .isEqualTo(Map.of("example1", "string1", "example2", "string2")),
+ () ->
+ assertThat(row.getMap("map_varchar_column", String.class, String.class))
+ .isEqualTo(Map.of("key1", "value1", "key2", "value2")),
+ () -> {
+ // Decode base64 to raw byte arrays
+ byte[] keyBytes = Base64.getDecoder().decode("R29vZ2xl");
+ byte[] valueBytes = Base64.getDecoder().decode("Q29tcGFueQ==");
+
+ // Create expected map
+ Map expected = new HashMap<>();
+ expected.put(ByteBuffer.wrap(keyBytes), ByteBuffer.wrap(valueBytes));
+
+ // Fetch actual map from Cassandra
+ Map actual =
+ row.getMap("map_blob_column", ByteBuffer.class, ByteBuffer.class);
+
+ // Iterate and assert equality based on byte content instead of ByteBuffer object
+ // references
+ Assertions.assertThat(actual)
+ .allSatisfy(
+ (key, value) -> {
+ ByteBuffer expectedKey =
+ expected.keySet().stream()
+ .filter(k -> compareByteBuffers(k, key))
+ .findAny()
+ .orElse(null);
+ Assertions.assertThat(expectedKey)
+ .withFailMessage("Unexpected key: %s", keyToString(key))
+ .isNotNull();
+
+ ByteBuffer expectedValue = expected.get(expectedKey);
+ Assertions.assertThat(expectedValue)
+ .withFailMessage("Unexpected value for key %s", keyToString(key))
+ .satisfies(v -> compareByteBuffers(v, value));
+ });
+ },
+ () ->
+ assertThat(row.getMap("map_date_column", LocalDate.class, LocalDate.class))
+ .isEqualTo(Map.of(LocalDate.parse("2025-01-27"), LocalDate.parse("1995-01-29"))),
+ () ->
+ assertThat(row.getMap("map_time_column", LocalTime.class, LocalTime.class))
+ .isEqualTo(Map.of(LocalTime.parse("12:30:00"), LocalTime.parse("02:45:00"))),
+ () ->
+ assertThat(row.getMap("map_timestamp_column", Instant.class, Instant.class))
+ .isEqualTo(
+ Map.of(
+ java.time.Instant.parse("2025-01-01T00:00:00Z"),
+ java.time.Instant.parse("9999-12-31T23:59:59.999Z"))),
+ () ->
+ assertThat(row.getMap("map_duration_column", String.class, CqlDuration.class))
+ .isEqualTo(Map.of("P4DT1H", CqlDuration.from("4d"))),
+ () ->
+ assertThat(row.getMap("map_uuid_column", UUID.class, UUID.class))
+ .isEqualTo(
+ Map.of(
+ UUID.fromString("123e4567-e89b-12d3-a456-426614174000"),
+ UUID.fromString("321e4567-e89b-12d3-a456-426614174000"))),
+ () ->
+ assertThat(row.getMap("map_timeuuid_column", UUID.class, UUID.class))
+ .isEqualTo(
+ Map.of(
+ UUID.fromString("321e4567-e89b-12d3-a456-426614174000"),
+ UUID.fromString("123e4567-e89b-12d3-a456-426614174000"))),
+ () -> {
+ try {
+ Map expected =
+ Map.of(
+ InetAddress.getByName("48.49.50.51"), InetAddress.getByName("::1"),
+ InetAddress.getByName("3031:3233:3435:3637:3839:4041:4243:4445"),
+ InetAddress.getByName("::ffff:192.0.2.128"));
+
+ Map actual =
+ row.getMap("map_inet_column", InetAddress.class, InetAddress.class);
+
+ Assertions.assertThat(actual)
+ .as(
+ "Checking the mapping of IP addresses between Cassandra and the expected output")
+ .isEqualTo(expected);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to convert String to InetAddress, possibly due to an invalid IP format.",
+ e);
+ }
+ });
+ }
+
+ // Helper function to compare two ByteBuffers byte-by-byte
+ private boolean compareByteBuffers(ByteBuffer buffer1, ByteBuffer buffer2) {
+ if (buffer1.remaining() != buffer2.remaining()) {
+ return false;
+ }
+
+ for (int i = 0; i < buffer1.remaining(); i++) {
+ if (buffer1.get(i) != buffer2.get(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Utility for debugging, converting ByteBuffer to readable string
+ private String keyToString(ByteBuffer buffer) {
+ int oldPosition = buffer.position();
+ StringBuilder hex = new StringBuilder();
+ while (buffer.hasRemaining()) {
+ hex.append(String.format("%02x", buffer.get()));
+ }
+ buffer.position(oldPosition); // reset to original position
+ return hex.toString();
+ }
}
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/codec/DurationCodecTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/codec/DurationCodecTest.java
new file mode 100644
index 0000000000..b0ba1c88fc
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/codec/DurationCodecTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.datastax.oss.driver.api.core.ProtocolVersion;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.core.type.reflect.GenericType;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DurationCodecTest {
+
+ private DurationCodec durationCodec;
+
+ @Before
+ public void setUp() {
+ durationCodec = new DurationCodec();
+ }
+
+ @Test
+ public void testGetJavaType() {
+ assertEquals(GenericType.DURATION, durationCodec.getJavaType());
+ }
+
+ @Test
+ public void testGetCqlType() {
+ assertEquals(DataTypes.DURATION, durationCodec.getCqlType());
+ }
+
+ @Test
+ public void testEncode() {
+ Duration duration = Duration.ofDays(60).plusNanos(123456789);
+ ByteBuffer encoded = durationCodec.encode(duration, ProtocolVersion.DEFAULT);
+
+ assertNotNull(encoded);
+ assertTrue(encoded.remaining() > 0);
+ }
+
+ @Test
+ public void testEncodeNull() {
+ assertNull(durationCodec.encode(null, ProtocolVersion.DEFAULT));
+ }
+
+ @Test
+ public void testDecode() {
+ Duration originalDuration = Duration.ofDays(60).plusNanos(123456789);
+ ByteBuffer encoded = durationCodec.encode(originalDuration, ProtocolVersion.DEFAULT);
+ Duration decoded = durationCodec.decode(encoded, ProtocolVersion.DEFAULT);
+
+ assertNotNull(decoded);
+ assertEquals(originalDuration, decoded);
+ }
+
+ @Test
+ public void testDecodeNull() {
+ assertNull(durationCodec.decode(null, ProtocolVersion.DEFAULT));
+ assertNull(durationCodec.decode(ByteBuffer.allocate(0), ProtocolVersion.DEFAULT));
+ }
+
+ @Test
+ public void testParse() {
+ String durationString = "2mo15d";
+ Duration parsed = durationCodec.parse(durationString);
+
+ assertNotNull(parsed);
+ assertEquals(Duration.ofDays(75), parsed);
+ }
+
+ @Test
+ public void testParseNull() {
+ assertNull(durationCodec.parse(null));
+ assertNull(durationCodec.parse("NULL"));
+ }
+
+ @Test
+ public void testFormat() {
+ Duration duration = Duration.ofDays(60).plusNanos(123456789);
+ String formatted = durationCodec.format(duration);
+
+ assertNotNull(formatted);
+ assertFalse("NULL".equals(formatted));
+ }
+
+ @Test
+ public void testFormatNull() {
+ assertEquals("NULL", durationCodec.format(null));
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java
index 56b9f1949e..3bdcaa7f26 100644
--- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/connection/CassandraConnectionHelperTest.java
@@ -19,6 +19,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -26,6 +28,9 @@
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
+import com.datastax.oss.driver.api.core.context.DriverContext;
+import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
+import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
@@ -77,7 +82,11 @@ public void testInit_ShouldInitializeConnectionPool() {
mockedCqlSession.when(CqlSession::builder).thenReturn(cqlSessionBuilder);
when(cqlSessionBuilder.withConfigLoader(driverConfigLoader)).thenReturn(cqlSessionBuilder);
when(cqlSessionBuilder.build()).thenReturn(cqlSession);
-
+ MutableCodecRegistry mockRegistry = mock(MutableCodecRegistry.class);
+ doNothing().when(mockRegistry).register((TypeCodec>) any());
+ DriverContext mockContext = mock(DriverContext.class);
+ when(mockContext.getCodecRegistry()).thenReturn(mockRegistry);
+ when(cqlSession.getContext()).thenReturn(mockContext);
ConnectionHelperRequest request = mock(ConnectionHelperRequest.class);
when(request.getShards()).thenReturn(Collections.singletonList(cassandraShard));
when(request.getMaxConnections()).thenReturn(10);
diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql
index 49830099eb..d78399ee0b 100644
--- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql
+++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql
@@ -95,4 +95,45 @@ CREATE TABLE AllDatatypeColumns (
frozen_list_of_sets_column list>>,
varint_column varint,
inet_column INET
+);
+
+CREATE TABLE BoundaryConversionTestTable (
+ varchar_column text PRIMARY KEY,
+ tinyint_column tinyint,
+ smallint_column smallint,
+ int_column int,
+ bigint_column bigint,
+ float_column float,
+ double_column double,
+ decimal_column decimal,
+ bool_column boolean,
+ ascii_column ascii,
+ text_column text,
+ bytes_column blob,
+ date_column date,
+ time_column time,
+ timestamp_column timestamp,
+ duration_column duration,
+ uuid_column uuid,
+ timeuuid_column timeuuid,
+ inet_column inet,
+ map_bool_column map,
+ map_float_column map,
+ map_double_column map,
+ map_tinyint_column map,
+ map_smallint_column map,
+ map_int_column map,
+ map_bigint_column map,
+ map_varint_column map,
+ map_decimal_column map,
+ map_ascii_column map,
+ map_varchar_column map,
+ map_blob_column map,
+ map_date_column map,
+ map_time_column map