Skip to content

Commit

Permalink
Merge branch 'master' into environment_for_using_prefiltering
Browse files Browse the repository at this point in the history
  • Loading branch information
realHannes authored Oct 22, 2024
2 parents 74756fe + f856919 commit 1278d39
Show file tree
Hide file tree
Showing 42 changed files with 940 additions and 632 deletions.
56 changes: 26 additions & 30 deletions src/engine/Bind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,25 @@ ProtoResult Bind::computeResult(bool requestLaziness) {

auto applyBind = [this, subRes](IdTable idTable, LocalVocab* localVocab) {
return computeExpressionBind(localVocab, std::move(idTable),
subRes->localVocab(),
_bind._expression.getPimpl());
};

if (subRes->isFullyMaterialized()) {
if (requestLaziness && subRes->idTable().size() > CHUNK_SIZE) {
auto localVocab =
std::make_shared<LocalVocab>(subRes->getCopyOfLocalVocab());
auto generator = [](std::shared_ptr<LocalVocab> vocab, auto applyBind,
std::shared_ptr<const Result> result)
-> cppcoro::generator<IdTable> {
size_t size = result->idTable().size();
for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) {
co_yield applyBind(
cloneSubView(result->idTable(),
{offset, std::min(size, offset + CHUNK_SIZE)}),
vocab.get());
}
}(localVocab, std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
return {
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
size_t size = result->idTable().size();
for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) {
LocalVocab outVocab = result->getCopyOfLocalVocab();
IdTable idTable = applyBind(
cloneSubView(result->idTable(),
{offset, std::min(size, offset + CHUNK_SIZE)}),
&outVocab);
co_yield {std::move(idTable), std::move(outVocab)};
}
}(std::move(applyBind), std::move(subRes)),
resultSortedOn()};
}
// Make a deep copy of the local vocab from `subRes` and then add to it (in
// case BIND adds a new word or words).
Expand All @@ -132,28 +131,25 @@ ProtoResult Bind::computeResult(bool requestLaziness) {
LOG(DEBUG) << "BIND result computation done." << std::endl;
return {std::move(result), resultSortedOn(), std::move(localVocab)};
}
auto localVocab = std::make_shared<LocalVocab>();
auto generator =
[](std::shared_ptr<LocalVocab> vocab, auto applyBind,
std::shared_ptr<const Result> result) -> cppcoro::generator<IdTable> {
for (IdTable& idTable : result->idTables()) {
co_yield applyBind(std::move(idTable), vocab.get());
[](auto applyBind,
std::shared_ptr<const Result> result) -> Result::Generator {
for (auto& [idTable, localVocab] : result->idTables()) {
IdTable resultTable = applyBind(std::move(idTable), &localVocab);
co_yield {std::move(resultTable), std::move(localVocab)};
}
std::array<const LocalVocab*, 2> vocabs{vocab.get(), &result->localVocab()};
*vocab = LocalVocab::merge(std::span{vocabs});
}(localVocab, std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn(), std::move(localVocab)};
}(std::move(applyBind), std::move(subRes));
return {std::move(generator), resultSortedOn()};
}

// _____________________________________________________________________________
IdTable Bind::computeExpressionBind(
LocalVocab* outputLocalVocab, IdTable idTable,
const LocalVocab& inputLocalVocab,
LocalVocab* localVocab, IdTable idTable,
const sparqlExpression::SparqlExpression* expression) const {
sparqlExpression::EvaluationContext evaluationContext(
*getExecutionContext(), _subtree->getVariableColumns(), idTable,
getExecutionContext()->getAllocator(), inputLocalVocab,
cancellationHandle_, deadline_);
getExecutionContext()->getAllocator(), *localVocab, cancellationHandle_,
deadline_);

sparqlExpression::ExpressionResult expressionResult =
expression->evaluate(&evaluationContext);
Expand Down Expand Up @@ -188,7 +184,7 @@ IdTable Bind::computeExpressionBind(
if (it != resultGenerator.end()) {
Id constantId =
sparqlExpression::detail::constantExpressionResultToId(
std::move(*it), *outputLocalVocab);
std::move(*it), *localVocab);
checkCancellation();
ad_utility::chunkedFill(outputColumn, constantId, CHUNK_SIZE,
[this]() { checkCancellation(); });
Expand All @@ -199,7 +195,7 @@ IdTable Bind::computeExpressionBind(
for (auto& resultValue : resultGenerator) {
outputColumn[i] =
sparqlExpression::detail::constantExpressionResultToId(
std::move(resultValue), *outputLocalVocab);
std::move(resultValue), *localVocab);
i++;
checkCancellation();
}
Expand Down
3 changes: 1 addition & 2 deletions src/engine/Bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class Bind : public Operation {

// Implementation for the binding of arbitrary expressions.
IdTable computeExpressionBind(
LocalVocab* outputLocalVocab, IdTable idTable,
const LocalVocab& inputLocalVocab,
LocalVocab* localVocab, IdTable idTable,
const sparqlExpression::SparqlExpression* expression) const;

[[nodiscard]] VariableToColumnMap computeVariableToColumnMap() const override;
Expand Down
69 changes: 38 additions & 31 deletions src/engine/ExportQueryExecutionTrees.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
#include "util/http/MediaTypes.h"

// __________________________________________________________________________
cppcoro::generator<const IdTable&> ExportQueryExecutionTrees::getIdTables(
const Result& result) {
cppcoro::generator<ExportQueryExecutionTrees::TableConstRefWithVocab>
ExportQueryExecutionTrees::getIdTables(const Result& result) {
if (result.isFullyMaterialized()) {
co_yield result.idTable();
TableConstRefWithVocab pair{result.idTable(), result.localVocab()};
co_yield pair;
} else {
for (const IdTable& idTable : result.idTables()) {
co_yield idTable;
for (const Result::IdTableVocabPair& pair : result.idTables()) {
TableConstRefWithVocab tableWithVocab{pair.idTable_, pair.localVocab_};
co_yield tableWithVocab;
}
}
}
Expand All @@ -33,11 +35,14 @@ ExportQueryExecutionTrees::getRowIndices(LimitOffsetClause limitOffset,
if (limitOffset._limit.value_or(1) == 0) {
co_return;
}
for (const IdTable& idTable : getIdTables(result)) {
uint64_t currentOffset = limitOffset.actualOffset(idTable.numRows());
uint64_t upperBound = limitOffset.upperBound(idTable.numRows());
for (TableConstRefWithVocab& tableWithVocab : getIdTables(result)) {
uint64_t currentOffset =
limitOffset.actualOffset(tableWithVocab.idTable_.numRows());
uint64_t upperBound =
limitOffset.upperBound(tableWithVocab.idTable_.numRows());
if (currentOffset != upperBound) {
co_yield {idTable, std::views::iota(currentOffset, upperBound)};
co_yield {std::move(tableWithVocab),
std::views::iota(currentOffset, upperBound)};
}
limitOffset._offset -= currentOffset;
if (limitOffset._limit.has_value()) {
Expand All @@ -57,9 +62,10 @@ ExportQueryExecutionTrees::constructQueryResultToTriples(
const ad_utility::sparql_types::Triples& constructTriples,
LimitOffsetClause limitAndOffset, std::shared_ptr<const Result> result,
CancellationHandle cancellationHandle) {
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
auto& idTable = pair.idTable_;
for (uint64_t i : range) {
ConstructQueryExportContext context{i, idTable, result->localVocab(),
ConstructQueryExportContext context{i, idTable, pair.localVocab_,
qet.getVariableColumns(),
qet.getQec()->getIndex()};
using enum PositionInTriple;
Expand Down Expand Up @@ -172,10 +178,10 @@ ExportQueryExecutionTrees::idTableToQLeverJSONBindings(
std::shared_ptr<const Result> result,
CancellationHandle cancellationHandle) {
AD_CORRECTNESS_CHECK(result != nullptr);
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t rowIndex : range) {
co_yield idTableToQLeverJSONRow(qet, columns, result->localVocab(),
rowIndex, idTable)
co_yield idTableToQLeverJSONRow(qet, columns, pair.localVocab_, rowIndex,
pair.idTable_)
.dump();
cancellationHandle->throwIfCancelled();
}
Expand Down Expand Up @@ -405,14 +411,14 @@ ExportQueryExecutionTrees::selectQueryResultToStream(

// special case : binary export of IdTable
if constexpr (format == MediaType::octetStream) {
for (const auto& [idTable, range] :
getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
for (const auto& columnIndex : selectedColumnIndices) {
if (columnIndex.has_value()) {
co_yield std::string_view{reinterpret_cast<const char*>(&idTable(
i, columnIndex.value().columnIndex_)),
sizeof(Id)};
co_yield std::string_view{
reinterpret_cast<const char*>(
&pair.idTable_(i, columnIndex.value().columnIndex_)),
sizeof(Id)};
}
}
cancellationHandle->throwIfCancelled();
Expand All @@ -436,15 +442,15 @@ ExportQueryExecutionTrees::selectQueryResultToStream(
constexpr auto& escapeFunction = format == MediaType::tsv
? RdfEscaping::escapeForTsv
: RdfEscaping::escapeForCsv;
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
for (size_t j = 0; j < selectedColumnIndices.size(); ++j) {
if (selectedColumnIndices[j].has_value()) {
const auto& val = selectedColumnIndices[j].value();
Id id = idTable(i, val.columnIndex_);
Id id = pair.idTable_(i, val.columnIndex_);
auto optionalStringAndType =
idToStringAndType<format == MediaType::csv>(
qet.getQec()->getIndex(), id, result->localVocab(),
qet.getQec()->getIndex(), id, pair.localVocab_,
escapeFunction);
if (optionalStringAndType.has_value()) [[likely]] {
co_yield optionalStringAndType.value().first;
Expand Down Expand Up @@ -561,15 +567,15 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
auto selectedColumnIndices =
qet.selectedVariablesToColumnIndices(selectClause, false);
// TODO<joka921> we could prefilter for the nonexisting variables.
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
co_yield "\n <result>";
for (size_t j = 0; j < selectedColumnIndices.size(); ++j) {
if (selectedColumnIndices[j].has_value()) {
const auto& val = selectedColumnIndices[j].value();
Id id = idTable(i, val.columnIndex_);
Id id = pair.idTable_(i, val.columnIndex_);
co_yield idToXMLBinding(val.variable_, id, qet.getQec()->getIndex(),
result->localVocab());
pair.localVocab_);
}
}
co_yield "\n </result>";
Expand Down Expand Up @@ -611,12 +617,13 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
co_return;
}

auto getBinding = [&](const IdTable& idTable, const uint64_t& i) {
auto getBinding = [&](const IdTable& idTable, const uint64_t& i,
const LocalVocab& localVocab) {
nlohmann::ordered_json binding = {};
for (const auto& column : columns) {
auto optionalStringAndType = idToStringAndType(
qet.getQec()->getIndex(), idTable(i, column->columnIndex_),
result->localVocab());
auto optionalStringAndType =
idToStringAndType(qet.getQec()->getIndex(),
idTable(i, column->columnIndex_), localVocab);
if (optionalStringAndType.has_value()) [[likely]] {
const auto& [stringValue, xsdType] = optionalStringAndType.value();
binding[column->variable_] =
Expand All @@ -627,12 +634,12 @@ ad_utility::streams::stream_generator ExportQueryExecutionTrees::
};

bool isFirstRow = true;
for (const auto& [idTable, range] : getRowIndices(limitAndOffset, *result)) {
for (const auto& [pair, range] : getRowIndices(limitAndOffset, *result)) {
for (uint64_t i : range) {
if (!isFirstRow) [[likely]] {
co_yield ",";
}
co_yield getBinding(idTable, i);
co_yield getBinding(pair.idTable_, i, pair.localVocab_);
cancellationHandle->throwIfCancelled();
isFirstRow = false;
}
Expand Down
12 changes: 10 additions & 2 deletions src/engine/ExportQueryExecutionTrees.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,23 @@ class ExportQueryExecutionTrees {
const parsedQuery::SelectClause& selectClause,
LimitOffsetClause limitAndOffset, CancellationHandle cancellationHandle);

// Public for testing.
public:
struct TableConstRefWithVocab {
const IdTable& idTable_;
const LocalVocab& localVocab_;
};
// Helper type that contains an `IdTable` and a view with related indices to
// access the `IdTable` with.
struct TableWithRange {
const IdTable& idTable_;
TableConstRefWithVocab tableWithVocab_;
std::ranges::iota_view<uint64_t, uint64_t> view_;
};

private:
// Yield all `IdTables` provided by the given `result`.
static cppcoro::generator<const IdTable&> getIdTables(const Result& result);
static cppcoro::generator<ExportQueryExecutionTrees::TableConstRefWithVocab>
getIdTables(const Result& result);

// Return a range that contains the indices of the rows that have to be
// exported from the `idTable` given the `LimitOffsetClause`. It takes into
Expand Down
44 changes: 26 additions & 18 deletions src/engine/Filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,47 +48,55 @@ ProtoResult Filter::computeResult(bool requestLaziness) {
LOG(DEBUG) << "Filter result computation..." << endl;
checkCancellation();

auto localVocab = subRes->getSharedLocalVocab();
if (subRes->isFullyMaterialized()) {
IdTable result = filterIdTable(subRes, subRes->idTable());
IdTable result = filterIdTable(subRes->sortedBy(), subRes->idTable(),
subRes->localVocab());
LOG(DEBUG) << "Filter result computation done." << endl;

return {std::move(result), resultSortedOn(), std::move(localVocab)};
return {std::move(result), resultSortedOn(), subRes->getSharedLocalVocab()};
}

if (requestLaziness) {
return {[](auto subRes, auto* self) -> cppcoro::generator<IdTable> {
for (IdTable& idTable : subRes->idTables()) {
IdTable result = self->filterIdTable(subRes, idTable);
co_yield result;
return {[](auto subRes, auto* self) -> Result::Generator {
for (auto& [idTable, localVocab] : subRes->idTables()) {
IdTable result = self->filterIdTable(subRes->sortedBy(),
idTable, localVocab);
co_yield {std::move(result), std::move(localVocab)};
}
}(std::move(subRes), this),
resultSortedOn(), std::move(localVocab)};
resultSortedOn()};
}

// If we receive a generator of IdTables, we need to materialize it into a
// single IdTable.
size_t width = getSubtree().get()->getResultWidth();
IdTable result{width, getExecutionContext()->getAllocator()};
ad_utility::callFixedSize(width, [this, &subRes, &result]<int WIDTH>() {
for (IdTable& idTable : subRes->idTables()) {
computeFilterImpl<WIDTH>(result, idTable, subRes->localVocab(),
subRes->sortedBy());
}
});
std::vector<LocalVocab> localVocabs;
ad_utility::callFixedSize(
width, [this, &subRes, &result, &localVocabs]<int WIDTH>() {
for (Result::IdTableVocabPair& pair : subRes->idTables()) {
computeFilterImpl<WIDTH>(result, pair.idTable_, pair.localVocab_,
subRes->sortedBy());
localVocabs.emplace_back(std::move(pair.localVocab_));
}
});

LocalVocab resultLocalVocab{};
resultLocalVocab.mergeWith(localVocabs);

LOG(DEBUG) << "Filter result computation done." << endl;

return {std::move(result), resultSortedOn(), std::move(localVocab)};
return {std::move(result), resultSortedOn(), std::move(resultLocalVocab)};
}

// _____________________________________________________________________________
IdTable Filter::filterIdTable(const std::shared_ptr<const Result>& subRes,
const IdTable& idTable) const {
IdTable Filter::filterIdTable(std::vector<ColumnIndex> sortedBy,
const IdTable& idTable,
const LocalVocab& localVocab) const {
size_t width = idTable.numColumns();
IdTable result{width, getExecutionContext()->getAllocator()};
CALL_FIXED_SIZE(width, &Filter::computeFilterImpl, this, result, idTable,
subRes->localVocab(), subRes->sortedBy());
localVocab, std::move(sortedBy));
return result;
}

Expand Down
5 changes: 3 additions & 2 deletions src/engine/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Filter : public Operation {
std::vector<ColumnIndex> sortedBy) const;

// Run `computeFilterImpl` on the provided IdTable
IdTable filterIdTable(const std::shared_ptr<const Result>& subRes,
const IdTable& idTable) const;
IdTable filterIdTable(std::vector<ColumnIndex> sortedBy,
const IdTable& idTable,
const LocalVocab& localVocab) const;
};
Loading

0 comments on commit 1278d39

Please sign in to comment.