Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Parsing and Writing Tests for V3 Metadata #11947

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public static TableMetadata replacePaths(
// TODO: update statistic file paths
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
metadata.rowLinageEnabled(),
metadata.nextRowId(),
metadata.changes());
}

Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public String toString() {
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;
private final boolean rowLineageEnabled;
private final long nextRowId;

@SuppressWarnings("checkstyle:CyclomaticComplexity")
TableMetadata(
Expand All @@ -288,6 +290,8 @@ public String toString() {
Map<String, SnapshotRef> refs,
List<StatisticsFile> statisticsFiles,
List<PartitionStatisticsFile> partitionStatisticsFiles,
boolean rowLineageEnabled,
long nextRowId,
List<MetadataUpdate> changes) {
Preconditions.checkArgument(
specs != null && !specs.isEmpty(), "Partition specs cannot be null or empty");
Expand All @@ -307,6 +311,13 @@ public String toString() {
Preconditions.checkArgument(
metadataFileLocation == null || changes.isEmpty(),
"Cannot create TableMetadata with a metadata location and changes");
Preconditions.checkArgument(
!rowLineageEnabled || formatVersion == 3,
"Row lineage is only supported in v3 (current version v%s)",
formatVersion);
Preconditions.checkArgument(
!rowLineageEnabled || nextRowId >= 0,
"Next row id is required when row lineage is enabled");

this.metadataFileLocation = metadataFileLocation;
this.formatVersion = formatVersion;
Expand Down Expand Up @@ -340,6 +351,8 @@ public String toString() {
this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);
this.statisticsFiles = ImmutableList.copyOf(statisticsFiles);
this.partitionStatisticsFiles = ImmutableList.copyOf(partitionStatisticsFiles);
this.rowLineageEnabled = rowLineageEnabled;
this.nextRowId = nextRowId;

HistoryEntry last = null;
for (HistoryEntry logEntry : snapshotLog) {
Expand Down Expand Up @@ -555,6 +568,14 @@ public List<MetadataLogEntry> previousFiles() {
return previousFiles;
}

public boolean rowLinageEnabled() {
return rowLineageEnabled;
}

public long nextRowId() {
return nextRowId;
}

public List<MetadataUpdate> changes() {
return changes;
}
Expand Down Expand Up @@ -903,6 +924,8 @@ public static class Builder {
private final Map<Long, List<StatisticsFile>> statisticsFiles;
private final Map<Long, List<PartitionStatisticsFile>> partitionStatisticsFiles;
private boolean suppressHistoricalSnapshots = false;
private boolean rowLinageEnabled;
private long nextRowId;

// change tracking
private final List<MetadataUpdate> changes;
Expand Down Expand Up @@ -949,6 +972,8 @@ private Builder(int formatVersion) {
this.schemasById = Maps.newHashMap();
this.specsById = Maps.newHashMap();
this.sortOrdersById = Maps.newHashMap();
this.rowLinageEnabled = false;
this.nextRowId = -1L;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, next-row-id can start from any integer, including negative ones. I am thinking may be in the implementation we could make it starts from 0 to simplify the implementation and make it more intuitive.

}

private Builder(TableMetadata base) {
Expand All @@ -971,6 +996,8 @@ private Builder(TableMetadata base) {
this.snapshots = Lists.newArrayList(base.snapshots());
this.changes = Lists.newArrayList(base.changes);
this.startingChangeCount = changes.size();
this.rowLinageEnabled = base.rowLineageEnabled;
this.nextRowId = base.nextRowId;

this.snapshotLog = Lists.newArrayList(base.snapshotLog);
this.previousFileLocation = base.metadataFileLocation;
Expand Down Expand Up @@ -1535,6 +1562,8 @@ public TableMetadata build() {
partitionStatisticsFiles.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList()),
rowLinageEnabled,
nextRowId,
discardChanges ? ImmutableList.of() : ImmutableList.copyOf(changes));
}

Expand Down
25 changes: 25 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadataParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ private TableMetadataParser() {}
static final String METADATA_LOG = "metadata-log";
static final String STATISTICS = "statistics";
static final String PARTITION_STATISTICS = "partition-statistics";
static final String ROW_LINEAGE = "row-lineage";
static final String NEXT_ROW_ID = "next-row-id";

public static void overwrite(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, true);
Expand Down Expand Up @@ -240,6 +242,13 @@ public static void toJson(TableMetadata metadata, JsonGenerator generator) throw
}
generator.writeEndArray();

if (metadata.formatVersion() == 3) {
generator.writeBooleanField(ROW_LINEAGE, metadata.rowLinageEnabled());
if (metadata.nextRowId() > -1L) {
flyrain marked this conversation as resolved.
Show resolved Hide resolved
generator.writeNumberField(NEXT_ROW_ID, metadata.nextRowId());
}
}

generator.writeArrayFieldStart(SNAPSHOT_LOG);
for (HistoryEntry logEntry : metadata.snapshotLog()) {
generator.writeStartObject();
Expand Down Expand Up @@ -352,6 +361,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
ImmutableList.Builder<Schema> builder = ImmutableList.builder();
for (JsonNode schemaNode : schemaArray) {
Schema current = SchemaParser.fromJson(schemaNode);
Schema.checkCompatibility(current, formatVersion);
if (current.schemaId() == currentSchemaId) {
schema = current;
}
Expand All @@ -372,6 +382,7 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
formatVersion == 1, "%s must exist in format v%s", SCHEMAS, formatVersion);

schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, node));
Schema.checkCompatibility(schema, formatVersion);
currentSchemaId = schema.schemaId();
schemas = ImmutableList.of(schema);
}
Expand Down Expand Up @@ -521,6 +532,18 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
}
}

boolean rowLineageEnabled = false;
if (formatVersion == 3 && node.hasNonNull(ROW_LINEAGE)) {
rowLineageEnabled = JsonUtil.getBool(ROW_LINEAGE, node);
}
Long nextRowId = JsonUtil.getLongOrNull(NEXT_ROW_ID, node);
Preconditions.checkArgument(
!rowLineageEnabled || nextRowId != null,
"Next row must be set when row lineage is enabled");
if (nextRowId == null) {
nextRowId = -1L;
}

return new TableMetadata(
metadataLocation,
formatVersion,
Expand All @@ -545,6 +568,8 @@ public static TableMetadata fromJson(String metadataLocation, JsonNode node) {
refs,
statisticsFiles,
partitionStatisticsFiles,
rowLineageEnabled,
nextRowId,
ImmutableList.of() /* no changes from the file */);
}

Expand Down
Loading