From 830813cb9e7bfe3b6e6db299431215ca0c577736 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 22 Aug 2024 19:59:25 +0800 Subject: [PATCH] use enum and rename unique key update mode --- be/src/exec/tablet_info.cpp | 45 ++++++++++++++----- be/src/exec/tablet_info.h | 14 +++--- be/src/http/action/stream_load.cpp | 17 ++++--- be/src/olap/partial_update_info.cpp | 28 +++++------- be/src/olap/partial_update_info.h | 16 ++++--- .../doris/datasource/FileGroupInfo.java | 18 ++++---- .../doris/datasource/LoadScanProvider.java | 2 +- .../main/java/org/apache/doris/load/Load.java | 16 +++---- .../apache/doris/load/loadv2/LoadTask.java | 6 --- .../apache/doris/planner/OlapTableSink.java | 27 ++++++----- .../doris/planner/StreamLoadPlanner.java | 25 +++++------ .../org/apache/doris/task/LoadTaskInfo.java | 5 ++- .../org/apache/doris/task/StreamLoadTask.java | 14 +++--- gensrc/proto/descriptors.proto | 4 +- gensrc/proto/olap_file.proto | 10 ++--- gensrc/thrift/Descriptors.thrift | 4 +- gensrc/thrift/Types.thrift | 4 +- 17 files changed, 137 insertions(+), 118 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 9ad92c521e4b06..f7879d2738053d 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -117,10 +118,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { _db_id = pschema.db_id(); _table_id = pschema.table_id(); _version = pschema.version(); - _is_fixed_partial_update = pschema.partial_update(); - _is_flexible_partial_update = pschema.is_flexible_partial_update(); + _unique_key_update_mode = pschema.unique_key_update_mode(); _is_strict_mode = pschema.is_strict_mode(); - if (_is_fixed_partial_update) { + if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { _auto_increment_column = pschema.auto_increment_column(); if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) { return Status::InternalError( @@ -153,7 +153,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) { index->index_id = p_index.id(); index->schema_hash = p_index.schema_hash(); for (const auto& pcolumn_desc : p_index.columns_desc()) { - if (!_is_fixed_partial_update || + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS || _partial_update_input_columns.contains(pcolumn_desc.name())) { auto it = slots_map.find(std::make_pair( to_lower(pcolumn_desc.name()), @@ -187,13 +187,37 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { _db_id = tschema.db_id; _table_id = tschema.table_id; _version = tschema.version; - _is_fixed_partial_update = tschema.is_partial_update; - _is_flexible_partial_update = - tschema.__isset.is_flexible_partial_update && tschema.is_flexible_partial_update; + if (tschema.__isset.unique_key_update_mode) { + switch (tschema.unique_key_update_mode) { + case doris::TUniqueKeyUpdateMode::UPSERT: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT; + break; + } + case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + break; + } + case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; + break; + } + default: { + return Status::InternalError( + "Unknown unique_key_update_mode: {}, should be one of " + "UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS", + tschema.unique_key_update_mode); + } + } + } else { + // for backward compatibility + if (tschema.__isset.is_partial_update && tschema.is_partial_update) { + _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + } + } if (tschema.__isset.is_strict_mode) { _is_strict_mode = tschema.is_strict_mode; } - if (_is_fixed_partial_update) { + if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { _auto_increment_column = tschema.auto_increment_column; if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) { return Status::InternalError( @@ -221,7 +245,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) { index->index_id = t_index.id; index->schema_hash = t_index.schema_hash; for (const auto& tcolumn_desc : t_index.columns_desc) { - if (!_is_fixed_partial_update || + if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS || _partial_update_input_columns.contains(tcolumn_desc.column_name)) { auto it = slots_map.find( std::make_pair(to_lower(tcolumn_desc.column_name), @@ -270,8 +294,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const { pschema->set_db_id(_db_id); pschema->set_table_id(_table_id); pschema->set_version(_version); - pschema->set_partial_update(_is_fixed_partial_update); - pschema->set_is_flexible_partial_update(_is_flexible_partial_update); + pschema->set_unique_key_update_mode(_unique_key_update_mode); pschema->set_is_strict_mode(_is_strict_mode); pschema->set_auto_increment_column(_auto_increment_column); pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id); diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 00fffb587ac7c9..649f06d3273808 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -89,10 +90,14 @@ class OlapTableSchemaParam { } bool is_partial_update() const { - return _is_fixed_partial_update || _is_flexible_partial_update; + return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT; + } + bool is_fixed_partial_update() const { + return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; + } + bool is_flexible_partial_update() const { + return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; } - bool is_fixed_partial_update() const { return _is_fixed_partial_update; } - bool is_flexible_partial_update() const { return _is_flexible_partial_update; } std::set partial_update_input_columns() const { return _partial_update_input_columns; @@ -115,9 +120,8 @@ class OlapTableSchemaParam { mutable POlapTableSchemaParam* _proto_schema = nullptr; std::vector _indexes; mutable ObjectPool _obj_pool; - bool _is_fixed_partial_update = false; + UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT}; std::set _partial_update_input_columns; - bool _is_flexible_partial_update; bool _is_strict_mode = false; std::string _auto_increment_column; int32_t _auto_increment_column_unique_id; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 2b27d963462756..858e08680d6386 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -621,16 +621,17 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_enable_profile(false); } } + StringCaseMap unique_key_update_mode_map = { {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, - {"FIXED_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FIXED_PARTIAL_UPDATE}, - {"FLEXIBLE_PARTIAL_UPDATE", TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE}}; + {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, + {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); if (iter != unique_key_update_mode_map.end()) { TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; - if (unique_key_update_mode == TUniqueKeyUpdateMode::FLEXIBLE_PARTIAL_UPDATE) { + if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { // check constraints when flexible partial update is enabled if (ctx->format != TFileFormatType::FORMAT_JSON) { return Status::InvalidArgument( @@ -650,6 +651,11 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::InvalidArgument( "Don't support flexible partial update when jsonpaths is specified"); } + if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { + return Status::InvalidArgument( + "Don't support flexible partial update when hidden_columns is " + "specified"); + } } request.__set_unique_key_update_mode(unique_key_update_mode); } else { @@ -663,11 +669,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { // only consider `partial_columns` parameter when `unique_key_update_mode` is not set if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) { - request.__set_partial_update(true); - } else { - request.__set_partial_update(false); + request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS); } } + if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index d8e42010b0f55a..95b8db27341667 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -31,20 +31,12 @@ namespace doris { -void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_partial_update, - bool flexible_partial_update, +void PartialUpdateInfo::init(const TabletSchema& tablet_schema, + UniqueKeyUpdateModePB unique_key_update_mode, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version) { - DCHECK(!(fixed_partial_update && flexible_partial_update)) - << "fixed_partial_update and flexible_partial_update can not be set simutanously!"; - if (fixed_partial_update) { - partial_update_mode = PartialUpdateModePB::FIXED; - } else if (flexible_partial_update) { - partial_update_mode = PartialUpdateModePB::FLEXIBLE; - } else { - partial_update_mode = PartialUpdateModePB::NONE; - } + partial_update_mode = unique_key_update_mode; partial_update_input_columns = partial_update_cols; max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; @@ -53,7 +45,7 @@ void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool fixed_parti update_cids.clear(); for (auto i = 0; i < tablet_schema.num_columns(); ++i) { - if (fixed_partial_update) { + if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { auto tablet_column = tablet_schema.column(i); if (!partial_update_input_columns.contains(tablet_column.name())) { missing_cids.emplace_back(i); @@ -111,9 +103,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { if (!partial_update_info_pb->has_partial_update_mode()) { // for backward compatibility if (partial_update_info_pb->is_partial_update()) { - partial_update_mode = PartialUpdateModePB::FIXED; + partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; } else { - partial_update_mode = PartialUpdateModePB::NONE; + partial_update_mode = UniqueKeyUpdateModePB::UPSERT; } } else { partial_update_mode = partial_update_info_pb->partial_update_mode(); @@ -151,13 +143,13 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { std::string PartialUpdateInfo::summary() const { std::string mode; switch (partial_update_mode) { - case PartialUpdateModePB::NONE: - mode = "none"; + case UniqueKeyUpdateModePB::UPSERT: + mode = "upsert"; break; - case PartialUpdateModePB::FIXED: + case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS: mode = "fixed partial update"; break; - case PartialUpdateModePB::FLEXIBLE: + case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS: mode = "flexible partial update"; break; } diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 0c96e0539cb65f..25da07ba76a6d3 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -16,6 +16,8 @@ // under the License. #pragma once +#include + #include #include #include @@ -38,27 +40,27 @@ struct RowsetId; class BitmapValue; struct PartialUpdateInfo { - void init(const TabletSchema& tablet_schema, bool fixed_partial_update, - bool flexible_partial_update, const std::set& partial_update_cols, - bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, + void init(const TabletSchema& tablet_schema, UniqueKeyUpdateModePB unique_key_update_mode, + const std::set& partial_update_cols, bool is_strict_mode, + int64_t timestamp_ms, const std::string& timezone, const std::string& auto_increment_column, int64_t cur_max_version = -1); void to_pb(PartialUpdateInfoPB* partial_update_info) const; void from_pb(PartialUpdateInfoPB* partial_update_info); std::string summary() const; - bool is_partial_update() const { return partial_update_mode != PartialUpdateModePB::NONE; } + bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; } bool is_fixed_partial_update() const { - return partial_update_mode == PartialUpdateModePB::FIXED; + return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; } bool is_flexible_partial_update() const { - return partial_update_mode == PartialUpdateModePB::FLEXIBLE; + return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; } private: void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema); public: - PartialUpdateModePB partial_update_mode; + UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java index 6e36420dc22ec0..c42e642c22723f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -87,8 +88,7 @@ public enum JobType { // used for stream load, FILE_LOCAL or FILE_STREAM private TFileType fileType; private List hiddenColumns = null; - private boolean isFixedPartialUpdate = false; - private boolean isFlexiblePartialUpdate = false; + private TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; // for broker load public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc, @@ -110,8 +110,7 @@ public FileGroupInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc b // for stream load public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc, BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode, - TFileType fileType, List hiddenColumns, boolean isFixedPartialUpdate, - boolean isFlexiblePartialUpdate) { + TFileType fileType, List hiddenColumns, TUniqueKeyUpdateMode uniquekeyUpdateMode) { this.jobType = JobType.STREAM_LOAD; this.loadId = loadId; this.txnId = txnId; @@ -124,8 +123,7 @@ public FileGroupInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc this.strictMode = strictMode; this.fileType = fileType; this.hiddenColumns = hiddenColumns; - this.isFixedPartialUpdate = isFixedPartialUpdate; - this.isFlexiblePartialUpdate = isFlexiblePartialUpdate; + this.uniquekeyUpdateMode = uniquekeyUpdateMode; } public Table getTargetTable() { @@ -166,12 +164,16 @@ public List getHiddenColumns() { return hiddenColumns; } + public TUniqueKeyUpdateMode getUniqueKeyUpdateMode() { + return uniquekeyUpdateMode; + } + public boolean isFixedPartialUpdate() { - return isFixedPartialUpdate; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS; } public boolean isFlexiblePartialUpdate() { - return isFlexiblePartialUpdate; + return uniquekeyUpdateMode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS; } public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java index 8a2d9114d5d9c1..cf69b24a7376fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java @@ -221,7 +221,7 @@ private void initColumns(FileLoadScanNode.ParamCreateContext context, Analyzer a Load.initColumns(fileGroupInfo.getTargetTable(), columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.srcSlotDescByName, srcSlotIds, formatType(context.fileGroup.getFileFormat()), fileGroupInfo.getHiddenColumns(), - fileGroupInfo.isFixedPartialUpdate(), fileGroupInfo.isFlexiblePartialUpdate()); + fileGroupInfo.getUniqueKeyUpdateMode()); int columnCountFromPath = 0; if (context.fileGroup.getColumnNamesFromPath() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 7eab7f5d704787..edb23b047376e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -73,6 +73,7 @@ import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -260,7 +261,7 @@ public static List getSchemaChangeShadowColumnDesc(Table tbl, public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction) throws UserException { initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false, - false, false); + TUniqueKeyUpdateMode.UPSERT); } /* @@ -270,12 +271,12 @@ public static void initColumns(Table tbl, List columnExprs, public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, - List srcSlotIds, TFileFormatType formatType, List hiddenColumns, boolean isPartialUpdate, - boolean isFlexiblePartialUpdate) + List srcSlotIds, TFileFormatType formatType, List hiddenColumns, + TUniqueKeyUpdateMode uniquekeyUpdateMode) throws UserException { rewriteColumns(columnDescs); initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName, - srcSlotIds, formatType, hiddenColumns, true, isPartialUpdate, isFlexiblePartialUpdate); + srcSlotIds, formatType, hiddenColumns, true, uniquekeyUpdateMode); } /* @@ -290,8 +291,7 @@ private static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, List srcSlotIds, TFileFormatType formatType, List hiddenColumns, - boolean needInitSlotAndAnalyzeExprs, boolean isPartialUpdate, - boolean isFlexiblePartialUpdate) throws UserException { + boolean needInitSlotAndAnalyzeExprs, TUniqueKeyUpdateMode uniquekeyUpdateMode) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. // skip the mapping columns not exist in schema @@ -336,7 +336,7 @@ private static void initColumns(Table tbl, List columnExprs, } copiedColumnExprs.add(columnDesc); } - if (hasSkipBitmapColumn && isFlexiblePartialUpdate) { + if (hasSkipBitmapColumn && uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) { Preconditions.checkArgument(!specifyFileFieldNames); Preconditions.checkArgument(hiddenColumns == null); if (LOG.isDebugEnabled()) { @@ -460,7 +460,7 @@ private static void initColumns(Table tbl, List columnExprs, if (formatType == TFileFormatType.FORMAT_ARROW) { slotDesc.setColumn(new Column(realColName, colToType.get(realColName))); } else { - if (isFlexiblePartialUpdate && hasSkipBitmapColumn) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && hasSkipBitmapColumn) { // we store the unique ids of missing columns in skip bitmap column in flexible partial update int colUniqueId = tblColumn.getUniqueId(); if (realColName.equals(Column.SKIP_BITMAP_COL)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 35c84cef11331e..06991d5fb52fbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -61,12 +61,6 @@ public int getValue() { } } - public enum UniquekeyUpdateMode { - UPSERT, - FIXED_PARTIAL_UPDATE, - FLEXIBLE_PARTIAL_UPDATE - } - private static final Logger LOG = LogManager.getLogger(LoadTask.class); public static final Comparator COMPARATOR = Comparator.comparing(LoadTask::getPriorityValue) .thenComparingLong(LoadTask::getSignature); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index b1268b48a8d6b7..69903fc2d7f1f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -76,6 +76,7 @@ import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TTabletLocation; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -106,8 +107,7 @@ public class OlapTableSink extends DataSink { // specified partition ids. private List partitionIds; // partial update input columns - private boolean isFixedPartialUpdate = false; - private boolean isFlexiblePartialUpdate = false; + private TUniqueKeyUpdateMode uniquekeyUpdateMode = TUniqueKeyUpdateMode.UPSERT; private HashSet partialUpdateInputColumns; // set after init called @@ -179,13 +179,11 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou } } - public void setPartialUpdateInputColumns(boolean isFixedPartialUpdate, HashSet columns) { - this.isFixedPartialUpdate = isFixedPartialUpdate; - this.partialUpdateInputColumns = columns; - } - - public void setFlexiblePartialUpdate(boolean isFlexiblePartialUpdate) { - this.isFlexiblePartialUpdate = isFlexiblePartialUpdate; + public void setPartialUpdateInfo(TUniqueKeyUpdateMode uniquekeyUpdateMode, HashSet columns) { + this.uniquekeyUpdateMode = uniquekeyUpdateMode; + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { + this.partialUpdateInputColumns = columns; + } } public void updateLoadId(TUniqueId newLoadId) { @@ -247,10 +245,10 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { } strBuilder.append(prefix + " TUPLE ID: " + tupleDescriptor.getId() + "\n"); strBuilder.append(prefix + " " + DataPartition.RANDOM.getExplainString(explainLevel)); - boolean isPartialUpdate = isFixedPartialUpdate || isFlexiblePartialUpdate; + boolean isPartialUpdate = uniquekeyUpdateMode != TUniqueKeyUpdateMode.UPSERT; strBuilder.append(prefix + " IS_PARTIAL_UPDATE: " + isPartialUpdate); if (isPartialUpdate) { - if (isFixedPartialUpdate) { + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FIXED_PARTIAL_UPDATE"); } else { strBuilder.append(prefix + " PARTIAL_UPDATE_MODE: FLEXIBLE_PARTIAL_UPDATE"); @@ -329,9 +327,10 @@ public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer a indexSchema.setIndexesDesc(indexDesc); schemaParam.addToIndexes(indexSchema); } - schemaParam.setIsPartialUpdate(isFixedPartialUpdate); - schemaParam.setIsFlexiblePartialUpdate(isFlexiblePartialUpdate); - if (isFixedPartialUpdate) { + // for backward compatibility + schemaParam.setIsPartialUpdate(uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS); + schemaParam.setUniqueKeyUpdateMode(uniquekeyUpdateMode); + if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { for (String s : partialUpdateInputColumns) { schemaParam.addToPartialUpdateInputColumns(s); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 4f3b4497aeea35..028641bef623eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -62,6 +62,7 @@ import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -143,25 +144,21 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); boolean negative = taskInfo.getNegative(); // get partial update related info - boolean isFixedPartialUpdate = taskInfo.isFixedPartialUpdate(); - boolean isFlexiblePartialUpdate = taskInfo.isFlexiblePartialUpdate(); - if ((isFixedPartialUpdate || isFlexiblePartialUpdate) && !destTable.getEnableUniqueKeyMergeOnWrite()) { + TUniqueKeyUpdateMode uniqueKeyUpdateMode = taskInfo.getUniqueKeyUpdateMode(); + if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT && !destTable.getEnableUniqueKeyMergeOnWrite()) { throw new UserException("Only unique key merge on write support partial update"); } - if (isFixedPartialUpdate && isFlexiblePartialUpdate) { - throw new AnalysisException("isFixedPartialUpdate and isFlexiblePartialUpdate" - + "can't be set to true bothly."); - } - if (isFlexiblePartialUpdate && !destTable.hasSkipBitmapColumn()) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && !destTable.hasSkipBitmapColumn()) { throw new UserException("Flexible partial update can only support table with skip bitmap hidden column." + "But table " + destTable.getName() + "doesn't have it"); } - if (isFlexiblePartialUpdate && !destTable.getEnableLightSchemaChange()) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS + && !destTable.getEnableLightSchemaChange()) { throw new UserException("Flexible partial update can only support table with light_schema_change enabled." + "But table " + destTable.getName() + "'s property light_schema_change is false"); } HashSet partialUpdateInputColumns = new HashSet<>(); - if (isFixedPartialUpdate) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) { for (Column col : destTable.getFullSchema()) { boolean existInExpr = false; if (col.hasOnUpdateDefaultValue()) { @@ -200,7 +197,8 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { - if (isFixedPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { + if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS + && !partialUpdateInputColumns.contains(col.getName())) { continue; } SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); @@ -265,7 +263,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - isFixedPartialUpdate, isFlexiblePartialUpdate); + uniqueKeyUpdateMode); scanNode = fileScanNode; scanNode.init(analyzer); @@ -297,8 +295,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout; olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), txnTimeout); - olapTableSink.setPartialUpdateInputColumns(isFixedPartialUpdate, partialUpdateInputColumns); - olapTableSink.setFlexiblePartialUpdate(isFlexiblePartialUpdate); + olapTableSink.setPartialUpdateInfo(uniqueKeyUpdateMode, partialUpdateInputColumns); olapTableSink.complete(analyzer); // for stream load, we only need one fragment, ScanNode -> DataSink. diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 878ce423097cdb..0f9e65b614c8d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; @@ -110,8 +111,8 @@ default long getFileSize() { boolean isFixedPartialUpdate(); - default LoadTask.UniquekeyUpdateMode getUniquekeyUpdateMode() { - return LoadTask.UniquekeyUpdateMode.UPSERT; + default TUniquekeyUpdateMode getUniquekeyUpdateMode() { + return UniquekeyUpdateMode.UPSERT; } default boolean isFlexiblePartialUpdate() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 5652e97ddc866e..99ee5ddef65325 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -35,6 +35,7 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUniqueKeyUpdateMode; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -83,7 +84,7 @@ public class StreamLoadTask implements LoadTaskInfo { private String headerType = ""; private List hiddenColumns; private boolean trimDoubleQuotes = false; - private LoadTask.UniquekeyUpdateMode uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode.UPSERT; + private UniquekeyUpdateMode uniquekeyUpdateMode = UniquekeyUpdateMode.UPSERT; private int skipLines = 0; private boolean enableProfile = false; @@ -298,17 +299,17 @@ public boolean getEnableProfile() { @Override public boolean isFixedPartialUpdate() { - return uniquekeyUpdateMode == LoadTask.UniquekeyUpdateMode.FIXED_PARTIAL_UPDATE; + return uniquekeyUpdateMode == UniquekeyUpdateMode.UPDATE_FIXED_COLUMNS; } @Override - public LoadTask.UniquekeyUpdateMode getUniquekeyUpdateMode() { + public TUniquekeyUpdateMode getUniquekeyUpdateMode() { return uniquekeyUpdateMode; } @Override public boolean isFlexiblePartialUpdate() { - return uniquekeyUpdateMode == LoadTask.UniquekeyUpdateMode.FLEXIBLE_PARTIAL_UPDATE; + return uniquekeyUpdateMode == UniquekeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS; } @Override @@ -463,15 +464,14 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws } if (request.isSetUniqueKeyUpdateMode()) { try { - uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode - .valueOf(request.getUniqueKeyUpdateMode().toString()); + uniquekeyUpdateMode = UniquekeyUpdateMode.valueOf(request.getUniqueKeyUpdateMode().toString()); } catch (IllegalArgumentException e) { throw new UserException("unknown unique_key_update_mode: " + request.getUniqueKeyUpdateMode().toString()); } } if (!request.isSetUniqueKeyUpdateMode() && request.isSetPartialUpdate()) { - uniquekeyUpdateMode = LoadTask.UniquekeyUpdateMode.FIXED_PARTIAL_UPDATE; + uniquekeyUpdateMode = UniquekeyUpdateMode.UPDATE_FIXED_COLUMNS; } if (request.isSetMemtableOnSinkNode()) { this.memtableOnSinkNode = request.isMemtableOnSinkNode(); diff --git a/gensrc/proto/descriptors.proto b/gensrc/proto/descriptors.proto index 6f258bb61bf95d..6f25a49af8fe1d 100644 --- a/gensrc/proto/descriptors.proto +++ b/gensrc/proto/descriptors.proto @@ -66,13 +66,13 @@ message POlapTableSchemaParam { repeated PSlotDescriptor slot_descs = 4; required PTupleDescriptor tuple_desc = 5; repeated POlapTableIndexSchema indexes = 6; - optional bool partial_update = 7 [default = false]; // flag for fixed partial update + optional bool partial_update = 7 [default = false]; // deprecated, use unique_key_update_mode repeated string partial_update_input_columns = 8; optional bool is_strict_mode = 9 [default = false]; optional string auto_increment_column = 10; optional int64 timestamp_ms = 11 [default = 0]; optional string timezone = 12; optional int32 auto_increment_column_unique_id = 13 [default = -1]; - optional bool is_flexible_partial_update = 14 [default = false]; + optional UniqueKeyUpdateModePB unique_key_update_mode = 14 [default = UPSERT]; }; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 80f9208bfdf5ea..4b832b5e3216fb 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -359,10 +359,10 @@ enum SortType { ZORDER = 1; } -enum PartialUpdateModePB { - NONE = 0; - FIXED = 1; - FLEXIBLE = 2; +enum UniqueKeyUpdateModePB { + UPSERT = 0; + UPDATE_FIXED_COLUMNS = 1; + UPDATE_FLEXIBLE_COLUMNS = 2; } // ATTN: When adding or deleting fields, please update `message TabletSchemaCloudPB` @@ -628,5 +628,5 @@ message PartialUpdateInfoPB { optional bool is_schema_contains_auto_inc_column = 10 [default = false]; repeated string default_values = 11; optional int64 max_version_in_flush_phase = 12 [default = -1]; - optional PartialUpdateModePB partial_update_mode = 13 [default = NONE]; + optional UniqueKeyUpdateModePB partial_update_mode = 13 [default = UPSERT]; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 925c358ceb4383..ec4f81b29ef71b 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -247,13 +247,13 @@ struct TOlapTableSchemaParam { 5: required TTupleDescriptor tuple_desc 6: required list indexes 7: optional bool is_dynamic_schema // deprecated - 8: optional bool is_partial_update // flag for fixed partial update + 8: optional bool is_partial_update // deprecated, use unique_key_update_mode 9: optional list partial_update_input_columns 10: optional bool is_strict_mode = false 11: optional string auto_increment_column 12: optional i32 auto_increment_column_unique_id = -1 13: optional Types.TInvertedIndexFileStorageFormat inverted_index_file_storage_format = Types.TInvertedIndexFileStorageFormat.V1 - 14: optional bool is_flexible_partial_update = false + 14: optional Types.TUniqueKeyUpdateMode unique_key_update_mode = Types.TUniqueKeyUpdateMode.UPSERT } struct TTabletLocation { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index bc7801d7bebff8..00311dc14ef7a3 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -719,8 +719,8 @@ enum TMergeType { enum TUniqueKeyUpdateMode { UPSERT, - FIXED_PARTIAL_UPDATE, - FLEXIBLE_PARTIAL_UPDATE + UPDATE_FIXED_COLUMNS, + UPDATE_FLEXIBLE_COLUMNS } enum TSortType {