Skip to content

Commit

Permalink
Spark3.2, Spark3.3: Return empty changed rows when there are no snaps…
Browse files Browse the repository at this point in the history
…hots between start time and end time (apache#8391)

Back-port of apache#8133 to `spark/v3.3` and `spark/v3.2`
  • Loading branch information
manuzhang authored Aug 25, 2023
1 parent 99410a1 commit a760828
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void testQueryWithTimeRange() {
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

assertEquals(
"Should have expected changed rows only from snapshot 3",
Expand Down Expand Up @@ -166,6 +167,26 @@ public void testQueryWithTimeRange() {
row(2, "b", "DELETE", 1, snap3.snapshotId()),
row(-2, "b", "INSERT", 1, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, null));

assertEquals(
"Should have empty changed rows if end time is before the first snapshot",
ImmutableList.of(),
changelogRecords(null, snap1.timestampMillis() - 1));

assertEquals(
"Should have empty changed rows if start time is after the current snapshot",
ImmutableList.of(),
changelogRecords(rightAfterSnap3, null));

assertEquals(
"Should have empty changed rows if end time is before the first snapshot",
ImmutableList.of(),
changelogRecords(null, snap1.timestampMillis() - 1));

assertEquals(
"Should have empty changed rows if there are no snapshots between start time and end time",
ImmutableList.of(),
changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
}

@Test
Expand All @@ -185,11 +206,6 @@ public void testTimeRangeValidation() {
"Should fail if start time is after end time",
IllegalArgumentException.class,
() -> changelogRecords(snap3.timestampMillis(), snap2.timestampMillis()));

assertThrows(
"Should fail if start time is after the current snapshot",
IllegalArgumentException.class,
() -> changelogRecords(rightAfterSnap3, null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics {
IncrementalChangelogScan scan,
SparkReadConf readConf,
Schema expectedSchema,
List<Expression> filters) {
List<Expression> filters,
boolean emptyScan) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

Expand All @@ -79,6 +80,9 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics {
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
if (emptyScan) {
this.taskGroups = Collections.emptyList();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public Scan build() {
return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
}

@SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
Expand Down Expand Up @@ -281,12 +282,20 @@ public Scan buildChangelogScan() {
SparkReadOptions.END_TIMESTAMP);
}

boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
if (startSnapshotId == null && endTimestamp == null) {
emptyScan = true;
}
}

if (endTimestamp != null) {
endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp);
if ((startSnapshotId == null && endSnapshotId == null)
|| (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) {
emptyScan = true;
}
}

Schema expectedSchema = schemaWithMetadataColumns();
Expand All @@ -308,18 +317,16 @@ public Scan buildChangelogScan() {

scan = configureSplitPlanning(scan);

return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
return new SparkChangelogScan(
spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan);
}

private Long getStartSnapshotId(Long startTimestamp) {
Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
Preconditions.checkArgument(
oldestSnapshotAfter != null,
"Cannot find a snapshot older than %s for table %s",
startTimestamp,
table.name());

if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
if (oldestSnapshotAfter == null) {
return null;
} else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
return oldestSnapshotAfter.snapshotId();
} else {
return oldestSnapshotAfter.parentId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void testQueryWithTimeRange() {
sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();
long rightAfterSnap3 = waitUntilAfter(snap3.timestampMillis());

assertEquals(
"Should have expected changed rows only from snapshot 3",
Expand Down Expand Up @@ -166,6 +167,26 @@ public void testQueryWithTimeRange() {
row(2, "b", "DELETE", 1, snap3.snapshotId()),
row(-2, "b", "INSERT", 1, snap3.snapshotId())),
changelogRecords(rightAfterSnap1, null));

assertEquals(
"Should have empty changed rows if end time is before the first snapshot",
ImmutableList.of(),
changelogRecords(null, snap1.timestampMillis() - 1));

assertEquals(
"Should have empty changed rows if start time is after the current snapshot",
ImmutableList.of(),
changelogRecords(rightAfterSnap3, null));

assertEquals(
"Should have empty changed rows if end time is before the first snapshot",
ImmutableList.of(),
changelogRecords(null, snap1.timestampMillis() - 1));

assertEquals(
"Should have empty changed rows if there are no snapshots between start time and end time",
ImmutableList.of(),
changelogRecords(rightAfterSnap2, snap3.timestampMillis() - 1));
}

@Test
Expand All @@ -185,11 +206,6 @@ public void testTimeRangeValidation() {
"Should fail if start time is after end time",
IllegalArgumentException.class,
() -> changelogRecords(snap3.timestampMillis(), snap2.timestampMillis()));

assertThrows(
"Should fail if start time is after the current snapshot",
IllegalArgumentException.class,
() -> changelogRecords(rightAfterSnap3, null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics {
IncrementalChangelogScan scan,
SparkReadConf readConf,
Schema expectedSchema,
List<Expression> filters) {
List<Expression> filters,
boolean emptyScan) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

Expand All @@ -82,6 +83,9 @@ class SparkChangelogScan implements Scan, SupportsReportStatistics {
this.startSnapshotId = readConf.startSnapshotId();
this.endSnapshotId = readConf.endSnapshotId();
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
if (emptyScan) {
this.taskGroups = Collections.emptyList();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ private Scan buildIncrementalAppendScan(long startSnapshotId, Long endSnapshotId
return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
}

@SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null
Expand Down Expand Up @@ -517,12 +518,20 @@ public Scan buildChangelogScan() {
SparkReadOptions.END_TIMESTAMP);
}

boolean emptyScan = false;
if (startTimestamp != null) {
startSnapshotId = getStartSnapshotId(startTimestamp);
if (startSnapshotId == null && endTimestamp == null) {
emptyScan = true;
}
}

if (endTimestamp != null) {
endSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, endTimestamp);
endSnapshotId = SnapshotUtil.nullableSnapshotIdAsOfTime(table, endTimestamp);
if ((startSnapshotId == null && endSnapshotId == null)
|| (startSnapshotId != null && startSnapshotId.equals(endSnapshotId))) {
emptyScan = true;
}
}

Schema expectedSchema = schemaWithMetadataColumns();
Expand All @@ -544,18 +553,16 @@ public Scan buildChangelogScan() {

scan = configureSplitPlanning(scan);

return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
return new SparkChangelogScan(
spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan);
}

private Long getStartSnapshotId(Long startTimestamp) {
Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
Preconditions.checkArgument(
oldestSnapshotAfter != null,
"Cannot find a snapshot older than %s for table %s",
startTimestamp,
table.name());

if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
if (oldestSnapshotAfter == null) {
return null;
} else if (oldestSnapshotAfter.timestampMillis() == startTimestamp) {
return oldestSnapshotAfter.snapshotId();
} else {
return oldestSnapshotAfter.parentId();
Expand Down

0 comments on commit a760828

Please sign in to comment.