Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smart optimize #5621

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public enum ErrorMsg {
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
COMPACTION_THREAD_INITIALIZATION(10442, "Compaction thread failed during initialization", false),
ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED(10443, "Filter expression can contain only partition columns."),
UNSUPPORTED_COMPACTION_REQUEST_WITH_FILE_SIZE_THRESHOLD(10444, "File size threshold is supported only with major and minor compaction for Iceberg tables"),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
Expand Down Expand Up @@ -506,7 +507,13 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
.map(x -> x.getJobConf().get(IcebergCompactionService.PARTITION_PATH))
.orElse(null);

commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath);
long fileSizeThreshold = jobContexts.stream()
.findAny()
.map(x -> x.getJobConf().get(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD))
.map(Long::parseLong)
.orElse(-1L);

commitCompaction(table, snapshotId, startTime, filesForCommit, partitionPath, fileSizeThreshold);
} else {
commitOverwrite(table, branchName, snapshotId, startTime, filesForCommit);
}
Expand Down Expand Up @@ -598,9 +605,10 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
* @param partitionPath The path of the compacted partition
*/
private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results,
String partitionPath) {
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath);
List<DeleteFile> existingDeleteFiles = IcebergCompactionUtil.getDeleteFiles(table, partitionPath);
String partitionPath, long fileSizeThreshold) {
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath, fileSizeThreshold);
List<DeleteFile> existingDeleteFiles = fileSizeThreshold == -1 ?
IcebergCompactionUtil.getDeleteFiles(table, partitionPath) : Collections.emptyList();

RewriteFiles rewriteFiles = table.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.iceberg.mr.hive.compaction;

import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
Expand All @@ -40,8 +41,8 @@ public IcebergCompactionService() {

public Boolean compact(Table table, CompactionInfo ci) throws Exception {

if (!ci.isMajorCompaction()) {
ci.errorMessage = "Presently Iceberg tables support only Major compaction";
if (!ci.isMajorCompaction() && !ci.isMinorCompaction() && !ci.isSmartOptimize()) {
ci.errorMessage = String.format("Iceberg tables do not support %s compaction type", ci.type.name());
LOG.error(ci.errorMessage + " Compaction info: {}", ci);
try {
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
Expand All @@ -53,13 +54,18 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
CompactorUtil.checkInterrupt(CLASS_NAME);

org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, table);
if (!IcebergCompactionEvaluator.isEligibleForCompaction(icebergTable, ci.partName, ci.type, conf)) {
IcebergCompactionEvaluator compactionEvaluator = new IcebergCompactionEvaluator(icebergTable, ci, conf);
if (!compactionEvaluator.isEligibleForCompaction()) {
LOG.info("Table={}{} doesn't meet requirements for compaction", table.getTableName(),
ci.partName == null ? "" : ", partition=" + ci.partName);
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
return false;
}

if (ci.type == CompactionType.SMART) {
ci.type = compactionEvaluator.determineCompactionType();
}

if (ci.runAs == null) {
ci.runAs = TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.iceberg.mr.hive.compaction;

import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -29,6 +33,7 @@
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class IcebergCompactionUtil {
Expand All @@ -55,6 +60,12 @@ public static boolean shouldIncludeForCompaction(Table table, String partitionPa
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
}

public static boolean shouldIncludeForCompaction(Table table, String partitionPath, ContentFile<?> file,
long fileSizeThreshold) {
return shouldIncludeForCompaction(table, partitionPath, file) &&
(fileSizeThreshold == -1 || file.fileSizeInBytes() < fileSizeThreshold);
}

/**
* Returns table's list of data files as following:
* 1. If the table is unpartitioned, returns all data files.
Expand All @@ -63,13 +74,13 @@ public static boolean shouldIncludeForCompaction(Table table, String partitionPa
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DataFile> getDataFiles(Table table, String partitionPath) {
public static List<DataFile> getDataFiles(Table table, String partitionPath, long fileSizeThreshold) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return shouldIncludeForCompaction(table, partitionPath, file);
return shouldIncludeForCompaction(table, partitionPath, file, fileSizeThreshold);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}
Expand All @@ -94,4 +105,48 @@ public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath)
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}

/**
* Returns target file size as following:
* In case of Minor compaction:
* 1. When COMPACTION_FILE_SIZE_THRESHOLD is defined, returns it.
* 2. Otherwise, calculates the file size threshold as:
* COMPACTION_FILE_SIZE_THRESHOLD * TableProperties.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE
* This makes Compaction evaluator consider data files with size less than file size threshold as undersized
* segment files eligible for minor compaction (as per Amoro compaction evaluator, which is minor compaction
* in Hive).
* In case of Major compaction returns -1.
* @param ci the compaction info
* @param conf Hive configuration
*/
public static long getFileSizeThreshold(CompactionInfo ci, HiveConf conf) {
switch (ci.type) {
case MINOR:
return Optional.ofNullable(ci.getProperty(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD))
.map(HiveConf::toSizeBytes).orElse((long) (HiveConf.toSizeBytes(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE)) *
TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT));
case MAJOR:
return -1;
default:
throw new RuntimeException(String.format("Unsupported compaction type %s", ci.type));
}
}

/**
* Returns target file size as following:
* 1. When COMPACTION_FILE_SIZE_THRESHOLD is defined, calculates target size as:
* COMPACTION_FILE_SIZE_THRESHOLD / TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT
* This makes Compaction evaluator consider data files with size less than COMPACTION_FILE_SIZE_THRESHOLD as
* fragments eligible for major (as per Amoro definition, minor compaction in Hive) compaction.
* 2. Otherwise, returns the default HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE from HiveConf.
* @param ci the compaction info
* @param conf Hive configuration
*/
public static long getTargetFileSize(CompactionInfo ci, HiveConf conf) {
return Optional.ofNullable(ci.getProperty(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD))
.map(HiveConf::toSizeBytes).map(x -> (long) (x / TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT))
.orElse(HiveConf.toSizeBytes(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.DriverUtils;
Expand All @@ -43,9 +44,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergMajorQueryCompactor extends QueryCompactor {
public class IcebergQueryCompactor extends QueryCompactor {

private static final Logger LOG = LoggerFactory.getLogger(IcebergMajorQueryCompactor.class.getName());
private static final Logger LOG = LoggerFactory.getLogger(IcebergQueryCompactor.class.getName());

@Override
public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException {
Expand All @@ -62,20 +63,32 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
String compactionQuery;
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
String fileSizeCond = null;

if (ci.type == CompactionType.MINOR) {
long fileSizeInBytesThreshold = IcebergCompactionUtil.getFileSizeThreshold(ci, conf);
fileSizeCond = String.format("%1$s in (select file_path from %2$s.files where file_size_in_bytes < %3$d)",
VirtualColumn.FILE_PATH.getName(), compactTableName, fileSizeInBytesThreshold);
conf.setLong(CompactorContext.COMPACTION_FILE_SIZE_THRESHOLD, fileSizeInBytesThreshold);
// IOW query containing a join with Iceberg .files metadata table fails with exception that Iceberg AVRO format
// doesn't support vectorization, hence disabling it in this case.
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}

if (partSpec == null) {
if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s", compactTableName, orderBy);
compactionQuery = String.format("insert overwrite table %s select * from %<s %2$s %3$s", compactTableName,
fileSizeCond == null ? "" : "where " + fileSizeCond, orderBy);
} else if (icebergTable.specs().size() > 1) {
// Compacting partitions of old partition specs on a partitioned table with partition evolution
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
// A single filter on a virtual column causes errors during compilation,
// added another filter on file_path as a workaround.
compactionQuery = String.format("insert overwrite table %1$s select * from %1$s " +
"where %2$s != %3$d and %4$s is not null %5$s",
"where %2$s != %3$d and %4$s is not null %5$s %6$s",
compactTableName, VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
VirtualColumn.FILE_PATH.getName(), orderBy);
VirtualColumn.FILE_PATH.getName(), fileSizeCond == null ? "" : "and " + fileSizeCond, orderBy);
} else {
// Partitioned table without partition evolution with partition spec as null in the compaction request - this
// code branch is not supposed to be reachable
Expand All @@ -90,8 +103,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);

compactionQuery = String.format("insert overwrite table %1$s select * from %1$s where %2$s=%3$d " +
"and %4$s is not null %5$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
VirtualColumn.FILE_PATH.getName(), orderBy);
"and %4$s is not null %5$s %6$s", compactTableName, VirtualColumn.PARTITION_HASH.getName(), partitionHash,
VirtualColumn.FILE_PATH.getName(), fileSizeCond == null ? "" : "and " + fileSizeCond, orderBy);
}

SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
Expand All @@ -45,52 +46,70 @@ public class IcebergCompactionEvaluator {

private static final long LAST_OPTIMIZE_TIME = 0;
private static final int TRIGGER_INTERVAL = 0;

private IcebergCompactionEvaluator() {

private CommonPartitionEvaluator partitionEvaluator;
private final Table table;
private final CompactionInfo ci;
private final HiveConf conf;

public IcebergCompactionEvaluator(Table table, CompactionInfo ci, HiveConf conf) {
this.table = table;
this.ci = ci;
this.conf = conf;

if (table.currentSnapshot() != null) {
partitionEvaluator = createCommonPartitionEvaluator();
} else {
LOG.info("Table {}{} doesn't require compaction because it is empty", table,
ci.partName == null ? "" : " partition " + ci.partName);
}
}

private static final Logger LOG = LoggerFactory.getLogger(IcebergCompactionEvaluator.class);

public static boolean isEligibleForCompaction(Table icebergTable, String partitionPath,
CompactionType compactionType, HiveConf conf) {

if (icebergTable.currentSnapshot() == null) {
LOG.info("Table {}{} doesn't require compaction because it is empty", icebergTable,
partitionPath == null ? "" : " partition " + partitionPath);
return false;
}

CommonPartitionEvaluator partitionEvaluator = createCommonPartitionEvaluator(icebergTable, partitionPath, conf);
public boolean isEligibleForCompaction() {

if (partitionEvaluator == null) {
if (table.currentSnapshot() == null || partitionEvaluator == null) {
return false;
}

switch (compactionType) {
switch (ci.type) {
case MINOR:
return partitionEvaluator.isMinorNecessary();
return partitionEvaluator.isMinorNecessary() || partitionEvaluator.isMajorNecessary();
case MAJOR:
return partitionEvaluator.isFullNecessary() || partitionEvaluator.isMajorNecessary();
return partitionEvaluator.isFullNecessary();
case SMART:
return partitionEvaluator.isMinorNecessary() || partitionEvaluator.isMajorNecessary() ||
partitionEvaluator.isFullNecessary();
default:
return false;
}
}

private static TableRuntime createTableRuntime(Table icebergTable, HiveConf conf) {
long targetFileSizeBytes = HiveConf.getSizeVar(conf,
HiveConf.ConfVars.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE);
public CompactionType determineCompactionType() {
if (ci.type == CompactionType.SMART) {
if (partitionEvaluator.isFullNecessary()) {
return CompactionType.MAJOR;
} else if (partitionEvaluator.isMinorNecessary() || partitionEvaluator.isMajorNecessary()) {
return CompactionType.MINOR;
} else {
return null;
}
} else {
return ci.type;
}
}

private TableRuntime createTableRuntime() {
OptimizingConfig optimizingConfig = OptimizingConfig.parse(Collections.emptyMap());
optimizingConfig.setTargetSize(targetFileSizeBytes);
optimizingConfig.setTargetSize(IcebergCompactionUtil.getTargetFileSize(ci, conf));
optimizingConfig.setFullTriggerInterval(TRIGGER_INTERVAL);
optimizingConfig.setMinorLeastInterval(TRIGGER_INTERVAL);

TableConfiguration tableConfig = new TableConfiguration();
tableConfig.setOptimizingConfig(optimizingConfig);

TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
tableRuntimeMeta.setTableName(icebergTable.name());
tableRuntimeMeta.setTableName(table.name());
tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
tableRuntimeMeta.setLastFullOptimizingTime(LAST_OPTIMIZE_TIME);
tableRuntimeMeta.setLastMinorOptimizingTime(LAST_OPTIMIZE_TIME);
Expand All @@ -99,9 +118,9 @@ private static TableRuntime createTableRuntime(Table icebergTable, HiveConf conf
return new HiveTableRuntime(tableRuntimeMeta);
}

private static CommonPartitionEvaluator createCommonPartitionEvaluator(Table table, String partitionPath,
HiveConf conf) {
TableRuntime tableRuntime = createTableRuntime(table, conf);
private CommonPartitionEvaluator createCommonPartitionEvaluator() {
TableRuntime tableRuntime = createTableRuntime();
long fileSizeThreshold = IcebergCompactionUtil.getFileSizeThreshold(ci, conf);

TableFileScanHelper tableFileScanHelper = new IcebergTableFileScanHelper(table,
table.currentSnapshot().snapshotId());
Expand All @@ -110,7 +129,7 @@ private static CommonPartitionEvaluator createCommonPartitionEvaluator(Table tab
tableFileScanHelper.scan()) {
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
DataFile file = fileScanResult.file();
if (IcebergCompactionUtil.shouldIncludeForCompaction(table, partitionPath, file)) {
if (IcebergCompactionUtil.shouldIncludeForCompaction(table, ci.partName, file, fileSizeThreshold)) {
PartitionSpec partitionSpec = table.specs().get(file.specId());
Pair<Integer, StructLike> partition = Pair.of(partitionSpec.specId(), fileScanResult.file().partition());

Expand Down
Loading
Loading