Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28658 Add Iceberg REST Catalog client support #5628

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions iceberg/iceberg-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,20 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
<version>5.3</version>
</dependency>
</dependencies>
</project>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
package org.apache.iceberg.hive;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,15 +30,9 @@
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand All @@ -54,9 +45,6 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.BiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.thrift.TException;
import org.slf4j.Logger;
Expand All @@ -75,8 +63,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations
private static final String HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES =
"iceberg.hive.metadata-refresh-max-retries";
private static final int HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT = 2;

private static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
private final IcebergTableConverter icebergTableConverter;
public static final BiMap<String, String> ICEBERG_TO_HMS_TRANSLATION =
ImmutableBiMap.of(
// gc.enabled in Iceberg and external.table.purge in Hive are meant to do the same things
// but with different names
Expand Down Expand Up @@ -127,6 +115,7 @@ protected HiveTableOperations(
HIVE_ICEBERG_METADATA_REFRESH_MAX_RETRIES_DEFAULT);
this.maxHiveTablePropertySize =
conf.getLong(HIVE_TABLE_PROPERTY_MAX_SIZE, HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT);
this.icebergTableConverter = new IcebergTableConverter(maxHiveTablePropertySize);
}

@Override
Expand Down Expand Up @@ -220,13 +209,11 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
.collect(Collectors.toSet());
}

Map<String, String> summary =
Optional.ofNullable(metadata.currentSnapshot())
.map(Snapshot::summary)
.orElseGet(ImmutableMap::of);
setHmsTableParameters(
newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary);

icebergTableConverter.setHmsTableParameters(newMetadataLocation, tbl,
metadata, removedProps, hiveEngineEnabled);
if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
tbl.getParameters().put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
}
if (!keepHiveStats) {
StatsSetupConst.setBasicStatsState(tbl.getParameters(), StatsSetupConst.FALSE);
StatsSetupConst.clearColumnStatsState(tbl.getParameters());
Expand Down Expand Up @@ -304,117 +291,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}

private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableMetadata metadata,
Set<String> obsoleteProps, boolean hiveEngineEnabled,
Map<String, String> summary) {
Map<String, String> parameters = Optional.ofNullable(tbl.getParameters())
.orElseGet(Maps::newHashMap);

// push all Iceberg table properties into HMS
metadata.properties().entrySet().stream()
.filter(entry -> !entry.getKey().equalsIgnoreCase(HiveCatalog.HMS_TABLE_OWNER))
.forEach(
entry -> {
String key = entry.getKey();
// translate key names between Iceberg and HMS where needed
String hmsKey = ICEBERG_TO_HMS_TRANSLATION.getOrDefault(key, key);
parameters.put(hmsKey, entry.getValue());
});
if (metadata.uuid() != null) {
parameters.put(TableProperties.UUID, metadata.uuid());
}

// remove any props from HMS that are no longer present in Iceberg table props
obsoleteProps.forEach(parameters::remove);

parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
parameters.put(METADATA_LOCATION_PROP, newMetadataLocation);

if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
}

setStorageHandler(parameters, hiveEngineEnabled);

// Set the basic statistics
if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) {
parameters.put(StatsSetupConst.NUM_FILES, summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP));
}
if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) {
parameters.put(StatsSetupConst.ROW_COUNT, summary.get(SnapshotSummary.TOTAL_RECORDS_PROP));
}
if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) {
parameters.put(StatsSetupConst.TOTAL_SIZE, summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP));
}

setSnapshotStats(metadata, parameters);
setSchema(metadata.schema(), parameters);
setPartitionSpec(metadata, parameters);
setSortOrder(metadata, parameters);

tbl.setParameters(parameters);
}

private static void setStorageHandler(Map<String, String> parameters, boolean hiveEngineEnabled) {
// If needed set the 'storage_handler' property to enable query from Hive
if (hiveEngineEnabled) {
parameters.put(hive_metastoreConstants.META_TABLE_STORAGE, HiveOperationsBase.HIVE_ICEBERG_STORAGE_HANDLER);
} else {
parameters.remove(hive_metastoreConstants.META_TABLE_STORAGE);
}
}

@VisibleForTesting
void setSnapshotStats(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SNAPSHOT_ID);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_TIMESTAMP);
parameters.remove(TableProperties.CURRENT_SNAPSHOT_SUMMARY);

Snapshot currentSnapshot = metadata.currentSnapshot();
if (exposeInHmsProperties() && currentSnapshot != null) {
parameters.put(
TableProperties.CURRENT_SNAPSHOT_ID, String.valueOf(currentSnapshot.snapshotId()));
parameters.put(
TableProperties.CURRENT_SNAPSHOT_TIMESTAMP, String.valueOf(currentSnapshot.timestampMillis()));
setSnapshotSummary(parameters, currentSnapshot);
}

parameters.put(TableProperties.SNAPSHOT_COUNT, String.valueOf(metadata.snapshots().size()));
}

@VisibleForTesting
void setSnapshotSummary(Map<String, String> parameters, Snapshot currentSnapshot) {
try {
String summary = JsonUtil.mapper().writeValueAsString(currentSnapshot.summary());
if (summary.length() <= maxHiveTablePropertySize) {
parameters.put(TableProperties.CURRENT_SNAPSHOT_SUMMARY, summary);
} else {
LOG.warn("Not exposing the current snapshot({}) summary in HMS since it exceeds {} characters",
currentSnapshot.snapshotId(), maxHiveTablePropertySize);
}
} catch (JsonProcessingException e) {
LOG.warn("Failed to convert current snapshot({}) summary to a json string",
currentSnapshot.snapshotId(), e);
}
}

@VisibleForTesting
void setPartitionSpec(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_PARTITION_SPEC);
if (exposeInHmsProperties() && metadata.spec() != null && metadata.spec().isPartitioned()) {
String spec = PartitionSpecParser.toJson(metadata.spec());
setField(parameters, TableProperties.DEFAULT_PARTITION_SPEC, spec);
}
}

@VisibleForTesting
void setSortOrder(TableMetadata metadata, Map<String, String> parameters) {
parameters.remove(TableProperties.DEFAULT_SORT_ORDER);
if (exposeInHmsProperties() && metadata.sortOrder() != null && metadata.sortOrder().isSorted()) {
String sortOrder = SortOrderParser.toJson(metadata.sortOrder());
setField(parameters, TableProperties.DEFAULT_SORT_ORDER, sortOrder);
}
IcebergTableConverter getIcebergTableConverter() {
return icebergTableConverter;
}

@Override
Expand Down
Loading
Loading