Skip to content

Commit

Permalink
[fix](ES Catalog)Support parse single value for array column (apache#…
Browse files Browse the repository at this point in the history
…40614)

Follow up apache#39104, when the field has one value and we map it as array
type in Doris, we parse the single value to a single element array to
make them queryable.

close apache#40406
  • Loading branch information
qidaye committed Sep 11, 2024
1 parent b0ab02b commit 09b7005
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 165 deletions.
273 changes: 154 additions & 119 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,159 @@ Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
return parse_and_insert_data(col);
}

template <typename T>
Status handle_value(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
T& val) {
RETURN_IF_ERROR(get_int_value<T>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<float>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
float& val) {
RETURN_IF_ERROR(get_float_value<float>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<double>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, double& val) {
RETURN_IF_ERROR(get_float_value<double>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}

template <>
Status handle_value<std::string>(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, std::string& val) {
RETURN_ERROR_IF_COL_IS_ARRAY(col, sub_type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
return Status::OK();
}

template <>
Status handle_value<bool>(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
bool& val) {
if (col.IsBool()) {
val = col.GetBool();
return Status::OK();
}

if (col.IsNumber()) {
val = col.GetInt();
return Status::OK();
}

bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
val = col[0].GetBool();
return Status::OK();
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
const std::string& str_val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
val = StringParser::string_to_bool(str_val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, sub_type);
return Status::OK();
}

template <typename T>
Status process_single_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
T val;
RETURN_IF_ERROR(handle_value<T>(col, sub_type, pure_doc_value, val));
array.push_back(val);
return Status::OK();
}

template <typename T>
Status process_column_array(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
for (const auto& sub_col : col.GetArray()) {
RETURN_IF_ERROR(process_single_column<T>(sub_col, sub_type, pure_doc_value, array));
}
return Status::OK();
}

template <typename T>
Status process_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array) {
if (!col.IsArray()) {
return process_single_column<T>(col, sub_type, pure_doc_value, array);
} else {
return process_column_array<T>(col, sub_type, pure_doc_value, array);
}
}

template <typename DateType, typename RT>
Status process_date_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone) {
if (!col.IsArray()) {
RT data;
RETURN_IF_ERROR(
(get_date_int<DateType, RT>(col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
} else {
for (const auto& sub_col : col.GetArray()) {
RT data;
RETURN_IF_ERROR((get_date_int<DateType, RT>(sub_col, sub_type, pure_doc_value, &data,
time_zone)));
array.push_back(data);
}
}
return Status::OK();
}

Status ScrollParser::parse_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array,
const cctz::time_zone& time_zone) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING:
return process_column<std::string>(col, sub_type, pure_doc_value, array);
case TYPE_TINYINT:
return process_column<int8_t>(col, sub_type, pure_doc_value, array);
case TYPE_SMALLINT:
return process_column<int16_t>(col, sub_type, pure_doc_value, array);
case TYPE_INT:
return process_column<int32>(col, sub_type, pure_doc_value, array);
case TYPE_BIGINT:
return process_column<int64_t>(col, sub_type, pure_doc_value, array);
case TYPE_LARGEINT:
return process_column<__int128>(col, sub_type, pure_doc_value, array);
case TYPE_FLOAT:
return process_column<float>(col, sub_type, pure_doc_value, array);
case TYPE_DOUBLE:
return process_column<double>(col, sub_type, pure_doc_value, array);
case TYPE_BOOLEAN:
return process_column<bool>(col, sub_type, pure_doc_value, array);
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
return process_date_column<DateV2Value<DateV2ValueType>, uint32_t>(
col, sub_type, pure_doc_value, array, time_zone);
}
case TYPE_DATETIMEV2: {
return process_date_column<DateV2Value<DateTimeV2ValueType>, uint64_t>(
col, sub_type, pure_doc_value, array, time_zone);
}
default:
LOG(ERROR) << "Do not support Array type: " << sub_type;
return Status::InternalError("Unsupported type");
}
}

ScrollParser::ScrollParser(bool doc_value_mode) : _size(0), _line_index(0) {}

ScrollParser::~ScrollParser() = default;
Expand Down Expand Up @@ -687,125 +840,7 @@ Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
case TYPE_ARRAY: {
vectorized::Array array;
const auto& sub_type = tuple_desc->slots()[i]->type().children[0].type;
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, false);
for (const auto& sub_col : col.GetArray()) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
std::string val;
RETURN_ERROR_IF_COL_IS_ARRAY(sub_col, sub_type, true);
if (!sub_col.IsString()) {
val = json_value_to_string(sub_col);
} else {
val = sub_col.GetString();
}
array.push_back(val);
break;
}
case TYPE_TINYINT: {
int8_t val;
RETURN_IF_ERROR(get_int_value<int8_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_SMALLINT: {
int16_t val;
RETURN_IF_ERROR(
get_int_value<int16_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_INT: {
int32 val;
RETURN_IF_ERROR(get_int_value<int32>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BIGINT: {
int64_t val;
RETURN_IF_ERROR(
get_int_value<int64_t>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_LARGEINT: {
__int128 val;
RETURN_IF_ERROR(
get_int_value<__int128>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_FLOAT: {
float val {};
RETURN_IF_ERROR(
get_float_value<float>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_DOUBLE: {
double val {};
RETURN_IF_ERROR(
get_float_value<double>(sub_col, sub_type, &val, pure_doc_value));
array.push_back(val);
break;
}
case TYPE_BOOLEAN: {
if (sub_col.IsBool()) {
array.push_back(sub_col.GetBool());
break;
}

if (sub_col.IsNumber()) {
array.push_back(sub_col.GetInt());
break;
}

bool is_nested_str = false;
if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsBool()) {
array.push_back(sub_col[0].GetBool());
break;
} else if (pure_doc_value && sub_col.IsArray() && !sub_col.Empty() &&
sub_col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && sub_col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}

const rapidjson::Value& str_col = is_nested_str ? sub_col[0] : sub_col;

const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
bool b = StringParser::string_to_bool(val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, type);
array.push_back(b);
break;
}
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
uint32_t data;
RETURN_IF_ERROR((get_date_int<DateV2Value<DateV2ValueType>, uint32_t>(
sub_col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
break;
}
case TYPE_DATETIMEV2: {
uint64_t data;
RETURN_IF_ERROR((get_date_int<DateV2Value<DateTimeV2ValueType>, uint64_t>(
sub_col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(data);
break;
}
default: {
LOG(ERROR) << "Do not support Array type: " << sub_type;
break;
}
}
}
RETURN_IF_ERROR(parse_column(col, sub_type, pure_doc_value, array, time_zone));
col_ptr->insert(array);
break;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ScrollParser {
int get_size() const;

private:
Status parse_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone);
std::string _scroll_id;
int _size;
rapidjson::SizeType _line_index;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{"name": "Andy", "sports": "soccer"}
{"name": "Betty", "sports": "pingpong ball"}
{"name": "Cindy", "sports": "武术"}
{"name": "David", "sports": ["volleyball"]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"]}
{"name": "Andy", "sports": "soccer", "scores": 100}
{"name": "Betty", "sports": "pingpong ball", "scores": 90}
{"name": "Cindy", "sports": "武术", "scores": 89}
{"name": "David", "sports": ["volleyball"], "scores": [77]}
{"name": "Emily", "sports": ["baseball", "golf", "hockey"], "scores": [56, 78, 99]}
{"name": "Frank", "sports": ["rugby", "cricket", "boxing"], "scores": [45, 67, 88]}
{"name": "Grace", "sports": ["table tennis", "badminton", "athletics"], "scores": [34, 56, 78]}
{"name": "Henry", "sports": ["archery", "fencing", "weightlifting"], "scores": [23, 45, 67]}
{"name": "Ivy", "sports": ["judo", "karate", "taekwondo"], "scores": [12, 34, 56]}
{"name": "Jack", "sports": ["wrestling", "gymnastics", "surfing"], "scores": [1, 23, 45]}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"_meta": {
"doris":{
"array_fields":[
"sports"
"sports",
"scores"
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"doc": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"mappings": {
"properties": {
"name": { "type": "keyword" },
"sports": { "type": "keyword", "doc_values": false}
"sports": { "type": "keyword", "doc_values": false},
"scores": { "type": "integer", "doc_values": false}
}
}
}
Loading

0 comments on commit 09b7005

Please sign in to comment.