Skip to content

Commit

Permalink
Merge pull request #10 from shendanfengg/fix-drop-table
Browse files Browse the repository at this point in the history
Fix drop table
  • Loading branch information
shendanfengg authored Nov 3, 2023
2 parents 0472fe2 + 29284a8 commit 5f9abbd
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ public class AmsClientPools {

private static final int CLIENT_POOL_MIN = 0;
private static final int CLIENT_POOL_MAX = 5;
private static final int CLIENT_POOL_TIMEOUT = 5000;

private static final LoadingCache<String, ThriftClientPool<ArcticTableMetastore.Client>> CLIENT_POOLS
= Caffeine.newBuilder()
.build(AmsClientPools::buildClientPool);
Expand All @@ -34,7 +32,6 @@ private static ThriftClientPool<ArcticTableMetastore.Client> buildClientPool(Str
poolConfig.setFailover(true);
poolConfig.setMinIdle(CLIENT_POOL_MIN);
poolConfig.setMaxIdle(CLIENT_POOL_MAX);
poolConfig.setTimeout(CLIENT_POOL_TIMEOUT);
return new ThriftClientPool<>(
url,
s -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,8 @@ public static void getTableList(Context ctx) {
}
for (TableIdentifier tableIdentifier : tableIdentifiers) {
TableMeta tableMeta = new TableMeta(tableIdentifier.getTableName(), TableMeta.TableType.ARCTIC.toString());
if (tempTables.contains(tableMeta)) {
tables.add(tableMeta);
tempTables.remove(tableMeta);
}
tables.add(tableMeta);
tempTables.remove(tableMeta);
}
tables.addAll(tempTables);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public Object upgradeHiveTable(ArcticHiveCatalog arcticHiveCatalog, TableIdentif
UpgradeHiveTableUtil.upgradeHiveTable(arcticHiveCatalog, tableIdentifier,
pkList, upgradeHiveMeta.getProperties());
runningInfoCache.get(tableIdentifier).setStatus(UpgradeStatus.SUCCESS.toString());
Thread.sleep(10 * 6 * 1000);
runningInfoCache.remove(tableIdentifier);
} catch (Throwable t) {
LOG.error("Failed to upgrade hive table to arctic ", t);
runningInfoCache.get(tableIdentifier).setErrorMessage(AmsUtils.getStackTrace(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,20 @@ public void dropDatabase(String databaseName) {

@Override
protected void doDropTable(TableMeta meta, boolean purge) {
// drop hive table operation will only delete hive table metadata
// delete data files operation will use BasicArcticCatalog
try {
hiveClientPool.run(client -> {
client.dropTable(meta.getTableIdentifier().getDatabase(),
meta.getTableIdentifier().getTableName(),
false /* deleteData */,
false /* ignoreUnknownTab */);
return null;
});
} catch (TException | InterruptedException e) {
throw new RuntimeException("Failed to drop table:" + meta.getTableIdentifier(), e);
if (HiveTableUtil.checkExist(hiveClientPool, TableIdentifier.of(meta.getTableIdentifier()))) {
// drop hive table operation will only delete hive table metadata
// delete data files operation will use BasicArcticCatalog
try {
hiveClientPool.run(client -> {
client.dropTable(meta.getTableIdentifier().getDatabase(),
meta.getTableIdentifier().getTableName(),
false /* deleteData */,
false /* ignoreUnknownTab */);
return null;
});
} catch (TException | InterruptedException e) {
throw new RuntimeException("Failed to drop table:" + meta.getTableIdentifier(), e);
}
}
super.doDropTable(meta, purge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,14 @@ public class HiveMetaSynchronizer {
* @param hiveClient hive client
*/
public static void syncHiveSchemaToArctic(ArcticTable table, HMSClientPool hiveClient) {
Table hiveTable;
try {
hiveTable = hiveClient.run(client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
} catch (Throwable t) {
LOG.error("Table {} not found in hive try to skip sync hive schema", table.id());
return;
}
try {
Table hiveTable = hiveClient.run(client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
Schema hiveSchema = HiveSchemaUtil.convertHiveSchemaToIcebergSchema(hiveTable, table.isKeyedTable() ?
table.asKeyedTable().primaryKeySpec().fieldNames() : new ArrayList<>());
UpdateSchema updateSchema = table.updateSchema();
Expand All @@ -91,8 +97,9 @@ public static void syncHiveSchemaToArctic(ArcticTable table, HMSClientPool hiveC
if (update) {
updateSchema.commit();
}
} catch (TException | InterruptedException e) {
throw new RuntimeException("Failed to get hive table:" + table.id(), e);
} catch (Throwable t) {
LOG.error("Fail to sync hive schema to arctic table {}", table.id(), t);
throw t;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static StorageDescriptor storageDescriptor(
* @param tableIdentifier A table identifier
* @return If table is existed in hive
*/
public boolean checkExist(HMSClientPool hiveClient, TableIdentifier tableIdentifier) {
public static boolean checkExist(HMSClientPool hiveClient, TableIdentifier tableIdentifier) {
String database = tableIdentifier.getDatabase();
String name = tableIdentifier.getTableName();
try {
Expand Down

0 comments on commit 5f9abbd

Please sign in to comment.