Skip to content

Commit

Permalink
[Fix](ShortCircuit) consider delete sign flag when hits row (apache#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon authored Sep 9, 2024
1 parent 653e315 commit 2023eab
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 10 deletions.
37 changes: 28 additions & 9 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include "runtime/runtime_state.h"
#include "util/key_util.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
#include "util/thrift_util.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/serde/data_type_serde.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
Expand All @@ -51,7 +53,8 @@ namespace doris {
Reusable::~Reusable() {}
constexpr static int s_preallocted_blocks_num = 32;
Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
const TQueryOptions& query_options, size_t block_size) {
const TQueryOptions& query_options, const TabletSchema& schema,
size_t block_size) {
SCOPED_MEM_COUNT_BY_HOOK(&_mem_size);
_runtime_state = RuntimeState::create_unique();
_runtime_state->set_query_options(query_options);
Expand All @@ -77,6 +80,8 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vector<TExp
_col_uid_to_idx[slot->col_unique_id()] = i;
_col_default_values[i] = slot->col_default_value();
}
// get the delete sign idx in block
_delete_sign_idx = _col_uid_to_idx[schema.columns()[schema.delete_sign_idx()]->unique_id()];
return Status::OK();
}

Expand Down Expand Up @@ -181,6 +186,12 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
auto cache_handle = LookupConnectionCache::instance()->get(uuid);
_binary_row_format = request->is_binary_row();
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id());
if (_tablet == nullptr) {
LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << request->tablet_id()
<< "] is not exist";
return Status::NotFound(fmt::format("tablet {} not exist", request->tablet_id()));
}
if (cache_handle != nullptr) {
_reusable = cache_handle;
_profile_metrics.hit_lookup_cache = true;
Expand Down Expand Up @@ -208,19 +219,14 @@ Status PointQueryExecutor::init(const PTabletKeyLookupRequest* request,
if (uuid != 0) {
// could be reused by requests after, pre allocte more blocks
RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
*_tablet->tablet_schema(),
s_preallocted_blocks_num));
LookupConnectionCache::instance()->add(uuid, reusable_ptr);
} else {
RETURN_IF_ERROR(
reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options, 1));
RETURN_IF_ERROR(reusable_ptr->init(t_desc_tbl, t_output_exprs.exprs, t_query_options,
*_tablet->tablet_schema(), 1));
}
}
_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request->tablet_id());
if (_tablet == nullptr) {
LOG(WARNING) << "failed to do tablet_fetch_data. tablet [" << request->tablet_id()
<< "] is not exist";
return Status::NotFound(fmt::format("tablet {} not exist", request->tablet_id()));
}
RETURN_IF_ERROR(_init_keys(request));
_result_block = _reusable->get_block();
CHECK(_result_block != nullptr);
Expand Down Expand Up @@ -354,6 +360,19 @@ Status PointQueryExecutor::_lookup_row_data() {
_reusable->get_col_uid_to_idx(), *_result_block,
_reusable->get_col_default_values());
}
// filter rows by delete sign
if (_result_block->rows() > 0 && _reusable->delete_sign_idx() != -1) {
vectorized::ColumnPtr delete_filter_columns =
_result_block->get_columns()[_reusable->delete_sign_idx()];
const auto& filter =
assert_cast<const vectorized::ColumnInt8*>(delete_filter_columns.get())->get_data();
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
if (count == filter.size()) {
_result_block->clear();
} else if (count > 0) {
return Status::NotSupported("Not implemented since only single row at present");
}
}
return Status::OK();
}

Expand Down
8 changes: 7 additions & 1 deletion be/src/service/point_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class Reusable {
}

Status init(const TDescriptorTable& t_desc_tbl, const std::vector<TExpr>& output_exprs,
const TQueryOptions& query_options, size_t block_size = 1);
const TQueryOptions& query_options, const TabletSchema& schema,
size_t block_size = 1);

std::unique_ptr<vectorized::Block> get_block();

Expand All @@ -95,6 +96,9 @@ class Reusable {

RuntimeState* runtime_state() { return _runtime_state.get(); }

// delete sign idx in block
int32_t delete_sign_idx() const { return _delete_sign_idx; }

private:
// caching TupleDescriptor, output_expr, etc...
std::unique_ptr<RuntimeState> _runtime_state;
Expand All @@ -108,6 +112,8 @@ class Reusable {
std::unordered_map<uint32_t, uint32_t> _col_uid_to_idx;
std::vector<std::string> _col_default_values;
int64_t _mem_size = 0;
// delete sign idx in block
int32_t _delete_sign_idx = -1;
};

// RowCache is a LRU cache for row store
Expand Down
8 changes: 8 additions & 0 deletions regression-test/data/point_query_p0/test_point_query.out
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,11 @@
-- !sql --
0 1111111

-- !sql --
10 20 aabc value

-- !sql --

-- !sql --
-10 20 aabc update val

27 changes: 27 additions & 0 deletions regression-test/suites/point_query_p0/test_point_query.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,31 @@ suite("test_point_query", "nonConcurrent") {
sql """set global enable_nereids_planner=true"""
sql "set global enable_fallback_to_original_planner = true"
}

// test partial update/delete
sql "DROP TABLE IF EXISTS table_3821461"
sql """
CREATE TABLE `table_3821461` (
`col1` smallint NOT NULL,
`col2` int NOT NULL,
`loc3` char(10) NOT NULL,
`value` char(10) NOT NULL,
INDEX col3 (`loc3`) USING INVERTED,
INDEX col2 (`col2`) USING INVERTED )
ENGINE=OLAP UNIQUE KEY(`col1`, `col2`, `loc3`)
DISTRIBUTED BY HASH(`col1`, `col2`, `loc3`) BUCKETS 1
PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "bloom_filter_columns" = "col1", "store_row_column" = "true" );
"""
sql "insert into table_3821461 values (-10, 20, 'aabc', 'value')"
sql "insert into table_3821461 values (10, 20, 'aabc', 'value');"
sql "insert into table_3821461 values (20, 30, 'aabc', 'value');"
explain {
sql("select * from table_3821461 where col1 = -10 and col2 = 20 and loc3 = 'aabc'")
contains "SHORT-CIRCUIT"
}
qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';"
sql "delete from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';"
qt_sql "select * from table_3821461 where col1 = 10 and col2 = 20 and loc3 = 'aabc';"
sql "update table_3821461 set value = 'update value' where col1 = -10 or col1 = 20;"
qt_sql """select * from table_3821461 where col1 = -10 and col2 = 20 and loc3 = 'aabc'"""
}

0 comments on commit 2023eab

Please sign in to comment.