Skip to content

Commit

Permalink
[AMORO-1744] Rename the name of classes with the arctic identifier in…
Browse files Browse the repository at this point in the history
… the mixed-format-hive module (#2835)

* Rename the name of classes with the arctic identifier in the mixed-format-hive module

* Change mixed hive catalog implemention class name
  • Loading branch information
zhoujinsong authored May 15, 2024
1 parent b5420c2 commit eef7e5e
Show file tree
Hide file tree
Showing 42 changed files with 261 additions and 256 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.amoro.api.config.Configurations;
import org.apache.amoro.hive.CachedHiveClientPool;
import org.apache.amoro.hive.HMSClientPool;
import org.apache.amoro.hive.catalog.ArcticHiveCatalog;
import org.apache.amoro.hive.catalog.MixedHiveCatalog;
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.UpgradeHiveTableUtil;
import org.apache.amoro.mixed.CatalogLoader;
Expand Down Expand Up @@ -211,8 +211,8 @@ public void upgradeHiveTable(Context ctx) {
Map<String, String> catalogProperties = new HashMap<>(originCatalogProperties);
catalogProperties.put(CatalogMetaProperties.TABLE_FORMATS, TableFormat.MIXED_HIVE.name());

ArcticHiveCatalog arcticHiveCatalog =
(ArcticHiveCatalog)
MixedHiveCatalog mixedHiveCatalog =
(MixedHiveCatalog)
CatalogLoader.createCatalog(
catalog, catalogMeta.getCatalogType(), catalogProperties, tableMetaStore);

Expand All @@ -222,7 +222,7 @@ public void upgradeHiveTable(Context ctx) {
upgradeRunningInfo.put(tableIdentifier, new UpgradeRunningInfo());
try {
UpgradeHiveTableUtil.upgradeHiveTable(
arcticHiveCatalog,
mixedHiveCatalog,
TableIdentifier.of(catalog, db, table),
upgradeHiveMeta.getPkList().stream()
.map(UpgradeHiveMeta.PrimaryKeyField::getFieldName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ protected void execute(TableRuntime tableRuntime) {
}

public static void syncIcebergToHive(MixedTable mixedTable) {
HiveMetaSynchronizer.syncArcticDataToHive((SupportHive) mixedTable);
HiveMetaSynchronizer.syncMixedTableDataToHive((SupportHive) mixedTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.amoro.catalog.CatalogTestHelper;
import org.apache.amoro.catalog.MixedTables;
import org.apache.amoro.hive.TestHMS;
import org.apache.amoro.hive.catalog.ArcticHiveCatalog;
import org.apache.amoro.hive.catalog.MixedHiveCatalog;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.properties.CatalogMetaProperties;
Expand Down Expand Up @@ -195,8 +195,8 @@ private void createMixedHiveTable() {
// only create mixed hive table here !
catalogMeta.putToCatalogProperties(
CatalogMetaProperties.TABLE_FORMATS, TableFormat.MIXED_HIVE.name());
ArcticHiveCatalog catalog =
(ArcticHiveCatalog)
MixedHiveCatalog catalog =
(MixedHiveCatalog)
CatalogLoader.createCatalog(
catalogMeta.getCatalogName(),
catalogMeta.getCatalogType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
public class CatalogLoader {

public static final String INTERNAL_CATALOG_IMPL = InternalMixedIcebergCatalog.class.getName();
public static final String HIVE_CATALOG_IMPL = "org.apache.amoro.hive.catalog.ArcticHiveCatalog";
public static final String HIVE_CATALOG_IMPL = "org.apache.amoro.hive.catalog.MixedHiveCatalog";
public static final String MIXED_ICEBERG_CATALOG_IMP = BasicMixedIcebergCatalog.class.getName();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,22 @@
* Extended implementation of {@link ClientPoolImpl} with {@link TableMetaStore} to support
* authenticated hive cluster.
*/
public class ArcticHiveClientPool extends ClientPoolImpl<HMSClient, TException> {
public class AuthenticatedHiveClientPool extends ClientPoolImpl<HMSClient, TException> {
private final TableMetaStore metaStore;

private final HiveConf hiveConf;
private static final Logger LOG = LoggerFactory.getLogger(ArcticHiveClientPool.class);
private static final Logger LOG = LoggerFactory.getLogger(AuthenticatedHiveClientPool.class);

private static final DynConstructors.Ctor<HiveMetaStoreClient> CLIENT_CTOR =
DynConstructors.builder()
.impl(HiveMetaStoreClient.class, HiveConf.class)
.impl(HiveMetaStoreClient.class, Configuration.class)
.build();

public ArcticHiveClientPool(TableMetaStore tableMetaStore, int poolSize) {
public AuthenticatedHiveClientPool(TableMetaStore tableMetaStore, int poolSize) {
super(poolSize, TTransportException.class, true);
this.hiveConf = new HiveConf(tableMetaStore.getConfiguration(), ArcticHiveClientPool.class);
this.hiveConf =
new HiveConf(tableMetaStore.getConfiguration(), AuthenticatedHiveClientPool.class);
this.hiveConf.addResource(tableMetaStore.getConfiguration());
this.hiveConf.addResource(tableMetaStore.getHiveSiteLocation().orElse(null));
this.metaStore = tableMetaStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

/** Cache {@link ArcticHiveClientPool} with {@link TableMetaStore} key. */
/** Cache {@link AuthenticatedHiveClientPool} with {@link TableMetaStore} key. */
public class CachedHiveClientPool implements HMSClientPool, Serializable {

private static Cache<TableMetaStore, ArcticHiveClientPool> clientPoolCache;
private static Cache<TableMetaStore, AuthenticatedHiveClientPool> clientPoolCache;

private final TableMetaStore tableMetaStore;
private final int clientPoolSize;
Expand All @@ -60,17 +60,17 @@ public CachedHiveClientPool(TableMetaStore tableMetaStore, Map<String, String> p
init();
}

private ArcticHiveClientPool clientPool() {
private AuthenticatedHiveClientPool clientPool() {
return clientPoolCache.get(
tableMetaStore, k -> new ArcticHiveClientPool(tableMetaStore, clientPoolSize));
tableMetaStore, k -> new AuthenticatedHiveClientPool(tableMetaStore, clientPoolSize));
}

private synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((ArcticHiveClientPool) value).close())
.removalListener((key, value, cause) -> ((AuthenticatedHiveClientPool) value).close())
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
import java.util.stream.Collectors;

/** Implementation of {@link MixedFormatCatalog} to support Hive table as base store. */
public class ArcticHiveCatalog implements MixedFormatCatalog {
public class MixedHiveCatalog implements MixedFormatCatalog {

private static final Logger LOG = LoggerFactory.getLogger(ArcticHiveCatalog.class);
private static final Logger LOG = LoggerFactory.getLogger(MixedHiveCatalog.class);

protected AmsClient client;
private CachedHiveClientPool hiveClientPool;
Expand Down Expand Up @@ -171,7 +171,7 @@ public static void putNotNullProperties(
}

/** HMS is case-insensitive for table name and database */
protected TableMeta getArcticTableMeta(TableIdentifier identifier) {
protected TableMeta getMixedTableMeta(TableIdentifier identifier) {
org.apache.hadoop.hive.metastore.api.Table hiveTable = null;
identifier = identifier.toLowCaseIdentifier();
try {
Expand All @@ -185,24 +185,24 @@ protected TableMeta getArcticTableMeta(TableIdentifier identifier) {

Map<String, String> hiveParameters = hiveTable.getParameters();

String arcticRootLocation = hiveParameters.get(MIXED_TABLE_ROOT_LOCATION);
if (arcticRootLocation == null) {
String mixedTableRootLocation = hiveParameters.get(MIXED_TABLE_ROOT_LOCATION);
if (mixedTableRootLocation == null) {
// if hive location ends with /hive, then it's a mixed-hive table. we need to remove /hive to
// get root location.
// if hive location doesn't end with /hive, then it's a pure-hive table. we can use the
// location as root location.
String hiveRootLocation = hiveTable.getSd().getLocation();
if (hiveRootLocation.endsWith("/hive")) {
arcticRootLocation = hiveRootLocation.substring(0, hiveRootLocation.length() - 5);
mixedTableRootLocation = hiveRootLocation.substring(0, hiveRootLocation.length() - 5);
} else {
arcticRootLocation = hiveRootLocation;
mixedTableRootLocation = hiveRootLocation;
}
}

// full path of base, change and root location
String baseLocation = arcticRootLocation + "/base";
String changeLocation = arcticRootLocation + "/change";
// load base table for get arctic table properties
String baseLocation = mixedTableRootLocation + "/base";
String changeLocation = mixedTableRootLocation + "/change";
// load base table for get table properties
Table baseIcebergTable = getTables().loadHadoopTableByLocation(baseLocation);
if (baseIcebergTable == null) {
throw new NoSuchTableException("load table failed %s, base table not found.", identifier);
Expand All @@ -213,7 +213,7 @@ protected TableMeta getArcticTableMeta(TableIdentifier identifier) {
tableMeta.setTableIdentifier(identifier.buildTableIdentifier());

Map<String, String> locations = new HashMap<>();
putNotNullProperties(locations, MetaTableProperties.LOCATION_KEY_TABLE, arcticRootLocation);
putNotNullProperties(locations, MetaTableProperties.LOCATION_KEY_TABLE, mixedTableRootLocation);
putNotNullProperties(locations, MetaTableProperties.LOCATION_KEY_CHANGE, changeLocation);
putNotNullProperties(locations, MetaTableProperties.LOCATION_KEY_BASE, baseLocation);
// set table location
Expand Down Expand Up @@ -267,7 +267,7 @@ public TableBuilder newTableBuilder(TableIdentifier identifier, Schema schema) {

@Override
public void renameTable(TableIdentifier from, String newTableName) {
throw new UnsupportedOperationException("unsupported rename arctic table for now.");
throw new UnsupportedOperationException("unsupported rename mixed-hive table for now.");
}

public HMSClientPool getHMSClient() {
Expand All @@ -279,11 +279,11 @@ public HMSClientPool getHMSClient() {
*
* <ul>
* <li>1、call getTableObjectsByName to get all Table objects of database
* <li>2、filter hive tables whose properties don't have arctic table flag
* <li>2、filter hive tables whose properties don't have mixed-hive table flag
* </ul>
*
* we don't do cache here because we create/drop table through engine (like spark) connector, they
* have another ArcticHiveCatalog instance。 we can't find a easy way to update cache.
* have another MixedHiveCatalog instance。 we can't find a easy way to update cache.
*
* @param database
* @return
Expand All @@ -299,7 +299,7 @@ public List<TableIdentifier> listTables(String database) {
List<org.apache.hadoop.hive.metastore.api.Table> hiveTables =
client.getTableObjectsByName(database, tableNames);
LOG.info("call getTableObjectsByName cost {} ms", System.currentTimeMillis() - start);
// filter hive tables whose properties don't have arctic table flag
// filter hive tables whose properties don't have mixed-hive table flag
if (hiveTables != null && !hiveTables.isEmpty()) {
List<TableIdentifier> loadResult =
hiveTables.stream()
Expand Down Expand Up @@ -344,7 +344,7 @@ private void validate(TableIdentifier identifier) {
@Override
public MixedTable loadTable(TableIdentifier identifier) {
validate(identifier);
TableMeta meta = getArcticTableMeta(identifier);
TableMeta meta = getMixedTableMeta(identifier);
if (meta.getLocations() == null) {
throw new IllegalStateException("load table failed, lack locations info");
}
Expand All @@ -356,7 +356,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
validate(identifier);
TableMeta meta;
try {
meta = getArcticTableMeta(identifier);
meta = getMixedTableMeta(identifier);
} catch (NoSuchTableException e) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ public void dropTableByMeta(TableMeta tableMeta, boolean purge) {
return;
}
// Drop hive table operation will only delete hive table metadata
// Delete data files operation will use BasicArcticCatalog
// Delete data files operation will use MixedHiveCatalog
if (purge) {
try {
hiveClientPool.run(
Expand All @@ -458,7 +458,8 @@ public void dropTableByMeta(TableMeta tableMeta, boolean purge) {
throw new RuntimeException("Failed to drop table:" + tableMeta.getTableIdentifier(), e);
}
} else {
// If purge is not true, we will not drop the hive table and need to remove the arctic table
// If purge is not true, we will not drop the hive table and need to remove the mixed-hive
// table
// flag
try {
hiveClientPool.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.Set;

/**
* Abstract implementation of ArcticDeleteFilter to adapt hive when open equality delete files.
* Abstract implementation of MixedDeleteFilter to adapt hive when open equality delete files.
*
* @param <T> to indicate the record data type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void commit() {
commitTimestamp = (int) (System.currentTimeMillis() / 1000);
applyDeleteExpr();
if (syncDataToHive) {
HiveMetaSynchronizer.syncArcticDataToHive(table);
HiveMetaSynchronizer.syncMixedTableDataToHive(table);
}
List<DataFile> committedDataFiles =
HiveCommitUtil.commitConsistentWriteFiles(this.addFiles, table.io(), table.spec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public MixFormatOptimizingDataReader(

@Override
public CloseableIterable<Record> readData() {
AdaptHiveGenericKeyedDataReader reader = arcticDataReader(table.schema());
AdaptHiveGenericKeyedDataReader reader = mixedTableDataReader(table.schema());

// Change returned value by readData from Iterator to Iterable in future
CloseableIterator<Record> closeableIterator =
Expand All @@ -80,15 +80,15 @@ public CloseableIterable<Record> readDeletedData() {
MetadataColumns.FILE_PATH,
MetadataColumns.ROW_POSITION,
org.apache.amoro.table.MetadataColumns.TREE_NODE_FIELD);
AdaptHiveGenericKeyedDataReader reader = arcticDataReader(schema);
AdaptHiveGenericKeyedDataReader reader = mixedTableDataReader(schema);
return wrapIterator2Iterable(
reader.readDeletedData(nodeFileScanTask(input.rePosDeletedDataFilesForMixed())));
}

@Override
public void close() {}

private AdaptHiveGenericKeyedDataReader arcticDataReader(Schema requiredSchema) {
private AdaptHiveGenericKeyedDataReader mixedTableDataReader(Schema requiredSchema) {

PrimaryKeySpec primaryKeySpec = PrimaryKeySpec.noPrimaryKey();
if (table.isKeyedTable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public KeyedHiveTable(
ChangeTable changeTable) {
super(tableLocation, primaryKeySpec, baseTable, changeTable);
this.hiveClient = hiveClient;
if (enableSyncHiveSchemaToArctic()) {
syncHiveSchemaToArctic();
if (enableSyncHiveSchemaToMixedTable()) {
syncHiveSchemaToMixedTable();
}
if (enableSyncHiveDataToArctic()) {
syncHiveDataToArctic(false);
if (enableSyncHiveDataToMixedTable()) {
syncHiveDataToMixedTable(false);
}
}

Expand All @@ -76,11 +76,11 @@ public TableFormat format() {
@Override
public void refresh() {
super.refresh();
if (enableSyncHiveSchemaToArctic()) {
syncHiveSchemaToArctic();
if (enableSyncHiveSchemaToMixedTable()) {
syncHiveSchemaToMixedTable();
}
if (enableSyncHiveDataToArctic()) {
syncHiveDataToArctic(false);
if (enableSyncHiveDataToMixedTable()) {
syncHiveDataToMixedTable(false);
}
}

Expand All @@ -90,29 +90,29 @@ public String hiveLocation() {
}

@Override
public boolean enableSyncHiveSchemaToArctic() {
public boolean enableSyncHiveSchemaToMixedTable() {
return PropertyUtil.propertyAsBoolean(
properties(),
HiveTableProperties.AUTO_SYNC_HIVE_SCHEMA_CHANGE,
HiveTableProperties.AUTO_SYNC_HIVE_SCHEMA_CHANGE_DEFAULT);
}

@Override
public void syncHiveSchemaToArctic() {
HiveMetaSynchronizer.syncHiveSchemaToArctic(this, hiveClient);
public void syncHiveSchemaToMixedTable() {
HiveMetaSynchronizer.syncHiveSchemaToMixedTable(this, hiveClient);
}

@Override
public boolean enableSyncHiveDataToArctic() {
public boolean enableSyncHiveDataToMixedTable() {
return PropertyUtil.propertyAsBoolean(
properties(),
HiveTableProperties.AUTO_SYNC_HIVE_DATA_WRITE,
HiveTableProperties.AUTO_SYNC_HIVE_DATA_WRITE_DEFAULT);
}

@Override
public void syncHiveDataToArctic(boolean force) {
HiveMetaSynchronizer.syncHiveDataToArctic(this, hiveClient, force);
public void syncHiveDataToMixedTable(boolean force) {
HiveMetaSynchronizer.syncHiveDataToMixedTable(this, hiveClient, force);
}

@Override
Expand Down Expand Up @@ -151,15 +151,15 @@ public static class HiveBaseInternalTable extends UnkeyedHiveTable implements Ba
public HiveBaseInternalTable(
TableIdentifier tableIdentifier,
Table icebergTable,
AuthenticatedHadoopFileIO arcticFileIO,
AuthenticatedHadoopFileIO fileIO,
String tableLocation,
HMSClientPool hiveClient,
Map<String, String> catalogProperties,
boolean syncHiveChange) {
super(
tableIdentifier,
icebergTable,
arcticFileIO,
fileIO,
tableLocation,
hiveClient,
catalogProperties,
Expand Down
Loading

0 comments on commit eef7e5e

Please sign in to comment.