Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy Distinct operation #1558

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 100 additions & 13 deletions src/engine/Distinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

#include "./Distinct.h"

#include <sstream>

#include "engine/CallFixedSize.h"
#include "engine/Engine.h"
#include "engine/QueryExecutionTree.h"

using std::endl;
Expand All @@ -19,7 +16,7 @@ size_t Distinct::getResultWidth() const { return subtree_->getResultWidth(); }
// _____________________________________________________________________________
Distinct::Distinct(QueryExecutionContext* qec,
std::shared_ptr<QueryExecutionTree> subtree,
const vector<ColumnIndex>& keepIndices)
const std::vector<ColumnIndex>& keepIndices)
: Operation(qec), subtree_(std::move(subtree)), _keepIndices(keepIndices) {}

// _____________________________________________________________________________
Expand All @@ -36,17 +33,107 @@ VariableToColumnMap Distinct::computeVariableToColumnMap() const {
return subtree_->getVariableColumns();
}

template <size_t WIDTH>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template <size_t WIDTH>
// ____________________________________________________________________
template <size_t WIDTH>

cppcoro::generator<IdTable> Distinct::lazyDistinct(
cppcoro::generator<IdTable> originalGenerator,
RobinTF marked this conversation as resolved.
Show resolved Hide resolved
std::vector<ColumnIndex> keepIndices,
std::optional<IdTable> aggregateTable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to not use the bool yieldOnce pattern from other operations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly for simplicity reasons. This function is currently static just like the regular distinct function and if a bool was passed instead you'd have to pass the width and the allocator to construct the IdTable within the generator. But conceptually it does the same thing

std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow =
std::nullopt;
for (IdTable& idTable : originalGenerator) {
IdTable result =
distinct<WIDTH>(std::move(idTable), keepIndices, previousRow);
if (!result.empty()) {
previousRow.emplace(result.asStaticView<WIDTH>().back());
if (aggregateTable.has_value()) {
aggregateTable.value().insertAtEnd(result);
} else {
co_yield result;
}
}
}
if (aggregateTable.has_value()) {
co_yield aggregateTable.value();
}
}

// _____________________________________________________________________________
ProtoResult Distinct::computeResult([[maybe_unused]] bool requestLaziness) {
IdTable idTable{getExecutionContext()->getAllocator()};
ProtoResult Distinct::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Getting sub-result for distinct result computation..." << endl;
std::shared_ptr<const Result> subRes = subtree_->getResult();
std::shared_ptr<const Result> subRes = subtree_->getResult(true);

LOG(DEBUG) << "Distinct result computation..." << endl;
idTable.setNumColumns(subRes->idTable().numColumns());
size_t width = subRes->idTable().numColumns();
CALL_FIXED_SIZE(width, &Engine::distinct, subRes->idTable(), _keepIndices,
&idTable);
LOG(DEBUG) << "Distinct result computation done." << endl;
return {std::move(idTable), resultSortedOn(), subRes->getSharedLocalVocab()};
size_t width = subtree_->getResultWidth();
if (subRes->isFullyMaterialized()) {
IdTable idTable =
CALL_FIXED_SIZE(width, &Distinct::distinct, subRes->idTable().clone(),
_keepIndices, std::nullopt);
LOG(DEBUG) << "Distinct result computation done." << endl;
return {std::move(idTable), resultSortedOn(),
subRes->getSharedLocalVocab()};
}

auto generator = CALL_FIXED_SIZE(
width, &Distinct::lazyDistinct, std::move(subRes->idTables()),
_keepIndices,
requestLaziness ? std::nullopt
: std::optional{IdTable{width, allocator()}});
if (!requestLaziness) {
IdTable result = cppcoro::getSingleElement(std::move(generator));
return {std::move(result), resultSortedOn(), subRes->getSharedLocalVocab()};
}
return {std::move(generator), resultSortedOn(),
subRes->getSharedLocalVocab()};
}

// _____________________________________________________________________________
template <size_t WIDTH>
IdTable Distinct::distinct(
IdTable dynInput, const std::vector<ColumnIndex>& keepIndices,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow) {
AD_CONTRACT_CHECK(keepIndices.size() <= dynInput.numColumns());
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
IdTableStatic<WIDTH> result = std::move(dynInput).toStatic<WIDTH>();

auto matchesRow = [&keepIndices](const auto& a, const auto& b) {
for (ColumnIndex i : keepIndices) {
if (a[i] != b[i]) {
return false;
}
}
return true;
};

// Variant of `std::ranges::unique` that allows to skip the first rows of
// elements found in the previous table.
auto first = std::ranges::find_if(result, [&matchesRow,
&previousRow](const auto& row) {
return !previousRow.has_value() || !matchesRow(row, previousRow.value());
});
auto last = result.end();

auto dest = result.begin();
if (first == dest) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it,there are two optimizations:

  1. Do this columnwise (see the Engine.cpp for a variation of this algorithm that only returns the count of unique elements)

  2. Use std::ranges::unique (with your matches row you can build something)

  3. What about cancellation? (see 1.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In total I think there might be a version with less code possible here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This would work, but it would require O(numRows()) bytes of additional memory (the current algorithm is in-place, no additional memory required, we're just freeing memory that has been allocated by the previous operation).
  2. After implementing this I'm fairly confident std::ranges::unique (as in this exact function) doesn't help here whatsoever, I really tried using it, but it just doesn't work for this use-case (with previousRow considered), but of course there might be other algorithms that might help making this shorter.
  3. Yeah, cancellation would be nice, but that should be added once we settled on a general approach. For the non-lazy approach admittedly it would be better to not clone the whole thing at all and just copy the entries that are actually required, but this would require even more code.

// Optimization to avoid redundant move operations.
first = std::ranges::adjacent_find(first, last, matchesRow);
dest = first;
if (first != last) {
++first;
}
} else if (first != last) {
*dest = std::move(*first);
}

if (first != last) {
while (++first != last) {
if (!matchesRow(*dest, *first)) {
*++dest = std::move(*first);
}
}
++dest;
}
result.erase(dest, last);

LOG(DEBUG) << "Distinct done.\n";
return std::move(result).toDynamic();
}
31 changes: 22 additions & 9 deletions src/engine/Distinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,26 @@
// Author: Björn Buchhold ([email protected])
#pragma once

#include <utility>
#include <vector>

#include "engine/Operation.h"
#include "engine/QueryExecutionTree.h"
#include "parser/ParsedQuery.h"

using std::vector;

class Distinct : public Operation {
private:
std::shared_ptr<QueryExecutionTree> subtree_;
vector<ColumnIndex> _keepIndices;
std::vector<ColumnIndex> _keepIndices;

public:
Distinct(QueryExecutionContext* qec,
std::shared_ptr<QueryExecutionTree> subtree,
const vector<ColumnIndex>& keepIndices);
const std::vector<ColumnIndex>& keepIndices);

[[nodiscard]] size_t getResultWidth() const override;

[[nodiscard]] string getDescriptor() const override;

[[nodiscard]] vector<ColumnIndex> resultSortedOn() const override {
[[nodiscard]] std::vector<ColumnIndex> resultSortedOn() const override {
return subtree_->resultSortedOn();
}

Expand All @@ -46,15 +42,32 @@ class Distinct : public Operation {

bool knownEmptyResult() override { return subtree_->knownEmptyResult(); }

vector<QueryExecutionTree*> getChildren() override {
std::vector<QueryExecutionTree*> getChildren() override {
return {subtree_.get()};
}

protected:
[[nodiscard]] string getCacheKeyImpl() const override;

private:
ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override;
ProtoResult computeResult(bool requestLaziness) override;

VariableToColumnMap computeVariableToColumnMap() const override;

template <size_t WIDTH>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short docstring please.

static cppcoro::generator<IdTable> lazyDistinct(
cppcoro::generator<IdTable> originalGenerator,
std::vector<ColumnIndex> keepIndices,
std::optional<IdTable> aggregateTable);

// Removes all duplicates from input with regards to the columns
// in keepIndices. The input needs to be sorted on the keep indices,
// otherwise the result of this function is undefined.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also document the previousRow argument.

template <size_t WIDTH>
static IdTable distinct(
IdTable dynInput, const std::vector<ColumnIndex>& keepIndices,
std::optional<typename IdTableStatic<WIDTH>::row_type> previousRow);

FRIEND_TEST(Distinct, distinct);
FRIEND_TEST(Distinct, distinctWithEmptyInput);
};
31 changes: 0 additions & 31 deletions src/engine/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,37 +140,6 @@ class Engine {

static void sort(IdTable& idTable, const std::vector<ColumnIndex>& sortCols);

/**
* @brief Removes all duplicates from input with regards to the columns
* in keepIndices. The input needs to be sorted on the keep indices,
* otherwise the result of this function is undefined.
**/
template <size_t WIDTH>
static void distinct(const IdTable& dynInput,
const std::vector<ColumnIndex>& keepIndices,
IdTable* dynResult) {
LOG(DEBUG) << "Distinct on " << dynInput.size() << " elements.\n";
const IdTableView<WIDTH> input = dynInput.asStaticView<WIDTH>();
IdTableStatic<WIDTH> result = std::move(*dynResult).toStatic<WIDTH>();
result = input.clone();
if (!input.empty()) {
AD_CONTRACT_CHECK(keepIndices.size() <= input.numColumns());

auto last = std::unique(result.begin(), result.end(),
[&keepIndices](const auto& a, const auto& b) {
for (ColumnIndex i : keepIndices) {
if (a[i] != b[i]) {
return false;
}
}
return true;
});
result.erase(last, result.end());
}
*dynResult = std::move(result).toDynamic();
LOG(DEBUG) << "Distinct done.\n";
}

// Return the number of distinct rows in the `input`. The input must have all
// duplicates adjacent to each other (e.g. by being sorted), otherwise the
// behavior is undefined. `checkCancellation()` is invoked regularly and can
Expand Down
12 changes: 4 additions & 8 deletions src/engine/idTable/IdTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,11 @@ class IdTable {
// `std::vector` aand other containers.
// TODO<C++23, joka921> Remove the duplicates via explicit object parameters
// ("deducing this").
row_reference_restricted front() { return (*this)[0]; }
row_reference_restricted front() requires(!isView) { return (*this)[0]; }
const_row_reference_restricted front() const { return (*this)[0]; }
row_reference_restricted back() { return (*this)[numRows() - 1]; }
row_reference_restricted back() requires(!isView) {
return (*this)[numRows() - 1];
}
const_row_reference_restricted back() const { return (*this)[numRows() - 1]; }

// Resize the `IdTable` to exactly `numRows`. If `numRows < size()`, then the
Expand Down Expand Up @@ -637,12 +639,6 @@ class IdTable {
// that `begin() <= beginIt <= endIt < end`, else the behavior is undefined.
// The order of the elements before and after the erased regions remains the
// same. This behavior is similar to `std::vector::erase`.
//
// TODO<joka921> It is currently used by the implementation of DISTINCT, which
// first copies the sorted input completely, and then calls `std::unique`,
// followed by `erase` at the end. `DISTINCT` should be implemented via an
// out-of-place algorithm that only writes the distinct elements. The the
// following two functions can be deleted.
void erase(const iterator& beginIt, const iterator& endIt) requires(!isView) {
AD_EXPENSIVE_CHECK(begin() <= beginIt && beginIt <= endIt &&
endIt <= end());
Expand Down
27 changes: 0 additions & 27 deletions test/EngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
#include "engine/ValuesForTesting.h"
#include "engine/idTable/IdTable.h"
#include "util/AllocatorTestHelpers.h"
#include "util/Forward.h"
#include "util/GTestHelpers.h"
#include "util/IdTableHelpers.h"
#include "util/IdTestHelpers.h"
#include "util/IndexTestHelpers.h"
#include "util/Random.h"

using ad_utility::testing::makeAllocator;
using namespace ad_utility::testing;
Expand All @@ -30,31 +28,6 @@ constexpr auto U = Id::makeUndefined();
using JoinColumns = std::vector<std::array<ColumnIndex, 2>>;
} // namespace

TEST(EngineTest, distinctTest) {
IdTable input{makeIdTableFromVector(
{{1, 1, 3, 7}, {6, 1, 3, 6}, {2, 2, 3, 5}, {3, 6, 5, 4}, {1, 6, 5, 1}})};

IdTable result{4, makeAllocator()};

std::vector<ColumnIndex> keepIndices{{1, 2}};
CALL_FIXED_SIZE(4, Engine::distinct, input, keepIndices, &result);

// For easier checking.
IdTable expectedResult{
makeIdTableFromVector({{1, 1, 3, 7}, {2, 2, 3, 5}, {3, 6, 5, 4}})};
ASSERT_EQ(expectedResult, result);
}

TEST(EngineTest, distinctWithEmptyInput) {
IdTable input{1, makeAllocator()};
// Deliberately input a non-empty result to check that it is
// overwritten by the (empty) input.
IdTable result = makeIdTableFromVector({{3}});
CALL_FIXED_SIZE(1, Engine::distinct, input, std::vector<ColumnIndex>{},
&result);
ASSERT_EQ(input, result);
}

void testOptionalJoin(const IdTable& inputA, const IdTable& inputB,
JoinColumns jcls, const IdTable& expectedResult) {
{
Expand Down
Loading
Loading