Skip to content

Commit

Permalink
[SC-65398]Fix a bug that startingVersion/startingTimestamp doesn't wo…
Browse files Browse the repository at this point in the history
…rk with rate limit correctly

## What changes were proposed in this pull request?

When a query is using the rate limit options such as `maxFilesPerTrigger` with startingVersion/startingTimestamp, `isStartingVersion` may be set to `true` incorrectly.

This PR fixes the bug and also adds a test for this.

Fixes #568

## How was this patch tested?

The new added test.

Author: Shixiong Zhu <[email protected]>

#17099 is resolved by zsxwing/SC-65398.

GitOrigin-RevId: 962754812143197562c0f748b4d7b028e3982c3c
  • Loading branch information
zsxwing authored and pranavanand committed Jan 20, 2021
1 parent c75a55f commit 5172443
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ case class DeltaSource(
// isStartingVersion must be false here as we have bumped the version
Some(DeltaSourceOffset(tableId, v + 1, -1, isStartingVersion = false))
} else {
Some(DeltaSourceOffset(tableId, v, i, isStartingVersion = v == version))
// - When the local val `isStartingVersion` is `false`, it means this query is using
// `startingVersion/startingTimestamp`. In this case, we should use an offset that's based on
// json files (the offset's `isStartingVersion` should be `false`).
// - If `isStartingVersion` is true (in other words, it's not using
// `startingVersion/startingTimestamp`), we should use `v == version` to determine whether we
// should use an offset that's based on snapshots or json files.
Some(DeltaSourceOffset(tableId, v, i, isStartingVersion = isStartingVersion && v == version))
}
}

Expand Down
36 changes: 36 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,42 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest {
}
}

test("startingVersion should work with rate time") {
withTempDir { dir =>
withTempView("startingVersionWithRateLimit") {
val path = dir.getAbsolutePath
// Create version 0 and version 1 and each version has two files
spark.range(0, 5).repartition(2).write.mode("append").format("delta").save(path)
spark.range(5, 10).repartition(2).write.mode("append").format("delta").save(path)

val q = spark.readStream
.format("delta")
.option("startingVersion", 1)
.option("maxFilesPerTrigger", 1)
.load(path)
.writeStream
.format("memory")
.queryName("startingVersionWithRateLimit")
.start()
try {
q.processAllAvailable()
checkAnswer(sql("select * from startingVersionWithRateLimit"), (5 until 10).map(Row(_)))
val id = DeltaLog.forTable(spark, path).snapshot.metadata.id
val endOffsets = q.recentProgress
.map(_.sources(0).endOffset)
.map(offsetJson => DeltaSourceOffset(id, SerializedOffset(offsetJson)))
assert(endOffsets.toList ==
DeltaSourceOffset(DeltaSourceOffset.VERSION, id, 1, 0, isStartingVersion = false)
// When we reach the end of version 1, we will jump to version 2 with index -1
:: DeltaSourceOffset(DeltaSourceOffset.VERSION, id, 2, -1, isStartingVersion = false)
:: Nil)
} finally {
q.stop()
}
}
}
}

testQuietly("SC-46515: deltaSourceIgnoreChangesError contains removeFile, version") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
Seq(1, 2, 3).toDF("x").write.format("delta").save(inputDir.toString)
Expand Down

0 comments on commit 5172443

Please sign in to comment.