Skip to content

Commit

Permalink
[Chore](function) change hash set usage of all functions (#43289) (#4…
Browse files Browse the repository at this point in the history
…6974)

remove all ck hash_set usage and use phmap_set
some test results have changed because the order in which data is stored
within the hash table has changed.
  • Loading branch information
BiteTheDDDDt authored Jan 14, 2025
1 parent 0075a83 commit 6d3f3b2
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 79 deletions.
38 changes: 21 additions & 17 deletions be/src/vec/aggregate_functions/aggregate_function_collect.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/columns/column_string.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/pod_array_fwd.h"
#include "vec/common/string_buffer.hpp"
#include "vec/common/string_ref.h"
Expand All @@ -62,7 +61,7 @@ struct AggregateFunctionCollectSetData {
using ColVecType = ColumnVectorOrDecimal<ElementType>;
using ElementNativeType = typename NativeType<T>::Type;
using SelfType = AggregateFunctionCollectSetData;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType>;
Set data_set;
Int64 max_size = -1;

Expand All @@ -83,28 +82,37 @@ struct AggregateFunctionCollectSetData {
if (size() >= max_size) {
return;
}
data_set.insert(rhs_elem.get_value());
data_set.insert(rhs_elem);
}
} else {
data_set.merge(rhs.data_set);
data_set.merge(Set(rhs.data_set));
}
}

void write(BufferWritable& buf) const {
data_set.write(buf);
write_var_uint(data_set.size(), buf);
for (const auto& value : data_set) {
write_binary(value, buf);
}
write_var_int(max_size, buf);
}

void read(BufferReadable& buf) {
data_set.read(buf);
size_t new_size = 0;
read_var_uint(new_size, buf);
ElementNativeType x;
for (size_t i = 0; i < new_size; ++i) {
read_binary(x, buf);
data_set.insert(x);
}
read_var_int(max_size, buf);
}

void insert_result_into(IColumn& to) const {
auto& vec = assert_cast<ColVecType&>(to).get_data();
vec.reserve(size());
for (const auto& item : data_set) {
vec.push_back(item.key);
vec.push_back(item);
}
}

Expand All @@ -116,23 +124,19 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
using ElementType = StringRef;
using ColVecType = ColumnString;
using SelfType = AggregateFunctionCollectSetData<ElementType, HasLimit>;
using Set = HashSetWithStackMemory<ElementType, DefaultHash<ElementType>, 4>;
using Set = phmap::flat_hash_set<ElementType>;
Set data_set;
Int64 max_size = -1;

size_t size() const { return data_set.size(); }

void add(const IColumn& column, size_t row_num, Arena* arena) {
Set::LookupResult it;
bool inserted;
auto key = column.get_data_at(row_num);
key.data = arena->insert(key.data, key.size);
data_set.emplace(key, it, inserted);
data_set.insert(key);
}

void merge(const SelfType& rhs, Arena* arena) {
bool inserted;
Set::LookupResult it;
if (max_size == -1) {
max_size = rhs.max_size;
}
Expand All @@ -145,16 +149,16 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
}
}
assert(arena != nullptr);
StringRef key = rhs_elem.get_value();
StringRef key = rhs_elem;
key.data = arena->insert(key.data, key.size);
data_set.emplace(key, it, inserted);
data_set.insert(key);
}
}

void write(BufferWritable& buf) const {
write_var_uint(size(), buf);
for (const auto& elem : data_set) {
write_string_binary(elem.get_value(), buf);
write_string_binary(elem, buf);
}
write_var_int(max_size, buf);
}
Expand All @@ -174,7 +178,7 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> {
auto& vec = assert_cast<ColVecType&>(to);
vec.reserve(size());
for (const auto& item : data_set) {
vec.insert_data(item.key.data, item.key.size);
vec.insert_data(item.data, item.size);
}
}

Expand Down
44 changes: 23 additions & 21 deletions be/src/vec/aggregate_functions/aggregate_function_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
Expand All @@ -59,8 +58,8 @@ namespace doris::vectorized {
template <typename T, bool stable>
struct AggregateFunctionDistinctSingleNumericData {
/// When creating, the hash table must be small.
using Container = std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>,
HashSetWithStackMemory<T, DefaultHash<T>, 4>>;
using Container =
std::conditional_t<stable, phmap::flat_hash_map<T, uint32_t>, phmap::flat_hash_set<T>>;
using Self = AggregateFunctionDistinctSingleNumericData<T, stable>;
Container data;

Expand All @@ -78,21 +77,30 @@ struct AggregateFunctionDistinctSingleNumericData {
void merge(const Self& rhs, Arena*) {
DCHECK(!stable);
if constexpr (!stable) {
data.merge(rhs.data);
data.merge(Container(rhs.data));
}
}

void serialize(BufferWritable& buf) const {
DCHECK(!stable);
if constexpr (!stable) {
data.write(buf);
write_var_uint(data.size(), buf);
for (const auto& value : data) {
write_binary(value, buf);
}
}
}

void deserialize(BufferReadable& buf, Arena*) {
DCHECK(!stable);
if constexpr (!stable) {
data.read(buf);
size_t new_size = 0;
read_var_uint(new_size, buf);
T x;
for (size_t i = 0; i < new_size; ++i) {
read_binary(x, buf);
data.insert(x);
}
}
}

Expand All @@ -108,7 +116,7 @@ struct AggregateFunctionDistinctSingleNumericData {
}
} else {
for (const auto& elem : data) {
argument_columns[0]->insert(elem.get_value());
argument_columns[0]->insert(elem);
}
}

Expand All @@ -120,19 +128,17 @@ template <bool stable>
struct AggregateFunctionDistinctGenericData {
/// When creating, the hash table must be small.
using Container = std::conditional_t<stable, phmap::flat_hash_map<StringRef, uint32_t>,
HashSetWithStackMemory<StringRef, StringRefHash, 4>>;
phmap::flat_hash_set<StringRef, StringRefHash>>;
using Self = AggregateFunctionDistinctGenericData;
Container data;

void merge(const Self& rhs, Arena* arena) {
DCHECK(!stable);
if constexpr (!stable) {
typename Container::LookupResult it;
bool inserted;
for (const auto& elem : rhs.data) {
StringRef key = elem.get_value();
StringRef key = elem;
key.data = arena->insert(key.data, key.size);
data.emplace(key, it, inserted);
data.emplace(key);
}
}
}
Expand All @@ -142,7 +148,7 @@ struct AggregateFunctionDistinctGenericData {
if constexpr (!stable) {
write_var_uint(data.size(), buf);
for (const auto& elem : data) {
write_string_binary(elem.get_value(), buf);
write_string_binary(elem, buf);
}
}
}
Expand Down Expand Up @@ -174,9 +180,7 @@ struct AggregateFunctionDistinctSingleGenericData
if constexpr (stable) {
data.emplace(key, data.size());
} else {
typename Base::Container::LookupResult it;
bool inserted;
data.emplace(key, it, inserted);
data.insert(key);
}
}

Expand All @@ -193,7 +197,7 @@ struct AggregateFunctionDistinctSingleGenericData
}
} else {
for (const auto& elem : data) {
argument_columns[0]->insert_data(elem.get_value().data, elem.get_value().size);
argument_columns[0]->insert_data(elem.data, elem.size);
}
}

Expand All @@ -218,9 +222,7 @@ struct AggregateFunctionDistinctMultipleGenericData
if constexpr (stable) {
data.emplace(key, data.size());
} else {
typename Base::Container::LookupResult it;
bool inserted;
data.emplace(key, it, inserted);
data.emplace(key);
}
}

Expand All @@ -243,7 +245,7 @@ struct AggregateFunctionDistinctMultipleGenericData
}
} else {
for (const auto& elem : data) {
const char* begin = elem.get_value().data;
const char* begin = elem.data;
for (auto& column : argument_columns) {
begin = column->deserialize_and_insert_from_arena(begin);
}
Expand Down
10 changes: 4 additions & 6 deletions be/src/vec/functions/array/function_array_distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/pod_array_fwd.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -146,8 +145,7 @@ class FunctionArrayDistinct : public IFunction {
auto& dest_data_concrete = reinterpret_cast<ColumnType&>(dest_column);
PaddedPODArray<NestType>& dest_datas = dest_data_concrete.get_data();

using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>,
INITIAL_SIZE_DEGREE>;
using Set = phmap::flat_hash_set<ElementNativeType, DefaultHash<ElementNativeType>>;
Set set;

size_t prev_src_offset = 0;
Expand All @@ -171,7 +169,7 @@ class FunctionArrayDistinct : public IFunction {
continue;
}

if (!set.find(src_datas[j])) {
if (!set.contains(src_datas[j])) {
set.insert(src_datas[j]);
dest_datas.push_back(src_datas[j]);
if (dest_null_map) {
Expand Down Expand Up @@ -201,7 +199,7 @@ class FunctionArrayDistinct : public IFunction {
ColumnString::Offsets& column_string_offsets = dest_column_string.get_offsets();
column_string_chars.reserve(src_column.size());

using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, INITIAL_SIZE_DEGREE>;
using Set = phmap::flat_hash_set<StringRef, DefaultHash<StringRef>>;
Set set;

size_t prev_src_offset = 0;
Expand All @@ -225,7 +223,7 @@ class FunctionArrayDistinct : public IFunction {
}

StringRef src_str_ref = src_data_concrete->get_data_at(j);
if (!set.find(src_str_ref)) {
if (!set.contains(src_str_ref)) {
set.insert(src_str_ref);
// copy the src data to column_string_chars
const size_t old_size = column_string_chars.size();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/functions/array/function_array_except.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ struct ExceptAction {
template <bool is_left>
bool apply(Set& set, Set& result_set, const Element& elem) {
if constexpr (is_left) {
if (!set.find(elem)) {
if (!set.contains(elem)) {
set.insert(elem);
return true;
}
} else {
if (!set.find(elem)) {
if (!set.contains(elem)) {
set.insert(elem);
}
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/functions/array/function_array_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "vec/columns/column_array.h"
#include "vec/columns/column_string.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/data_types/data_type_array.h"
#include "vec/functions/array/function_array_utils.h"
#include "vec/functions/function_helpers.h"
Expand Down
5 changes: 2 additions & 3 deletions be/src/vec/functions/array/function_array_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "vec/columns/column_array.h"
#include "vec/columns/column_string.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/data_types/data_type_array.h"
#include "vec/functions/array/function_array_utils.h"
#include "vec/functions/function_helpers.h"
Expand All @@ -48,7 +47,7 @@ template <SetOperation operation, typename ColumnType>
struct OpenSetImpl {
using Element = typename ColumnType::value_type;
using ElementNativeType = typename NativeType<Element>::Type;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType>;
using Action = typename ActionImpl<Set, Element, operation>::Action;
Action action;
Set set;
Expand Down Expand Up @@ -85,7 +84,7 @@ struct OpenSetImpl {

template <SetOperation operation>
struct OpenSetImpl<operation, ColumnString> {
using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, 4>;
using Set = phmap::flat_hash_set<StringRef>;
using Action = typename ActionImpl<Set, StringRef, operation>::Action;
Action action;
Set set;
Expand Down
9 changes: 4 additions & 5 deletions be/src/vec/functions/array/function_arrays_overlap.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_set.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
Expand Down Expand Up @@ -62,7 +61,7 @@ using ColumnString = ColumnStr<UInt32>;
template <typename T>
struct OverlapSetImpl {
using ElementNativeType = typename NativeType<typename T::value_type>::Type;
using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
using Set = phmap::flat_hash_set<ElementNativeType, DefaultHash<ElementNativeType>>;
Set set;
void insert_array(const IColumn* column, size_t start, size_t size) {
const auto& vec = assert_cast<const T&>(*column).get_data();
Expand All @@ -73,7 +72,7 @@ struct OverlapSetImpl {
bool find_any(const IColumn* column, size_t start, size_t size) {
const auto& vec = assert_cast<const T&>(*column).get_data();
for (size_t i = start; i < start + size; ++i) {
if (set.find(vec[i])) {
if (set.contains(vec[i])) {
return true;
}
}
Expand All @@ -83,7 +82,7 @@ struct OverlapSetImpl {

template <>
struct OverlapSetImpl<ColumnString> {
using Set = HashSetWithStackMemory<StringRef, DefaultHash<StringRef>, 4>;
using Set = phmap::flat_hash_set<StringRef, DefaultHash<StringRef>>;
Set set;
void insert_array(const IColumn* column, size_t start, size_t size) {
for (size_t i = start; i < start + size; ++i) {
Expand All @@ -92,7 +91,7 @@ struct OverlapSetImpl<ColumnString> {
}
bool find_any(const IColumn* column, size_t start, size_t size) {
for (size_t i = start; i < start + size; ++i) {
if (set.find(column->get_data_at(i))) {
if (set.contains(column->get_data_at(i))) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
a ["aa", "a"]

-- !select_default --
a ["aaa", "aa", "a"]
a ["a", "aa", "aaa"]
b ["b"]

-- !select_default --
a ["aaa", "aa", "a"]
a ["a", "aa", "aaa"]
b ["b"]

Loading

0 comments on commit 6d3f3b2

Please sign in to comment.