diff --git a/components/core/.clang-format b/components/core/.clang-format index ff65adbae..4d0d3a87c 100644 --- a/components/core/.clang-format +++ b/components/core/.clang-format @@ -4,7 +4,7 @@ IncludeCategories: # NOTE: A header is grouped by first matching regex # Library headers. Update when adding new libraries. # NOTE: clang-format retains leading white-space on a line in violation of the YAML spec. - - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\ + - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\ |msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)" Priority: 3 # C system headers diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index ce74f04cc..0995a0afb 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -11,8 +11,11 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Set general compressor set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor") -set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd) -if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough") +set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS lzma passthrough zstd) +if ("${GENERAL_COMPRESSOR}" STREQUAL "lzma") + add_definitions(-DUSE_LZMA_COMPRESSION=1) + message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression") +elseif ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough") add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1) message(STATUS "Using passthrough compression") elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd") @@ -224,6 +227,21 @@ else() message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd") endif() +# Find and setup LZMA Library +# TODO: Add a script in ./cmake/Modules to properly import LZMA in find_package()'s module mode +if(CLP_USE_STATIC_LIBS) + set(LIBLZMA_USE_STATIC_LIBS ON) +endif() +find_package(LibLZMA REQUIRED) +if(LIBLZMA_FOUND) + message(STATUS "Found Lzma ${LIBLZMA_VERSION_STRING}") + message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}") + message(STATUS "Lzma Include Dir: ${LIBLZMA_INCLUDE_DIRS}") +else() + message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for Lzma") +endif() +include_directories(${LIBLZMA_INCLUDE_DIRS}) + # sqlite dependencies set(sqlite_DYNAMIC_LIBS "dl;m;pthread") include(cmake/Modules/FindLibraryDependencies.cmake) @@ -516,6 +534,9 @@ set(SOURCE_FILES_unitTest src/clp/streaming_compression/Compressor.hpp src/clp/streaming_compression/Constants.hpp src/clp/streaming_compression/Decompressor.hpp + src/clp/streaming_compression/lzma/Compressor.cpp + src/clp/streaming_compression/lzma/Compressor.hpp + src/clp/streaming_compression/lzma/Constants.hpp src/clp/streaming_compression/passthrough/Compressor.cpp src/clp/streaming_compression/passthrough/Compressor.hpp src/clp/streaming_compression/passthrough/Decompressor.cpp @@ -608,6 +629,7 @@ target_link_libraries(unitTest clp::regex_utils clp::string_utils yaml-cpp::yaml-cpp + ${LIBLZMA_LIBRARIES} ZStd::ZStd ) target_compile_features(unitTest diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp index 4649c2e98..080f3a20b 100644 --- a/components/core/src/clp/streaming_compression/Constants.hpp +++ b/components/core/src/clp/streaming_compression/Constants.hpp @@ -7,6 +7,7 @@ namespace clp::streaming_compression { enum class CompressorType : uint8_t { ZSTD = 0x10, + LZMA = 0x20, Passthrough = 0xFF, }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.cpp b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp new file mode 100644 index 000000000..34c1a0e2b --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp @@ -0,0 +1,203 @@ +#include "Compressor.hpp" + +#include <array> +#include <cstddef> +#include <cstdint> +#include <cstring> + +#include <lzma.h> +#include <spdlog/spdlog.h> + +#include "../../ErrorCode.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../../type_utils.hpp" + +namespace clp::streaming_compression::lzma { +auto Compressor::open(FileWriter& file_writer) -> void { + if (nullptr != m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + + m_lzma_stream.detach_input(); + if (false + == m_lzma_stream.attach_output( + m_compressed_stream_block_buffer.data(), + m_compressed_stream_block_buffer.size() + )) + { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + m_compressed_stream_file_writer = &file_writer; + m_uncompressed_stream_pos = 0; +} + +auto Compressor::close() -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + if (m_lzma_stream.avail_in() > 0) { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + + flush_lzma(LZMA_FINISH); + m_lzma_stream.end_and_detach_output(); + m_compressed_stream_file_writer = nullptr; +} + +auto Compressor::write(char const* data, size_t data_length) -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + if (false + == m_lzma_stream + .attach_input(clp::size_checked_pointer_cast<uint8_t const>(data), data_length)) + { + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + encode_lzma(); + m_lzma_stream.detach_input(); + m_uncompressed_stream_pos += data_length; +} + +auto Compressor::flush() -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + flush_lzma(LZMA_SYNC_FLUSH); +} + +auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode { + if (nullptr == m_compressed_stream_file_writer) { + return ErrorCode_NotInit; + } + pos = m_uncompressed_stream_pos; + return ErrorCode_Success; +} + +auto Compressor::encode_lzma() -> void { + while (m_lzma_stream.avail_in() > 0) { + if (0 == m_lzma_stream.avail_out()) { + flush_stream_output_block_buffer(); + } + auto const rc = m_lzma_stream.lzma_code(LZMA_RUN); + switch (rc) { + case LZMA_OK: + break; + case LZMA_BUF_ERROR: + SPDLOG_ERROR("LZMA compressor input stream is corrupt. No encoding " + "progress can be made."); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + default: + SPDLOG_ERROR( + "lzma_code() returned an unexpected value - {}.", + static_cast<int>(rc) + ); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + } +} + +auto Compressor::flush_lzma(lzma_action flush_action) -> void { + if (false == LzmaStream::is_flush_action(flush_action)) { + SPDLOG_ERROR( + "lzma_code() supplied with invalid flush action - {}.", + static_cast<int>(flush_action) + ); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + + bool flushed{false}; + while (false == flushed) { + if (0 == m_lzma_stream.avail_out()) { + flush_stream_output_block_buffer(); + } + auto const rc = m_lzma_stream.lzma_code(flush_action); + switch (rc) { + case LZMA_OK: + break; + case LZMA_STREAM_END: + // NOTE: flush may not have completed if a multithreaded encoder is using action + // LZMA_FULL_BARRIER. For now, we skip this check. + flushed = true; + break; + case LZMA_BUF_ERROR: + // NOTE: this can happen if we are using LZMA_FULL_FLUSH or LZMA_FULL_BARRIER. These + // two actions keeps encoding input data alongside flushing buffered encoded data. + SPDLOG_ERROR("LZMA compressor input stream is corrupt. No encoding " + "progress can be made."); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + default: + SPDLOG_ERROR( + "lzma_code() returned an unexpected value - {}.", + static_cast<int>(rc) + ); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + } + flush_stream_output_block_buffer(); +} + +auto Compressor::flush_stream_output_block_buffer() -> void { + if (cCompressedStreamBlockBufferSize == m_lzma_stream.avail_out()) { + return; + } + m_compressed_stream_file_writer->write( + clp::size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()), + cCompressedStreamBlockBufferSize - m_lzma_stream.avail_out() + ); + if (false + == m_lzma_stream.attach_output( + m_compressed_stream_block_buffer.data(), + m_compressed_stream_block_buffer.size() + )) + { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } +} + +Compressor::LzmaStream::LzmaStream(int compression_level, size_t dict_size, lzma_check check) { + lzma_options_lzma options; + if (0 != lzma_lzma_preset(&options, compression_level)) { + SPDLOG_ERROR("Failed to initialize LZMA options' compression level."); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + options.dict_size = dict_size; + std::array<lzma_filter, 2> filters{{ + {.id = LZMA_FILTER_LZMA2, .options = &options}, + {.id = LZMA_VLI_UNKNOWN, .options = nullptr}, + }}; + + auto const rc = lzma_stream_encoder(&m_stream, filters.data(), check); + if (LZMA_OK == rc) { + return; + } + + char const* msg{nullptr}; + switch (rc) { + case LZMA_MEM_ERROR: + msg = "Memory allocation failed"; + break; + + case LZMA_OPTIONS_ERROR: + msg = "Specified preset is not supported"; + break; + + case LZMA_UNSUPPORTED_CHECK: + msg = "Specified integrity check is not supported"; + break; + + case LZMA_PROG_ERROR: + msg = "Input arguments are not sane"; + break; + + default: + msg = "Unknown error"; + break; + } + + SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(rc)); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); +} +} // namespace clp::streaming_compression::lzma diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.hpp b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp new file mode 100644 index 000000000..de665eaf6 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp @@ -0,0 +1,230 @@ +#ifndef CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP +#define CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP + +#include <cstddef> +#include <cstdint> + +#include <lzma.h> + +#include "../../Array.hpp" +#include "../../ErrorCode.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "Constants.hpp" + +namespace clp::streaming_compression::lzma { +/** + * Implements a LZMA compressor that compresses byte input data to a file. + */ +class Compressor : public ::clp::streaming_compression::Compressor { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + [[nodiscard]] auto what() const noexcept -> char const* override { + return "streaming_compression::lzma::Compressor operation failed"; + } + }; + + // Constructors + Compressor() : Compressor{cDefaultCompressionLevel, cDefaultDictionarySize, LZMA_CHECK_CRC64} {} + + Compressor(int compression_level, size_t dict_size, lzma_check check) + : m_lzma_stream{compression_level, dict_size, check} {} + + // Destructor + ~Compressor() override = default; + + // Delete copy constructor and assignment operator + Compressor(Compressor const&) = delete; + auto operator=(Compressor const&) -> Compressor& = delete; + + // Default move constructor and assignment operator + Compressor(Compressor&&) noexcept = default; + auto operator=(Compressor&&) noexcept -> Compressor& = default; + + // Methods implementing the WriterInterface + /** + * Writes the given data to the compressor + * @param data + * @param data_length + */ + auto write(char const* data, size_t data_length) -> void override; + + /** + * Writes any internally buffered data to file and ends the current frame + * + * Forces all the encoded data buffered by LZMA to be available at output + */ + auto flush() -> void override; + + /** + * Tries to get the current position of the write head + * @param pos Position of the write head + * @return ErrorCode_NotInit if the compressor is not open + * @return ErrorCode_Success on success + */ + auto try_get_pos(size_t& pos) const -> ErrorCode override; + + // Methods implementing the Compressor interface + /** + * Closes the compressor + */ + auto close() -> void override; + + /** + * Open the compression stream for encoding to the file_writer. + * + * @param file_writer + */ + auto open(FileWriter& file_writer) -> void override; + +private: + /** + * Wrapper class around lzma_stream providing easier usage. + */ + class LzmaStream { + public: + /** + * Initializes an LZMA compression encoder and its streams. + * + * @param compression_level Compression preset level in the range [0-9] where the higher + * numbers use increasingly more memory for greater compression ratios. + * @param dict_size Max amount of recently processed uncompressed bytes to keep in the + * memory. + * @param check Type of check to verify the integrity of the uncompressed data. + * LZMA_CHECK_CRC64 is the default in the xz command line tool. If the .xz file needs to be + * decompressed with XZ-Embedded, use LZMA_CHECK_CRC32 instead. + * + * @throw `OperationFailed` `ErrorCode_BadParam` if the LZMA options are invalid or the + * encoder fails to initialize. + */ + LzmaStream(int compression_level, size_t dict_size, lzma_check check); + + // Destructor + ~LzmaStream() = default; + + // Delete copy constructor and assignment operator + LzmaStream(LzmaStream const&) = delete; + auto operator=(LzmaStream const&) -> LzmaStream& = delete; + + // Default move constructor and assignment operator + LzmaStream(LzmaStream&&) noexcept = default; + auto operator=(LzmaStream&&) noexcept -> LzmaStream& = default; + + /** + * Attaches a pre-allocated block buffer to the encoder's input stream. + * + * @return false if the data buffer is null. + * @return true on success. + */ + [[nodiscard]] auto attach_input(uint8_t const* data_ptr, size_t data_length) -> bool { + if (nullptr == data_ptr) { + return false; + } + m_stream.next_in = data_ptr; + m_stream.avail_in = data_length; + return true; + } + + /** + * Attaches a pre-allocated block buffer to the encoder's output stream. + * + * @return false if the data buffer is null or empty. + * @return true on success. + */ + [[nodiscard]] auto attach_output(uint8_t* data_ptr, size_t data_length) -> bool { + if (nullptr == data_ptr || 0 == data_length) { + return false; + } + m_stream.next_out = data_ptr; + m_stream.avail_out = data_length; + return true; + } + + [[nodiscard]] auto avail_in() const -> size_t { return m_stream.avail_in; } + + [[nodiscard]] auto avail_out() const -> size_t { return m_stream.avail_out; } + + /** + * Unset the internal fields of the encoder's input stream. + */ + auto detach_input() -> void { + m_stream.next_in = nullptr; + m_stream.avail_in = 0; + } + + /** + * End the LZMA stream and unset the internal fields of the encoder's output stream. + */ + auto end_and_detach_output() -> void { + lzma_end(&m_stream); + m_stream.next_out = nullptr; + m_stream.avail_out = 0; + } + + [[nodiscard]] static auto is_flush_action(lzma_action action) -> bool { + return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action + || LZMA_FULL_BARRIER == action || LZMA_FINISH == action; + } + + [[nodiscard]] auto lzma_code(lzma_action action) -> lzma_ret { + return ::lzma_code(&m_stream, action); + } + + private: + lzma_stream m_stream = LZMA_STREAM_INIT; + }; + + static constexpr size_t cCompressedStreamBlockBufferSize{4096}; // 4KiB + + /** + * Invokes lzma_code() repeatedly with LZMA_RUN until the input is exhausted + * + * At the end of the workflow, the last bytes of encoded data may still be buffered in the LZMA + * stream and thus not immediately available at the output block buffer. + * + * Assumes input stream and output block buffer are both in valid states. + * @throw `OperationFailed` if LZMA returns an unexpected error value + */ + auto encode_lzma() -> void; + + /** + * Invokes lzma_code() repeatedly with the given flushing action until all encoded data is made + * available at the output block buffer + * + * Once flushing starts, the workflow action needs to stay the same until flushing is signaled + * complete by LZMA (aka LZMA_STREAM_END is reached). + * See also: https://github.com/tukaani-project/xz/blob/master/src/liblzma/api/lzma/base.h#L274 + * + * Assumes input stream and output block buffer are both in valid states. + * @param flush_action + * @throw `OperationFailed` if the provided action is not an LZMA flush + * action, or if LZMA returns an unexpected error value + */ + auto flush_lzma(lzma_action flush_action) -> void; + + /** + * Flushes the current compressed data in the output block buffer to the output file handler. + * + * Also resets the output block buffer to receive new data. + */ + auto flush_stream_output_block_buffer() -> void; + + // Variables + FileWriter* m_compressed_stream_file_writer{nullptr}; + + // Compressed stream variables + Array<uint8_t> m_compressed_stream_block_buffer{cCompressedStreamBlockBufferSize}; + LzmaStream m_lzma_stream; + size_t m_uncompressed_stream_pos{0}; +}; +} // namespace clp::streaming_compression::lzma + +#endif // CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP diff --git a/components/core/src/clp/streaming_compression/lzma/Constants.hpp b/components/core/src/clp/streaming_compression/lzma/Constants.hpp new file mode 100644 index 000000000..4e261187a --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Constants.hpp @@ -0,0 +1,15 @@ +#ifndef STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP +#define STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP + +#include <cstdint> + +#include <lzma.h> + +namespace clp::streaming_compression::lzma { +constexpr int cDefaultCompressionLevel{3}; +constexpr int cMinCompressionLevel{0}; +constexpr int cMaxCompressionLevel{9}; +constexpr uint32_t cDefaultDictionarySize{LZMA_DICT_SIZE_DEFAULT}; +} // namespace clp::streaming_compression::lzma + +#endif // STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 0fbae9e3a..9f0df9306 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -4,6 +4,8 @@ #include <memory> #include <numeric> #include <string> +#include <string_view> +#include <utility> #include <boost/filesystem/operations.hpp> #include <Catch2/single_include/catch2/catch.hpp> @@ -15,6 +17,7 @@ #include "../src/clp/ReadOnlyMemoryMappedFile.hpp" #include "../src/clp/streaming_compression/Compressor.hpp" #include "../src/clp/streaming_compression/Decompressor.hpp" +#include "../src/clp/streaming_compression/lzma/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Decompressor.hpp" #include "../src/clp/streaming_compression/zstd/Compressor.hpp" @@ -25,56 +28,48 @@ using clp::ErrorCode_Success; using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; +using std::string; +using std::string_view; -TEST_CASE("StreamingCompression", "[StreamingCompression]") { - // Initialize constants - constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB - constexpr auto cCompressionChunkSizes = std::to_array<size_t>( - {cBufferSize / 100, - cBufferSize / 50, - cBufferSize / 25, - cBufferSize / 10, - cBufferSize / 5, - cBufferSize / 2, - cBufferSize} - ); - constexpr size_t cAlphabetLength{26}; - std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; - - // Initialize compression devices - std::unique_ptr<Compressor> compressor; - std::unique_ptr<Decompressor> decompressor; - - SECTION("ZStd single phase compression") { - compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>(); - decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>(); - } - - SECTION("Passthrough compression") { - compressor = std::make_unique<clp::streaming_compression::passthrough::Compressor>(); - decompressor = std::make_unique<clp::streaming_compression::passthrough::Decompressor>(); - } +namespace { +constexpr string_view cCompressedFilePath{"test_streaming_compressed_file.bin"}; +constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB +constexpr auto cCompressionChunkSizes = std::to_array<size_t>( + {0, + cBufferSize / 100, + cBufferSize / 50, + cBufferSize / 25, + cBufferSize / 10, + cBufferSize / 5, + cBufferSize / 2, + cBufferSize} +); - // Initialize buffers - Array<char> uncompressed_buffer{cBufferSize}; - for (size_t i{0}; i < cBufferSize; ++i) { - uncompressed_buffer.at(i) = static_cast<char>(('a' + (i % cAlphabetLength))); - } +auto compress(std::unique_ptr<Compressor> compressor, char const* src) -> void; - Array<char> decompressed_buffer{cBufferSize}; +auto decompress_and_compare( + std::unique_ptr<Decompressor> decompressor, + Array<char> const& uncompressed_buffer, + Array<char>& decompressed_buffer +) -> void; - // Compress +auto compress(std::unique_ptr<Compressor> compressor, char const* src) -> void { FileWriter file_writer; - file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); + file_writer.open(string(cCompressedFilePath), FileWriter::OpenMode::CREATE_FOR_WRITING); compressor->open(file_writer); for (auto const chunk_size : cCompressionChunkSizes) { - compressor->write(uncompressed_buffer.data(), chunk_size); + compressor->write(src, chunk_size); } compressor->close(); file_writer.close(); +} - // Decompress and compare - clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{compressed_file_path}; +auto decompress_and_compare( + std::unique_ptr<Decompressor> decompressor, + Array<char> const& uncompressed_buffer, + Array<char>& decompressed_buffer +) -> void { + clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{string(cCompressedFilePath)}; auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; decompressor->open(compressed_file_view.data(), compressed_file_view.size()); @@ -98,7 +93,6 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { num_uncompressed_bytes += chunk_size; } - // Sanity check REQUIRE( (std::accumulate( cCompressionChunkSizes.cbegin(), @@ -107,7 +101,39 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { ) == num_uncompressed_bytes) ); +} +} // namespace + +TEST_CASE("StreamingCompression", "[StreamingCompression]") { + constexpr size_t cAlphabetLength{26}; + + std::unique_ptr<Compressor> compressor; + std::unique_ptr<Decompressor> decompressor; + + Array<char> decompressed_buffer{cBufferSize}; + Array<char> uncompressed_buffer{cBufferSize}; + for (size_t i{0}; i < cBufferSize; ++i) { + uncompressed_buffer.at(i) = static_cast<char>(('a' + (i % cAlphabetLength))); + } + + SECTION("ZStd single phase compression") { + compressor = std::make_unique<clp::streaming_compression::zstd::Compressor>(); + compress(std::move(compressor), uncompressed_buffer.data()); + decompressor = std::make_unique<clp::streaming_compression::zstd::Decompressor>(); + decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer); + } + + SECTION("Passthrough compression") { + compressor = std::make_unique<clp::streaming_compression::passthrough::Compressor>(); + compress(std::move(compressor), uncompressed_buffer.data()); + decompressor = std::make_unique<clp::streaming_compression::passthrough::Decompressor>(); + decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer); + } + + SECTION("LZMA compression") { + compressor = std::make_unique<clp::streaming_compression::lzma::Compressor>(); + compress(std::move(compressor), uncompressed_buffer.data()); + } - // Cleanup - boost::filesystem::remove(compressed_file_path); + boost::filesystem::remove(string(cCompressedFilePath)); } diff --git a/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh index 66ea4ac4f..c51a521c1 100755 --- a/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh @@ -18,4 +18,5 @@ dnf install -y \ libzstd-devel \ make \ mariadb-connector-c-devel \ - openssl-devel + openssl-devel \ + xz-devel diff --git a/components/core/tools/scripts/lib_install/liblzma.sh b/components/core/tools/scripts/lib_install/liblzma.sh new file mode 100755 index 000000000..a73ff79b9 --- /dev/null +++ b/components/core/tools/scripts/lib_install/liblzma.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Exit on any error +set -e + +# Error on undefined variable +set -u + +# Dependencies: +# - curl +# - make +# - gcc +# NOTE: Dependencies should be installed outside the script to allow the script to be largely distro-agnostic + +for cmd in curl make gcc; do + if ! $cmd --version >/dev/null 2>&1; then + echo "Error: Required dependency '$cmd' not found" + exit 1 + fi +done + +cUsage="Usage: ${BASH_SOURCE[0]} <version>[ <.deb output directory>]" +if [ "$#" -lt 1 ] ; then + echo $cUsage + exit +fi +version=$1 + +package_name=liblzma +temp_dir=/tmp/${package_name}-installation +deb_output_dir=${temp_dir} +if [[ "$#" -gt 1 ]] ; then + deb_output_dir="$(readlink -f "$2")" + if [ ! -d ${deb_output_dir} ] ; then + echo "${deb_output_dir} does not exist or is not a directory" + exit + fi +fi + +# Note: we won't check if the package already exists + +# Get number of cpu cores +num_cpus=$(grep -c ^processor /proc/cpuinfo) + +# Download +mkdir -p $temp_dir +cd $temp_dir +extracted_dir=${temp_dir}/xz-${version} +if [ ! -e ${extracted_dir} ] ; then + tar_filename=xz-${version}.tar.gz + if [ ! -e ${tar_filename} ] ; then + curl -fsSL https://github.com/tukaani-project/xz/releases/download/v${version}/${tar_filename} -o ${tar_filename} + fi + tar -xf ${tar_filename} +fi + +# Build +cd ${extracted_dir} +mkdir build +cd build +cmake -DCMAKE_POSITION_INDEPENDENT_CODE=TRUE ../ +make -j${num_cpus} +make install liblzma + +# Clean up +rm -rf $temp_dir diff --git a/components/core/tools/scripts/lib_install/macos/install-all.sh b/components/core/tools/scripts/lib_install/macos/install-all.sh index 97e41903d..cb24dd054 100755 --- a/components/core/tools/scripts/lib_install/macos/install-all.sh +++ b/components/core/tools/scripts/lib_install/macos/install-all.sh @@ -21,6 +21,7 @@ brew install \ mongo-cxx-driver \ msgpack-cxx \ spdlog \ + xz \ zstd # Install pkg-config if it isn't already installed diff --git a/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh b/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh index 035c5f4da..839f6d3c3 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh @@ -14,6 +14,7 @@ lib_install_scripts_dir=$script_dir/.. "$lib_install_scripts_dir"/fmtlib.sh 8.0.1 "$lib_install_scripts_dir"/libarchive.sh 3.5.1 +"$lib_install_scripts_dir"/liblzma.sh 5.4.6 "$lib_install_scripts_dir"/lz4.sh 1.8.2 "$lib_install_scripts_dir"/mongocxx.sh 3.10.2 "$lib_install_scripts_dir"/msgpack.sh 7.0.0 diff --git a/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh index 8997ffe01..3ea3b3ed5 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh @@ -20,6 +20,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \ jq \ libcurl4 \ libcurl4-openssl-dev \ + liblzma-dev \ libmariadb-dev \ libssl-dev \ make \ diff --git a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh index 035c5f4da..839f6d3c3 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh @@ -14,6 +14,7 @@ lib_install_scripts_dir=$script_dir/.. "$lib_install_scripts_dir"/fmtlib.sh 8.0.1 "$lib_install_scripts_dir"/libarchive.sh 3.5.1 +"$lib_install_scripts_dir"/liblzma.sh 5.4.6 "$lib_install_scripts_dir"/lz4.sh 1.8.2 "$lib_install_scripts_dir"/mongocxx.sh 3.10.2 "$lib_install_scripts_dir"/msgpack.sh 7.0.0 diff --git a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh index 4a71a122c..ea055ffdf 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh @@ -17,6 +17,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \ jq \ libcurl4 \ libcurl4-openssl-dev \ + liblzma-dev \ libmariadb-dev \ libssl-dev \ openjdk-11-jdk \