From 81a93505e06e777f3063eac5181337c8e74cb57e Mon Sep 17 00:00:00 2001 From: RobinTF <83676088+RobinTF@users.noreply.github.com> Date: Tue, 15 Oct 2024 18:14:06 +0200 Subject: [PATCH 1/3] Implement lazy `BIND` (#1543) Allow the `BIND` operation to handle its input lazily. NOTE: Currently there is only a single local vocab for all the results of the `BIND`, so even when a `BIND` that creates strings is handled lazily, we still need the RAM for the complete local vocab. This will be handled in a follow-up PR. --- src/engine/Bind.cpp | 147 +++++++++++++++++++---------------- src/engine/Bind.h | 32 ++++---- src/engine/idTable/IdTable.h | 6 ++ test/IdTableTest.cpp | 17 ++++ test/engine/BindTest.cpp | 133 +++++++++++++++++++++++++++++++ test/engine/CMakeLists.txt | 1 + 6 files changed, 252 insertions(+), 84 deletions(-) create mode 100644 test/engine/BindTest.cpp diff --git a/src/engine/Bind.cpp b/src/engine/Bind.cpp index 83aef56e2e..dde7e019d9 100644 --- a/src/engine/Bind.cpp +++ b/src/engine/Bind.cpp @@ -81,64 +81,84 @@ std::vector Bind::getChildren() { } // _____________________________________________________________________________ -ProtoResult Bind::computeResult([[maybe_unused]] bool requestLaziness) { - using std::endl; - LOG(DEBUG) << "Get input to BIND operation..." << endl; - std::shared_ptr subRes = _subtree->getResult(); - LOG(DEBUG) << "Got input to Bind operation." << endl; - IdTable idTable{getExecutionContext()->getAllocator()}; - - idTable.setNumColumns(getResultWidth()); - - // Make a deep copy of the local vocab from `subRes` and then add to it (in - // case BIND adds a new word or words). - // - // TODO: In most BIND operations, nothing is added to the local vocabulary, so - // it would be more efficient to first share the pointer here (like with - // `shareLocalVocabFrom`) and only copy it when a new word is about to be - // added. Same for GROUP BY. - auto localVocab = subRes->getCopyOfLocalVocab(); - - size_t inwidth = subRes->idTable().numColumns(); - size_t outwidth = getResultWidth(); - - CALL_FIXED_SIZE((std::array{inwidth, outwidth}), &Bind::computeExpressionBind, - this, &idTable, &localVocab, *subRes, - _bind._expression.getPimpl()); - - LOG(DEBUG) << "BIND result computation done." << endl; - return {std::move(idTable), resultSortedOn(), std::move(localVocab)}; +IdTable Bind::cloneSubView(const IdTable& idTable, + const std::pair& subrange) { + IdTable result(idTable.numColumns(), idTable.getAllocator()); + result.resize(subrange.second - subrange.first); + std::ranges::copy(idTable.begin() + subrange.first, + idTable.begin() + subrange.second, result.begin()); + return result; } // _____________________________________________________________________________ -template -void Bind::computeExpressionBind( - IdTable* outputIdTable, LocalVocab* outputLocalVocab, - const Result& inputResultTable, - sparqlExpression::SparqlExpression* expression) const { +ProtoResult Bind::computeResult(bool requestLaziness) { + LOG(DEBUG) << "Get input to BIND operation..." << std::endl; + std::shared_ptr subRes = _subtree->getResult(requestLaziness); + LOG(DEBUG) << "Got input to Bind operation." << std::endl; + + auto applyBind = [this, subRes](IdTable idTable, LocalVocab* localVocab) { + return computeExpressionBind(localVocab, std::move(idTable), + subRes->localVocab(), + _bind._expression.getPimpl()); + }; + + if (subRes->isFullyMaterialized()) { + if (requestLaziness && subRes->idTable().size() > CHUNK_SIZE) { + auto localVocab = + std::make_shared(subRes->getCopyOfLocalVocab()); + auto generator = [](std::shared_ptr vocab, auto applyBind, + std::shared_ptr result) + -> cppcoro::generator { + size_t size = result->idTable().size(); + for (size_t offset = 0; offset < size; offset += CHUNK_SIZE) { + co_yield applyBind( + cloneSubView(result->idTable(), + {offset, std::min(size, offset + CHUNK_SIZE)}), + vocab.get()); + } + }(localVocab, std::move(applyBind), std::move(subRes)); + return {std::move(generator), resultSortedOn(), std::move(localVocab)}; + } + // Make a deep copy of the local vocab from `subRes` and then add to it (in + // case BIND adds a new word or words). + // + // Make a copy of the local vocab from`subRes`and then add to it (in case + // BIND adds new words). Note: The copy of the local vocab is shallow + // via`shared_ptr`s, so the following is also efficient if the BIND adds no + // new words. + LocalVocab localVocab = subRes->getCopyOfLocalVocab(); + IdTable result = applyBind(subRes->idTable().clone(), &localVocab); + LOG(DEBUG) << "BIND result computation done." << std::endl; + return {std::move(result), resultSortedOn(), std::move(localVocab)}; + } + auto localVocab = std::make_shared(); + auto generator = + [](std::shared_ptr vocab, auto applyBind, + std::shared_ptr result) -> cppcoro::generator { + for (IdTable& idTable : result->idTables()) { + co_yield applyBind(std::move(idTable), vocab.get()); + } + std::array vocabs{vocab.get(), &result->localVocab()}; + *vocab = LocalVocab::merge(std::span{vocabs}); + }(localVocab, std::move(applyBind), std::move(subRes)); + return {std::move(generator), resultSortedOn(), std::move(localVocab)}; +} + +// _____________________________________________________________________________ +IdTable Bind::computeExpressionBind( + LocalVocab* outputLocalVocab, IdTable idTable, + const LocalVocab& inputLocalVocab, + const sparqlExpression::SparqlExpression* expression) const { sparqlExpression::EvaluationContext evaluationContext( - *getExecutionContext(), _subtree->getVariableColumns(), - inputResultTable.idTable(), getExecutionContext()->getAllocator(), - inputResultTable.localVocab(), cancellationHandle_, deadline_); + *getExecutionContext(), _subtree->getVariableColumns(), idTable, + getExecutionContext()->getAllocator(), inputLocalVocab, + cancellationHandle_, deadline_); sparqlExpression::ExpressionResult expressionResult = expression->evaluate(&evaluationContext); - const auto input = inputResultTable.idTable().asStaticView(); - auto output = std::move(*outputIdTable).toStatic(); - - // first initialize the first columns (they remain identical) - const auto inSize = input.size(); - output.reserve(inSize); - const auto inCols = input.numColumns(); - // copy the input to the first numColumns; - for (size_t i = 0; i < inSize; ++i) { - output.emplace_back(); - for (size_t j = 0; j < inCols; ++j) { - output(i, j) = input(i, j); - } - checkCancellation(); - } + idTable.addEmptyColumn(); + auto outputColumn = idTable.getColumn(idTable.numColumns() - 1); auto visitor = [&]( T&& singleResult) mutable { @@ -146,22 +166,19 @@ void Bind::computeExpressionBind( constexpr static bool isStrongId = std::is_same_v; if constexpr (isVariable) { - auto column = + auto columnIndex = getInternallyVisibleVariableColumns().at(singleResult).columnIndex_; - for (size_t i = 0; i < inSize; ++i) { - output(i, inCols) = output(i, column); - checkCancellation(); - } + auto inputColumn = idTable.getColumn(columnIndex); + AD_CORRECTNESS_CHECK(inputColumn.size() == outputColumn.size()); + std::ranges::copy(inputColumn, outputColumn.begin()); } else if constexpr (isStrongId) { - for (size_t i = 0; i < inSize; ++i) { - output(i, inCols) = singleResult; - checkCancellation(); - } + std::ranges::fill(outputColumn, singleResult); } else { constexpr bool isConstant = sparqlExpression::isConstantResult; auto resultGenerator = sparqlExpression::detail::makeGenerator( - std::forward(singleResult), inSize, &evaluationContext); + std::forward(singleResult), outputColumn.size(), + &evaluationContext); if constexpr (isConstant) { auto it = resultGenerator.begin(); @@ -169,16 +186,14 @@ void Bind::computeExpressionBind( Id constantId = sparqlExpression::detail::constantExpressionResultToId( std::move(*it), *outputLocalVocab); - for (size_t i = 0; i < inSize; ++i) { - output(i, inCols) = constantId; - checkCancellation(); - } + checkCancellation(); + std::ranges::fill(outputColumn, constantId); } } else { size_t i = 0; // We deliberately move the values from the generator. for (auto& resultValue : resultGenerator) { - output(i, inCols) = + outputColumn[i] = sparqlExpression::detail::constantExpressionResultToId( std::move(resultValue), *outputLocalVocab); i++; @@ -190,5 +205,5 @@ void Bind::computeExpressionBind( std::visit(visitor, std::move(expressionResult)); - *outputIdTable = std::move(output).toDynamic(); + return idTable; } diff --git a/src/engine/Bind.h b/src/engine/Bind.h index bb1996a967..eeaafaf3ed 100644 --- a/src/engine/Bind.h +++ b/src/engine/Bind.h @@ -1,9 +1,8 @@ -// -// Created by johannes on 19.04.20. -// +// Copyright 2020, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Johannes Kalmbach -#ifndef QLEVER_BIND_H -#define QLEVER_BIND_H +#pragma once #include "engine/Operation.h" #include "engine/sparqlExpressions/SparqlExpressionPimpl.h" @@ -12,6 +11,8 @@ /// BIND operation, currently only supports a very limited subset of expressions class Bind : public Operation { public: + static constexpr size_t CHUNK_SIZE = 10'000; + Bind(QueryExecutionContext* qec, std::shared_ptr subtree, parsedQuery::Bind b) : Operation(qec), _subtree(std::move(subtree)), _bind(std::move(b)) {} @@ -37,25 +38,20 @@ class Bind : public Operation { float getMultiplicity(size_t col) override; bool knownEmptyResult() override; - // Returns the variable to which the expression will be bound - [[nodiscard]] const string& targetVariable() const { - return _bind._target.name(); - } - protected: [[nodiscard]] vector resultSortedOn() const override; private: - ProtoResult computeResult([[maybe_unused]] bool requestLaziness) override; + ProtoResult computeResult(bool requestLaziness) override; + + static IdTable cloneSubView(const IdTable& idTable, + const std::pair& subrange); // Implementation for the binding of arbitrary expressions. - template - void computeExpressionBind( - IdTable* outputIdTable, LocalVocab* outputLocalVocab, - const Result& inputResultTable, - sparqlExpression::SparqlExpression* expression) const; + IdTable computeExpressionBind( + LocalVocab* outputLocalVocab, IdTable idTable, + const LocalVocab& inputLocalVocab, + const sparqlExpression::SparqlExpression* expression) const; [[nodiscard]] VariableToColumnMap computeVariableToColumnMap() const override; }; - -#endif // QLEVER_BIND_H diff --git a/src/engine/idTable/IdTable.h b/src/engine/idTable/IdTable.h index 1f1f502503..29a0257cac 100644 --- a/src/engine/idTable/IdTable.h +++ b/src/engine/idTable/IdTable.h @@ -278,6 +278,12 @@ class IdTable { data().resize(numColumns, ColumnStorage{allocator_}); } + // Add a new empty column to the table. + void addEmptyColumn() requires columnsAreAllocatable && isDynamic { + data().emplace_back(size(), allocator_); + ++numColumns_; + } + // The number of rows in the table. We deliberately have an explicitly named // function `numRows` as well as a generic `size` function because the latter // can be used to write generic code, for example when using STL algorithms on diff --git a/test/IdTableTest.cpp b/test/IdTableTest.cpp index 46fb9a0f6c..34b1ad3072 100644 --- a/test/IdTableTest.cpp +++ b/test/IdTableTest.cpp @@ -1119,6 +1119,23 @@ TEST(IdTable, constructorsAreSfinaeFriendly) { static_assert(std::is_constructible_v); } +// _____________________________________________________________________________ +TEST(IdTable, addEmptyColumn) { + using ::testing::ElementsAre; + using ::testing::Eq; + IdTable table{1, ad_utility::makeUnlimitedAllocator()}; + table.push_back({V(1)}); + table.push_back({V(2)}); + + table.addEmptyColumn(); + + EXPECT_EQ(table.numColumns(), 2); + EXPECT_THAT(table.getColumn(0), ElementsAre(V(1), V(2))); + // The new column is uninitialized, so we can't make any more specific + // assertions about its content here. + EXPECT_EQ(table.getColumn(1).size(), 2); +} + // Check that we can completely instantiate `IdTable`s with a different value // type and a different underlying storage. diff --git a/test/engine/BindTest.cpp b/test/engine/BindTest.cpp new file mode 100644 index 0000000000..d0f4309f56 --- /dev/null +++ b/test/engine/BindTest.cpp @@ -0,0 +1,133 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Robin Textor-Falconi + +#include + +#include "../util/IdTableHelpers.h" +#include "../util/IndexTestHelpers.h" +#include "./ValuesForTesting.h" +#include "engine/Bind.h" +#include "engine/sparqlExpressions/LiteralExpression.h" + +using namespace sparqlExpression; +using Vars = std::vector>; + +namespace { +Bind makeBindForIdTable(QueryExecutionContext* qec, IdTable idTable) { + auto valuesTree = ad_utility::makeExecutionTree( + qec, std::move(idTable), Vars{Variable{"?a"}}); + return { + qec, + std::move(valuesTree), + {SparqlExpressionPimpl{ + std::make_unique(Variable{"?a"}), "?a as ?b"}, + Variable{"?b"}}}; +} + +void expectBindYieldsIdTable( + QueryExecutionContext* qec, Bind& bind, const IdTable& expected, + ad_utility::source_location loc = ad_utility::source_location::current()) { + auto trace = generateLocationTrace(loc); + + { + qec->getQueryTreeCache().clearAll(); + auto result = bind.getResult(false, ComputationMode::FULLY_MATERIALIZED); + ASSERT_TRUE(result->isFullyMaterialized()); + EXPECT_EQ(result->idTable(), expected); + } + + { + qec->getQueryTreeCache().clearAll(); + auto result = bind.getResult(false, ComputationMode::LAZY_IF_SUPPORTED); + ASSERT_FALSE(result->isFullyMaterialized()); + auto& idTables = result->idTables(); + auto iterator = idTables.begin(); + ASSERT_NE(iterator, idTables.end()); + EXPECT_EQ(*iterator, expected); + EXPECT_EQ(++iterator, idTables.end()); + } +} +} // namespace + +// _____________________________________________________________________________ +TEST(Bind, computeResult) { + auto* qec = ad_utility::testing::getQec(); + Bind bind = + makeBindForIdTable(qec, makeIdTableFromVector({{1}, {2}, {3}, {4}})); + + expectBindYieldsIdTable( + qec, bind, makeIdTableFromVector({{1, 1}, {2, 2}, {3, 3}, {4, 4}})); +} + +// _____________________________________________________________________________ +TEST(Bind, computeResultWithTableWithoutRows) { + auto* qec = ad_utility::testing::getQec(); + Bind bind = makeBindForIdTable( + qec, IdTable{1, ad_utility::makeUnlimitedAllocator()}); + + expectBindYieldsIdTable(qec, bind, + IdTable{2, ad_utility::makeUnlimitedAllocator()}); +} + +// _____________________________________________________________________________ +TEST(Bind, computeResultWithTableWithoutColumns) { + auto val = Id::makeFromInt(42); + auto* qec = ad_utility::testing::getQec(); + auto valuesTree = ad_utility::makeExecutionTree( + qec, makeIdTableFromVector({{}, {}}), Vars{}); + Bind bind{ + qec, + std::move(valuesTree), + {SparqlExpressionPimpl{std::make_unique(val), "42 as ?b"}, + Variable{"?b"}}}; + + expectBindYieldsIdTable(qec, bind, makeIdTableFromVector({{val}, {val}})); +} + +// _____________________________________________________________________________ +TEST( + Bind, + computeResultProducesLazyResultWhenFullyMaterializedSubResultIsTooLargeAndRequested) { + auto val = Id::makeFromInt(42); + IdTable::row_type row{1}; + row[0] = val; + auto* qec = ad_utility::testing::getQec(); + IdTable table{1, ad_utility::makeUnlimitedAllocator()}; + table.resize(Bind::CHUNK_SIZE + 1); + std::ranges::fill(table, row); + auto valuesTree = ad_utility::makeExecutionTree( + qec, table.clone(), Vars{Variable{"?a"}}, false, + std::vector{}, LocalVocab{}, std::nullopt, true); + Bind bind{ + qec, + std::move(valuesTree), + {SparqlExpressionPimpl{std::make_unique(val), "42 as ?b"}, + Variable{"?b"}}}; + + table.addEmptyColumn(); + row = IdTable::row_type{2}; + row[0] = val; + row[1] = val; + std::ranges::fill(table, row); + { + qec->getQueryTreeCache().clearAll(); + auto result = bind.getResult(false, ComputationMode::FULLY_MATERIALIZED); + ASSERT_TRUE(result->isFullyMaterialized()); + EXPECT_EQ(result->idTable(), table); + } + + { + table.resize(Bind::CHUNK_SIZE); + qec->getQueryTreeCache().clearAll(); + auto result = bind.getResult(false, ComputationMode::LAZY_IF_SUPPORTED); + ASSERT_FALSE(result->isFullyMaterialized()); + auto& idTables = result->idTables(); + auto iterator = idTables.begin(); + ASSERT_NE(iterator, idTables.end()); + EXPECT_EQ(*iterator, table); + ASSERT_NE(++iterator, idTables.end()); + EXPECT_EQ(*iterator, makeIdTableFromVector({{val, val}})); + EXPECT_EQ(++iterator, idTables.end()); + } +} diff --git a/test/engine/CMakeLists.txt b/test/engine/CMakeLists.txt index 1c875c49cd..3a7e7d3cf7 100644 --- a/test/engine/CMakeLists.txt +++ b/test/engine/CMakeLists.txt @@ -8,3 +8,4 @@ addLinkAndDiscoverTest(DistinctTest engine) addLinkAndDiscoverTest(GroupByHashMapOptimizationTest) addLinkAndDiscoverTest(LazyGroupByTest engine) addLinkAndDiscoverTest(CountConnectedSubgraphsTest) +addLinkAndDiscoverTest(BindTest engine) From a9a9ae46e72a7e9d2a2a09d691a10e64d62b3952 Mon Sep 17 00:00:00 2001 From: Johannes Kalmbach Date: Wed, 16 Oct 2024 15:20:30 +0200 Subject: [PATCH 2/3] Add the workflow files for the `sparql-conformance` webapp (#1559) With this commit, the first demo of a webapp that checks QLever's conformance with the official W3C SPARQL test suite will be activated for each pull request and commit to master. --- .../workflows/sparql-conformance-uploader.yml | 65 ++++++++++++++ .github/workflows/sparql-conformance.yml | 86 +++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 .github/workflows/sparql-conformance-uploader.yml create mode 100644 .github/workflows/sparql-conformance.yml diff --git a/.github/workflows/sparql-conformance-uploader.yml b/.github/workflows/sparql-conformance-uploader.yml new file mode 100644 index 0000000000..a2c2128c37 --- /dev/null +++ b/.github/workflows/sparql-conformance-uploader.yml @@ -0,0 +1,65 @@ +name: Upload conformance tests result + +on: + workflow_run: + workflows: [sparql-conformance] + types: + - completed + +jobs: + upload: + env: + SERVER_URL: https://qlever.cs.uni-freiburg.de/sparql-conformance-uploader + API_KEY: ${{ secrets.SPARQL_CONFORMANCE_TOKEN }} + runs-on: ubuntu-latest + if: github.event.workflow_run.conclusion == 'success' + steps: + - name: 'Download artifact' + uses: actions/github-script@v6 + with: + script: | + var artifacts = await github.rest.actions.listWorkflowRunArtifacts({ + owner: context.repo.owner, + repo: context.repo.repo, + run_id: ${{github.event.workflow_run.id }}, + }); + var matchArtifact = artifacts.data.artifacts.filter((artifact) => { + return artifact.name == "conformance-report" + })[0]; + var download = await github.rest.actions.downloadArtifact({ + owner: context.repo.owner, + repo: context.repo.repo, + artifact_id: matchArtifact.id, + archive_format: 'zip', + }); + var fs = require('fs'); + fs.writeFileSync('${{github.workspace}}/conformance-report.zip', Buffer.from(download.data)); + - run: unzip conformance-report.zip + # Read the metadata into environment variables. + - name: "Read github event" + run: echo "github_event=`cat event`" >> $GITHUB_ENV + - name: "Read PR number" + run: echo "pr_number=`cat pr`" >> $GITHUB_ENV + - name: "Read Github Ref" + run: echo "original_github_ref=`cat github_ref`" >> $GITHUB_ENV; + - name: "Read Github SHA" + run: echo "commit_sha=`cat sha`" >> $GITHUB_ENV; + - name: "Read Github Repository" + run: echo "original_github_repository=`cat github_repository`" >> $GITHUB_ENV; + - name: "Submit data to server" + run: | + response=$(curl -s -o temp_response.txt -w "%{http_code}" \ + -H "x-api-key: $API_KEY" \ + -H "event: ${{ env.github_event }}" \ + -H "sha: ${{ env.commit_sha }}" \ + -H "pr-number: ${{ env.pr_number }}" \ + -F "file=@${{env.commit_sha}}.json.bz2" \ + $SERVER_URL/upload) + + echo "Server response:" + cat temp_response.txt + echo "HTTP Status: $response" + if [ "$response" -gt 200 ]; then + echo "Server did not respond with status 200. Failing the workflow." + exit 1 + fi \ No newline at end of file diff --git a/.github/workflows/sparql-conformance.yml b/.github/workflows/sparql-conformance.yml new file mode 100644 index 0000000000..38152b5c79 --- /dev/null +++ b/.github/workflows/sparql-conformance.yml @@ -0,0 +1,86 @@ +name: sparql-conformance + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + merge_group: + +jobs: + build: + env: + compiler: clang + compiler-version: 16 + build-type: Release + cmake-flags: "-DCMAKE_C_COMPILER=clang-16 -DCMAKE_CXX_COMPILER=clang++-16" + runs-on: ubuntu-22.04 + steps: + - uses: actions/checkout@v3 + with: + submodules: "recursive" + path: qlever-code + - name: Checkout sparql-test-suite-files + uses: actions/checkout@v3 + with: + repository: "w3c/rdf-tests" + path: sparql-test-suite + - name: Checkout qlever-test-suite + uses: actions/checkout@v3 + with: + repository: "ad-freiburg/sparql-conformance" + path: qlever-test-suite + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.10" + - name: Install python dependencies + run: | + python -m pip install --upgrade pip + pip install requests + pip install rdflib + - name: Install dependencies + uses: ./qlever-code/.github/workflows/install-dependencies-ubuntu + - name: Install compiler + uses: ./qlever-code/.github/workflows/install-compiler-ubuntu + with: + compiler: "clang" + compiler-version: "16" + - name: Create build directory + run: mkdir ${{github.workspace}}/qlever-code/build + - name: Configure CMake + run: cmake -S ${{github.workspace}}/qlever-code/ -B ${{github.workspace}}/qlever-code/build ${{env.cmake-flags}} -DCMAKE_BUILD_TYPE=${{env.build-type}} -DLOGLEVEL=INFO -DUSE_PARALLEL=false + - name: Build IndexBuilderMain + run: cmake --build ${{github.workspace}}/qlever-code/build --target IndexBuilderMain --config ${{env.build-type}} -- -j $(nproc) + - name: Build ServerMain + run: cmake --build ${{github.workspace}}/qlever-code/build --target ServerMain --config ${{env.build-type}} -- -j $(nproc) + - name: Execute test suite + run: | + cd qlever-test-suite + python testsuite.py config http://0.0.0.0 7001 ${{github.workspace}}/sparql-test-suite/sparql/sparql11/ ${{github.workspace}}/qlever-code/build/ localhost sparql sparql + python testsuite.py extract + python testsuite.py ${{ github.sha }} + cd .. + - name: Save workflow information + # Note: If you change any of the filenames here, you also have to change them in `upload-conformance.yml` + run : | + mkdir -p conformance-report + echo ${{ github.event_name }} > ./conformance-report/event + echo ${{ github.event.number }} > ./conformance-report/pr + echo ${{ github.repository }} > ./conformance-report/github_repository + echo ${GITHUB_REF} > ./conformance-report/github_ref + - name: Save SHA and conformance report if it is a master commit. + if: github.event_name == 'push' + run : | + echo ${{github.sha}} > ./conformance-report/sha + mv ${{ github.workspace}}/qlever-test-suite/results/${{ github.sha }}.json.bz2 conformance-report/${{ github.sha }}.json.bz2 + - name: Save SHA and conformance report if it is a PR. + if: github.event_name == 'pull_request' + run : | + echo ${{github.event.pull_request.head.sha}} > ./conformance-report/sha + mv ${{ github.workspace}}/qlever-test-suite/results/${{ github.sha }}.json.bz2 conformance-report/${{ github.event.pull_request.head.sha }}.json.bz2 + - name: Upload coverage artifact + uses: actions/upload-artifact@v3 + with: + name: conformance-report + path: conformance-report/ \ No newline at end of file From 4acbca3e3c9e024bbda82ad701b395441f7003e2 Mon Sep 17 00:00:00 2001 From: Johannes Kalmbach Date: Thu, 17 Oct 2024 01:13:31 +0200 Subject: [PATCH 3/3] Multi-file import (#1537) So far, QLever read its input from a single file or from standard input. This made it hard to associate graph information per file. It also caused problems when parallel parsing was activated and a Turtle file did not have all its prefix declarations at the beginning. With this change, QLever can read its input from multiple input streams (files or pipes), and the streams are parsed concurrently. It can be specified separately for each stream which default graph to use for that stream and whether to use the parallel parser or not. Specifying a value for `"parallel-parsing"` in the `.settings.json` file is now deprecated. There will a corresponding change in https://github.com/ad-freiburg/qlever-control next that enables the convenient control of this new functionality from a `Qleverfile`. --- src/index/Index.cpp | 10 +- src/index/Index.h | 8 +- src/index/IndexBuilderMain.cpp | 193 +++++++++++++++++++++-------- src/index/IndexImpl.cpp | 55 +++++--- src/index/IndexImpl.h | 19 +-- src/index/InputFileSpecification.h | 32 +++++ src/parser/RdfParser.cpp | 114 ++++++++++++++++- src/parser/RdfParser.h | 104 ++++++++++++++-- test/GroupByTest.cpp | 3 +- test/IndexTest.cpp | 41 ++++++ test/RdfParserTest.cpp | 81 +++++++++++- test/util/IndexTestHelpers.cpp | 4 +- 12 files changed, 554 insertions(+), 110 deletions(-) create mode 100644 src/index/InputFileSpecification.h diff --git a/src/index/Index.cpp b/src/index/Index.cpp index 38cbfe52ed..0bb7d770fa 100644 --- a/src/index/Index.cpp +++ b/src/index/Index.cpp @@ -19,11 +19,6 @@ Index::Index(Index&&) noexcept = default; // https://stackoverflow.com/questions/13414652/forward-declaration-with-unique-ptr Index::~Index() = default; -// ____________________________________________________________________________ -void Index::createFromFile(const std::string& filename, Filetype type) { - pimpl_->createFromFile(filename, type); -} - // ____________________________________________________________________________ void Index::createFromOnDiskIndex(const std::string& onDiskBase) { pimpl_->createFromOnDiskIndex(onDiskBase); @@ -283,3 +278,8 @@ size_t Index::getResultSizeOfScan(const ScanSpecification& scanSpecification, const Permutation::Enum& permutation) const { return pimpl_->getResultSizeOfScan(scanSpecification, permutation); } + +// ____________________________________________________________________________ +void Index::createFromFiles(const std::vector& files) { + return pimpl_->createFromFiles(files); +} diff --git a/src/index/Index.h b/src/index/Index.h index e0c9f26e81..991d6573dc 100644 --- a/src/index/Index.h +++ b/src/index/Index.h @@ -11,6 +11,7 @@ #include "global/Id.h" #include "index/CompressedString.h" +#include "index/InputFileSpecification.h" #include "index/Permutation.h" #include "index/StringSortComparator.h" #include "index/Vocabulary.h" @@ -63,6 +64,9 @@ class Index { vector scores_; }; + using Filetype = qlever::Filetype; + using InputFileSpecification = qlever::InputFileSpecification; + /// Forbid copy and assignment. Index& operator=(const Index&) = delete; Index(const Index&) = delete; @@ -79,9 +83,7 @@ class Index { // Create an index from a file. Will write vocabulary and on-disk index data. // NOTE: The index can not directly be used after this call, but has to be // setup by `createFromOnDiskIndex` after this call. - enum class Filetype { Turtle, NQuad }; - void createFromFile(const std::string& filename, - Filetype filetype = Filetype::Turtle); + void createFromFiles(const std::vector& files); // Create an index object from an on-disk index that has previously been // constructed using the `createFromFile` method which is typically called via diff --git a/src/index/IndexBuilderMain.cpp b/src/index/IndexBuilderMain.cpp index ef861854c6..2e4173d305 100644 --- a/src/index/IndexBuilderMain.cpp +++ b/src/index/IndexBuilderMain.cpp @@ -49,6 +49,94 @@ void writeStxxlConfigFile(const string& location, const string& tail) { << STXXL_DISK_SIZE_INDEX_BUILDER << ",syscall\n"; } +// Check that `values` has exactly one or `numFiles` many entries. If +// `allowEmpty` is true, then an empty vector will also be accepted. If this +// condition is violated, throw an exception. This is used to validate the +// parameters for file types and default graphs. +static void checkNumParameterValues(const auto& values, size_t numFiles, + bool allowEmpty, + std::string_view parameterName) { + if (allowEmpty && values.empty()) { + return; + } + if (values.size() == 1 || values.size() == numFiles) { + return; + } + auto error = absl::StrCat( + "The parameter \"", parameterName, + "\" must be specified either exactly once (in which case it is " + "used for all input files) or exactly as many times as there are " + "input files, in which case each input file has its own value."); + if (allowEmpty) { + absl::StrAppend(&error, + " The parameter can also be omitted entirely, in which " + " case a default value is used for all input files."); + } + throw std::runtime_error{error}; +} + +// Convert the `filetype` string, which must be "ttl", "nt", or "nq" to the +// corresponding `qlever::Filetype` value. If no filetyp is given, try to deduce +// the type from the filename. +qlever::Filetype getFiletype(std::optional filetype, + std::string_view filename) { + auto impl = [](std::string_view s) -> std::optional { + if (s == "ttl" || s == "nt") { + return qlever::Filetype::Turtle; + } else if (s == "nq") { + return qlever::Filetype::NQuad; + } else { + return std::nullopt; + } + }; + if (filetype.has_value()) { + auto result = impl(filetype.value()); + if (result.has_value()) { + return result.value(); + } else { + throw std::runtime_error{ + absl::StrCat("The value of --file-format or -F must be one of " + "`ttl`, `nt`, or `nq`, but is `", + filetype.value(), "`")}; + } + } + + auto posOfDot = filename.rfind('.'); + auto throwNotDeducable = [&filename]() { + throw std::runtime_error{absl::StrCat( + "Could not deduce the file format from the filename \"", filename, + "\". Either use files with names that end on `.ttl`, `.nt`, or `.nq`, " + "or explicitly set the format of the file via --file-format or -F")}; + }; + if (posOfDot == std::string::npos) { + throwNotDeducable(); + } + auto deducedType = impl(filename.substr(posOfDot + 1)); + if (deducedType.has_value()) { + return deducedType.value(); + } else { + throwNotDeducable(); + } + // The following line is necessary because Clang and GCC currently can't + // deduce that the above `else` case always throws and there is currently no + // way to mark the `throwNotDeducable` lambda as `[[noreturn]]`. + AD_FAIL(); +} + +// Get the parameter value at the given index. If the vector is empty, return +// the given `defaultValue`. If the vector has exactly one element, return that +// element, no matter what the index is. +template +T getParameterValue(size_t idx, const auto& values, const T& defaultValue) { + if (values.empty()) { + return defaultValue; + } + if (values.size() == 1) { + return values.at(0); + } + return values.at(idx); +} + // Main function. int main(int argc, char** argv) { // Copy the git hash and datetime of compilation (which require relinking) @@ -67,8 +155,10 @@ int main(int argc, char** argv) { string textIndexName; string kbIndexName; string settingsFile; - string filetype; - string inputFile; + std::vector filetype; + std::vector inputFile; + std::vector defaultGraphs; + std::vector parseParallel; bool noPatterns = false; bool onlyAddTextIndex = false; bool keepTemporaryFiles = false; @@ -92,8 +182,19 @@ int main(int argc, char** argv) { "will read from stdin."); add("file-format,F", po::value(&filetype), "The format of the input file with the knowledge graph data. Must be one " - "of [nt|ttl|nq]. If not set, QLever will try to deduce it from the " - "filename suffix."); + "of [nt|ttl|nq]. Can be specified once (then all files use that format), " + "or once per file, or not at all (in that case, the format is deduced " + "from the filename suffix if possible)."); + add("default-graph,g", po::value(&defaultGraphs), + "The graph IRI without angle brackets. Write `-` for the default graph. " + "Can be omitted (then all files use the default graph), specified once " + "(then all files use that graph), or once per file."); + add("parse-parallel,p", po::value(&parseParallel), + "Enable or disable the parallel parser for all files (if specified once) " + "or once per input file. Parallel parsing works for all input files " + "using the N-Triples or N-Quads format, as well as for well-behaved " + "Turtle files, where all the prefix declarations come in one block at " + "the beginning and there are no multiline literals"); add("kg-index-name,K", po::value(&kbIndexName), "The name of the knowledge graph index (default: basename of " "`kg-input-file`)."); @@ -157,8 +258,8 @@ int main(int argc, char** argv) { // If no index name was specified, take the part of the input file name after // the last slash. - if (kbIndexName.empty() && !inputFile.empty()) { - kbIndexName = ad_utility::getLastPartOfString(inputFile, '/'); + if (kbIndexName.empty()) { + kbIndexName = "no index name specified"; } LOG(INFO) << EMPH_ON << "QLever IndexBuilder, compiled on " @@ -181,59 +282,43 @@ int main(int argc, char** argv) { index.setKeepTempFiles(keepTemporaryFiles); index.setSettingsFile(settingsFile); index.loadAllPermutations() = !onlyPsoAndPos; - // NOTE: If `onlyAddTextIndex` is true, we do not want to construct an - // index, but we assume that it already exists. In particular, we then need - // the vocabulary from the KB index for building the text index. - if (!onlyAddTextIndex) { - if (inputFile.empty() || inputFile == "-") { - inputFile = "/dev/stdin"; - } - if (!filetype.empty()) { - LOG(INFO) << "You specified the input format: " - << ad_utility::getUppercase(filetype) << std::endl; - } else { - bool filetypeDeduced = false; - if (inputFile.ends_with(".nt")) { - filetype = "nt"; - filetypeDeduced = true; - } else if (inputFile.ends_with(".ttl")) { - filetype = "ttl"; - filetypeDeduced = true; - } else if (inputFile.ends_with(".nq")) { - filetype = "nq"; - filetypeDeduced = true; - } else { - LOG(INFO) << "Unknown or missing extension of input file, assuming: " - "TTL" - << std::endl; + // Convert the parameters for the filenames, file types, and default graphs + // into a `vector`. + auto getFileSpecifications = [&]() { + checkNumParameterValues(filetype, inputFile.size(), true, + "--file-format, -F"); + checkNumParameterValues(defaultGraphs, inputFile.size(), true, + "--default-graph, -g"); + checkNumParameterValues(parseParallel, parseParallel.size(), true, + "--parse-parallel, p"); + + std::vector fileSpecs; + for (size_t i = 0; i < inputFile.size(); ++i) { + auto type = getParameterValue>( + i, filetype, std::nullopt); + + auto defaultGraph = getParameterValue>( + i, defaultGraphs, std::nullopt); + if (defaultGraph == "-") { + defaultGraph = std::nullopt; } - if (filetypeDeduced) { - LOG(INFO) << "Format of input file deduced from extension: " - << ad_utility::getUppercase(filetype) << std::endl; + + bool parseInParallel = getParameterValue(i, parseParallel, false); + auto& filename = inputFile.at(i); + if (filename == "-") { + filename = "/dev/stdin"; } - LOG(INFO) << "If this is not correct, start again using the option " - "--file-format (-F)" - << std::endl; + fileSpecs.emplace_back(filename, getFiletype(type, filename), + std::move(defaultGraph), parseInParallel); } + return fileSpecs; + }; - if (filetype == "ttl") { - LOG(DEBUG) << "Parsing uncompressed TTL from: " << inputFile - << std::endl; - index.createFromFile(inputFile, Index::Filetype::Turtle); - } else if (filetype == "nt") { - LOG(DEBUG) << "Parsing uncompressed N-Triples from: " << inputFile - << " (using the Turtle parser)" << std::endl; - index.createFromFile(inputFile, Index::Filetype::Turtle); - } else if (filetype == "nq") { - LOG(DEBUG) << "Parsing uncompressed N-Quads from: " << inputFile - << std::endl; - index.createFromFile(inputFile, Index::Filetype::NQuad); - } else { - LOG(ERROR) << "File format must be one of: nt ttl nq" << std::endl; - std::cerr << boostOptions << std::endl; - exit(1); - } + if (!onlyAddTextIndex) { + auto fileSpecifications = getFileSpecifications(); + AD_CONTRACT_CHECK(!fileSpecifications.empty()); + index.createFromFiles(fileSpecifications); } if (!wordsfile.empty() || addWordsFromLiterals) { diff --git a/src/index/IndexImpl.cpp b/src/index/IndexImpl.cpp index 35e6f773c3..c7eb2e8a4d 100644 --- a/src/index/IndexImpl.cpp +++ b/src/index/IndexImpl.cpp @@ -68,28 +68,18 @@ IndexBuilderDataAsFirstPermutationSorter IndexImpl::createIdTriplesAndVocab( // _____________________________________________________________________________ std::unique_ptr IndexImpl::makeRdfParser( - const std::string& filename, Index::Filetype type) const { + const std::vector& files) const { auto makeRdfParserImpl = - [&filename]() - -> std::unique_ptr { + [&files]() -> std::unique_ptr { using TokenizerT = std::conditional_t; - using InnerParser = - std::conditional_t, - NQuadParser>; - using Parser = - std::conditional_t, - RdfStreamParser>; - return std::make_unique(filename); + return std::make_unique>(files); }; // `callFixedSize` litfts runtime integers to compile time integers. We use it // here to create the correct combinations of template arguments. - return ad_utility::callFixedSize( - std::array{useParallelParser_ ? 1 : 0, - type == Index::Filetype::Turtle ? 1 : 0, - onlyAsciiTurtlePrefixes_ ? 1 : 0}, - makeRdfParserImpl); + return ad_utility::callFixedSize(std::array{onlyAsciiTurtlePrefixes_ ? 1 : 0}, + makeRdfParserImpl); } // Several helper functions for joining the OSP permutation with the patterns. @@ -297,18 +287,45 @@ std::pair IndexImpl::createInternalPSOandPOS( } // _____________________________________________________________________________ -void IndexImpl::createFromFile(const string& filename, Index::Filetype type) { +void IndexImpl::updateInputFileSpecificationsAndLog( + std::vector& spec, + bool parallelParsingSpecifiedViaJson) { + if (spec.size() == 1) { + LOG(INFO) << "Processing triples from " << spec.at(0).filename_ << " ..." + << std::endl; + } else { + LOG(INFO) << "Processing triples from " << spec.size() + << " input streams ..." << std::endl; + } + if (parallelParsingSpecifiedViaJson) { + if (spec.size() == 1) { + LOG(WARN) << "Parallel parsing set to `true` in the `.settings.json` " + "file; this is deprecated, please use the command-line " + " option --parse-parallel or -p instead" + << std::endl; + spec.at(0).parseInParallel_ = true; + } else { + throw std::runtime_error{ + "For more than one input file, the parallel parsing must not be " + "specified via the `.settings.json` file, but has to be specified " + " via the command-line option --parse-parallel or -p"}; + } + } +} + +// _____________________________________________________________________________ +void IndexImpl::createFromFiles( + std::vector files) { if (!loadAllPermutations_ && usePatterns_) { throw std::runtime_error{ "The patterns can only be built when all 6 permutations are created"}; } - LOG(INFO) << "Processing input triples from " << filename << " ..." - << std::endl; readIndexBuilderSettingsFromFile(); + updateInputFileSpecificationsAndLog(files, useParallelParser_); IndexBuilderDataAsFirstPermutationSorter indexBuilderData = - createIdTriplesAndVocab(makeRdfParser(filename, type)); + createIdTriplesAndVocab(makeRdfParser(files)); // Write the configuration already at this point, so we have it available in // case any of the permutations fail. diff --git a/src/index/IndexImpl.h b/src/index/IndexImpl.h index de108ab660..fb5da51e1b 100644 --- a/src/index/IndexImpl.h +++ b/src/index/IndexImpl.h @@ -119,7 +119,7 @@ class IndexImpl { string onDiskBase_; string settingsFileName_; bool onlyAsciiTurtlePrefixes_ = false; - bool useParallelParser_ = true; + bool useParallelParser_ = false; TurtleParserIntegerOverflowBehavior turtleParserIntegerOverflowBehavior_ = TurtleParserIntegerOverflowBehavior::Error; bool turtleParserSkipIllegalLiterals_ = false; @@ -228,12 +228,11 @@ class IndexImpl { Permutation& getPermutation(Permutation::Enum p); const Permutation& getPermutation(Permutation::Enum p) const; - // Creates an index from a file. Parameter Parser must be able to split the - // file's format into triples. - // Will write vocabulary and on-disk index data. + // Creates an index from a given set of input files. Will write vocabulary and + // on-disk index data. // !! The index can not directly be used after this call, but has to be setup // by createFromOnDiskIndex after this call. - void createFromFile(const string& filename, Index::Filetype type); + void createFromFiles(std::vector files); // Creates an index object from an on disk index that has previously been // constructed. Read necessary meta data into memory and opens file handles. @@ -474,8 +473,8 @@ class IndexImpl { // configured to either parse in parallel or not, and to either use the // CTRE-based relaxed parser or not, depending on the settings of the // corresponding member variables. - std::unique_ptr makeRdfParser(const std::string& filename, - Index::Filetype type) const; + std::unique_ptr makeRdfParser( + const std::vector& files) const; FirstPermutationSorterAndInternalTriplesAsPso convertPartialToGlobalIds( TripleVec& data, const vector& actualLinesPerPartial, @@ -762,4 +761,10 @@ class IndexImpl { // them. void addInternalStatisticsToConfiguration(size_t numTriplesInternal, size_t numPredicatesInternal); + + // Update `InputFileSpecification` based on `parallelParsingSpecifiedViaJson` + // and write a summary to the log. + static void updateInputFileSpecificationsAndLog( + std::vector& spec, + bool parallelParsingSpecifiedViaJson); }; diff --git a/src/index/InputFileSpecification.h b/src/index/InputFileSpecification.h new file mode 100644 index 0000000000..9b5afde367 --- /dev/null +++ b/src/index/InputFileSpecification.h @@ -0,0 +1,32 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Johannes Kalmbach(joka921) +// +#pragma once + +#include +#include +namespace qlever { + +// An enum to distinguish between `Turtle` and `NQuad` files. +enum class Filetype { Turtle, NQuad }; + +// Specify a single input file or stream for the index builder. +struct InputFileSpecification { + std::string filename_; + Filetype filetype_; + // All triples that don't have a dedicated graph (either because the input + // format is N-Triples or Turtle, or because the corresponding line in the + // N-Quads format has no explicit graph) will be stored in this graph. The + // graph has to be specified without angle brackets. If set to `nullopt`, the + // global default graph will be used. + std::optional defaultGraph_; + + // If set to `true`, then the parallel RDF parser will be used for this file. + // This always works for N-Triples and N-Quads files, and for well-behaved + // Turtle files with all prefixes at the beginning and no multiline literals. + bool parseInParallel_ = false; + + bool operator==(const InputFileSpecification&) const = default; +}; +} // namespace qlever diff --git a/src/parser/RdfParser.cpp b/src/parser/RdfParser.cpp index 2da16fe402..9cba9f1455 100644 --- a/src/parser/RdfParser.cpp +++ b/src/parser/RdfParser.cpp @@ -1,7 +1,6 @@ // Copyright 2018, University of Freiburg, // Chair of Algorithms and Data Structures. // Author: Johannes Kalmbach(joka921) -// #include "parser/RdfParser.h" @@ -11,6 +10,7 @@ #include #include +#include "engine/CallFixedSize.h" #include "global/Constants.h" #include "parser/GeoPoint.h" #include "parser/NormalizedString.h" @@ -830,8 +830,9 @@ void RdfStreamParser::initialize(const string& filename) { } } +// _____________________________________________________________________________ template -bool RdfStreamParser::getLine(TurtleTriple* triple) { +bool RdfStreamParser::getLineImpl(TurtleTriple* triple) { if (triples_.empty()) { // if parsing the line fails because our buffer ends before the end of // the next statement we need to be able to recover @@ -944,7 +945,7 @@ template void RdfParallelParser::parseBatch(size_t parsePosition, auto batch) { try { - RdfStringParser parser; + RdfStringParser parser{defaultGraphIri_}; parser.prefixMap_ = this->prefixMap_; parser.setPositionOffset(parsePosition); parser.setInputStream(std::move(batch)); @@ -1037,7 +1038,7 @@ void RdfParallelParser::initialize(const string& filename) { // _______________________________________________________________________ template -bool RdfParallelParser::getLine(TurtleTriple* triple) { +bool RdfParallelParser::getLineImpl(TurtleTriple* triple) { // If the current batch is out of triples_ get the next batch of triples. // We need a while loop instead of a simple if in case there is a batch that // contains no triples. (Theoretically this might happen, and it is safer this @@ -1087,6 +1088,109 @@ RdfParallelParser::~RdfParallelParser() { "During the destruction of a RdfParallelParser"); } +// Create a parser for a single file of an `InputFileSpecification`. The type +// of the parser depends on the filetype (Turtle or N-Quads) and on whether the +// file is to be parsed in parallel. +template +static std::unique_ptr makeSingleRdfParser( + const Index::InputFileSpecification& file) { + auto graph = [file]() -> TripleComponent { + if (file.defaultGraph_.has_value()) { + return TripleComponent::Iri::fromIrirefWithoutBrackets( + file.defaultGraph_.value()); + } else { + return qlever::specialIds().at(DEFAULT_GRAPH_IRI); + } + }; + auto makeRdfParserImpl = [&filename = file.filename_, + &graph]() + -> std::unique_ptr { + using InnerParser = + std::conditional_t, + NQuadParser>; + using Parser = + std::conditional_t, + RdfStreamParser>; + return std::make_unique(filename, graph()); + }; + + // The call to `callFixedSize` lifts runtime integers to compile time + // integers. We use it here to create the correct combination of template + // arguments. + return ad_utility::callFixedSize( + std::array{file.parseInParallel_ ? 1 : 0, + file.filetype_ == Index::Filetype::Turtle ? 1 : 0}, + makeRdfParserImpl); +} + +// ______________________________________________________________ +template +RdfMultifileParser::RdfMultifileParser( + const std::vector& files) { + using namespace qlever; + // This lambda parses a single file and pushes the results and all occurring + // exceptions to the `finishedBatchQueue_`. + auto parseFile = [this](const InputFileSpecification& file) { + try { + auto parser = makeSingleRdfParser(file); + while (auto batch = parser->getBatch()) { + bool active = finishedBatchQueue_.push(std::move(batch.value())); + if (!active) { + // The queue was finished prematurely, stop this thread. This is + // important to avoid deadlocks. + return; + } + } + } catch (...) { + finishedBatchQueue_.pushException(std::current_exception()); + return; + } + if (numActiveParsers_.fetch_sub(1) == 1) { + // We are the last parser, we have to notify the downstream code that the + // input has been parsed completely. + finishedBatchQueue_.finish(); + } + }; + + // Feed all the input files to the `parsingQueue_`. + auto makeParsers = [files, this, parseFile]() { + for (const auto& file : files) { + numActiveParsers_++; + bool active = parsingQueue_.push(std::bind_front(parseFile, file)); + if (!active) { + // The queue was finished prematurely, stop this thread. This is + // important to avoid deadlocks. + return; + } + } + parsingQueue_.finish(); + }; + feederThread_ = ad_utility::JThread{makeParsers}; +} + +// _____________________________________________________________________________ +template +RdfMultifileParser::~RdfMultifileParser() { + ad_utility::ignoreExceptionIfThrows( + [this] { + parsingQueue_.finish(); + finishedBatchQueue_.finish(); + }, + "During the destruction of an RdfMultifileParser"); +} + +//______________________________________________________________________________ +template +bool RdfMultifileParser::getLineImpl(TurtleTriple*) { + AD_FAIL(); +} + +// _____________________________________________________________________________ +template +std::optional> RdfMultifileParser::getBatch() { + return finishedBatchQueue_.pop(); +} + // Explicit instantiations template class TurtleParser; template class TurtleParser; @@ -1098,3 +1202,5 @@ template class RdfStreamParser>; template class RdfStreamParser>; template class RdfParallelParser>; template class RdfParallelParser>; +template class RdfMultifileParser; +template class RdfMultifileParser; diff --git a/src/parser/RdfParser.h b/src/parser/RdfParser.h index 7e350299b5..ca55c61993 100644 --- a/src/parser/RdfParser.h +++ b/src/parser/RdfParser.h @@ -1,6 +1,6 @@ -// Copyright 2018, University of Freiburg, +// Copyright 2018 - 2024, University of Freiburg, // Chair of Algorithms and Data Structures. -// Author: Johannes Kalmbach(joka921) +// Author: Johannes Kalmbach #pragma once @@ -17,6 +17,7 @@ #include "global/Constants.h" #include "global/SpecialIds.h" #include "index/ConstantsIndexBuilding.h" +#include "index/InputFileSpecification.h" #include "parser/ParallelBuffer.h" #include "parser/Tokenizer.h" #include "parser/TokenizerCtre.h" @@ -59,7 +60,7 @@ class RdfParserBase { public: virtual ~RdfParserBase() = default; // Wrapper to getLine that is expected by the rest of QLever - bool getLine(TurtleTriple& triple) { return getLine(&triple); } + bool getLine(TurtleTriple& triple) { return getLineImpl(&triple); } virtual TurtleParserIntegerOverflowBehavior& integerOverflowBehavior() final { return integerOverflowBehavior_; @@ -82,7 +83,7 @@ class RdfParserBase { // Writes the triple to the argument (format subject, object predicate) // returns true iff a triple can be successfully written, else the triple // value is invalid and the parser is at the end of the input. - virtual bool getLine(TurtleTriple* triple) = 0; + virtual bool getLineImpl(TurtleTriple* triple) = 0; // Get the offset (relative to the beginning of the file) of the first byte // that has not yet been dealt with by the parser. @@ -195,6 +196,7 @@ class TurtleParser : public RdfParserBase { std::string activePrefix_; TripleComponent activeSubject_; TripleComponent::Iri activePredicate_; + TripleComponent defaultGraphIri_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); size_t numBlankNodes_ = 0; bool currentTripleIgnoredBecauseOfInvalidLiteral_ = false; @@ -206,6 +208,8 @@ class TurtleParser : public RdfParserBase { public: TurtleParser() = default; + explicit TurtleParser(TripleComponent defaultGraphIri) + : defaultGraphIri_{std::move(defaultGraphIri)} {} TurtleParser(TurtleParser&& rhs) noexcept = default; TurtleParser& operator=(TurtleParser&& rhs) noexcept = default; @@ -347,7 +351,8 @@ class TurtleParser : public RdfParserBase { // ______________________________________________________________________________________ void emitTriple() { if (!currentTripleIgnoredBecauseOfInvalidLiteral_) { - triples_.emplace_back(activeSubject_, activePredicate_, lastParseResult_); + triples_.emplace_back(activeSubject_, activePredicate_, lastParseResult_, + defaultGraphIri_); } currentTripleIgnoredBecauseOfInvalidLiteral_ = false; } @@ -406,12 +411,16 @@ class TurtleParser : public RdfParserBase { template class NQuadParser : public TurtleParser { - static inline const TripleComponent defaultGraphId_ = - qlever::specialIds().at(DEFAULT_GRAPH_IRI); + TripleComponent defaultGraphId_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); TripleComponent activeObject_; TripleComponent activeGraphLabel_; using Base = TurtleParser; + public: + NQuadParser() = default; + explicit NQuadParser(TripleComponent defaultGraphId) + : defaultGraphId_{std::move(defaultGraphId)} {} + protected: bool statement() override; @@ -432,7 +441,10 @@ class RdfStringParser : public Parser { public: using Parser::getLine; using Parser::prefixMap_; - bool getLine(TurtleTriple* triple) override { + RdfStringParser() = default; + explicit RdfStringParser(TripleComponent defaultGraph) + : Parser{std::move(defaultGraph)} {} + bool getLineImpl(TurtleTriple* triple) override { (void)triple; throw std::runtime_error( "RdfStringParser doesn't support calls to getLine. Only use " @@ -554,13 +566,16 @@ class RdfStreamParser : public Parser { public: // Default construction needed for tests RdfStreamParser() = default; - explicit RdfStreamParser(const string& filename) { - LOG(DEBUG) << "Initialize turtle parsing from uncompressed file or stream " + explicit RdfStreamParser(const string& filename, + TripleComponent defaultGraphIri = + qlever::specialIds().at(DEFAULT_GRAPH_IRI)) + : Parser{std::move(defaultGraphIri)} { + LOG(DEBUG) << "Initialize RDF parsing from uncompressed file or stream " << filename << std::endl; initialize(filename); } - bool getLine(TurtleTriple* triple) override; + bool getLineImpl(TurtleTriple* triple) override; void initialize(const string& filename); @@ -625,10 +640,17 @@ class RdfParallelParser : public Parser { initialize(filename); } + // Construct a parser from a file and a given default graph iri. + RdfParallelParser(const string& filename, + const TripleComponent& defaultGraphIri) + : Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} { + initialize(filename); + } + // inherit the wrapper overload using Parser::getLine; - bool getLine(TurtleTriple* triple) override; + bool getLineImpl(TurtleTriple* triple) override; std::optional> getBatch() override; @@ -683,5 +705,61 @@ class RdfParallelParser : public Parser { std::atomic batchIdx_ = 0; std::atomic numBatchesTotal_ = 0; - std::chrono::milliseconds sleepTimeForTesting_; + TripleComponent defaultGraphIri_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); + + std::chrono::milliseconds sleepTimeForTesting_{0}; +}; + +// This class is an RDF parser that parses multiple files in parallel. Each +// file is specified by an `InputFileSpecification`. +template +class RdfMultifileParser : public RdfParserBase { + public: + // Default construction needed for tests + RdfMultifileParser() = default; + + // Construct the parser from a vector of file specifications and eagerly start + // parsing them on background threads. + explicit RdfMultifileParser( + const std::vector& files); + + // This function is needed for the interface, but always throws an exception. + // `getBatch` (below) has to be used instead. + bool getLineImpl(TurtleTriple* triple) override; + + // Retrieve the next batch of triples, or `nullopt` if there are no more + // batches. There is no guarantee about the order in which batches from + // different input files are returned, but each batch belongs to a distinct + // input file. + std::optional> getBatch() override; + + size_t getParsePosition() const override { + // TODO: This function is used for better error messages, but we currently + // have no good way to implement it for this parser. Further analyze this. + return 0; + } + + // The destructor has to clean up all the parallel structures that might be + // still running in the background, especially when it is called before the + // parsing has finished (e.g. in case of an exception in the code that uses + // the parser). + ~RdfMultifileParser() override; + + private: + // A thread that feeds the file specifications to the actual parser threads. + ad_utility::JThread feederThread_; + // The buffer for the finished batches. + ad_utility::data_structures::ThreadSafeQueue> + finishedBatchQueue_{10}; + // This queue manages its own worker threads. Each task consists of a single + // file that is to be parsed. The parsed results are then pushed to the + // `finishedBatchQueue_` above. Note: It is important, that the + // `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when + // destroying the parser, the threads from the `parsingQueue_` are all joined + // before the `finishedBatchQueue_` (which they are using!) is destroyed. + ad_utility::TaskQueue parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS}; + + // The number of parsers that have started, but not yet finished. This is + // needed to detect the complete parsing. + std::atomic numActiveParsers_ = 0; }; diff --git a/test/GroupByTest.cpp b/test/GroupByTest.cpp index 0fc1b0dcd6..bbfc0ae6e7 100644 --- a/test/GroupByTest.cpp +++ b/test/GroupByTest.cpp @@ -71,7 +71,8 @@ class GroupByTest : public ::testing::Test { _index.setKbName("group_by_test"); _index.setTextName("group_by_test"); _index.setOnDiskBase("group_ty_test"); - _index.createFromFile("group_by_test.nt"); + _index.createFromFiles( + {{"group_by_test.nt", qlever::Filetype::Turtle, std::nullopt}}); _index.addTextFromContextFile("group_by_test.words", false); _index.buildDocsDB("group_by_test.documents"); diff --git a/test/IndexTest.cpp b/test/IndexTest.cpp index 0d45c4aecb..6ed660aae9 100644 --- a/test/IndexTest.cpp +++ b/test/IndexTest.cpp @@ -486,3 +486,44 @@ TEST(IndexTest, trivialGettersAndSetters) { EXPECT_EQ(index.memoryLimitIndexBuilding(), 7_kB); EXPECT_EQ(std::as_const(index).memoryLimitIndexBuilding(), 7_kB); } + +TEST(IndexTest, loggingAndSettingOfParallelParsing) { + using enum qlever::Filetype; + std::vector files{ + {"singleFile.ttl", Turtle, std::nullopt, false}}; + testing::internal::CaptureStdout(); + using namespace ::testing; + { + IndexImpl::updateInputFileSpecificationsAndLog(files, false); + EXPECT_THAT( + testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from singleFile.ttl"), Not(HasSubstr("parallel")))); + EXPECT_FALSE(files.at(0).parseInParallel_); + } + + { + testing::internal::CaptureStdout(); + IndexImpl::updateInputFileSpecificationsAndLog(files, true); + EXPECT_THAT(testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from singleFile.ttl"), HasSubstr("deprecated"), + HasSubstr("--parse-parallel"))); + EXPECT_TRUE(files.at(0).parseInParallel_); + } + + { + files.emplace_back("secondFile.ttl", Turtle, std::nullopt, false); + auto filesCopy = files; + testing::internal::CaptureStdout(); + IndexImpl::updateInputFileSpecificationsAndLog(files, false); + EXPECT_THAT(testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from 2 input streams"), + Not(HasSubstr("is deprecated")))); + EXPECT_EQ(files, filesCopy); + } + + { + AD_EXPECT_THROW_WITH_MESSAGE( + IndexImpl::updateInputFileSpecificationsAndLog(files, true), + HasSubstr("but has to be specified")); + } +} diff --git a/test/RdfParserTest.cpp b/test/RdfParserTest.cpp index 9be3623f84..f917745d3c 100644 --- a/test/RdfParserTest.cpp +++ b/test/RdfParserTest.cpp @@ -768,7 +768,13 @@ TEST(RdfParserTest, iriref) { template std::vector parseFromFile(const std::string& filename, bool useBatchInterface) { - Parser parserChild{filename}; + auto parserChild = [&]() { + if constexpr (ad_utility::isInstantiation) { + return Parser{{{filename, qlever::Filetype::Turtle, std::nullopt}}}; + } else { + return Parser{filename}; + } + }(); RdfParserBase& parser = parserChild; std::vector result; @@ -800,6 +806,12 @@ auto forAllParallelParsers(const auto& function, const auto&... args) { function.template operator()>>( false, args...); } +auto forAllMultifileParsers(const auto& function, const auto&... args) { + function.template operator()>(true, args...); + function.template operator()>(true, + args...); +} + auto forAllParsers(const auto& function, const auto&... args) { function.template operator()>>( true, args...); @@ -810,6 +822,7 @@ auto forAllParsers(const auto& function, const auto&... args) { function.template operator()>>( false, args...); forAllParallelParsers(function, args...); + forAllMultifileParsers(function, args...); } TEST(RdfParserTest, TurtleStreamAndParallelParser) { @@ -833,6 +846,7 @@ TEST(RdfParserTest, TurtleStreamAndParallelParser) { }; forAllParsers(testWithParser); + forAllMultifileParsers(testWithParser); ad_utility::deleteFile(filename); } @@ -854,6 +868,7 @@ TEST(RdfParserTest, emptyInput) { forAllParsers(testWithParser, ""); std::string onlyPrefixes = "PREFIX bim: "; forAllParsers(testWithParser, onlyPrefixes); + forAllMultifileParsers(testWithParser); } // ________________________________________________________________________ @@ -967,7 +982,13 @@ TEST(RdfParserTest, stopParsingOnOutsideFailure) { } ad_utility::Timer t{ad_utility::Timer::Stopped}; { - [[maybe_unused]] Parser parserChild{filename, 10ms}; + [[maybe_unused]] Parser parserChild = [&]() { + if constexpr (ad_utility::isInstantiation) { + return Parser{{{filename, qlever::Filetype::Turtle, std::nullopt}}}; + } else { + return Parser{filename, 10ms}; + } + }(); t.cont(); } EXPECT_LE(t.msecs(), 20ms); @@ -983,6 +1004,7 @@ TEST(RdfParserTest, stopParsingOnOutsideFailure) { }(); FILE_BUFFER_SIZE = 40; forAllParallelParsers(testWithParser, input); + forAllMultifileParsers(testWithParser, input); } // _____________________________________________________________________________ @@ -1022,14 +1044,67 @@ TEST(RdfParserTest, nQuadParser) { runTestsForParser(NQuadCtreParser{}); } +// _____________________________________________________________________________ TEST(RdfParserTest, noGetlineInStringParser) { auto runTestsForParser = [](auto parser) { parser.setInputStream("

."); TurtleTriple t; - EXPECT_ANY_THROW(parser.getLine(&t)); + EXPECT_ANY_THROW(parser.getLine(t)); }; runTestsForParser(NQuadRe2Parser{}); runTestsForParser(NQuadCtreParser{}); runTestsForParser(Re2Parser{}); runTestsForParser(CtreParser{}); } + +// _____________________________________________________________________________ +TEST(RdfParserTest, noGetlineInMultifileParsers) { + auto runTestsForParser = + []([[maybe_unused]] bool interface) { + Parser parser{}; + TurtleTriple t; + // Also test the dummy parse position member. + EXPECT_EQ(parser.getParsePosition(), 0u); + EXPECT_ANY_THROW(parser.getLine(t)); + }; + forAllMultifileParsers(runTestsForParser); +} + +// _____________________________________________________________________________ +TEST(RdfParserTest, multifileParser) { + auto impl = [](bool useParallelParser) { + std::vector expected; + std::string ttl = " . ."; + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + std::string nq = " . ."; + expected.push_back( + TurtleTriple{iri(""), iri(""), iri(""), iri("")}); + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + std::string file1 = "multifileParserTest1.ttl"; + std::string file2 = "multifileParserTest2.nq"; + { + auto f = ad_utility::makeOfstream(file1); + f << ttl; + } + { + auto f = ad_utility::makeOfstream(file2); + f << nq; + } + std::vector specs; + specs.emplace_back(file1, qlever::Filetype::Turtle, "defaultGraphTTL", + useParallelParser); + specs.emplace_back(file2, qlever::Filetype::NQuad, "defaultGraphNQ", + useParallelParser); + Parser p{specs}; + std::vector result; + while (auto batch = p.getBatch()) { + std::ranges::copy(batch.value(), std::back_inserter(result)); + } + EXPECT_THAT(result, ::testing::UnorderedElementsAreArray(expected)); + }; + forAllMultifileParsers(impl); +} diff --git a/test/util/IndexTestHelpers.cpp b/test/util/IndexTestHelpers.cpp index 91c2b2c615..d63e8fde18 100644 --- a/test/util/IndexTestHelpers.cpp +++ b/test/util/IndexTestHelpers.cpp @@ -173,7 +173,9 @@ Index makeTestIndex(const std::string& indexBasename, index.usePatterns() = usePatterns; index.setSettingsFile(inputFilename + ".settings.json"); index.loadAllPermutations() = loadAllPermutations; - index.createFromFile(inputFilename); + qlever::InputFileSpecification spec{inputFilename, qlever::Filetype::Turtle, + std::nullopt}; + index.createFromFiles({spec}); if (createTextIndex) { index.addTextFromContextFile("", true); }