Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](partial update) Support flexible partial update in stream load with json files #39756

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
// transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed
// transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed
// transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users
if (_context.partial_update_info && _context.partial_update_info->is_partial_update) {
if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) {
_rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE);
} else {
_rowset_meta->set_rowset_state(COMMITTED);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
rowset_writer->num_rows() > 0) {
const auto& rowset_meta = rowset->rowset_meta();
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) {
Expand Down Expand Up @@ -313,6 +314,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_enable_variant_flatten_nested(in.variant_enable_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

TabletSchemaPB cloud_tablet_schema_to_doris(const TabletSchemaCloudPB& in) {
Expand Down Expand Up @@ -353,6 +355,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) {
Expand Down Expand Up @@ -381,6 +384,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
out->set_inverted_index_storage_format(in.inverted_index_storage_format());
out->set_variant_enable_flatten_nested(in.enable_variant_flatten_nested());
out->set_skip_bitmap_col_idx(in.skip_bitmap_col_idx());
}

TabletMetaCloudPB doris_tablet_meta_to_cloud(const TabletMetaPB& in) {
Expand Down
68 changes: 61 additions & 7 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gen_cpp/Partitions_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

#include <algorithm>
Expand Down Expand Up @@ -117,9 +118,21 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_db_id = pschema.db_id();
_table_id = pschema.table_id();
_version = pschema.version();
_is_partial_update = pschema.partial_update();
if (pschema.has_unique_key_update_mode()) {
_unique_key_update_mode = pschema.unique_key_update_mode();
if (pschema.has_sequence_map_col_unique_id()) {
_sequence_map_col_uid = pschema.sequence_map_col_unique_id();
}
} else {
// for backward compatibility
if (pschema.has_partial_update() && pschema.partial_update()) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
_is_strict_mode = pschema.is_strict_mode();
if (_is_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = pschema.auto_increment_column();
if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -155,7 +168,7 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
index->index_id = p_index.id();
index->schema_hash = p_index.schema_hash();
for (const auto& pcolumn_desc : p_index.columns_desc()) {
if (!_is_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(pcolumn_desc.name())) {
auto it = slots_map.find(std::make_pair(
to_lower(pcolumn_desc.name()),
Expand Down Expand Up @@ -185,15 +198,51 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
return Status::OK();
}

Status OlapTableSchemaParam::init_unique_key_update_mode(const TOlapTableSchemaParam& tschema) {
if (tschema.__isset.unique_key_update_mode) {
switch (tschema.unique_key_update_mode) {
case doris::TUniqueKeyUpdateMode::UPSERT: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
break;
}
case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
break;
}
default: {
return Status::InternalError(
"Unknown unique_key_update_mode: {}, should be one of "
"UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS",
tschema.unique_key_update_mode);
}
}
if (tschema.__isset.sequence_map_col_unique_id) {
_sequence_map_col_uid = tschema.sequence_map_col_unique_id;
}
} else {
// for backward compatibility
if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
} else {
_unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
}
}
bobhan1 marked this conversation as resolved.
Show resolved Hide resolved
return Status::OK();
}

Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
_db_id = tschema.db_id;
_table_id = tschema.table_id;
_version = tschema.version;
_is_partial_update = tschema.is_partial_update;
RETURN_IF_ERROR(init_unique_key_update_mode(tschema));
if (tschema.__isset.is_strict_mode) {
_is_strict_mode = tschema.is_strict_mode;
}
if (_is_partial_update) {
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
_auto_increment_column = tschema.auto_increment_column;
if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
return Status::InternalError(
Expand Down Expand Up @@ -221,7 +270,7 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (const auto& tcolumn_desc : t_index.columns_desc) {
if (!_is_partial_update ||
if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
_partial_update_input_columns.contains(tcolumn_desc.column_name)) {
auto it = slots_map.find(
std::make_pair(to_lower(tcolumn_desc.column_name),
Expand Down Expand Up @@ -270,13 +319,18 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_db_id(_db_id);
pschema->set_table_id(_table_id);
pschema->set_version(_version);
pschema->set_partial_update(_is_partial_update);
pschema->set_unique_key_update_mode(_unique_key_update_mode);
if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
// for backward compatibility
pschema->set_partial_update(true);
}
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_auto_increment_column(_auto_increment_column);
pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
pschema->set_sequence_map_col_unique_id(_sequence_map_col_uid);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
20 changes: 18 additions & 2 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <butil/fast_rand.h>
bobhan1 marked this conversation as resolved.
Show resolved Hide resolved
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/descriptors.pb.h>
#include <gen_cpp/olap_file.pb.h>

#include <cstdint>
#include <functional>
Expand Down Expand Up @@ -88,7 +89,18 @@ class OlapTableSchemaParam {
return _proto_schema;
}

bool is_partial_update() const { return _is_partial_update; }
UniqueKeyUpdateModePB unique_key_update_mode() const { return _unique_key_update_mode; }

bool is_partial_update() const {
return _unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT;
}
bool is_fixed_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
}
bool is_flexible_partial_update() const {
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
}

std::set<std::string> partial_update_input_columns() const {
return _partial_update_input_columns;
}
Expand All @@ -101,8 +113,11 @@ class OlapTableSchemaParam {
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
int32_t sequence_map_col_uid() const { return _sequence_map_col_uid; }
std::string debug_string() const;

Status init_unique_key_update_mode(const TOlapTableSchemaParam& tschema);

private:
int64_t _db_id;
int64_t _table_id;
Expand All @@ -112,14 +127,15 @@ class OlapTableSchemaParam {
mutable POlapTableSchemaParam* _proto_schema = nullptr;
std::vector<OlapTableIndexSchema*> _indexes;
mutable ObjectPool _obj_pool;
bool _is_partial_update = false;
UniqueKeyUpdateModePB _unique_key_update_mode {UniqueKeyUpdateModePB::UPSERT};
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
std::string _auto_increment_column;
int32_t _auto_increment_column_unique_id;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
int32_t _sequence_map_col_uid {-1};
};

using OlapTableIndexTablets = TOlapTableIndexTablets;
Expand Down
68 changes: 65 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,13 +636,75 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
request.__set_enable_profile(false);
}
}
if (!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {

if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
{"UPSERT", TUniqueKeyUpdateMode::UPSERT},
{"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
{"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
if (iter != unique_key_update_mode_map.end()) {
TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
// check constraints when flexible partial update is enabled
if (ctx->format != TFileFormatType::FORMAT_JSON) {
return Status::InvalidArgument(
"flexible partial update only support json format as input file "
"currently");
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'fuzzy_parse' is enabled");
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'columns' is specified");
}
if (!http_req->header(HTTP_JSONPATHS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'jsonpaths' is specified");
}
if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when 'hidden_columns' is "
"specified");
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'function_column.sequence_col' is specified");
}
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'merge_type' is specified");
}
if (!http_req->header(HTTP_WHERE).empty()) {
return Status::InvalidArgument(
"Don't support flexible partial update when "
"'where' is specified");
}
}
request.__set_unique_key_update_mode(unique_key_update_mode);
} else {
return Status::InvalidArgument(
"Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
"'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
unique_key_update_mode_str);
}
}
if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
!http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
// only consider `partial_columns` parameter when `unique_key_update_mode` is not set
if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
// for backward compatibility
request.__set_partial_update(true);
} else {
request.__set_partial_update(false);
}
}

if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
request.__set_memtable_on_sink_node(value);
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ static const std::string HTTP_SKIP_LINES = "skip_lines";
static const std::string HTTP_COMMENT = "comment";
static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
static const std::string HTTP_PARTIAL_COLUMNS = "partial_columns";
static const std::string HTTP_UNIQUE_KEY_UPDATE_MODE = "unique_key_update_mode";
static const std::string HTTP_SQL = "sql";
static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
static const std::string HTTP_TXN_ID_KEY = "txn_id";
Expand Down
Loading
Loading