diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index d8b0352a4..c7352d218 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -573,6 +573,9 @@ set(SOURCE_FILES_unitTest src/clp/streaming_archive/reader/Segment.hpp src/clp/streaming_archive/reader/SegmentManager.cpp src/clp/streaming_archive/reader/SegmentManager.hpp + src/clp/streaming_archive/single_file_archive/Defs.hpp + src/clp/streaming_archive/single_file_archive/writer.cpp + src/clp/streaming_archive/single_file_archive/writer.hpp src/clp/streaming_archive/writer/Archive.cpp src/clp/streaming_archive/writer/Archive.hpp src/clp/streaming_archive/writer/File.cpp diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index eff32ce46..c241e3bed 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -108,6 +108,9 @@ set( ../streaming_archive/reader/Segment.hpp ../streaming_archive/reader/SegmentManager.cpp ../streaming_archive/reader/SegmentManager.hpp + ../streaming_archive/single_file_archive/Defs.hpp + ../streaming_archive/single_file_archive/writer.cpp + ../streaming_archive/single_file_archive/writer.hpp ../streaming_archive/writer/Archive.cpp ../streaming_archive/writer/Archive.hpp ../streaming_archive/writer/File.cpp diff --git a/components/core/src/clp/clp/CommandLineArguments.cpp b/components/core/src/clp/clp/CommandLineArguments.cpp index cb44d96d8..679734b11 100644 --- a/components/core/src/clp/clp/CommandLineArguments.cpp +++ b/components/core/src/clp/clp/CommandLineArguments.cpp @@ -373,6 +373,10 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) { ->default_value(m_schema_file_path), "Path to a schema file. If not specified, heuristics are used to determine " "dictionary variables. See README-Schema.md for details." + )( + "single-file-archive", + po::bool_switch(&m_single_file_archive), + "Output archive as a single-file" ); po::options_description all_compression_options; diff --git a/components/core/src/clp/clp/CommandLineArguments.hpp b/components/core/src/clp/clp/CommandLineArguments.hpp index 6e14a4b3b..307427210 100644 --- a/components/core/src/clp/clp/CommandLineArguments.hpp +++ b/components/core/src/clp/clp/CommandLineArguments.hpp @@ -23,6 +23,7 @@ class CommandLineArguments : public CommandLineArgumentsBase { explicit CommandLineArguments(std::string const& program_name) : CommandLineArgumentsBase(program_name), m_show_progress(false), + m_single_file_archive(false), m_sort_input_files(true), m_print_archive_stats_progress(false), m_target_segment_uncompressed_size(1L * 1024 * 1024 * 1024), @@ -45,6 +46,8 @@ class CommandLineArguments : public CommandLineArgumentsBase { bool show_progress() const { return m_show_progress; } + [[nodiscard]] auto single_file_archive() const -> bool { return m_single_file_archive; } + bool sort_input_files() const { return m_sort_input_files; } bool print_archive_stats_progress() const { return m_print_archive_stats_progress; } @@ -92,6 +95,7 @@ class CommandLineArguments : public CommandLineArgumentsBase { std::string m_output_dir; std::string m_schema_file_path; bool m_show_progress; + bool m_single_file_archive; bool m_print_archive_stats_progress; size_t m_target_encoded_file_size; size_t m_target_segment_uncompressed_size; diff --git a/components/core/src/clp/clp/compression.cpp b/components/core/src/clp/clp/compression.cpp index a0d5bf276..32c7d25ae 100644 --- a/components/core/src/clp/clp/compression.cpp +++ b/components/core/src/clp/clp/compression.cpp @@ -107,6 +107,7 @@ bool compress( archive_user_config.global_metadata_db = global_metadata_db.get(); archive_user_config.print_archive_stats_progress = command_line_args.print_archive_stats_progress(); + archive_user_config.use_single_file_archive = command_line_args.single_file_archive(); // Open Archive streaming_archive::writer::Archive archive_writer; diff --git a/components/core/src/clp/streaming_archive/single_file_archive/Defs.hpp b/components/core/src/clp/streaming_archive/single_file_archive/Defs.hpp new file mode 100644 index 000000000..6866bf676 --- /dev/null +++ b/components/core/src/clp/streaming_archive/single_file_archive/Defs.hpp @@ -0,0 +1,63 @@ +#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP +#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP + +#include +#include + +#include + +#include "../ArchiveMetadata.hpp" +#include "../Constants.hpp" + +namespace clp::streaming_archive::single_file_archive { +using single_file_archive_format_version_t = uint32_t; + +// Single file archive version +constexpr uint8_t cVersionMajor{0}; +constexpr uint8_t cVersionMinor{2}; +constexpr uint16_t cVersionPatch{0}; +constexpr single_file_archive_format_version_t cVersion{ + cVersionMajor << 24 | cVersionMinor << 16 | cVersionPatch +}; + +static constexpr size_t cNumMagicNumberChars{4}; +static constexpr std::array + cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'}; +static constexpr std::string_view cUnstructuredSfaExtension{".clp"}; +static constexpr std::string_view cFileInfoSentinelName{""}; + +static constexpr size_t cNumStaticFiles{6}; +constexpr std::array cStaticArchiveFileNames{ + cMetadataFileName, + cMetadataDBFileName, + cLogTypeDictFilename, + cLogTypeSegmentIndexFilename, + cVarDictFilename, + cVarSegmentIndexFilename +}; + +static constexpr size_t cNumUnused{6}; + +struct __attribute__((packed)) SingleFileArchiveHeader { + std::array magic; + single_file_archive_format_version_t version; + uint64_t metadata_size; + std::array unused; +}; + +struct FileInfo { + std::string name; + uint64_t offset; + + // Variables are renamed when serialized to match single-file archive specification + MSGPACK_DEFINE_MAP(MSGPACK_NVP("n", name), MSGPACK_NVP("o", offset)); +}; + +struct SingleFileArchiveMetadata { + std::vector archive_files; + uint64_t num_segments; + MSGPACK_DEFINE_MAP(archive_files, num_segments); +}; +} // namespace clp::streaming_archive::single_file_archive + +#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP diff --git a/components/core/src/clp/streaming_archive/single_file_archive/writer.cpp b/components/core/src/clp/streaming_archive/single_file_archive/writer.cpp new file mode 100644 index 000000000..b37b978f7 --- /dev/null +++ b/components/core/src/clp/streaming_archive/single_file_archive/writer.cpp @@ -0,0 +1,221 @@ +#include "writer.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "../../Defs.h" +#include "../../ErrorCode.hpp" +#include "../../FileReader.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../ArchiveMetadata.hpp" +#include "../Constants.hpp" +#include "Defs.hpp" + +namespace clp::streaming_archive::single_file_archive { + +namespace { +constexpr size_t cReadBlockSize{4096}; + +/** + * Generates metadata for the file section of a single-file archive. The metadata consists + * of a list of file names and their corresponding starting offsets. + * + * @param multi_file_archive_path + * @param num_segments + * @return Vector containing a `FileInfo` struct for every file in the multi-file archive. + * @throws `std::filesystem::filesystem_error` if `stat` on an archive file fails. + */ +[[nodiscard]] auto +get_archive_file_infos(std::filesystem::path const& multi_file_archive_path, size_t num_segments) + -> std::vector; + +/** + * Combines file section metadata and the number of segments into single-file archive metadata, + * then serializes the metadata into the MessagePack format. + * + * @param multi_file_archive_path + * @param num_segments + * @return Packed metadata. + */ +[[nodiscard]] auto pack_single_file_archive_metadata( + std::filesystem::path const& multi_file_archive_path, + size_t num_segments +) -> std::stringstream; + +/** + * Writes single-file archive header. + * + * @param single_file_archive_writer + * @param packed_metadata_size + */ +auto write_archive_header(FileWriter& single_file_archive_writer, size_t packed_metadata_size) + -> void; + +/** + * Writes packed single-file archive metadata. + * + * @param single_file_archive_writer + * @param packed_metadata Packed metadata. + */ +auto write_packed_archive_metadata( + FileWriter& single_file_archive_writer, + std::stringstream const& packed_metadata +) -> void; + +/** + * Reads the content of a file and writes it to the given file writer. + * @param file_path + * @param single_file_archive_writer + * @throws OperationFailed if reading the file fails. + */ +auto +write_archive_file(std::filesystem::path const& file_path, FileWriter& single_file_archive_writer) + -> void; + +/** + * Iterates over files in the multi-file archive and copies their contents to the single-file + * archive. + * + * @param single_file_archive_writer + * @param multi_file_archive_path + * @param num_segments + */ +auto write_archive_files( + FileWriter& single_file_archive_writer, + std::filesystem::path const& multi_file_archive_path, + size_t num_segments +) -> void; + +auto +get_archive_file_infos(std::filesystem::path const& multi_file_archive_path, size_t num_segments) + -> std::vector { + std::vector files; + + uint64_t offset{}; + for (auto const& static_archive_file_name : cStaticArchiveFileNames) { + files.emplace_back(FileInfo{std::string(static_archive_file_name), offset}); + offset += std::filesystem::file_size(multi_file_archive_path / static_archive_file_name); + } + + std::filesystem::path segment_dir_path = multi_file_archive_path / cSegmentsDirname; + for (size_t i = 0; i < num_segments; ++i) { + auto const segment_id = std::to_string(i); + files.emplace_back(FileInfo{segment_id, offset}); + offset += std::filesystem::file_size(segment_dir_path / segment_id); + } + + // Add sentinel indicating total size of all files + files.emplace_back(FileInfo{std::string(cFileInfoSentinelName), offset}); + + return files; +} + +auto pack_single_file_archive_metadata( + std::filesystem::path const& multi_file_archive_path, + size_t num_segments +) -> std::stringstream { + SingleFileArchiveMetadata single_file_archive{ + .archive_files = get_archive_file_infos(multi_file_archive_path, num_segments), + .num_segments = num_segments, + }; + + std::stringstream buf; + msgpack::pack(buf, single_file_archive); + + return buf; +} + +auto write_archive_header(FileWriter& single_file_archive_writer, size_t packed_metadata_size) + -> void { + SingleFileArchiveHeader header{ + .magic = cUnstructuredSfaMagicNumber, + .version = cVersion, + .metadata_size = packed_metadata_size, + .unused{} + }; + single_file_archive_writer.write(reinterpret_cast(&header), sizeof(header)); +} + +auto write_packed_archive_metadata( + FileWriter& single_file_archive_writer, + std::stringstream const& packed_metadata +) -> void { + single_file_archive_writer.write(packed_metadata.str().data(), packed_metadata.str().size()); +} + +auto +write_archive_file(std::filesystem::path const& file_path, FileWriter& single_file_archive_writer) + -> void { + FileReader reader{file_path.string()}; + std::array read_buffer{}; + while (true) { + size_t num_bytes_read{}; + auto const error_code = reader.try_read(read_buffer.data(), cReadBlockSize, num_bytes_read); + if (ErrorCode_EndOfFile == error_code) { + break; + } + if (ErrorCode_Success != error_code) { + throw OperationFailed(error_code, __FILENAME__, __LINE__); + } + single_file_archive_writer.write(read_buffer.data(), num_bytes_read); + } +} + +auto write_archive_files( + FileWriter& single_file_archive_writer, + std::filesystem::path const& multi_file_archive_path, + size_t num_segments +) -> void { + for (auto const& static_archive_file_name : cStaticArchiveFileNames) { + auto const static_archive_file_path{multi_file_archive_path / static_archive_file_name}; + write_archive_file(static_archive_file_path, single_file_archive_writer); + } + + auto const segment_dir_path{multi_file_archive_path / cSegmentsDirname}; + for (size_t i = 0; i < num_segments; ++i) { + auto const segment_path{segment_dir_path / std::to_string(i)}; + write_archive_file(segment_path, single_file_archive_writer); + } +} +} // namespace + +auto +write_single_file_archive(std::filesystem::path const& multi_file_archive_path, size_t num_segments) + -> void { + FileWriter single_file_archive_writer; + std::filesystem::path single_file_archive_path{ + multi_file_archive_path.string() + + std::string(single_file_archive::cUnstructuredSfaExtension) + }; + if (std::filesystem::exists(single_file_archive_path)) { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + + single_file_archive_writer.open( + single_file_archive_path.string(), + FileWriter::OpenMode::CREATE_FOR_WRITING + ); + + auto const packed_metadata + = pack_single_file_archive_metadata(multi_file_archive_path, num_segments); + write_archive_header(single_file_archive_writer, packed_metadata.str().size()); + write_packed_archive_metadata(single_file_archive_writer, packed_metadata); + write_archive_files(single_file_archive_writer, multi_file_archive_path, num_segments); + + single_file_archive_writer.close(); + + try { + std::filesystem::remove_all(multi_file_archive_path); + } catch (std::filesystem::filesystem_error& e) { + SPDLOG_WARN("Failed to delete multi-file archive: {}", e.what()); + } +} +} // namespace clp::streaming_archive::single_file_archive diff --git a/components/core/src/clp/streaming_archive/single_file_archive/writer.hpp b/components/core/src/clp/streaming_archive/single_file_archive/writer.hpp new file mode 100644 index 000000000..85f234ca4 --- /dev/null +++ b/components/core/src/clp/streaming_archive/single_file_archive/writer.hpp @@ -0,0 +1,48 @@ +#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_WRITER_HPP +#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_WRITER_HPP + +#include +#include + +#include + +#include "../../Defs.h" +#include "../../ErrorCode.hpp" +#include "../../TraceableException.hpp" +#include "../ArchiveMetadata.hpp" + +namespace clp::streaming_archive::single_file_archive { + +class OperationFailed : public TraceableException { +public: + // Constructors + OperationFailed( + ErrorCode error_code, + char const* const filename, + int line_number, + std::string message = "streaming_archive::single_file_archive operation failed" + ) + : TraceableException{error_code, filename, line_number}, + m_message{std::move(message)} {} + + // Methods + [[nodiscard]] auto what() const noexcept -> char const* override { return m_message.c_str(); } + +private: + std::string m_message; +}; + +/** + * Writes a single-file archive then deletes the multi-file archive. + * + * @param multi_file_archive_path + * @param num_segments + * @throws OperationFailed if single-file archive path already exists. + */ +auto +write_single_file_archive(std::filesystem::path const& multi_file_archive_path, size_t num_segments) + -> void; + +} // namespace clp::streaming_archive::single_file_archive + +#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_WRITER_HPP diff --git a/components/core/src/clp/streaming_archive/writer/Archive.cpp b/components/core/src/clp/streaming_archive/writer/Archive.cpp index a88296425..34b8f9b48 100644 --- a/components/core/src/clp/streaming_archive/writer/Archive.cpp +++ b/components/core/src/clp/streaming_archive/writer/Archive.cpp @@ -19,6 +19,7 @@ #include "../../spdlog_with_specializations.hpp" #include "../../Utils.hpp" #include "../Constants.hpp" +#include "../single_file_archive/writer.hpp" #include "utils.hpp" using clp::ir::eight_byte_encoded_variable_t; @@ -56,6 +57,7 @@ void Archive::open(UserConfig const& user_config) { m_creator_id_as_string = boost::uuids::to_string(m_creator_id); m_creation_num = user_config.creation_num; m_print_archive_stats_progress = user_config.print_archive_stats_progress; + m_use_single_file_archive = user_config.use_single_file_archive; std::error_code std_error_code; @@ -242,6 +244,13 @@ void Archive::close() { m_metadata_db.close(); + if (m_use_single_file_archive) { + single_file_archive::write_single_file_archive( + m_path, + static_cast(m_next_segment_id) + ); + } + m_creator_id_as_string.clear(); m_id_as_string.clear(); m_path.clear(); diff --git a/components/core/src/clp/streaming_archive/writer/Archive.hpp b/components/core/src/clp/streaming_archive/writer/Archive.hpp index cd5c5d99f..082a6bf12 100644 --- a/components/core/src/clp/streaming_archive/writer/Archive.hpp +++ b/components/core/src/clp/streaming_archive/writer/Archive.hpp @@ -48,6 +48,7 @@ class Archive { std::string output_dir; GlobalMetadataDB* global_metadata_db; bool print_archive_stats_progress; + bool use_single_file_archive; }; class OperationFailed : public TraceableException { @@ -193,6 +194,10 @@ class Archive { return m_logtype_dict.get_data_size() + m_var_dict.get_data_size(); } + [[nodiscard]] auto get_use_single_file_archive() const -> bool { + return m_use_single_file_archive; + } + private: // Types /** @@ -341,6 +346,7 @@ class Archive { GlobalMetadataDB* m_global_metadata_db; bool m_print_archive_stats_progress; + bool m_use_single_file_archive{false}; }; } // namespace clp::streaming_archive::writer