Skip to content

Commit

Permalink
[opt](hive) support orc generated from hive 1.x for all file scan node (
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored and HappenLee committed Jan 12, 2024
1 parent cc0e365 commit 6a524ae
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 34 deletions.
24 changes: 16 additions & 8 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_is_dict_cols_converted(false) {
Expand All @@ -165,7 +164,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
_scan_params(params),
_scan_range(range),
_ctz(ctz),
_is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
Expand Down Expand Up @@ -307,11 +305,15 @@ Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
std::vector<std::string> orc_cols;
std::vector<std::string> orc_cols_lower_case;
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
bool is_hive1_orc = false;
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc);

// In old version slot_name_to_schema_pos may not be set in _scan_params
// TODO, should be removed in 2.2 or later
_is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
if (_is_hive) {
if (_is_hive1_orc) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
Expand Down Expand Up @@ -346,7 +348,7 @@ Status OrcReader::_init_read_columns() {
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column name map.
// This is for Hive 1.x orc file with internal column name _col0, _col1...
if (_is_hive) {
if (_is_hive1_orc) {
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name;
}
_col_name_to_file_col_name[col_name] = read_col;
Expand All @@ -357,20 +359,26 @@ Status OrcReader::_init_read_columns() {

void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
std::unordered_map<std::string, const orc::Type*>& type_map) {
std::unordered_map<std::string, const orc::Type*>& type_map,
bool* is_hive1_orc) {
bool hive1_orc = true;
for (int i = 0; i < type.getSubtypeCount(); ++i) {
orc_cols.emplace_back(type.getFieldName(i));
auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
if (hive1_orc) {
hive1_orc = _is_hive1_col_name(filed_name_lower_case);
}
auto filed_name_lower_case_copy = filed_name_lower_case;
orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i));
if (_is_acid) {
const orc::Type* sub_type = type.getSubtype(i);
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map);
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map, is_hive1_orc);
}
}
}
*is_hive1_orc = hive1_orc;
}

bool OrcReader::_check_acid_schema(const orc::Type& type) {
Expand Down Expand Up @@ -845,7 +853,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
std::string name;
// For hive engine, translate the column name in orc file to schema column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
if (_is_hive) {
if (_is_hive1_orc) {
name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
} else {
name = _get_field_name_lower_case(&type, i);
Expand Down
24 changes: 21 additions & 3 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ class OrcReader : public GenericReader {
Status _init_read_columns();
void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
std::unordered_map<std::string, const orc::Type*>& type_map);
std::unordered_map<std::string, const orc::Type*>& type_map,
bool* is_hive1_orc);
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
Expand Down Expand Up @@ -483,6 +484,22 @@ class OrcReader : public GenericReader {
int64_t get_remaining_rows() { return _remaining_rows; }
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }

// check if the given name is like _col0, _col1, ...
static bool inline _is_hive1_col_name(const std::string& name) {
if (name.size() <= 4) {
return false;
}
if (name.substr(0, 4) != "_col") {
return false;
}
for (size_t i = 4; i < name.size(); ++i) {
if (!isdigit(name[i])) {
return false;
}
}
return true;
}

private:
// This is only for count(*) short circuit read.
// save the total number of rows in range
Expand All @@ -509,8 +526,9 @@ class OrcReader : public GenericReader {
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
// Flag for hive engine. True if the external table engine is Hive.
bool _is_hive = false;
// Flag for hive engine. True if the external table engine is Hive1.x with orc col name
// as _col1, col2, ...
bool _is_hive1_orc = false;
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ public void createScanRangeLocations() throws UserException {
return;
}
TFileFormatType fileFormatType = getFileFormatType();
if (fileFormatType == TFileFormatType.FORMAT_ORC) {
genSlotToSchemaIdMapForOrc();
}
params.setFormatType(fileFormatType);
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
Expand Down Expand Up @@ -463,6 +466,25 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col
return rangeDesc;
}

// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
// We need to save mapping from slot name to schema position
protected void genSlotToSchemaIdMapForOrc() {
Preconditions.checkNotNull(params);
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
int idx = 0;
for (Column col : baseSchema) {
if (col.getName().equals(slot.getColumn().getName())) {
columnNameToPosition.put(col.getName(), idx);
break;
}
idx += 1;
}
}
params.setSlotNameToSchemaPos(columnNameToPosition);
}

protected abstract TFileType getLocationType() throws UserException;

protected abstract TFileType getLocationType(String location) throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.planner.external;

import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand All @@ -39,7 +38,6 @@
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.HiveVersionUtil;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
Expand All @@ -55,7 +53,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -117,9 +114,6 @@ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
@Override
protected void doInitialize() throws UserException {
super.doInitialize();
if (HiveVersionUtil.isHive1(hmsTable.getHiveVersion())) {
genSlotToSchemaIdMap();
}

if (hmsTable.isHiveTransactionalTable()) {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
Expand Down Expand Up @@ -396,23 +390,6 @@ protected TFileAttributes getFileAttributes() throws UserException {
return fileAttributes;
}

// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
private void genSlotToSchemaIdMap() {
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
int idx = 0;
for (Column col : baseSchema) {
if (col.getName().equals(slot.getColumn().getName())) {
columnNameToPosition.put(col.getName(), idx);
break;
}
idx += 1;
}
}
params.setSlotNameToSchemaPos(columnNameToPosition);
}

@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {

Expand Down

0 comments on commit 6a524ae

Please sign in to comment.