From 6688d7feafe807dad656ebca1e2fd6e0df264bd9 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 21 Dec 2023 16:39:09 +0800 Subject: [PATCH 1/3] [opt] support orc generated from hive 1.x for all file scan node --- .../planner/external/FileQueryScanNode.java | 22 ++++++++++++++++++ .../doris/planner/external/HiveScanNode.java | 23 ------------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 9bae707c2f3ed8..489bf6ac22c0a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -273,6 +273,9 @@ public void createScanRangeLocations() throws UserException { return; } TFileFormatType fileFormatType = getFileFormatType(); + if (fileFormatType == TFileFormatType.FORMAT_ORC) { + genSlotToSchemaIdMapForOrc(); + } params.setFormatType(fileFormatType); boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL; @@ -455,6 +458,25 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List col return rangeDesc; } + // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...) + // We need to save mapping from slot name to schema position + protected void genSlotToSchemaIdMapForOrc() { + Preconditions.checkNotNull(params); + List baseSchema = desc.getTable().getBaseSchema(); + Map columnNameToPosition = Maps.newHashMap(); + for (SlotDescriptor slot : desc.getSlots()) { + int idx = 0; + for (Column col : baseSchema) { + if (col.getName().equals(slot.getColumn().getName())) { + columnNameToPosition.put(col.getName(), idx); + break; + } + idx += 1; + } + } + params.setSlotNameToSchemaPos(columnNameToPosition); + } + protected abstract TFileType getLocationType() throws UserException; protected abstract TFileType getLocationType(String location) throws UserException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index f26db2b9fb70fc..b540cd67c5654e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -18,7 +18,6 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.FunctionCallExpr; -import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -39,7 +38,6 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue; import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.HiveTransaction; -import org.apache.doris.datasource.hive.HiveVersionUtil; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; @@ -55,7 +53,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import lombok.Setter; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -117,9 +114,6 @@ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, @Override protected void doInitialize() throws UserException { super.doInitialize(); - if (HiveVersionUtil.isHive1(hmsTable.getHiveVersion())) { - genSlotToSchemaIdMap(); - } if (hmsTable.isHiveTransactionalTable()) { this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()), @@ -396,23 +390,6 @@ protected TFileAttributes getFileAttributes() throws UserException { return fileAttributes; } - // To Support Hive 1.x orc internal column name like (_col0, _col1, _col2...) - private void genSlotToSchemaIdMap() { - List baseSchema = desc.getTable().getBaseSchema(); - Map columnNameToPosition = Maps.newHashMap(); - for (SlotDescriptor slot : desc.getSlots()) { - int idx = 0; - for (Column col : baseSchema) { - if (col.getName().equals(slot.getColumn().getName())) { - columnNameToPosition.put(col.getName(), idx); - break; - } - idx += 1; - } - } - params.setSlotNameToSchemaPos(columnNameToPosition); - } - @Override public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { From c7019d1674e884937e5d03a35ab5a15de8cea5ce Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 1 Jan 2024 21:32:38 +0800 Subject: [PATCH 2/3] 1 --- be/src/vec/exec/format/orc/vorc_reader.cpp | 24 ++++++++++++++-------- be/src/vec/exec/format/orc/vorc_reader.h | 21 ++++++++++++++++--- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 943b9d1e4d6066..ce234134c7b8a1 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -146,7 +146,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, _range_start_offset(range.start_offset), _range_size(range.size), _ctz(ctz), - _is_hive(params.__isset.slot_name_to_schema_pos), _io_ctx(io_ctx), _enable_lazy_mat(enable_lazy_mat), _is_dict_cols_converted(false) { @@ -165,7 +164,6 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& r _scan_params(params), _scan_range(range), _ctz(ctz), - _is_hive(params.__isset.slot_name_to_schema_pos), _file_system(nullptr), _io_ctx(io_ctx), _enable_lazy_mat(enable_lazy_mat), @@ -307,11 +305,15 @@ Status OrcReader::_init_read_columns() { auto& root_type = _reader->getType(); std::vector orc_cols; std::vector orc_cols_lower_case; - _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map); + bool is_hive1_orc = false; + _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc); + // In old version slot_name_to_schema_pos may not be set in _scan_params + // TODO, should be removed in 2.2 or later + _is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos; for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; - if (_is_hive) { + if (_is_hive1_orc) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); if (iter != _scan_params.slot_name_to_schema_pos.end()) { int pos = iter->second; @@ -346,7 +348,7 @@ Status OrcReader::_init_read_columns() { _read_cols_lower_case.emplace_back(col_name); // For hive engine, store the orc column name to schema column name map. // This is for Hive 1.x orc file with internal column name _col0, _col1... - if (_is_hive) { + if (_is_hive1_orc) { _removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name; } _col_name_to_file_col_name[col_name] = read_col; @@ -357,20 +359,26 @@ Status OrcReader::_init_read_columns() { void OrcReader::_init_orc_cols(const orc::Type& type, std::vector& orc_cols, std::vector& orc_cols_lower_case, - std::unordered_map& type_map) { + std::unordered_map& type_map, + bool* is_hive1_orc) { + bool hive1_orc = false; for (int i = 0; i < type.getSubtypeCount(); ++i) { orc_cols.emplace_back(type.getFieldName(i)); auto filed_name_lower_case = _get_field_name_lower_case(&type, i); + if (!hive1_orc) { + hive1_orc = _is_hive1_col_name(filed_name_lower_case); + } auto filed_name_lower_case_copy = filed_name_lower_case; orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case)); type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i)); if (_is_acid) { const orc::Type* sub_type = type.getSubtype(i); if (sub_type->getKind() == orc::TypeKind::STRUCT) { - _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map); + _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map, is_hive1_orc); } } } + *is_hive1_orc = hive1_orc; } bool OrcReader::_check_acid_schema(const orc::Type& type) { @@ -845,7 +853,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) { std::string name; // For hive engine, translate the column name in orc file to schema column name. // This is for Hive 1.x which use internal column name _col0, _col1... - if (_is_hive) { + if (_is_hive1_orc) { name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)]; } else { name = _get_field_name_lower_case(&type, i); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 366231deaee50b..e2a49210d0050b 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -247,7 +247,8 @@ class OrcReader : public GenericReader { Status _init_read_columns(); void _init_orc_cols(const orc::Type& type, std::vector& orc_cols, std::vector& orc_cols_lower_case, - std::unordered_map& type_map); + std::unordered_map& type_map, + bool* is_hive1_orc); static bool _check_acid_schema(const orc::Type& type); static const orc::Type& _remove_acid(const orc::Type& type); TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type); @@ -483,6 +484,19 @@ class OrcReader : public GenericReader { int64_t get_remaining_rows() { return _remaining_rows; } void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } + // check if the given name is like _col0, _col1, ... + bool inline _is_hive1_col_name(const std::string& name) { + if (name.substr(0, 4) != "_col") { + return false; + } + for (size_t i = 4; i < name.size(); ++i) { + if (!isdigit(name[i])) { + return false; + } + } + return true; + } + private: // This is only for count(*) short circuit read. // save the total number of rows in range @@ -509,8 +523,9 @@ class OrcReader : public GenericReader { // This is used for Hive 1.x which use internal column name in Orc file. // _col0, _col1... std::unordered_map _removed_acid_file_col_name_to_schema_col; - // Flag for hive engine. True if the external table engine is Hive. - bool _is_hive = false; + // Flag for hive engine. True if the external table engine is Hive1.x with orc col name + // as _col1, col2, ... + bool _is_hive1_orc = false; std::unordered_map _col_name_to_file_col_name; std::unordered_map _type_map; std::vector _col_orc_type; From 50463bf5fe9e18828a52514ebcd7a8f0519aadcd Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 1 Jan 2024 22:25:57 +0800 Subject: [PATCH 3/3] 2 --- be/src/vec/exec/format/orc/vorc_reader.cpp | 4 ++-- be/src/vec/exec/format/orc/vorc_reader.h | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index ce234134c7b8a1..17a3425f4afba7 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -361,11 +361,11 @@ void OrcReader::_init_orc_cols(const orc::Type& type, std::vector& std::vector& orc_cols_lower_case, std::unordered_map& type_map, bool* is_hive1_orc) { - bool hive1_orc = false; + bool hive1_orc = true; for (int i = 0; i < type.getSubtypeCount(); ++i) { orc_cols.emplace_back(type.getFieldName(i)); auto filed_name_lower_case = _get_field_name_lower_case(&type, i); - if (!hive1_orc) { + if (hive1_orc) { hive1_orc = _is_hive1_col_name(filed_name_lower_case); } auto filed_name_lower_case_copy = filed_name_lower_case; diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index e2a49210d0050b..b8bc05387b9029 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -485,7 +485,10 @@ class OrcReader : public GenericReader { void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } // check if the given name is like _col0, _col1, ... - bool inline _is_hive1_col_name(const std::string& name) { + static bool inline _is_hive1_col_name(const std::string& name) { + if (name.size() <= 4) { + return false; + } if (name.substr(0, 4) != "_col") { return false; }