From a7608281c032d88fdacb91245311a874f22fa78d Mon Sep 17 00:00:00 2001 From: Manu Zhang Date: Fri, 25 Aug 2023 22:47:07 +0800 Subject: [PATCH] Spark3.2, Spark3.3: Return empty changed rows when there are no snapshots between start time and end time (#8391) Back-port of https://github.com/apache/iceberg/pull/8133 to `spark/v3.3` and `spark/v3.2` --- .../spark/extensions/TestChangelogTable.java | 26 +++++++++++++++---- .../spark/source/SparkChangelogScan.java | 6 ++++- .../spark/source/SparkScanBuilder.java | 23 ++++++++++------ .../spark/extensions/TestChangelogTable.java | 26 +++++++++++++++---- .../spark/source/SparkChangelogScan.java | 6 ++++- .../spark/source/SparkScanBuilder.java | 23 ++++++++++------ 6 files changed, 82 insertions(+), 28 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java index 603775eb11b7..cc81b4b3d323 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java @@ -136,6 +136,7 @@ public void testQueryWithTimeRange() { sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName); table.refresh(); Snapshot snap3 = table.currentSnapshot(); + long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis()); assertEquals( "Should have expected changed rows only from snapshot 3", @@ -166,6 +167,26 @@ public void testQueryWithTimeRange() { row(2, "b", "DELETE", 1, snap3.snapshotId()), row(-2, "b", "INSERT", 1, snap3.snapshotId())), changelogRecords(rightAfterSnap1, null)); + + assertEquals( + "Should have empty changed rows if end time is before the first snapshot", + ImmutableList.of(), + changelogRecords(null, snap1.timestampMillis() - 1)); + + assertEquals( + "Should have empty changed rows if start time is after the current snapshot", + ImmutableList.of(), + changelogRecords(rightAfterSnap3, null)); + + assertEquals( + "Should have empty changed rows if end time is before the first snapshot", + ImmutableList.of(), + changelogRecords(null, snap1.timestampMillis() - 1)); + + assertEquals( + "Should have empty changed rows if there are no snapshots between start time and end time", + ImmutableList.of(), + changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1)); } @Test @@ -185,11 +206,6 @@ public void testTimeRangeValidation() { "Should fail if start time is after end time", IllegalArgumentException.class, () -> changelogRecords(snap3.timestampMillis(), snap2.timestampMillis())); - - assertThrows( - "Should fail if start time is after the current snapshot", - IllegalArgumentException.class, - () -> changelogRecords(rightAfterSnap3, null)); } @Test diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 3f7927c6d6c2..e68bc8aee28c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -66,7 +66,8 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics { IncrementalChangelogScan scan, SparkReadConf readConf, Schema expectedSchema, - List filters) { + List filters, + boolean emptyScan) { SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); @@ -79,6 +80,9 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics { this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); + if (emptyScan) { + this.taskGroups = Collections.emptyList(); + } } @Override diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index c1144d944a66..7f628483b212 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -249,6 +249,7 @@ public Scan build() { return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions); } + @SuppressWarnings("CyclomaticComplexity") public Scan buildChangelogScan() { Preconditions.checkArgument( readConf.snapshotId() == null && readConf.asOfTimestamp() == null, @@ -281,12 +282,20 @@ public Scan buildChangelogScan() { SparkReadOptions.END_TIMESTAMP); } + boolean emptyScan = false; if (startTimestamp != null) { startSnapshotId = getStartSnapshotId(startTimestamp); + if (startSnapshotId == null && endTimestamp == null) { + emptyScan = true; + } } if (endTimestamp != null) { - endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp); + endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp); + if ((startSnapshotId == null && endSnapshotId == null) + || (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) { + emptyScan = true; + } } Schema expectedSchema = schemaWithMetadataColumns(); @@ -308,18 +317,16 @@ public Scan buildChangelogScan() { scan = configureSplitPlanning(scan); - return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions); + return new SparkChangelogScan( + spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan); } private Long getStartSnapshotId(Long startTimestamp) { Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp); - Preconditions.checkArgument( - oldestSnapshotAfter != null, - "Cannot find a snapshot older than %s for table %s", - startTimestamp, - table.name()); - if (oldestSnapshotAfter.timestampMillis() == startTimestamp) { + if (oldestSnapshotAfter == null) { + return null; + } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) { return oldestSnapshotAfter.snapshotId(); } else { return oldestSnapshotAfter.parentId(); diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java index 603775eb11b7..cc81b4b3d323 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java @@ -136,6 +136,7 @@ public void testQueryWithTimeRange() { sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName); table.refresh(); Snapshot snap3 = table.currentSnapshot(); + long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis()); assertEquals( "Should have expected changed rows only from snapshot 3", @@ -166,6 +167,26 @@ public void testQueryWithTimeRange() { row(2, "b", "DELETE", 1, snap3.snapshotId()), row(-2, "b", "INSERT", 1, snap3.snapshotId())), changelogRecords(rightAfterSnap1, null)); + + assertEquals( + "Should have empty changed rows if end time is before the first snapshot", + ImmutableList.of(), + changelogRecords(null, snap1.timestampMillis() - 1)); + + assertEquals( + "Should have empty changed rows if start time is after the current snapshot", + ImmutableList.of(), + changelogRecords(rightAfterSnap3, null)); + + assertEquals( + "Should have empty changed rows if end time is before the first snapshot", + ImmutableList.of(), + changelogRecords(null, snap1.timestampMillis() - 1)); + + assertEquals( + "Should have empty changed rows if there are no snapshots between start time and end time", + ImmutableList.of(), + changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1)); } @Test @@ -185,11 +206,6 @@ public void testTimeRangeValidation() { "Should fail if start time is after end time", IllegalArgumentException.class, () -> changelogRecords(snap3.timestampMillis(), snap2.timestampMillis())); - - assertThrows( - "Should fail if start time is after the current snapshot", - IllegalArgumentException.class, - () -> changelogRecords(rightAfterSnap3, null)); } @Test diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 54fdd186d473..0ce8d5c29e33 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -69,7 +69,8 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics { IncrementalChangelogScan scan, SparkReadConf readConf, Schema expectedSchema, - List filters) { + List filters, + boolean emptyScan) { SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); @@ -82,6 +83,9 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics { this.startSnapshotId = readConf.startSnapshotId(); this.endSnapshotId = readConf.endSnapshotId(); this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); + if (emptyScan) { + this.taskGroups = Collections.emptyList(); + } } @Override diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 2653b9eab1f5..c24bcaad58b1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -480,6 +480,7 @@ private Scan buildIncrementalAppendScan(long startSnapshotId, Long endSnapshotId return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions); } + @SuppressWarnings("CyclomaticComplexity") public Scan buildChangelogScan() { Preconditions.checkArgument( readConf.snapshotId() == null @@ -517,12 +518,20 @@ public Scan buildChangelogScan() { SparkReadOptions.END_TIMESTAMP); } + boolean emptyScan = false; if (startTimestamp != null) { startSnapshotId = getStartSnapshotId(startTimestamp); + if (startSnapshotId == null && endTimestamp == null) { + emptyScan = true; + } } if (endTimestamp != null) { - endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp); + endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp); + if ((startSnapshotId == null && endSnapshotId == null) + || (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) { + emptyScan = true; + } } Schema expectedSchema = schemaWithMetadataColumns(); @@ -544,18 +553,16 @@ public Scan buildChangelogScan() { scan = configureSplitPlanning(scan); - return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions); + return new SparkChangelogScan( + spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan); } private Long getStartSnapshotId(Long startTimestamp) { Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp); - Preconditions.checkArgument( - oldestSnapshotAfter != null, - "Cannot find a snapshot older than %s for table %s", - startTimestamp, - table.name()); - if (oldestSnapshotAfter.timestampMillis() == startTimestamp) { + if (oldestSnapshotAfter == null) { + return null; + } else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) { return oldestSnapshotAfter.snapshotId(); } else { return oldestSnapshotAfter.parentId();