Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy Union operation #1557

Merged
merged 10 commits into from
Oct 18, 2024
138 changes: 101 additions & 37 deletions src/engine/Union.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,24 @@ size_t Union::getCostEstimate() {
getSizeEstimateBeforeLimit();
}

ProtoResult Union::computeResult([[maybe_unused]] bool requestLaziness) {
ProtoResult Union::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Union result computation..." << std::endl;
std::shared_ptr<const Result> subRes1 = _subtrees[0]->getResult();
std::shared_ptr<const Result> subRes2 = _subtrees[1]->getResult();
LOG(DEBUG) << "Union subresult computation done." << std::endl;
std::shared_ptr<const Result> subRes1 =
_subtrees[0]->getResult(requestLaziness);
std::shared_ptr<const Result> subRes2 =
_subtrees[1]->getResult(requestLaziness);

IdTable idTable{getExecutionContext()->getAllocator()};
if (requestLaziness) {
auto localVocab = std::make_shared<LocalVocab>();
auto generator =
computeResultLazily(std::move(subRes1), std::move(subRes2), localVocab);
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
}

idTable.setNumColumns(getResultWidth());
Union::computeUnion(&idTable, subRes1->idTable(), subRes2->idTable(),
_columnOrigins);
LOG(DEBUG) << "Union subresult computation done." << std::endl;

IdTable idTable =
computeUnion(subRes1->idTable(), subRes2->idTable(), _columnOrigins);

LOG(DEBUG) << "Union result computation done" << std::endl;
// If only one of the two operands has a non-empty local vocabulary, share
Expand All @@ -177,43 +184,40 @@ ProtoResult Union::computeResult([[maybe_unused]] bool requestLaziness) {
Result::getMergedLocalVocab(*subRes1, *subRes2)};
}

void Union::computeUnion(
IdTable* resPtr, const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) {
IdTable& res = *resPtr;
res.resize(left.size() + right.size());

static constexpr size_t chunkSize = 1'000'000;
// _____________________________________________________________________________
void Union::copyChunked(auto beg, auto end, auto target) const {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::copy(beg + i, beg + actualEnd, target + i);
}
}
joka921 marked this conversation as resolved.
Show resolved Hide resolved

// A drop-in replacement for `std::copy` that performs the copying in chunks
// of `chunkSize` and checks the timeout after each chunk.
auto copyChunked = [this](auto beg, auto end, auto target) {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::copy(beg + i, beg + actualEnd, target + i);
}
};
// _____________________________________________________________________________
void Union::fillChunked(auto beg, auto end, const auto& value) const {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::fill(beg + i, beg + actualEnd, value);
}
};

// A similar timeout-checking replacement for `std::fill`.
auto fillChunked = [this](auto beg, auto end, const auto& value) {
size_t total = end - beg;
for (size_t i = 0; i < total; i += chunkSize) {
checkCancellation();
size_t actualEnd = std::min(i + chunkSize, total);
std::fill(beg + i, beg + actualEnd, value);
}
};
// _____________________________________________________________________________
IdTable Union::computeUnion(
const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) const {
IdTable res{getResultWidth(), getExecutionContext()->getAllocator()};
res.resize(left.size() + right.size());

// Write the column with the `inputColumnIndex` from the `inputTable` into the
// `targetColumn`. Always copy the complete input column and start at position
// `offset` in the target column. If the `inputColumnIndex` is `NO_COLUMN`,
// then the corresponding range in the `targetColumn` will be filled with
// UNDEF.
auto writeColumn = [&copyChunked, &fillChunked](
const auto& inputTable, auto& targetColumn,
size_t inputColumnIndex, size_t offset) {
auto writeColumn = [this](const auto& inputTable, auto& targetColumn,
size_t inputColumnIndex, size_t offset) {
if (inputColumnIndex != NO_COLUMN) {
decltype(auto) input = inputTable.getColumn(inputColumnIndex);
copyChunked(input.begin(), input.end(), targetColumn.begin() + offset);
Expand All @@ -233,4 +237,64 @@ void Union::computeUnion(
writeColumn(left, targetColumn, leftCol, 0u);
writeColumn(right, targetColumn, rightCol, left.size());
}
return res;
}

// _____________________________________________________________________________
template <bool left>
std::vector<ColumnIndex> Union::computePermutation() const {
constexpr size_t treeIndex = left ? 0 : 1;
ColumnIndex startOfUndefColumns = _subtrees[treeIndex]->getResultWidth();
std::vector<ColumnIndex> permutation{};
permutation.reserve(_columnOrigins.size());
for (const auto& columnOrigin : _columnOrigins) {
ColumnIndex originIndex = columnOrigin[treeIndex];
if (originIndex == NO_COLUMN) {
originIndex = startOfUndefColumns;
startOfUndefColumns++;
}
permutation.push_back(originIndex);
}
return permutation;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retrospective I'm not quite sure if there's an actual performance benefit to compute the permutation once and reuse it for all id tables or if it would be possible to just inline it into the algorithm that does the actual swapping, saving the memory allocation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be better to just store pairs of indices of swap operations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is perfectly fine as is, the assumption always is, that columns and blocks are sufficiently large, s.t. you can do something mildly expensive for each of them.

}

// _____________________________________________________________________________
IdTable Union::transformToCorrectColumnFormat(
IdTable idTable, const std::vector<ColumnIndex>& permutation) const {
while (idTable.numColumns() < getResultWidth()) {
idTable.addEmptyColumn();
auto column = idTable.getColumn(idTable.numColumns() - 1);
fillChunked(column.begin(), column.end(), Id::makeUndefined());
}

idTable.setColumnSubset(permutation);
return idTable;
}

// _____________________________________________________________________________
cppcoro::generator<IdTable> Union::computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2,
std::shared_ptr<LocalVocab> localVocab) const {
std::vector<ColumnIndex> permutation = computePermutation<true>();
if (result1->isFullyMaterialized()) {
co_yield transformToCorrectColumnFormat(result1->idTable().clone(),
permutation);
} else {
for (IdTable& idTable : result1->idTables()) {
co_yield transformToCorrectColumnFormat(std::move(idTable), permutation);
}
}
permutation = computePermutation<false>();
if (result2->isFullyMaterialized()) {
co_yield transformToCorrectColumnFormat(result2->idTable().clone(),
permutation);
} else {
for (IdTable& idTable : result2->idTables()) {
co_yield transformToCorrectColumnFormat(std::move(idTable), permutation);
}
}
std::array<const LocalVocab*, 2> vocabs{&result1->localVocab(),
&result2->localVocab()};
*localVocab = LocalVocab::merge(vocabs);
}
37 changes: 32 additions & 5 deletions src/engine/Union.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,45 @@ class Union : public Operation {

const static size_t NO_COLUMN;

static constexpr size_t chunkSize = 1'000'000;

// The method is declared here to make it unit testable
void computeUnion(IdTable* inputTable, const IdTable& left,
const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins);
IdTable computeUnion(
const IdTable& left, const IdTable& right,
const std::vector<std::array<size_t, 2>>& columnOrigins) const;

vector<QueryExecutionTree*> getChildren() override {
return {_subtrees[0].get(), _subtrees[1].get()};
}

private:
virtual ProtoResult computeResult(
[[maybe_unused]] bool requestLaziness) override;
// A drop-in replacement for `std::copy` that performs the copying in chunks
// of `chunkSize` and checks the timeout after each chunk.
void copyChunked(auto beg, auto end, auto target) const;

// A similar timeout-checking replacement for `std::fill`.
void fillChunked(auto beg, auto end, const auto& value) const;

ProtoResult computeResult(bool requestLaziness) override;

VariableToColumnMap computeVariableToColumnMap() const override;

// Compute the permutation of the `IdTable` being yielded for the left or
// right child depending on `left`. This permutation can then be used to swap
// the columns without any copy operations.
template <bool left>
std::vector<ColumnIndex> computePermutation() const;

// Take the given `IdTable`, add any missing columns to it (filled with
// undefined values) and permutate the columns to match the end result.
IdTable transformToCorrectColumnFormat(
IdTable idTable, const std::vector<ColumnIndex>& permutation) const;

// Create a generator that yields the `IdTable` for the left or right child
// one after another and apply a potential differing permutation to it. Write
// the merged LocalVocab to the given `LocalVocab` object at the end.
cppcoro::generator<IdTable> computeResultLazily(
std::shared_ptr<const Result> result1,
std::shared_ptr<const Result> result2,
std::shared_ptr<LocalVocab> localVocab) const;
};
99 changes: 95 additions & 4 deletions test/UnionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ using Vars = std::vector<std::optional<Variable>>;
} // namespace

// A simple test for computing a union.
TEST(UnionTest, computeUnion) {
TEST(Union, computeUnion) {
auto* qec = ad_utility::testing::getQec();
IdTable left = makeIdTableFromVector({{V(1)}, {V(2)}, {V(3)}});
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
Expand All @@ -30,7 +30,7 @@ TEST(UnionTest, computeUnion) {
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, right.clone(), Vars{Variable{"?u"}, Variable{"?x"}});

Union u{ad_utility::testing::getQec(), leftT, rightT};
Union u{qec, leftT, rightT};
auto resultTable = u.computeResultOnlyForTesting();
const auto& result = resultTable.idTable();

Expand All @@ -42,7 +42,7 @@ TEST(UnionTest, computeUnion) {

// A test with large inputs to test the chunked writing that is caused by the
// timeout checks.
TEST(UnionTest, computeUnionLarge) {
TEST(Union, computeUnionLarge) {
auto* qec = ad_utility::testing::getQec();
VectorTable leftInput, rightInput, expected;
size_t numInputsL = 1'500'000u;
Expand All @@ -65,9 +65,100 @@ TEST(UnionTest, computeUnionLarge) {
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector(rightInput), Vars{Variable{"?u"}});

Union u{ad_utility::testing::getQec(), leftT, rightT};
Union u{qec, leftT, rightT};
auto resultTable = u.computeResultOnlyForTesting();
const auto& result = resultTable.idTable();

ASSERT_EQ(result, makeIdTableFromVector(expected));
}

// _____________________________________________________________________________
TEST(Union, computeUnionLazy) {
auto runTest = [](bool nonLazyChildren,
ad_utility::source_location loc =
ad_utility::source_location::current()) {
auto l = generateLocationTrace(loc);
auto* qec = ad_utility::testing::getQec();
qec->getQueryTreeCache().clearAll();
IdTable left = makeIdTableFromVector({{V(1)}, {V(2)}, {V(3)}});
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(left), Vars{Variable{"?x"}}, false,
std::vector<ColumnIndex>{}, LocalVocab{}, std::nullopt,
nonLazyChildren);

IdTable right = makeIdTableFromVector({{V(4), V(5)}, {V(6), V(7)}});
auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, std::move(right), Vars{Variable{"?u"}, Variable{"?x"}}, false,
std::vector<ColumnIndex>{}, LocalVocab{}, std::nullopt,
nonLazyChildren);

Union u{qec, std::move(leftT), std::move(rightT)};
auto resultTable = u.computeResultOnlyForTesting(true);
ASSERT_FALSE(resultTable.isFullyMaterialized());
auto& result = resultTable.idTables();

auto U = Id::makeUndefined();
auto expected1 = makeIdTableFromVector({{V(1), U}, {V(2), U}, {V(3), U}});
auto expected2 = makeIdTableFromVector({{V(5), V(4)}, {V(7), V(6)}});

auto iterator = result.begin();
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected1);

++iterator;
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected2);

ASSERT_EQ(++iterator, result.end());
};

runTest(false);
runTest(true);
}

// _____________________________________________________________________________
TEST(Union, ensurePermutationIsAppliedCorrectly) {
using Var = Variable;
auto* qec = ad_utility::testing::getQec();
auto leftT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector({{1, 2, 3, 4, 5}}),
Vars{Var{"?a"}, Var{"?b"}, Var{"?c"}, Var{"?d"}, Var{"?e"}});

auto rightT = ad_utility::makeExecutionTree<ValuesForTesting>(
qec, makeIdTableFromVector({{6, 7, 8}}),
Vars{Var{"?b"}, Var{"?a"}, Var{"?e"}});

Union u{qec, std::move(leftT), std::move(rightT)};

{
qec->getQueryTreeCache().clearAll();
auto resultTable = u.computeResultOnlyForTesting(true);
ASSERT_FALSE(resultTable.isFullyMaterialized());
auto& result = resultTable.idTables();

auto U = Id::makeUndefined();
auto expected1 = makeIdTableFromVector({{1, 2, 3, 4, 5}});
auto expected2 = makeIdTableFromVector({{V(7), V(6), U, U, V(8)}});

auto iterator = result.begin();
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected1);

++iterator;
ASSERT_NE(iterator, result.end());
ASSERT_EQ(*iterator, expected2);

ASSERT_EQ(++iterator, result.end());
}

{
qec->getQueryTreeCache().clearAll();
auto resultTable = u.computeResultOnlyForTesting();
ASSERT_TRUE(resultTable.isFullyMaterialized());

auto U = Id::makeUndefined();
auto expected =
makeIdTableFromVector({{1, 2, 3, 4, 5}, {V(7), V(6), U, U, V(8)}});
EXPECT_EQ(resultTable.idTable(), expected);
}
}
Loading