diff --git a/src/index/Index.cpp b/src/index/Index.cpp index 38cbfe52ed..0bb7d770fa 100644 --- a/src/index/Index.cpp +++ b/src/index/Index.cpp @@ -19,11 +19,6 @@ Index::Index(Index&&) noexcept = default; // https://stackoverflow.com/questions/13414652/forward-declaration-with-unique-ptr Index::~Index() = default; -// ____________________________________________________________________________ -void Index::createFromFile(const std::string& filename, Filetype type) { - pimpl_->createFromFile(filename, type); -} - // ____________________________________________________________________________ void Index::createFromOnDiskIndex(const std::string& onDiskBase) { pimpl_->createFromOnDiskIndex(onDiskBase); @@ -283,3 +278,8 @@ size_t Index::getResultSizeOfScan(const ScanSpecification& scanSpecification, const Permutation::Enum& permutation) const { return pimpl_->getResultSizeOfScan(scanSpecification, permutation); } + +// ____________________________________________________________________________ +void Index::createFromFiles(const std::vector& files) { + return pimpl_->createFromFiles(files); +} diff --git a/src/index/Index.h b/src/index/Index.h index e0c9f26e81..991d6573dc 100644 --- a/src/index/Index.h +++ b/src/index/Index.h @@ -11,6 +11,7 @@ #include "global/Id.h" #include "index/CompressedString.h" +#include "index/InputFileSpecification.h" #include "index/Permutation.h" #include "index/StringSortComparator.h" #include "index/Vocabulary.h" @@ -63,6 +64,9 @@ class Index { vector scores_; }; + using Filetype = qlever::Filetype; + using InputFileSpecification = qlever::InputFileSpecification; + /// Forbid copy and assignment. Index& operator=(const Index&) = delete; Index(const Index&) = delete; @@ -79,9 +83,7 @@ class Index { // Create an index from a file. Will write vocabulary and on-disk index data. // NOTE: The index can not directly be used after this call, but has to be // setup by `createFromOnDiskIndex` after this call. - enum class Filetype { Turtle, NQuad }; - void createFromFile(const std::string& filename, - Filetype filetype = Filetype::Turtle); + void createFromFiles(const std::vector& files); // Create an index object from an on-disk index that has previously been // constructed using the `createFromFile` method which is typically called via diff --git a/src/index/IndexBuilderMain.cpp b/src/index/IndexBuilderMain.cpp index ef861854c6..2e4173d305 100644 --- a/src/index/IndexBuilderMain.cpp +++ b/src/index/IndexBuilderMain.cpp @@ -49,6 +49,94 @@ void writeStxxlConfigFile(const string& location, const string& tail) { << STXXL_DISK_SIZE_INDEX_BUILDER << ",syscall\n"; } +// Check that `values` has exactly one or `numFiles` many entries. If +// `allowEmpty` is true, then an empty vector will also be accepted. If this +// condition is violated, throw an exception. This is used to validate the +// parameters for file types and default graphs. +static void checkNumParameterValues(const auto& values, size_t numFiles, + bool allowEmpty, + std::string_view parameterName) { + if (allowEmpty && values.empty()) { + return; + } + if (values.size() == 1 || values.size() == numFiles) { + return; + } + auto error = absl::StrCat( + "The parameter \"", parameterName, + "\" must be specified either exactly once (in which case it is " + "used for all input files) or exactly as many times as there are " + "input files, in which case each input file has its own value."); + if (allowEmpty) { + absl::StrAppend(&error, + " The parameter can also be omitted entirely, in which " + " case a default value is used for all input files."); + } + throw std::runtime_error{error}; +} + +// Convert the `filetype` string, which must be "ttl", "nt", or "nq" to the +// corresponding `qlever::Filetype` value. If no filetyp is given, try to deduce +// the type from the filename. +qlever::Filetype getFiletype(std::optional filetype, + std::string_view filename) { + auto impl = [](std::string_view s) -> std::optional { + if (s == "ttl" || s == "nt") { + return qlever::Filetype::Turtle; + } else if (s == "nq") { + return qlever::Filetype::NQuad; + } else { + return std::nullopt; + } + }; + if (filetype.has_value()) { + auto result = impl(filetype.value()); + if (result.has_value()) { + return result.value(); + } else { + throw std::runtime_error{ + absl::StrCat("The value of --file-format or -F must be one of " + "`ttl`, `nt`, or `nq`, but is `", + filetype.value(), "`")}; + } + } + + auto posOfDot = filename.rfind('.'); + auto throwNotDeducable = [&filename]() { + throw std::runtime_error{absl::StrCat( + "Could not deduce the file format from the filename \"", filename, + "\". Either use files with names that end on `.ttl`, `.nt`, or `.nq`, " + "or explicitly set the format of the file via --file-format or -F")}; + }; + if (posOfDot == std::string::npos) { + throwNotDeducable(); + } + auto deducedType = impl(filename.substr(posOfDot + 1)); + if (deducedType.has_value()) { + return deducedType.value(); + } else { + throwNotDeducable(); + } + // The following line is necessary because Clang and GCC currently can't + // deduce that the above `else` case always throws and there is currently no + // way to mark the `throwNotDeducable` lambda as `[[noreturn]]`. + AD_FAIL(); +} + +// Get the parameter value at the given index. If the vector is empty, return +// the given `defaultValue`. If the vector has exactly one element, return that +// element, no matter what the index is. +template +T getParameterValue(size_t idx, const auto& values, const T& defaultValue) { + if (values.empty()) { + return defaultValue; + } + if (values.size() == 1) { + return values.at(0); + } + return values.at(idx); +} + // Main function. int main(int argc, char** argv) { // Copy the git hash and datetime of compilation (which require relinking) @@ -67,8 +155,10 @@ int main(int argc, char** argv) { string textIndexName; string kbIndexName; string settingsFile; - string filetype; - string inputFile; + std::vector filetype; + std::vector inputFile; + std::vector defaultGraphs; + std::vector parseParallel; bool noPatterns = false; bool onlyAddTextIndex = false; bool keepTemporaryFiles = false; @@ -92,8 +182,19 @@ int main(int argc, char** argv) { "will read from stdin."); add("file-format,F", po::value(&filetype), "The format of the input file with the knowledge graph data. Must be one " - "of [nt|ttl|nq]. If not set, QLever will try to deduce it from the " - "filename suffix."); + "of [nt|ttl|nq]. Can be specified once (then all files use that format), " + "or once per file, or not at all (in that case, the format is deduced " + "from the filename suffix if possible)."); + add("default-graph,g", po::value(&defaultGraphs), + "The graph IRI without angle brackets. Write `-` for the default graph. " + "Can be omitted (then all files use the default graph), specified once " + "(then all files use that graph), or once per file."); + add("parse-parallel,p", po::value(&parseParallel), + "Enable or disable the parallel parser for all files (if specified once) " + "or once per input file. Parallel parsing works for all input files " + "using the N-Triples or N-Quads format, as well as for well-behaved " + "Turtle files, where all the prefix declarations come in one block at " + "the beginning and there are no multiline literals"); add("kg-index-name,K", po::value(&kbIndexName), "The name of the knowledge graph index (default: basename of " "`kg-input-file`)."); @@ -157,8 +258,8 @@ int main(int argc, char** argv) { // If no index name was specified, take the part of the input file name after // the last slash. - if (kbIndexName.empty() && !inputFile.empty()) { - kbIndexName = ad_utility::getLastPartOfString(inputFile, '/'); + if (kbIndexName.empty()) { + kbIndexName = "no index name specified"; } LOG(INFO) << EMPH_ON << "QLever IndexBuilder, compiled on " @@ -181,59 +282,43 @@ int main(int argc, char** argv) { index.setKeepTempFiles(keepTemporaryFiles); index.setSettingsFile(settingsFile); index.loadAllPermutations() = !onlyPsoAndPos; - // NOTE: If `onlyAddTextIndex` is true, we do not want to construct an - // index, but we assume that it already exists. In particular, we then need - // the vocabulary from the KB index for building the text index. - if (!onlyAddTextIndex) { - if (inputFile.empty() || inputFile == "-") { - inputFile = "/dev/stdin"; - } - if (!filetype.empty()) { - LOG(INFO) << "You specified the input format: " - << ad_utility::getUppercase(filetype) << std::endl; - } else { - bool filetypeDeduced = false; - if (inputFile.ends_with(".nt")) { - filetype = "nt"; - filetypeDeduced = true; - } else if (inputFile.ends_with(".ttl")) { - filetype = "ttl"; - filetypeDeduced = true; - } else if (inputFile.ends_with(".nq")) { - filetype = "nq"; - filetypeDeduced = true; - } else { - LOG(INFO) << "Unknown or missing extension of input file, assuming: " - "TTL" - << std::endl; + // Convert the parameters for the filenames, file types, and default graphs + // into a `vector`. + auto getFileSpecifications = [&]() { + checkNumParameterValues(filetype, inputFile.size(), true, + "--file-format, -F"); + checkNumParameterValues(defaultGraphs, inputFile.size(), true, + "--default-graph, -g"); + checkNumParameterValues(parseParallel, parseParallel.size(), true, + "--parse-parallel, p"); + + std::vector fileSpecs; + for (size_t i = 0; i < inputFile.size(); ++i) { + auto type = getParameterValue>( + i, filetype, std::nullopt); + + auto defaultGraph = getParameterValue>( + i, defaultGraphs, std::nullopt); + if (defaultGraph == "-") { + defaultGraph = std::nullopt; } - if (filetypeDeduced) { - LOG(INFO) << "Format of input file deduced from extension: " - << ad_utility::getUppercase(filetype) << std::endl; + + bool parseInParallel = getParameterValue(i, parseParallel, false); + auto& filename = inputFile.at(i); + if (filename == "-") { + filename = "/dev/stdin"; } - LOG(INFO) << "If this is not correct, start again using the option " - "--file-format (-F)" - << std::endl; + fileSpecs.emplace_back(filename, getFiletype(type, filename), + std::move(defaultGraph), parseInParallel); } + return fileSpecs; + }; - if (filetype == "ttl") { - LOG(DEBUG) << "Parsing uncompressed TTL from: " << inputFile - << std::endl; - index.createFromFile(inputFile, Index::Filetype::Turtle); - } else if (filetype == "nt") { - LOG(DEBUG) << "Parsing uncompressed N-Triples from: " << inputFile - << " (using the Turtle parser)" << std::endl; - index.createFromFile(inputFile, Index::Filetype::Turtle); - } else if (filetype == "nq") { - LOG(DEBUG) << "Parsing uncompressed N-Quads from: " << inputFile - << std::endl; - index.createFromFile(inputFile, Index::Filetype::NQuad); - } else { - LOG(ERROR) << "File format must be one of: nt ttl nq" << std::endl; - std::cerr << boostOptions << std::endl; - exit(1); - } + if (!onlyAddTextIndex) { + auto fileSpecifications = getFileSpecifications(); + AD_CONTRACT_CHECK(!fileSpecifications.empty()); + index.createFromFiles(fileSpecifications); } if (!wordsfile.empty() || addWordsFromLiterals) { diff --git a/src/index/IndexImpl.cpp b/src/index/IndexImpl.cpp index 35e6f773c3..c7eb2e8a4d 100644 --- a/src/index/IndexImpl.cpp +++ b/src/index/IndexImpl.cpp @@ -68,28 +68,18 @@ IndexBuilderDataAsFirstPermutationSorter IndexImpl::createIdTriplesAndVocab( // _____________________________________________________________________________ std::unique_ptr IndexImpl::makeRdfParser( - const std::string& filename, Index::Filetype type) const { + const std::vector& files) const { auto makeRdfParserImpl = - [&filename]() - -> std::unique_ptr { + [&files]() -> std::unique_ptr { using TokenizerT = std::conditional_t; - using InnerParser = - std::conditional_t, - NQuadParser>; - using Parser = - std::conditional_t, - RdfStreamParser>; - return std::make_unique(filename); + return std::make_unique>(files); }; // `callFixedSize` litfts runtime integers to compile time integers. We use it // here to create the correct combinations of template arguments. - return ad_utility::callFixedSize( - std::array{useParallelParser_ ? 1 : 0, - type == Index::Filetype::Turtle ? 1 : 0, - onlyAsciiTurtlePrefixes_ ? 1 : 0}, - makeRdfParserImpl); + return ad_utility::callFixedSize(std::array{onlyAsciiTurtlePrefixes_ ? 1 : 0}, + makeRdfParserImpl); } // Several helper functions for joining the OSP permutation with the patterns. @@ -297,18 +287,45 @@ std::pair IndexImpl::createInternalPSOandPOS( } // _____________________________________________________________________________ -void IndexImpl::createFromFile(const string& filename, Index::Filetype type) { +void IndexImpl::updateInputFileSpecificationsAndLog( + std::vector& spec, + bool parallelParsingSpecifiedViaJson) { + if (spec.size() == 1) { + LOG(INFO) << "Processing triples from " << spec.at(0).filename_ << " ..." + << std::endl; + } else { + LOG(INFO) << "Processing triples from " << spec.size() + << " input streams ..." << std::endl; + } + if (parallelParsingSpecifiedViaJson) { + if (spec.size() == 1) { + LOG(WARN) << "Parallel parsing set to `true` in the `.settings.json` " + "file; this is deprecated, please use the command-line " + " option --parse-parallel or -p instead" + << std::endl; + spec.at(0).parseInParallel_ = true; + } else { + throw std::runtime_error{ + "For more than one input file, the parallel parsing must not be " + "specified via the `.settings.json` file, but has to be specified " + " via the command-line option --parse-parallel or -p"}; + } + } +} + +// _____________________________________________________________________________ +void IndexImpl::createFromFiles( + std::vector files) { if (!loadAllPermutations_ && usePatterns_) { throw std::runtime_error{ "The patterns can only be built when all 6 permutations are created"}; } - LOG(INFO) << "Processing input triples from " << filename << " ..." - << std::endl; readIndexBuilderSettingsFromFile(); + updateInputFileSpecificationsAndLog(files, useParallelParser_); IndexBuilderDataAsFirstPermutationSorter indexBuilderData = - createIdTriplesAndVocab(makeRdfParser(filename, type)); + createIdTriplesAndVocab(makeRdfParser(files)); // Write the configuration already at this point, so we have it available in // case any of the permutations fail. diff --git a/src/index/IndexImpl.h b/src/index/IndexImpl.h index de108ab660..fb5da51e1b 100644 --- a/src/index/IndexImpl.h +++ b/src/index/IndexImpl.h @@ -119,7 +119,7 @@ class IndexImpl { string onDiskBase_; string settingsFileName_; bool onlyAsciiTurtlePrefixes_ = false; - bool useParallelParser_ = true; + bool useParallelParser_ = false; TurtleParserIntegerOverflowBehavior turtleParserIntegerOverflowBehavior_ = TurtleParserIntegerOverflowBehavior::Error; bool turtleParserSkipIllegalLiterals_ = false; @@ -228,12 +228,11 @@ class IndexImpl { Permutation& getPermutation(Permutation::Enum p); const Permutation& getPermutation(Permutation::Enum p) const; - // Creates an index from a file. Parameter Parser must be able to split the - // file's format into triples. - // Will write vocabulary and on-disk index data. + // Creates an index from a given set of input files. Will write vocabulary and + // on-disk index data. // !! The index can not directly be used after this call, but has to be setup // by createFromOnDiskIndex after this call. - void createFromFile(const string& filename, Index::Filetype type); + void createFromFiles(std::vector files); // Creates an index object from an on disk index that has previously been // constructed. Read necessary meta data into memory and opens file handles. @@ -474,8 +473,8 @@ class IndexImpl { // configured to either parse in parallel or not, and to either use the // CTRE-based relaxed parser or not, depending on the settings of the // corresponding member variables. - std::unique_ptr makeRdfParser(const std::string& filename, - Index::Filetype type) const; + std::unique_ptr makeRdfParser( + const std::vector& files) const; FirstPermutationSorterAndInternalTriplesAsPso convertPartialToGlobalIds( TripleVec& data, const vector& actualLinesPerPartial, @@ -762,4 +761,10 @@ class IndexImpl { // them. void addInternalStatisticsToConfiguration(size_t numTriplesInternal, size_t numPredicatesInternal); + + // Update `InputFileSpecification` based on `parallelParsingSpecifiedViaJson` + // and write a summary to the log. + static void updateInputFileSpecificationsAndLog( + std::vector& spec, + bool parallelParsingSpecifiedViaJson); }; diff --git a/src/index/InputFileSpecification.h b/src/index/InputFileSpecification.h new file mode 100644 index 0000000000..9b5afde367 --- /dev/null +++ b/src/index/InputFileSpecification.h @@ -0,0 +1,32 @@ +// Copyright 2024, University of Freiburg, +// Chair of Algorithms and Data Structures. +// Author: Johannes Kalmbach(joka921) +// +#pragma once + +#include +#include +namespace qlever { + +// An enum to distinguish between `Turtle` and `NQuad` files. +enum class Filetype { Turtle, NQuad }; + +// Specify a single input file or stream for the index builder. +struct InputFileSpecification { + std::string filename_; + Filetype filetype_; + // All triples that don't have a dedicated graph (either because the input + // format is N-Triples or Turtle, or because the corresponding line in the + // N-Quads format has no explicit graph) will be stored in this graph. The + // graph has to be specified without angle brackets. If set to `nullopt`, the + // global default graph will be used. + std::optional defaultGraph_; + + // If set to `true`, then the parallel RDF parser will be used for this file. + // This always works for N-Triples and N-Quads files, and for well-behaved + // Turtle files with all prefixes at the beginning and no multiline literals. + bool parseInParallel_ = false; + + bool operator==(const InputFileSpecification&) const = default; +}; +} // namespace qlever diff --git a/src/parser/RdfParser.cpp b/src/parser/RdfParser.cpp index 2da16fe402..9cba9f1455 100644 --- a/src/parser/RdfParser.cpp +++ b/src/parser/RdfParser.cpp @@ -1,7 +1,6 @@ // Copyright 2018, University of Freiburg, // Chair of Algorithms and Data Structures. // Author: Johannes Kalmbach(joka921) -// #include "parser/RdfParser.h" @@ -11,6 +10,7 @@ #include #include +#include "engine/CallFixedSize.h" #include "global/Constants.h" #include "parser/GeoPoint.h" #include "parser/NormalizedString.h" @@ -830,8 +830,9 @@ void RdfStreamParser::initialize(const string& filename) { } } +// _____________________________________________________________________________ template -bool RdfStreamParser::getLine(TurtleTriple* triple) { +bool RdfStreamParser::getLineImpl(TurtleTriple* triple) { if (triples_.empty()) { // if parsing the line fails because our buffer ends before the end of // the next statement we need to be able to recover @@ -944,7 +945,7 @@ template void RdfParallelParser::parseBatch(size_t parsePosition, auto batch) { try { - RdfStringParser parser; + RdfStringParser parser{defaultGraphIri_}; parser.prefixMap_ = this->prefixMap_; parser.setPositionOffset(parsePosition); parser.setInputStream(std::move(batch)); @@ -1037,7 +1038,7 @@ void RdfParallelParser::initialize(const string& filename) { // _______________________________________________________________________ template -bool RdfParallelParser::getLine(TurtleTriple* triple) { +bool RdfParallelParser::getLineImpl(TurtleTriple* triple) { // If the current batch is out of triples_ get the next batch of triples. // We need a while loop instead of a simple if in case there is a batch that // contains no triples. (Theoretically this might happen, and it is safer this @@ -1087,6 +1088,109 @@ RdfParallelParser::~RdfParallelParser() { "During the destruction of a RdfParallelParser"); } +// Create a parser for a single file of an `InputFileSpecification`. The type +// of the parser depends on the filetype (Turtle or N-Quads) and on whether the +// file is to be parsed in parallel. +template +static std::unique_ptr makeSingleRdfParser( + const Index::InputFileSpecification& file) { + auto graph = [file]() -> TripleComponent { + if (file.defaultGraph_.has_value()) { + return TripleComponent::Iri::fromIrirefWithoutBrackets( + file.defaultGraph_.value()); + } else { + return qlever::specialIds().at(DEFAULT_GRAPH_IRI); + } + }; + auto makeRdfParserImpl = [&filename = file.filename_, + &graph]() + -> std::unique_ptr { + using InnerParser = + std::conditional_t, + NQuadParser>; + using Parser = + std::conditional_t, + RdfStreamParser>; + return std::make_unique(filename, graph()); + }; + + // The call to `callFixedSize` lifts runtime integers to compile time + // integers. We use it here to create the correct combination of template + // arguments. + return ad_utility::callFixedSize( + std::array{file.parseInParallel_ ? 1 : 0, + file.filetype_ == Index::Filetype::Turtle ? 1 : 0}, + makeRdfParserImpl); +} + +// ______________________________________________________________ +template +RdfMultifileParser::RdfMultifileParser( + const std::vector& files) { + using namespace qlever; + // This lambda parses a single file and pushes the results and all occurring + // exceptions to the `finishedBatchQueue_`. + auto parseFile = [this](const InputFileSpecification& file) { + try { + auto parser = makeSingleRdfParser(file); + while (auto batch = parser->getBatch()) { + bool active = finishedBatchQueue_.push(std::move(batch.value())); + if (!active) { + // The queue was finished prematurely, stop this thread. This is + // important to avoid deadlocks. + return; + } + } + } catch (...) { + finishedBatchQueue_.pushException(std::current_exception()); + return; + } + if (numActiveParsers_.fetch_sub(1) == 1) { + // We are the last parser, we have to notify the downstream code that the + // input has been parsed completely. + finishedBatchQueue_.finish(); + } + }; + + // Feed all the input files to the `parsingQueue_`. + auto makeParsers = [files, this, parseFile]() { + for (const auto& file : files) { + numActiveParsers_++; + bool active = parsingQueue_.push(std::bind_front(parseFile, file)); + if (!active) { + // The queue was finished prematurely, stop this thread. This is + // important to avoid deadlocks. + return; + } + } + parsingQueue_.finish(); + }; + feederThread_ = ad_utility::JThread{makeParsers}; +} + +// _____________________________________________________________________________ +template +RdfMultifileParser::~RdfMultifileParser() { + ad_utility::ignoreExceptionIfThrows( + [this] { + parsingQueue_.finish(); + finishedBatchQueue_.finish(); + }, + "During the destruction of an RdfMultifileParser"); +} + +//______________________________________________________________________________ +template +bool RdfMultifileParser::getLineImpl(TurtleTriple*) { + AD_FAIL(); +} + +// _____________________________________________________________________________ +template +std::optional> RdfMultifileParser::getBatch() { + return finishedBatchQueue_.pop(); +} + // Explicit instantiations template class TurtleParser; template class TurtleParser; @@ -1098,3 +1202,5 @@ template class RdfStreamParser>; template class RdfStreamParser>; template class RdfParallelParser>; template class RdfParallelParser>; +template class RdfMultifileParser; +template class RdfMultifileParser; diff --git a/src/parser/RdfParser.h b/src/parser/RdfParser.h index 7e350299b5..ca55c61993 100644 --- a/src/parser/RdfParser.h +++ b/src/parser/RdfParser.h @@ -1,6 +1,6 @@ -// Copyright 2018, University of Freiburg, +// Copyright 2018 - 2024, University of Freiburg, // Chair of Algorithms and Data Structures. -// Author: Johannes Kalmbach(joka921) +// Author: Johannes Kalmbach #pragma once @@ -17,6 +17,7 @@ #include "global/Constants.h" #include "global/SpecialIds.h" #include "index/ConstantsIndexBuilding.h" +#include "index/InputFileSpecification.h" #include "parser/ParallelBuffer.h" #include "parser/Tokenizer.h" #include "parser/TokenizerCtre.h" @@ -59,7 +60,7 @@ class RdfParserBase { public: virtual ~RdfParserBase() = default; // Wrapper to getLine that is expected by the rest of QLever - bool getLine(TurtleTriple& triple) { return getLine(&triple); } + bool getLine(TurtleTriple& triple) { return getLineImpl(&triple); } virtual TurtleParserIntegerOverflowBehavior& integerOverflowBehavior() final { return integerOverflowBehavior_; @@ -82,7 +83,7 @@ class RdfParserBase { // Writes the triple to the argument (format subject, object predicate) // returns true iff a triple can be successfully written, else the triple // value is invalid and the parser is at the end of the input. - virtual bool getLine(TurtleTriple* triple) = 0; + virtual bool getLineImpl(TurtleTriple* triple) = 0; // Get the offset (relative to the beginning of the file) of the first byte // that has not yet been dealt with by the parser. @@ -195,6 +196,7 @@ class TurtleParser : public RdfParserBase { std::string activePrefix_; TripleComponent activeSubject_; TripleComponent::Iri activePredicate_; + TripleComponent defaultGraphIri_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); size_t numBlankNodes_ = 0; bool currentTripleIgnoredBecauseOfInvalidLiteral_ = false; @@ -206,6 +208,8 @@ class TurtleParser : public RdfParserBase { public: TurtleParser() = default; + explicit TurtleParser(TripleComponent defaultGraphIri) + : defaultGraphIri_{std::move(defaultGraphIri)} {} TurtleParser(TurtleParser&& rhs) noexcept = default; TurtleParser& operator=(TurtleParser&& rhs) noexcept = default; @@ -347,7 +351,8 @@ class TurtleParser : public RdfParserBase { // ______________________________________________________________________________________ void emitTriple() { if (!currentTripleIgnoredBecauseOfInvalidLiteral_) { - triples_.emplace_back(activeSubject_, activePredicate_, lastParseResult_); + triples_.emplace_back(activeSubject_, activePredicate_, lastParseResult_, + defaultGraphIri_); } currentTripleIgnoredBecauseOfInvalidLiteral_ = false; } @@ -406,12 +411,16 @@ class TurtleParser : public RdfParserBase { template class NQuadParser : public TurtleParser { - static inline const TripleComponent defaultGraphId_ = - qlever::specialIds().at(DEFAULT_GRAPH_IRI); + TripleComponent defaultGraphId_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); TripleComponent activeObject_; TripleComponent activeGraphLabel_; using Base = TurtleParser; + public: + NQuadParser() = default; + explicit NQuadParser(TripleComponent defaultGraphId) + : defaultGraphId_{std::move(defaultGraphId)} {} + protected: bool statement() override; @@ -432,7 +441,10 @@ class RdfStringParser : public Parser { public: using Parser::getLine; using Parser::prefixMap_; - bool getLine(TurtleTriple* triple) override { + RdfStringParser() = default; + explicit RdfStringParser(TripleComponent defaultGraph) + : Parser{std::move(defaultGraph)} {} + bool getLineImpl(TurtleTriple* triple) override { (void)triple; throw std::runtime_error( "RdfStringParser doesn't support calls to getLine. Only use " @@ -554,13 +566,16 @@ class RdfStreamParser : public Parser { public: // Default construction needed for tests RdfStreamParser() = default; - explicit RdfStreamParser(const string& filename) { - LOG(DEBUG) << "Initialize turtle parsing from uncompressed file or stream " + explicit RdfStreamParser(const string& filename, + TripleComponent defaultGraphIri = + qlever::specialIds().at(DEFAULT_GRAPH_IRI)) + : Parser{std::move(defaultGraphIri)} { + LOG(DEBUG) << "Initialize RDF parsing from uncompressed file or stream " << filename << std::endl; initialize(filename); } - bool getLine(TurtleTriple* triple) override; + bool getLineImpl(TurtleTriple* triple) override; void initialize(const string& filename); @@ -625,10 +640,17 @@ class RdfParallelParser : public Parser { initialize(filename); } + // Construct a parser from a file and a given default graph iri. + RdfParallelParser(const string& filename, + const TripleComponent& defaultGraphIri) + : Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} { + initialize(filename); + } + // inherit the wrapper overload using Parser::getLine; - bool getLine(TurtleTriple* triple) override; + bool getLineImpl(TurtleTriple* triple) override; std::optional> getBatch() override; @@ -683,5 +705,61 @@ class RdfParallelParser : public Parser { std::atomic batchIdx_ = 0; std::atomic numBatchesTotal_ = 0; - std::chrono::milliseconds sleepTimeForTesting_; + TripleComponent defaultGraphIri_ = qlever::specialIds().at(DEFAULT_GRAPH_IRI); + + std::chrono::milliseconds sleepTimeForTesting_{0}; +}; + +// This class is an RDF parser that parses multiple files in parallel. Each +// file is specified by an `InputFileSpecification`. +template +class RdfMultifileParser : public RdfParserBase { + public: + // Default construction needed for tests + RdfMultifileParser() = default; + + // Construct the parser from a vector of file specifications and eagerly start + // parsing them on background threads. + explicit RdfMultifileParser( + const std::vector& files); + + // This function is needed for the interface, but always throws an exception. + // `getBatch` (below) has to be used instead. + bool getLineImpl(TurtleTriple* triple) override; + + // Retrieve the next batch of triples, or `nullopt` if there are no more + // batches. There is no guarantee about the order in which batches from + // different input files are returned, but each batch belongs to a distinct + // input file. + std::optional> getBatch() override; + + size_t getParsePosition() const override { + // TODO: This function is used for better error messages, but we currently + // have no good way to implement it for this parser. Further analyze this. + return 0; + } + + // The destructor has to clean up all the parallel structures that might be + // still running in the background, especially when it is called before the + // parsing has finished (e.g. in case of an exception in the code that uses + // the parser). + ~RdfMultifileParser() override; + + private: + // A thread that feeds the file specifications to the actual parser threads. + ad_utility::JThread feederThread_; + // The buffer for the finished batches. + ad_utility::data_structures::ThreadSafeQueue> + finishedBatchQueue_{10}; + // This queue manages its own worker threads. Each task consists of a single + // file that is to be parsed. The parsed results are then pushed to the + // `finishedBatchQueue_` above. Note: It is important, that the + // `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when + // destroying the parser, the threads from the `parsingQueue_` are all joined + // before the `finishedBatchQueue_` (which they are using!) is destroyed. + ad_utility::TaskQueue parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS}; + + // The number of parsers that have started, but not yet finished. This is + // needed to detect the complete parsing. + std::atomic numActiveParsers_ = 0; }; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 02ce2123ce..6db9886307 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -126,7 +126,7 @@ addLinkAndDiscoverTest(JoinTest engine) addLinkAndDiscoverTest(TextLimitOperationTest engine) -addLinkAndDiscoverTest(QueryPlannerTest engine) +addLinkAndDiscoverTestSerial(QueryPlannerTest engine) addLinkAndDiscoverTest(HashMapTest) diff --git a/test/GroupByTest.cpp b/test/GroupByTest.cpp index 0fc1b0dcd6..bbfc0ae6e7 100644 --- a/test/GroupByTest.cpp +++ b/test/GroupByTest.cpp @@ -71,7 +71,8 @@ class GroupByTest : public ::testing::Test { _index.setKbName("group_by_test"); _index.setTextName("group_by_test"); _index.setOnDiskBase("group_ty_test"); - _index.createFromFile("group_by_test.nt"); + _index.createFromFiles( + {{"group_by_test.nt", qlever::Filetype::Turtle, std::nullopt}}); _index.addTextFromContextFile("group_by_test.words", false); _index.buildDocsDB("group_by_test.documents"); diff --git a/test/IndexTest.cpp b/test/IndexTest.cpp index 0d45c4aecb..6ed660aae9 100644 --- a/test/IndexTest.cpp +++ b/test/IndexTest.cpp @@ -486,3 +486,44 @@ TEST(IndexTest, trivialGettersAndSetters) { EXPECT_EQ(index.memoryLimitIndexBuilding(), 7_kB); EXPECT_EQ(std::as_const(index).memoryLimitIndexBuilding(), 7_kB); } + +TEST(IndexTest, loggingAndSettingOfParallelParsing) { + using enum qlever::Filetype; + std::vector files{ + {"singleFile.ttl", Turtle, std::nullopt, false}}; + testing::internal::CaptureStdout(); + using namespace ::testing; + { + IndexImpl::updateInputFileSpecificationsAndLog(files, false); + EXPECT_THAT( + testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from singleFile.ttl"), Not(HasSubstr("parallel")))); + EXPECT_FALSE(files.at(0).parseInParallel_); + } + + { + testing::internal::CaptureStdout(); + IndexImpl::updateInputFileSpecificationsAndLog(files, true); + EXPECT_THAT(testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from singleFile.ttl"), HasSubstr("deprecated"), + HasSubstr("--parse-parallel"))); + EXPECT_TRUE(files.at(0).parseInParallel_); + } + + { + files.emplace_back("secondFile.ttl", Turtle, std::nullopt, false); + auto filesCopy = files; + testing::internal::CaptureStdout(); + IndexImpl::updateInputFileSpecificationsAndLog(files, false); + EXPECT_THAT(testing::internal::GetCapturedStdout(), + AllOf(HasSubstr("from 2 input streams"), + Not(HasSubstr("is deprecated")))); + EXPECT_EQ(files, filesCopy); + } + + { + AD_EXPECT_THROW_WITH_MESSAGE( + IndexImpl::updateInputFileSpecificationsAndLog(files, true), + HasSubstr("but has to be specified")); + } +} diff --git a/test/RdfParserTest.cpp b/test/RdfParserTest.cpp index 9be3623f84..f917745d3c 100644 --- a/test/RdfParserTest.cpp +++ b/test/RdfParserTest.cpp @@ -768,7 +768,13 @@ TEST(RdfParserTest, iriref) { template std::vector parseFromFile(const std::string& filename, bool useBatchInterface) { - Parser parserChild{filename}; + auto parserChild = [&]() { + if constexpr (ad_utility::isInstantiation) { + return Parser{{{filename, qlever::Filetype::Turtle, std::nullopt}}}; + } else { + return Parser{filename}; + } + }(); RdfParserBase& parser = parserChild; std::vector result; @@ -800,6 +806,12 @@ auto forAllParallelParsers(const auto& function, const auto&... args) { function.template operator()>>( false, args...); } +auto forAllMultifileParsers(const auto& function, const auto&... args) { + function.template operator()>(true, args...); + function.template operator()>(true, + args...); +} + auto forAllParsers(const auto& function, const auto&... args) { function.template operator()>>( true, args...); @@ -810,6 +822,7 @@ auto forAllParsers(const auto& function, const auto&... args) { function.template operator()>>( false, args...); forAllParallelParsers(function, args...); + forAllMultifileParsers(function, args...); } TEST(RdfParserTest, TurtleStreamAndParallelParser) { @@ -833,6 +846,7 @@ TEST(RdfParserTest, TurtleStreamAndParallelParser) { }; forAllParsers(testWithParser); + forAllMultifileParsers(testWithParser); ad_utility::deleteFile(filename); } @@ -854,6 +868,7 @@ TEST(RdfParserTest, emptyInput) { forAllParsers(testWithParser, ""); std::string onlyPrefixes = "PREFIX bim: "; forAllParsers(testWithParser, onlyPrefixes); + forAllMultifileParsers(testWithParser); } // ________________________________________________________________________ @@ -967,7 +982,13 @@ TEST(RdfParserTest, stopParsingOnOutsideFailure) { } ad_utility::Timer t{ad_utility::Timer::Stopped}; { - [[maybe_unused]] Parser parserChild{filename, 10ms}; + [[maybe_unused]] Parser parserChild = [&]() { + if constexpr (ad_utility::isInstantiation) { + return Parser{{{filename, qlever::Filetype::Turtle, std::nullopt}}}; + } else { + return Parser{filename, 10ms}; + } + }(); t.cont(); } EXPECT_LE(t.msecs(), 20ms); @@ -983,6 +1004,7 @@ TEST(RdfParserTest, stopParsingOnOutsideFailure) { }(); FILE_BUFFER_SIZE = 40; forAllParallelParsers(testWithParser, input); + forAllMultifileParsers(testWithParser, input); } // _____________________________________________________________________________ @@ -1022,14 +1044,67 @@ TEST(RdfParserTest, nQuadParser) { runTestsForParser(NQuadCtreParser{}); } +// _____________________________________________________________________________ TEST(RdfParserTest, noGetlineInStringParser) { auto runTestsForParser = [](auto parser) { parser.setInputStream("

."); TurtleTriple t; - EXPECT_ANY_THROW(parser.getLine(&t)); + EXPECT_ANY_THROW(parser.getLine(t)); }; runTestsForParser(NQuadRe2Parser{}); runTestsForParser(NQuadCtreParser{}); runTestsForParser(Re2Parser{}); runTestsForParser(CtreParser{}); } + +// _____________________________________________________________________________ +TEST(RdfParserTest, noGetlineInMultifileParsers) { + auto runTestsForParser = + []([[maybe_unused]] bool interface) { + Parser parser{}; + TurtleTriple t; + // Also test the dummy parse position member. + EXPECT_EQ(parser.getParsePosition(), 0u); + EXPECT_ANY_THROW(parser.getLine(t)); + }; + forAllMultifileParsers(runTestsForParser); +} + +// _____________________________________________________________________________ +TEST(RdfParserTest, multifileParser) { + auto impl = [](bool useParallelParser) { + std::vector expected; + std::string ttl = " . ."; + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + std::string nq = " . ."; + expected.push_back( + TurtleTriple{iri(""), iri(""), iri(""), iri("")}); + expected.push_back(TurtleTriple{iri(""), iri(""), iri(""), + iri("")}); + std::string file1 = "multifileParserTest1.ttl"; + std::string file2 = "multifileParserTest2.nq"; + { + auto f = ad_utility::makeOfstream(file1); + f << ttl; + } + { + auto f = ad_utility::makeOfstream(file2); + f << nq; + } + std::vector specs; + specs.emplace_back(file1, qlever::Filetype::Turtle, "defaultGraphTTL", + useParallelParser); + specs.emplace_back(file2, qlever::Filetype::NQuad, "defaultGraphNQ", + useParallelParser); + Parser p{specs}; + std::vector result; + while (auto batch = p.getBatch()) { + std::ranges::copy(batch.value(), std::back_inserter(result)); + } + EXPECT_THAT(result, ::testing::UnorderedElementsAreArray(expected)); + }; + forAllMultifileParsers(impl); +} diff --git a/test/util/IndexTestHelpers.cpp b/test/util/IndexTestHelpers.cpp index 91c2b2c615..d63e8fde18 100644 --- a/test/util/IndexTestHelpers.cpp +++ b/test/util/IndexTestHelpers.cpp @@ -173,7 +173,9 @@ Index makeTestIndex(const std::string& indexBasename, index.usePatterns() = usePatterns; index.setSettingsFile(inputFilename + ".settings.json"); index.loadAllPermutations() = loadAllPermutations; - index.createFromFile(inputFilename); + qlever::InputFileSpecification spec{inputFilename, qlever::Filetype::Turtle, + std::nullopt}; + index.createFromFiles({spec}); if (createTextIndex) { index.addTextFromContextFile("", true); }