diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 5b2e9739d7f..30029031d47 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -258,7 +258,10 @@ private LogReplay getEmptyLogReplay( @Override protected Tuple2 loadTableProtocolAndMetadata( - Engine engine, Optional snapshotHint, long snapshotVersion) { + Engine engine, + LogSegment logSegment, + Optional snapshotHint, + long snapshotVersion) { return new Tuple2<>(protocol, metadata); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java new file mode 100644 index 00000000000..f0876281ebd --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java @@ -0,0 +1,74 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.replay; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.types.StructType; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CRCInfo { + private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class); + + public static Optional fromColumnarBatch( + long version, ColumnarBatch batch, int rowId, String crcFilePath) { + Protocol protocol = Protocol.fromColumnVector(batch.getColumnVector(PROTOCOL_ORDINAL), rowId); + Metadata metadata = Metadata.fromColumnVector(batch.getColumnVector(METADATA_ORDINAL), rowId); + // protocol and metadata are nullable per fromColumnVector's implementation. + if (protocol == null || metadata == null) { + logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath); + return Optional.empty(); + } + return Optional.of(new CRCInfo(version, metadata, protocol)); + } + + // We can add additional fields later + public static final StructType FULL_SCHEMA = + new StructType().add("protocol", Protocol.FULL_SCHEMA).add("metadata", Metadata.FULL_SCHEMA); + + private static final int PROTOCOL_ORDINAL = 0; + private static final int METADATA_ORDINAL = 1; + + private final long version; + private final Metadata metadata; + private final Protocol protocol; + + protected CRCInfo(long version, Metadata metadata, Protocol protocol) { + this.version = version; + this.metadata = requireNonNull(metadata); + this.protocol = requireNonNull(protocol); + } + + /** The version of the Delta table that this CRCInfo represents. */ + public long getVersion() { + return version; + } + + /** The {@link Metadata} stored in this CRCInfo. */ + public Metadata getMetadata() { + return metadata; + } + + /** The {@link Protocol} stored in this CRCInfo. */ + public Protocol getProtocol() { + return protocol; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java new file mode 100644 index 00000000000..b606ad761b3 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java @@ -0,0 +1,125 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.replay; + +import static io.delta.kernel.internal.util.FileNames.*; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static java.lang.Math.min; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.internal.util.InternalUtils; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.IOException; +import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility method to load protocol and metadata from the Delta log checksum files. */ +public class ChecksumReader { + private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class); + + /** + * Load the CRCInfo from the checksum file at the given version. If the checksum file is not found + * at the given version, it will try to find the latest checksum file that is created at or after + * the lower bound version. + * + * @param engine the engine to use for reading the checksum file + * @param logPath the path to the Delta log + * @param targetedVersion the target version to read the checksum file from + * @param lowerBound the inclusive lower bound version to search for the checksum file + * @return Optional {@link CRCInfo} containing the protocol and metadata, and the version of the + * checksum file. If the checksum file is not found, it will return an empty + */ + public static Optional getCRCInfo( + Engine engine, Path logPath, long targetedVersion, long lowerBound) { + // lower bound should always smaller than the targetedVersion. + lowerBound = min(lowerBound, targetedVersion); + logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound); + // First try to load the CRC at given version. If not found or failed to read then try to + // find the latest CRC file that is created at or after the lower bound version. + Path crcFilePath = checksumFile(logPath, targetedVersion); + Optional crcInfoOpt = readChecksumFile(engine, crcFilePath); + if (crcInfoOpt.isPresent() + || + // we don't expect any more checksum files as it is the first version + targetedVersion == 0 + || targetedVersion == lowerBound) { + return crcInfoOpt; + } + logger.info( + "CRC file for version {} not found, listing CRC files from version {}", + targetedVersion, + lowerBound); + + Path lowerBoundFilePath = checksumFile(logPath, lowerBound); + try (CloseableIterator crcFiles = + engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) { + List crcFilesList = + InternalUtils.toList( + crcFiles + .filter(file -> isChecksumFile(file.getPath())) + .takeWhile(file -> checksumVersion(new Path(file.getPath())) <= targetedVersion)); + + // pick the last file which is the latest version that has the CRC file + if (crcFilesList.isEmpty()) { + logger.warn("No checksum files found in the range {} to {}", lowerBound, targetedVersion); + return Optional.empty(); + } + + FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1); + return readChecksumFile(engine, new Path(latestCRCFile.getPath())); + } catch (IOException e) { + logger.warn("Failed to list checksum files from {}", lowerBoundFilePath, e); + return Optional.empty(); + } + } + + private static Optional readChecksumFile(Engine engine, Path filePath) { + try (CloseableIterator iter = + engine + .getJsonHandler() + .readJsonFiles( + singletonCloseableIterator(FileStatus.of(filePath.toString())), + CRCInfo.FULL_SCHEMA, + Optional.empty())) { + // We do this instead of iterating through the rows or using `getSingularRow` so we + // can use the existing fromColumnVector methods in Protocol, Metadata, Format etc + if (!iter.hasNext()) { + logger.warn("Checksum file is empty: {}", filePath); + return Optional.empty(); + } + + ColumnarBatch batch = iter.next(); + if (batch.getSize() != 1) { + String msg = "Expected exactly one row in the checksum file {}, found {} rows"; + logger.warn(msg, filePath, batch.getSize()); + return Optional.empty(); + } + + long crcVersion = FileNames.checksumVersion(filePath); + + return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, filePath.toString()); + } catch (Exception e) { + // This can happen when the version does not have a checksum file + logger.warn("Failed to read checksum file {}", filePath, e); + return Optional.empty(); + } + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index 59154c0c8fb..523ae3ae116 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -17,6 +17,9 @@ package io.delta.kernel.internal.replay; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Arrays.asList; +import static java.util.Collections.max; import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; @@ -38,9 +41,7 @@ import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; /** * Replays a history of actions, resolving them to produce the current state of the table. The @@ -135,7 +136,7 @@ public LogReplay( this.logSegment = logSegment; this.protocolAndMetadata = snapshotMetrics.loadInitialDeltaActionsTimer.time( - () -> loadTableProtocolAndMetadata(engine, snapshotHint, snapshotVersion)); + () -> loadTableProtocolAndMetadata(engine, logSegment, snapshotHint, snapshotVersion)); // Lazy loading of domain metadata only when needed this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine)); } @@ -201,13 +202,47 @@ public CloseableIterator getAddFilesAsColumnarBatches( * use the P and/or M from the hint. */ protected Tuple2 loadTableProtocolAndMetadata( - Engine engine, Optional snapshotHint, long snapshotVersion) { + Engine engine, + LogSegment logSegment, + Optional snapshotHint, + long snapshotVersion) { - // Exit early if the hint already has the info we need + // Exit early if the hint already has the info we need. if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) { return new Tuple2<>(snapshotHint.get().getProtocol(), snapshotHint.get().getMetadata()); } + // Snapshot hit is not use-able in this case for determine the lower bound. + if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) { + snapshotHint = Optional.empty(); + } + + long crcSearchLowerBound = + max( + asList( + // Prefer reading hint over CRC, so start listing from hint's version + 1. + snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1, + logSegment.checkpointVersionOpt.orElse(0L), + // Only find the CRC within 100 versions. + snapshotVersion - 100, + 0L)); + Optional crcInfoOpt = + ChecksumReader.getCRCInfo(engine, logSegment.logPath, snapshotVersion, crcSearchLowerBound); + if (crcInfoOpt.isPresent()) { + CRCInfo crcInfo = crcInfoOpt.get(); + if (crcInfo.getVersion() == snapshotVersion) { + // CRC is related to the desired snapshot version. Load protocol and metadata from CRC. + return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata()); + } + checkArgument( + crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion); + // We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion) + // but (b) newer than the current hint. Use this CRCInfo to create a new hint + snapshotHint = + Optional.of( + new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata())); + } + Protocol protocol = null; Metadata metadata = null; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index f0077f840c1..27e7bbb3533 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -52,6 +52,9 @@ private FileNames() {} private static final Pattern CLASSIC_CHECKPOINT_FILE_PATTERN = Pattern.compile("\\d+\\.checkpoint\\.parquet"); + /** Example: 00000000000000000001.crc */ + private static final Pattern CHECK_SUM_FILE_PATTERN = Pattern.compile("(\\d+)\\.crc"); + /** * Examples: * @@ -84,8 +87,8 @@ public static long getFileVersion(Path path) { return checkpointVersion(path); } else if (isCommitFile(path.getName())) { return deltaVersion(path); - // } else if (isChecksumFile(path)) { - // checksumVersion(path); + } else if (isChecksumFile(path.getName())) { + return checksumVersion(path); } else { throw new IllegalArgumentException( String.format("Unexpected file type found in transaction log: %s", path)); @@ -128,6 +131,15 @@ public static String sidecarFile(Path path, String sidecar) { return String.format("%s/%s/%s", path.toString(), SIDECAR_DIRECTORY, sidecar); } + /** Returns the path to the checksum file for the given version. */ + public static Path checksumFile(Path path, long version) { + return new Path(path, String.format("%020d.crc", version)); + } + + public static long checksumVersion(Path path) { + return Long.parseLong(path.getName().split("\\.")[0]); + } + /** * Returns the prefix of all delta log files for the given version. * @@ -206,4 +218,8 @@ public static boolean isCommitFile(String path) { return DELTA_FILE_PATTERN.matcher(fileName).matches() || UUID_DELTA_FILE_REGEX.matcher(fileName).matches(); } + + public static boolean isChecksumFile(String checksumFilePath) { + return CHECK_SUM_FILE_PATTERN.matcher(new Path(checksumFilePath).getName()).matches(); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/InternalUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/InternalUtils.java index 12e7a6513e1..cbc04003dd3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/InternalUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/InternalUtils.java @@ -30,9 +30,7 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; -import java.util.Collection; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; public class InternalUtils { @@ -169,4 +167,10 @@ public static Path relativizePath(Path child, URI root) { public static Set toLowerCaseSet(Collection set) { return set.stream().map(String::toLowerCase).collect(Collectors.toSet()); } + + public static List toList(CloseableIterator iterator) { + List result = new ArrayList<>(); + iterator.forEachRemaining(result::add); + return result; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java index dbae0033625..30d3b9040e3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java @@ -129,6 +129,49 @@ public void close() throws IOException { }; } + default CloseableIterator takeWhile(Function mapper) { + CloseableIterator delegate = this; + return new CloseableIterator() { + T next; + boolean hasLoadedNext; + boolean end; + + @Override + public boolean hasNext() { + if (end) { + return false; + } + if (hasLoadedNext) { + return true; + } + if (delegate.hasNext()) { + T potentialNext = delegate.next(); + if (mapper.apply(potentialNext)) { + next = potentialNext; + hasLoadedNext = true; + return true; + } + } + end = true; + return false; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + hasLoadedNext = false; + return next; + } + + @Override + public void close() throws IOException { + delegate.close(); + } + }; + } + /** * Combine the current iterator with another iterator. The resulting iterator will return all * elements from the current iterator followed by all elements from the other iterator. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java index 4b659126230..5b40eccb61d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/utils/FileStatus.java @@ -75,6 +75,16 @@ public static FileStatus of(String path, long size, long modificationTime) { return new FileStatus(path, size, modificationTime); } + /** + * Create a {@link FileStatus} with the given path with size and modification time set to 0. + * + * @param path Fully qualified file path. + * @return {@link FileStatus} object + */ + public static FileStatus of(String path) { + return new FileStatus(path, 0 /* size */, 0 /* modTime */); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/utils/CloseableIteratorSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/utils/CloseableIteratorSuite.scala new file mode 100644 index 00000000000..d1d6d34e4f1 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/utils/CloseableIteratorSuite.scala @@ -0,0 +1,64 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.utils +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.must.Matchers.contain +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +import java.util + +class CloseableIteratorSuite extends AnyFunSuite { + + test("test take while with all elements matching filter") { + val res = new util.ArrayList[Int](); + new IntArrayIterator(List(1, 2, 3, 2, 2)) + .takeWhile((input: Int) => input > 0) + .forEachRemaining((input: Int) => res.add(input)) + res should contain theSameElementsAs List(1, 2, 3, 2, 2) + } + + test("test take while with no elements matching filter") { + val res = new util.ArrayList[Int](); + new IntArrayIterator(List(1, 2, 3, 2, 2)) + .takeWhile((input: Int) => input < 0) + .forEachRemaining((input: Int) => res.add(input)) + assert(res.isEmpty) + } + + test("test take while with some elements matching filter") { + val res = new util.ArrayList[Int](); + new IntArrayIterator(List(1, 2, 3, 2, 2)) + .takeWhile((input: Int) => input < 3) + .forEachRemaining((input: Int) => res.add(input)) + res should contain theSameElementsAs List(1, 2) + } + +} + +class IntArrayIterator(val intArray: List[Int]) extends CloseableIterator[Int] { + val data: List[Int] = intArray + var curIdx = 0 + override def hasNext: Boolean = { + data.size > curIdx + } + + override def next(): Int = { + val res = data(curIdx) + curIdx = curIdx + 1 + res + } + override def close(): Unit = {} +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala index cd43dc11838..e2036fdedc1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala @@ -16,11 +16,11 @@ package io.delta.kernel.defaults -import java.io.File import io.delta.kernel.Table -import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.data.ColumnarBatch import io.delta.kernel.defaults.engine.{DefaultEngine, DefaultJsonHandler, DefaultParquetHandler} +import io.delta.kernel.defaults.utils.TestUtils +import io.delta.kernel.engine.{Engine, ExpressionHandler, FileSystemClient} import io.delta.kernel.expressions.Predicate import io.delta.kernel.internal.checkpoints.Checkpointer import io.delta.kernel.internal.fs.Path @@ -29,18 +29,18 @@ import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.StructType import io.delta.kernel.utils.{CloseableIterator, FileStatus} import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.{DeltaConfig, DeltaLog} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -import org.apache.spark.sql.functions.col -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.functions.{col, log} +import org.scalatest.funsuite.AnyFunSuite +import java.io.File import java.nio.file.Files import java.util import java.util.Optional import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer +import org.apache.spark.sql.SparkSession /** * Suite to test the engine metrics while replaying logs for getting the table protocol and @@ -50,15 +50,21 @@ import scala.collection.mutable.ArrayBuffer * The goal is to test the behavior of calls to `readJsonFiles` and `readParquetFiles` that * Kernel makes. This calls determine the performance. */ -class LogReplayEngineMetricsSuite extends QueryTest - with SharedSparkSession - with DeltaSQLCommandTest { +class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { + + // Disable writing checksums for this test suite + // This test suite checks the files read when loading the P&M, however, with the crc optimization + // if crc are available, crc will be the only files read. + // We want to test the P&M loading when CRC are not available in the tests. + // Tests for tables with available CRC are included using resource test tables (and thus are + // unaffected by changing our confs for writes). + spark.conf.set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, false) ///////////////////////// // Test Helper Methods // ///////////////////////// - private def withTempDirAndEngine(f: (File, MetricsEngine) => Unit): Unit = { + private def withTempDirAndMetricsEngine(f: (String, MetricsEngine) => Unit): Unit = { val engine = new MetricsEngine(new Configuration() { { // Set the batch sizes to small so that we get to test the multiple batch scenarios. @@ -66,23 +72,25 @@ class LogReplayEngineMetricsSuite extends QueryTest set("delta.kernel.default.json.reader.batch-size", "2"); } }) - withTempDir { dir => f(dir, engine) } + withTempDir { dir => f(dir.getAbsolutePath, engine) } } private def loadPandMCheckMetrics( + snapshotFetchCall: => StructType, engine: MetricsEngine, - table: Table, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long] = Nil): Unit = { + expParquetReadSetSizes: Seq[Long] = null, + expChecksumReadSet: Seq[Long] = null): Unit = { engine.resetMetrics() - table.getLatestSnapshot(engine).getSchema(engine) + snapshotFetchCall assertMetrics( engine, expJsonVersionsRead, expParquetVersionsRead, - expParquetReadSetSizes) + expParquetReadSetSizes, + expChecksumReadSet = expChecksumReadSet) } private def loadScanFilesCheckMetrics( @@ -110,8 +118,9 @@ class LogReplayEngineMetricsSuite extends QueryTest engine: MetricsEngine, expJsonVersionsRead: Seq[Long], expParquetVersionsRead: Seq[Long], - expParquetReadSetSizes: Seq[Long], - expLastCheckpointReadCalls: Option[Int] = None): Unit = { + expParquetReadSetSizes: Seq[Long] = null, + expLastCheckpointReadCalls: Option[Int] = None, + expChecksumReadSet: Seq[Long] = null): Unit = { val actualJsonVersionsRead = engine.getJsonHandler.getVersionsRead val actualParquetVersionsRead = engine.getParquetHandler.getVersionsRead @@ -124,7 +133,7 @@ class LogReplayEngineMetricsSuite extends QueryTest s"versions $expParquetVersionsRead but read $actualParquetVersionsRead" ) - if (expParquetReadSetSizes.nonEmpty) { + if (expParquetReadSetSizes != null) { val actualParquetReadSetSizes = engine.getParquetHandler.checkpointReadRequestSizes assert( actualParquetReadSetSizes === expParquetReadSetSizes, s"Expected parquet read set sizes " + @@ -137,6 +146,14 @@ class LogReplayEngineMetricsSuite extends QueryTest assert(actualCalls === expCalls, s"Expected to read last checkpoint metadata $expCalls times but read $actualCalls times") } + + if (expChecksumReadSet != null) { + val actualChecksumReadSet = engine.getJsonHandler.checksumsRead + assert( + actualChecksumReadSet === expChecksumReadSet, s"Expected checksum read set " + + s"$expChecksumReadSet but read $actualChecksumReadSet" + ) + } } private def appendCommit(path: String): Unit = @@ -153,31 +170,35 @@ class LogReplayEngineMetricsSuite extends QueryTest /////////// test("no hint, no checkpoint, reads all files") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 9) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 9L to 0L by -1L, Nil) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 9L to 0L by -1L, + expParquetVersionsRead = Nil + ) } } test("no hint, existing checkpoint, reads all files up to that checkpoint") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 14L to 11L by -1L, Seq(10), Seq(1)) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 14L to 11L by -1L, + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1)) } } test("no hint, existing checkpoint, newer P & M update, reads up to P & M commit") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 12) { appendCommit(path) } // v13 changes the protocol (which also updates the metadata) @@ -191,69 +212,82 @@ class LogReplayEngineMetricsSuite extends QueryTest for (_ <- 14 to 16) { appendCommit(path) } - val table = Table.forPath(tc, path) - loadPandMCheckMetrics(tc, table, 16L to 13L by -1L, Nil) + val table = Table.forPath(engine, path) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 16L to 13L by -1L, + expParquetVersionsRead = Nil) } } test("hint with no new commits, should read no files") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v14 - - loadPandMCheckMetrics(tc, table, Nil, Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil) } } test("hint with no P or M updates") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v14 // Case: only one version change appendCommit(path) // v15 - loadPandMCheckMetrics(tc, table, Seq(15), Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(15), + expParquetVersionsRead = Nil) // A hint is now saved at v15 // Case: several version changes for (_ <- 16 to 19) { appendCommit(path) } - loadPandMCheckMetrics(tc, table, 19L to 16L by -1L, Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = 19L to 16L by -1L, + expParquetVersionsRead = Nil) // A hint is now saved at v19 // Case: [delta-io/delta#2262] [Fix me!] Read the entire checkpoint at v20, even if v20.json // and v19 hint are available appendCommit(path) // v20 - loadPandMCheckMetrics(tc, table, Nil, Seq(20)) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Seq(20)) } } test("hint with a P or M update") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 3) { appendCommit(path) } - val table = Table.forPath(tc, path) + val table = Table.forPath(engine, path) - table.getLatestSnapshot(tc).getSchema(tc) + table.getLatestSnapshot(engine).getSchema(engine) // A hint is now saved at v3 @@ -266,8 +300,11 @@ class LogReplayEngineMetricsSuite extends QueryTest .mode("append") .save(path) - loadPandMCheckMetrics(tc, table, Seq(4), Nil) - + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(4), + expParquetVersionsRead = Nil) // a hint is now saved at v4 // v5 changes the protocol (which also updates the metadata) @@ -279,20 +316,22 @@ class LogReplayEngineMetricsSuite extends QueryTest |) |""".stripMargin) - loadPandMCheckMetrics(tc, table, Seq(5), Nil) + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Seq(5), + expParquetVersionsRead = Nil) } } test("read a table with multi-part checkpoint") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, engine) => for (_ <- 0 to 14) { appendCommit(path) } // there should be one checkpoint file at version 10 loadScanFilesCheckMetrics( - tc, - Table.forPath(tc, path), + engine, + Table.forPath(engine, path), expJsonVersionsRead = 14L to 11L by -1L, expParquetVersionsRead = Seq(10), // we read the checkpoint twice: once for the P &M and once for the scan files @@ -302,25 +341,23 @@ class LogReplayEngineMetricsSuite extends QueryTest checkpoint(path, actionsPerFile = 2) // Reset metrics. - tc.resetMetrics() + engine.resetMetrics() // expect the Parquet read set to contain one request with size of 15 loadScanFilesCheckMetrics( - tc, - Table.forPath(tc, path), + engine, + Table.forPath(engine, path), expJsonVersionsRead = Nil, expParquetVersionsRead = Seq(14), // we read the checkpoint twice: once for the P &M and once for the scan files - expParquetReadSetSizes = Seq(15, 15)) + expParquetReadSetSizes = Seq(8, 8)) } } Seq(true, false).foreach { deleteLastCheckpointMetadataFile => test("ensure `_last_checkpoint` is tried to read only once when " + s"""${if (deleteLastCheckpointMetadataFile) "not exists" else "valid file exists"}""") { - withTempDirAndEngine { (dir, tc) => - val path = dir.getAbsolutePath - + withTempDirAndMetricsEngine { (path, tc) => for (_ <- 0 to 14) { appendCommit(path) } if (deleteLastCheckpointMetadataFile) { @@ -342,6 +379,316 @@ class LogReplayEngineMetricsSuite extends QueryTest } } } + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Tests for loading P & M through checksums files // + ///////////////////////////////////////////////////////////////////////////////////////////////// + + Seq(-1L, 3L, 4L).foreach { version => // -1 means latest version + test(s"checksum found at the read version: ${if (version == -1) "latest" else version}") { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + version match { + case -1 => table.getLatestSnapshot(engine).getSchema(engine) + case ver => table.getSnapshotAsOfVersion(engine, ver).getSchema(engine) + }, + engine, + // shouldn't need to read commit or checkpoint files as P&M are found through checksum + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(if (version == -1) 11 else version)) + } + } + } + + test("checksum not found at the read version, but found at a previous version") { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + Seq(10L, 11L, 5L, 6L).foreach { version => + assert( + Files.deleteIfExists( + new File( + FileNames.checksumFile(new Path(f"$path/_delta_log"), version).toString + ).toPath + ) + ) + } + + loadPandMCheckMetrics( + Table.forPath(engine, path) + .getLatestSnapshot(engine).getSchema(engine), + engine, + // 10.checkpoint found, so use it and combined with 11.crc + expJsonVersionsRead = Seq(11), + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1), + expChecksumReadSet = Seq(11) + ) + + loadPandMCheckMetrics( + Table + .forPath(engine, path) + .getSnapshotAsOfVersion(engine, 6 /* versionId */ ) + .getSchema(engine), + engine, + // We find the checksum from crc at version 4, but still read commit files 5 and 6 + // to find the P&M which could have been updated in version 5 and 6. + expJsonVersionsRead = Seq(6, 5), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First attempted to read checksum for version 6, then we do a listing of + // last 100 crc files and read the latest one which is version 4 (as version 5 is deleted) + expChecksumReadSet = Seq(6, 4)) + + + // now try to load version 3 and it should get P&M from checksum files only + loadPandMCheckMetrics( + Table.forPath(engine, path) + .getSnapshotAsOfVersion(engine, 3 /* versionId */).getSchema(engine), + engine, + // We find the checksum from crc at version 3, so shouldn't read anything else + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(3)) + } + } + + test("checksum not found at the read version, but uses snapshot hint lower bound") { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + (3 to 6).foreach { version => + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile( + new Path(f"$path/_delta_log"), version).toString).toPath + ) + ) + } + + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 4 /* versionId */).getSchema(engine), + engine, + // There are no checksum files for versions 4. Latest is at version 2. + // We need to read the commit files 3 and 4 to get the P&M in addition the P&M from + // checksum file at version 2 + expJsonVersionsRead = Seq(4, 3), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First attempted to read checksum for version 4 which doesn't exists, + // then we do a listing of last 100 crc files and read the latest + // one which is version 2 (as version 3-6 are deleted) + expChecksumReadSet = Seq(4, 2)) + // read version 4 which sets the snapshot P&M hint to 4 + + // now try to load version 6 and we expect no checksums are read + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 6 /* versionId */).getSchema(engine), + engine, + // We have snapshot P&M hint at version 4, and no checksum after 2 + expJsonVersionsRead = Seq(6, 5), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + // First we attempt to read at version 6, then we do a listing of last 100 crc files + // bound by the snapshot hint which is at version 4 and we don't try to read checksums + // beyond version 4 + expChecksumReadSet = Seq(6) + ) + } + } + + test("snapshot hint found for read version and crc found at read version => use hint") { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + + val table = Table.forPath(engine, path) + table.getLatestSnapshot(engine) + + loadPandMCheckMetrics( + table.getLatestSnapshot(engine).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Nil + ) + } + } + + + test( + "checksum not found at read version and checkpoint exists at read version => use checkpoint" + ) { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + val checkpointVersion = 10; + val logPath = f"$path/_delta_log" + assert( + Files.exists( + new File( + FileNames + .checkpointFileSingular(new Path(logPath), checkpointVersion) + .toString + ).toPath + ) + ) + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile(new Path(logPath), checkpointVersion).toString).toPath + ) + ) + + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 10).getSchema(engine), + engine, + // 10.crc missing, 10.checkpoint.parquet exists. + // Attempt to read 10.crc fails and read 10.checkpoint.parquet succeeds. + expJsonVersionsRead = Nil, + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1), + expChecksumReadSet = Seq(10) + ) + } + } + + test( + "checksum missing read version and the previous version, " + + "checkpoint exists the read version the previous version => use checkpoint" + ) { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + val checkpointVersion = 10; + val logPath = f"$path/_delta_log" + assert( + Files + .exists( + new File( + FileNames.checkpointFileSingular(new Path(logPath), checkpointVersion).toString + ).toPath + ) + ) + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile(new Path(logPath), checkpointVersion).toString).toPath + ) + ) + assert( + Files.deleteIfExists( + new File( + FileNames.checksumFile(new Path(logPath), checkpointVersion + 1).toString + ).toPath + ) + ) + + val table = Table.forPath(engine, path) + + // 11.crc, 10.crc missing, 10.checkpoint.parquet exists. + // Attempt to read 11.crc fails and read 10.checkpoint.parquet and 11.json succeeds. + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 11).getSchema(engine), + engine, + expJsonVersionsRead = Seq(11), + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1), + expChecksumReadSet = Seq(11) + ) + } + } + + test( + "checksum missing read version, " + + "both checksum and checkpoint exist the read version the previous version => use checksum" + ) { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + val checkpointVersion = 10; + val logPath = new Path(s"$path/_delta_log") + assert( + Files.exists( + new File( + FileNames + .checkpointFileSingular(logPath, checkpointVersion) + .toString + ).toPath + ) + ) + assert( + Files.deleteIfExists( + new File( + FileNames.checksumFile(logPath, checkpointVersion + 1).toString + ).toPath + ) + ) + + val table = Table.forPath(engine, path) + + // 11.crc, missing, 10.crc and 10.checkpoint.parquet exist. + // Attempt to read 11.crc fails and read 10.checkpoint.parquet and 11.json succeeds. + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 11).getSchema(engine), + engine, + expJsonVersionsRead = Seq(11), + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(11, 10) + ) + } + } + + + test("crc found at read version and checkpoint at read version => use checksum") { + withTempDirAndMetricsEngine { (path, engine) => + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + buildTableWithCrc(path) + val checkpointVersion = 10; + val logPath = new Path(s"$path/_delta_log") + assert( + Files.exists( + new File( + FileNames + .checkpointFileSingular(logPath, checkpointVersion) + .toString + ).toPath + ) + ) + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, checkpointVersion).getSchema(engine), + engine, + expJsonVersionsRead = Nil, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Seq(10) + ) + } + } + + // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet + def buildTableWithCrc(path: String): Unit = { + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true") { + spark.sql( + s"CREATE TABLE delta.`$path` USING DELTA AS " + + s"SELECT 0L as id" + ) + for (_ <- 0 to 10) { appendCommit(path) } + } + } } //////////////////// @@ -376,6 +723,8 @@ trait FileReadMetrics { self: Object => // number of times read is requested on `_last_checkpoint` private var lastCheckpointMetadataReadCalls = 0 + val checksumsRead = new ArrayBuffer[Long]() // versions of checksum files read + private val versionsRead = ArrayBuffer[Long]() // Number of checkpoint files requested read in each readParquetFiles call @@ -392,6 +741,8 @@ trait FileReadMetrics { self: Object => } } else if (Checkpointer.LAST_CHECKPOINT_FILE_NAME.equals(path.getName)) { lastCheckpointMetadataReadCalls += 1 + } else if (FileNames.isChecksumFile(path.getName)) { + checksumsRead += FileNames.getFileVersion(path) } } @@ -403,6 +754,7 @@ trait FileReadMetrics { self: Object => lastCheckpointMetadataReadCalls = 0 versionsRead.clear() checkpointReadRequestSizes.clear() + checksumsRead.clear() } def collectReadFiles(fileIter: CloseableIterator[FileStatus]): CloseableIterator[FileStatus] = {