Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](hive) support orc generated from hive 1.x for all file scan node #28806

Merged
merged 3 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
_is_dict_cols_converted(false) {
Expand All @@ -165,7 +164,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r
_scan_params(params),
_scan_range(range),
_ctz(ctz),
_is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat),
Expand Down Expand Up @@ -307,11 +305,15 @@ Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
std::vector<std::string> orc_cols;
std::vector<std::string> orc_cols_lower_case;
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
bool is_hive1_orc = false;
_init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc);

// In old version slot_name_to_schema_pos may not be set in _scan_params
// TODO, should be removed in 2.2 or later
_is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos;
for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];
if (_is_hive) {
if (_is_hive1_orc) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
Expand Down Expand Up @@ -346,7 +348,7 @@ Status OrcReader::_init_read_columns() {
_read_cols_lower_case.emplace_back(col_name);
// For hive engine, store the orc column name to schema column name map.
// This is for Hive 1.x orc file with internal column name _col0, _col1...
if (_is_hive) {
if (_is_hive1_orc) {
_removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name;
}
_col_name_to_file_col_name[col_name] = read_col;
Expand All @@ -357,20 +359,26 @@ Status OrcReader::_init_read_columns() {

void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
std::unordered_map<std::string, const orc::Type*>& type_map) {
std::unordered_map<std::string, const orc::Type*>& type_map,
bool* is_hive1_orc) {
bool hive1_orc = true;
for (int i = 0; i < type.getSubtypeCount(); ++i) {
orc_cols.emplace_back(type.getFieldName(i));
auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
if (hive1_orc) {
hive1_orc = _is_hive1_col_name(filed_name_lower_case);
}
auto filed_name_lower_case_copy = filed_name_lower_case;
orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i));
if (_is_acid) {
const orc::Type* sub_type = type.getSubtype(i);
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map);
_init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map, is_hive1_orc);
}
}
}
*is_hive1_orc = hive1_orc;
}

bool OrcReader::_check_acid_schema(const orc::Type& type) {
Expand Down Expand Up @@ -845,7 +853,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) {
std::string name;
// For hive engine, translate the column name in orc file to schema column name.
// This is for Hive 1.x which use internal column name _col0, _col1...
if (_is_hive) {
if (_is_hive1_orc) {
name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)];
} else {
name = _get_field_name_lower_case(&type, i);
Expand Down
24 changes: 21 additions & 3 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ class OrcReader : public GenericReader {
Status _init_read_columns();
void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
std::unordered_map<std::string, const orc::Type*>& type_map);
std::unordered_map<std::string, const orc::Type*>& type_map,
bool* is_hive1_orc);
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
Expand Down Expand Up @@ -483,6 +484,22 @@ class OrcReader : public GenericReader {
int64_t get_remaining_rows() { return _remaining_rows; }
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }

// check if the given name is like _col0, _col1, ...
static bool inline _is_hive1_col_name(const std::string& name) {
if (name.size() <= 4) {
return false;
}
if (name.substr(0, 4) != "_col") {
return false;
}
for (size_t i = 4; i < name.size(); ++i) {
if (!isdigit(name[i])) {
return false;
}
}
return true;
}

private:
// This is only for count(*) short circuit read.
// save the total number of rows in range
Expand All @@ -509,8 +526,9 @@ class OrcReader : public GenericReader {
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
// Flag for hive engine. True if the external table engine is Hive.
bool _is_hive = false;
// Flag for hive engine. True if the external table engine is Hive1.x with orc col name
// as _col1, col2, ...
bool _is_hive1_orc = false;
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ public void createScanRangeLocations() throws UserException {
return;
}
TFileFormatType fileFormatType = getFileFormatType();
if (fileFormatType == TFileFormatType.FORMAT_ORC) {
genSlotToSchemaIdMapForOrc();
}
params.setFormatType(fileFormatType);
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
Expand Down Expand Up @@ -455,6 +458,25 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> col
return rangeDesc;
}

// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
// We need to save mapping from slot name to schema position
protected void genSlotToSchemaIdMapForOrc() {
Preconditions.checkNotNull(params);
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
int idx = 0;
for (Column col : baseSchema) {
if (col.getName().equals(slot.getColumn().getName())) {
columnNameToPosition.put(col.getName(), idx);
break;
}
idx += 1;
}
}
params.setSlotNameToSchemaPos(columnNameToPosition);
}

protected abstract TFileType getLocationType() throws UserException;

protected abstract TFileType getLocationType(String location) throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.planner.external;

import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
Expand All @@ -39,7 +38,6 @@
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.HiveVersionUtil;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
Expand All @@ -55,7 +53,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Setter;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -117,9 +114,6 @@ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
@Override
protected void doInitialize() throws UserException {
super.doInitialize();
if (HiveVersionUtil.isHive1(hmsTable.getHiveVersion())) {
genSlotToSchemaIdMap();
}

if (hmsTable.isHiveTransactionalTable()) {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
Expand Down Expand Up @@ -396,23 +390,6 @@ protected TFileAttributes getFileAttributes() throws UserException {
return fileAttributes;
}

// To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...)
private void genSlotToSchemaIdMap() {
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
int idx = 0;
for (Column col : baseSchema) {
if (col.getName().equals(slot.getColumn().getName())) {
columnNameToPosition.put(col.getName(), idx);
break;
}
idx += 1;
}
}
params.setSlotNameToSchemaPos(columnNameToPosition);
}

@Override
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {

Expand Down
Loading