-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Load the protocol and metadata from the CRC files when available #4077
base: master
Are you sure you want to change the base?
Conversation
… in DeltaLog Initial read current version CRC w test to verify wip end-2-end working + tests Co-authored-by: Allison Portis <[email protected]> Co-authored-by: Venki Korukanti <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Left some comments.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/VersionStats.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
ChecksumReader.getVersionStats( | ||
engine, logSegment.logPath, snapshotVersion, crcSearchLowerBound); | ||
if (versionStatsOpt.isPresent()) { | ||
// We found the protocol and metadata for the version we are looking for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a check that versionStateOpt.get.version is > (or >=, whatever is corect) than the snapshot hint?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
...el/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
Outdated
Show resolved
Hide resolved
...el/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java
Outdated
Show resolved
Hide resolved
try (CloseableIterator<FileStatus> crcFiles = | ||
engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) { | ||
List<FileStatus> crcFilesList = new ArrayList<>(); | ||
crcFiles | ||
.filter(file -> isChecksumFile(new Path(file.getPath()))) | ||
.forEachRemaining(crcFilesList::add); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scottsand-db I wonder if this can be refactored to use any of the log listing methods in your PR?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableReadsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CRCInfo.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
def buildTableWithCrc(path: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are the assumptions of what delta talbe state this method produces? 0.json to 11.json, 0.crc to 11.crc, 10.checkpoint.parquet?
Can we (a) state these assumptions, (b) assert that they are true, and (c) inline in the test succinctly what his has yieled e.g. buildTableWithCrc(path) // 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet
This will make understanding the tests much easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.json to 11.json, 0.crc to 11.crc, 10.checkpoint.parquet
Yes correct.
Added comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verbatim include this comment in the buildTableWithCrc(path)
usages above
buildTableWithCrc(path) // 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet
withTempDirAndMetricsEngine { (path, engine) => | ||
buildTableWithCrc(path) | ||
(3 to 6).foreach { version => | ||
val crcFile = new File(path, f"_delta_log/$version%020d.crc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when creating crc file names, can we please use FileNames. checksumFile
} | ||
} | ||
|
||
test( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test("checksum not found at read version and checkpoint exists at read version => use checkpoint") {
...el/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! A few more minor changes to make the code a bit clearer
snapshotHint = Optional.empty(); | ||
} | ||
|
||
// Finds the exclusive lower bound for CRC search. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inclusive
lower bound
checkArgument(targetedVersion == 0 || targetedVersion >= lowerBound); | ||
logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound); | ||
// First try to load the CRC at given version. If not found or failed to read then try to | ||
// find the latest CRC file that is created after the lower bound version or within the last 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is created at or after the lower bound version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if no lower bound is provided.
--> lowerbound is provided. it is not optional. we can remove this comment.
public static Optional<CRCInfo> getCRCInfo( | ||
Engine engine, Path logPath, long targetedVersion, long lowerBound) { | ||
// If targetedVersion is 0, lower bound will be ignored as snapshot 0 is the first snapshot. | ||
checkArgument(targetedVersion == 0 || targetedVersion >= lowerBound); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because of this or
we have to consider cases in the code below where lowerBound is greater than targedVersion. IMO this is confusing and adds extra congnitive load to understand the code.
why not just eliminate this case? checkArgument(targetVersion >= lowerBound)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an edge case caused by the caller's call which makes lowerBound at least 1
Math.max(
checkpointVersionFromLogSegment,
snapshotVersion - 100,
snapshotHint.orElse(0) + 1,
0
)
targetVersion=0 needs to be handle separately.
why not just eliminate this case? checkArgument(targetVersion >= lowerBound)
Done
I moved the check tocheckArgument(targetVersion >= lowerBound)
to line 64 before lower bound is actually used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an edge case caused by the caller's call which makes lowerBound at least 1
Another option would be to, in getCrcInfo, just set lowerBound to min(lowerBound, targetVersion), right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCrcInfo, just set lowerBound to min(lowerBound, targetVersion), right?
let me do that
return crcInfoOpt; | ||
} | ||
logger.info( | ||
"CRC file for version {} not found, attempt to loading version up to {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"CRC file for version {} not found, listing CRC files from version {}"
|
||
long crcVersion = FileNames.checksumVersion(filePath); | ||
|
||
return fromColumnarBatch(engine, crcVersion, batch, 0 /* rowId */, filePath.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CRCInfo.fromColumnarBatch
.
fromColumnarBatch
is used in lots of places in the code --> lets include the classname so it's clear to the reader which class and method you are calling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
public static <T> List<T> toFilteredList( | ||
CloseableIterator<T> iterator, Function<T, Boolean> filter) { | ||
List<T> result = new ArrayList<>(); | ||
iterator.filter(filter).forEachRemaining(result::add); | ||
return result; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry nit, I think it's easier if this is just toList
and we can do the iterator filtering in the outer call. This way we can re-use this even when we don't want to filter
i.e. toList(iterator.filter(filter))
Which Delta project/connector is this regarding?
Description
CP crc loading code to master branch from "kernel-20250115-crc-optimization"
PR created using
git cherry-pick 7d32f66d9efd1dca41ed2a45cf259525cbdd8952
How was this patch tested?
Does this PR introduce any user-facing changes?
No