diff --git a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 7edaaf0491f..ab2b1f89720 100644 --- a/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -202,7 +202,13 @@ case class DeltaSource( // isStartingVersion must be false here as we have bumped the version Some(DeltaSourceOffset(tableId, v + 1, -1, isStartingVersion = false)) } else { - Some(DeltaSourceOffset(tableId, v, i, isStartingVersion = v == version)) + // - When the local val `isStartingVersion` is `false`, it means this query is using + // `startingVersion/startingTimestamp`. In this case, we should use an offset that's based on + // json files (the offset's `isStartingVersion` should be `false`). + // - If `isStartingVersion` is true (in other words, it's not using + // `startingVersion/startingTimestamp`), we should use `v == version` to determine whether we + // should use an offset that's based on snapshots or json files. + Some(DeltaSourceOffset(tableId, v, i, isStartingVersion = isStartingVersion && v == version)) } } diff --git a/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala b/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala index 479103c2a3c..02402db16fb 100644 --- a/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala +++ b/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala @@ -1428,6 +1428,42 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest { } } + test("startingVersion should work with rate time") { + withTempDir { dir => + withTempView("startingVersionWithRateLimit") { + val path = dir.getAbsolutePath + // Create version 0 and version 1 and each version has two files + spark.range(0, 5).repartition(2).write.mode("append").format("delta").save(path) + spark.range(5, 10).repartition(2).write.mode("append").format("delta").save(path) + + val q = spark.readStream + .format("delta") + .option("startingVersion", 1) + .option("maxFilesPerTrigger", 1) + .load(path) + .writeStream + .format("memory") + .queryName("startingVersionWithRateLimit") + .start() + try { + q.processAllAvailable() + checkAnswer(sql("select * from startingVersionWithRateLimit"), (5 until 10).map(Row(_))) + val id = DeltaLog.forTable(spark, path).snapshot.metadata.id + val endOffsets = q.recentProgress + .map(_.sources(0).endOffset) + .map(offsetJson => DeltaSourceOffset(id, SerializedOffset(offsetJson))) + assert(endOffsets.toList == + DeltaSourceOffset(DeltaSourceOffset.VERSION, id, 1, 0, isStartingVersion = false) + // When we reach the end of version 1, we will jump to version 2 with index -1 + :: DeltaSourceOffset(DeltaSourceOffset.VERSION, id, 2, -1, isStartingVersion = false) + :: Nil) + } finally { + q.stop() + } + } + } + } + testQuietly("SC-46515: deltaSourceIgnoreChangesError contains removeFile, version") { withTempDirs { (inputDir, outputDir, checkpointDir) => Seq(1, 2, 3).toDF("x").write.format("delta").save(inputDir.toString)