From 6269b292e1cc2604156a4e73783fcd8afb7cbc55 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Fri, 22 Nov 2024 19:03:48 -0500 Subject: [PATCH 01/10] Add deserializer implementation --- CMakeLists.txt | 2 + clp_ffi_py/ir/__init__.py | 1 + clp_ffi_py/ir/native.pyi | 9 + src/clp_ffi_py/PyObjectCast.hpp | 2 + src/clp_ffi_py/ir/native/PyDeserializer.cpp | 281 ++++++++++++++++++ src/clp_ffi_py/ir/native/PyDeserializer.hpp | 255 ++++++++++++++++ .../ir/native/PyKeyValuePairLogEvent.cpp | 15 + .../ir/native/PyKeyValuePairLogEvent.hpp | 10 + src/clp_ffi_py/modules/ir_native.cpp | 6 + tests/test_ir/test_serder.py | 175 +++++++++++ 10 files changed, 756 insertions(+) create mode 100644 src/clp_ffi_py/ir/native/PyDeserializer.cpp create mode 100644 src/clp_ffi_py/ir/native/PyDeserializer.hpp create mode 100644 tests/test_ir/test_serder.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 48c6ebd3..426b3e65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -99,6 +99,8 @@ set(CLP_FFI_PY_LIB_IR_SOURCES ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/LogEvent.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/Metadata.cpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/Metadata.hpp + ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyDeserializer.cpp + ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyDeserializer.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyDeserializerBuffer.cpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyDeserializerBuffer.hpp ${CLP_FFI_PY_LIB_SRC_DIR}/ir/native/PyFourByteDeserializer.cpp diff --git a/clp_ffi_py/ir/__init__.py b/clp_ffi_py/ir/__init__.py index b89ffda4..567574d9 100644 --- a/clp_ffi_py/ir/__init__.py +++ b/clp_ffi_py/ir/__init__.py @@ -8,6 +8,7 @@ __all__: List[str] = [ "Decoder", # native_deprecated "DecoderBuffer", # native_deprecated + "Deserializer", # Native "DeserializerBuffer", # native "FourByteDeserializer", # native "FourByteEncoder", # native_deprecated diff --git a/clp_ffi_py/ir/native.pyi b/clp_ffi_py/ir/native.pyi index 7ba19f14..d2e6471b 100644 --- a/clp_ffi_py/ir/native.pyi +++ b/clp_ffi_py/ir/native.pyi @@ -103,4 +103,13 @@ class Serializer: def flush(self) -> None: ... def close(self) -> None: ... +class Deserializer: + def __init__( + self, + input_stream: IO[bytes], + buffer_capacity: int = 65536, + allow_incomplete_stream: bool = False, + ): ... + def deserialize_log_event(self) -> Optional[KeyValuePairLogEvent]: ... + class IncompleteStreamError(Exception): ... diff --git a/src/clp_ffi_py/PyObjectCast.hpp b/src/clp_ffi_py/PyObjectCast.hpp index 196c47f8..a7338fea 100644 --- a/src/clp_ffi_py/PyObjectCast.hpp +++ b/src/clp_ffi_py/PyObjectCast.hpp @@ -112,6 +112,7 @@ auto py_reinterpret_cast(Src* src) noexcept -> Dst* { } namespace ir::native { +class PyDeserializer; class PyDeserializerBuffer; class PyFourByteDeserializer; class PyKeyValuePairLogEvent; @@ -121,6 +122,7 @@ class PyQuery; class PySerializer; } // namespace ir::native +CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDeserializer); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyDeserializerBuffer); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyFourByteDeserializer); CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PyKeyValuePairLogEvent); diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp new file mode 100644 index 00000000..facf0d9c --- /dev/null +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -0,0 +1,281 @@ +#include // Must always be included before any other header files + +#include "PyDeserializer.hpp" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace clp_ffi_py::ir::native { +using clp::ffi::ir_stream::IRErrorCode; +using clp::ffi::ir_stream::IrUnitType; + +namespace { +/** + * Callback of `PyDeserializer`'s `__init__` method: + */ +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyDoc_STRVAR( + cPyDeserializerDoc, + "Deserializer for deserializing CLP key-value pair IR streams.\n" + "This class deserializes log events from a CLP key-value pair IR stream.\n\n" + "__init__(self, input_stream, buffer_capacity=65536, allow_incomplete_stream=False)\n\n" + "Initializes a :class:`Deserializer` instance with the given inputs. Note that each" + " object should only be initialized once. Double initialization will result in a memory" + " leak.\n\n" + ":param input_stream: Input stream that contains serialized CLP IR.\n" + ":type input_stream: IO[bytes]\n" + ":param buffer_capacity: The capacity used to initialize the underlying read buffer.\n" + ":type buffer_capacity: int\n" + ":param allow_incomplete_stream: If set to `True`, an incomplete CLP IR stream is not" + " treated as an error.\n" + ":type allow_incomplete_stream: bool\n" +); +CLP_FFI_PY_METHOD auto +PyDeserializer_init(PyDeserializer* self, PyObject* args, PyObject* keywords) -> int; + +/** + * Callback of `PyDeserializer`'s `deserialize_log_event`. + */ +PyDoc_STRVAR( + cPyDeserializerDeserializeLogEventDoc, + "deserialize_log_event(self)\n" + "--\n\n" + "Deserializes the IR stream until the next log event has been deserialized.\n\n" + ":return:\n" + " - The next deserialized log event from the IR stream.\n" + " - None if there are no more log events in the stream.\n" + ":rtype: :class:`KeyValuePairLogEvent` | None\n" + ":raises: Appropriate exceptions with detailed information on any encountered failure.\n" +); +CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self) -> PyObject*; + +/** + * Callback of `PyDeserializer`'s deallocator. + */ +CLP_FFI_PY_METHOD auto PyDeserializer_dealloc(PyDeserializer* self) -> void; + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays) +PyMethodDef PyDeserializer_method_table[]{ + {"deserialize_log_event", + py_c_function_cast(PyDeserializer_deserialize_log_event), + METH_NOARGS, + static_cast(cPyDeserializerDeserializeLogEventDoc)}, + + {nullptr} +}; + +// NOLINTBEGIN(cppcoreguidelines-avoid-c-arrays, cppcoreguidelines-pro-type-*-cast) +PyType_Slot PyDeserializer_slots[]{ + {Py_tp_alloc, reinterpret_cast(PyType_GenericAlloc)}, + {Py_tp_dealloc, reinterpret_cast(PyDeserializer_dealloc)}, + {Py_tp_new, reinterpret_cast(PyType_GenericNew)}, + {Py_tp_init, reinterpret_cast(PyDeserializer_init)}, + {Py_tp_methods, static_cast(PyDeserializer_method_table)}, + {Py_tp_doc, const_cast(static_cast(cPyDeserializerDoc))}, + {0, nullptr} +}; +// NOLINTEND(cppcoreguidelines-avoid-c-arrays, cppcoreguidelines-pro-type-*-cast) + +/** + * `PyDeserializer`'s Python type specifications. + */ +PyType_Spec PyDeserializer_type_spec{ + "clp_ffi_py.ir.native.Deserializer", + sizeof(PyDeserializer), + 0, + Py_TPFLAGS_DEFAULT, + static_cast(PyDeserializer_slots) +}; + +CLP_FFI_PY_METHOD auto +PyDeserializer_init(PyDeserializer* self, PyObject* args, PyObject* keywords) -> int { + static char keyword_input_stream[]{"input_stream"}; + static char keyword_buffer_capacity[]{"buffer_capacity"}; + static char keyword_allow_incomplete_stream[]{"allow_incomplete_stream"}; + static char* keyword_table[]{ + static_cast(keyword_input_stream), + static_cast(keyword_buffer_capacity), + static_cast(keyword_allow_incomplete_stream), + nullptr + }; + + // If the argument parsing fails, `self` will be deallocated. We must reset all pointers to + // nullptr in advance, otherwise the deallocator might trigger segmentation fault. + self->default_init(); + + PyObject* input_stream{}; + Py_ssize_t buffer_capacity{PyDeserializer::cDefaultBufferCapacity}; + int allow_incomplete_stream{0}; + if (false + == static_cast(PyArg_ParseTupleAndKeywords( + args, + keywords, + "O|Lp", + static_cast(keyword_table), + &input_stream, + &buffer_capacity, + &allow_incomplete_stream + ))) + { + return -1; + } + + if (false + == self->init(input_stream, buffer_capacity, static_cast(allow_incomplete_stream))) + { + return -1; + } + + return 0; +} + +CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self) -> PyObject* { + return self->deserialize_log_event(); +} + +CLP_FFI_PY_METHOD auto PyDeserializer_dealloc(PyDeserializer* self) -> void { + self->clean(); + Py_TYPE(self)->tp_free(py_reinterpret_cast(self)); +} +} // namespace + +auto PyDeserializer::init( + PyObject* input_stream, + Py_ssize_t buffer_capacity, + bool allow_incomplete_stream +) -> bool { + m_allow_incomplete_stream = allow_incomplete_stream; + m_deserializer_buffer_reader = DeserializerBufferReader::create(input_stream, buffer_capacity); + if (nullptr == m_deserializer_buffer_reader) { + return false; + } + + try { + PyDeserializer::IrUnitHandler::LogEventHandle log_event_handle + = [this](clp::ffi::KeyValuePairLogEvent&& kv_log_event) -> IRErrorCode { + return this->handle_log_event(std::move(kv_log_event)); + }; + + PyDeserializer::IrUnitHandler::UtcOffsetChangeHandle trivial_utc_offset_handle + = []([[maybe_unused]] clp::UtcOffset, [[maybe_unused]] clp::UtcOffset + ) -> IRErrorCode { return IRErrorCode::IRErrorCode_Success; }; + + PyDeserializer::IrUnitHandler::SchemaTreeNodeInsertionHandle + trivial_schema_tree_node_insertion_handle + = []([[maybe_unused]] clp::ffi::SchemaTree::NodeLocator) -> IRErrorCode { + return IRErrorCode::IRErrorCode_Success; + }; + + PyDeserializer::IrUnitHandler::EndOfStreamHandle end_of_stream_handle + = [this]() -> IRErrorCode { return this->handle_end_of_stream(); }; + + auto deserializer_result{Deserializer::create( + *m_deserializer_buffer_reader, + {std::move(log_event_handle), + std::move(trivial_utc_offset_handle), + std::move(trivial_schema_tree_node_insertion_handle), + std::move(end_of_stream_handle)} + )}; + if (deserializer_result.has_error()) { + PyErr_Format( + PyExc_RuntimeError, + cDeserializerCreateErrorFormatStr.data(), + deserializer_result.error().message().c_str() + ); + return false; + } + m_deserializer = new clp::ffi::ir_stream::Deserializer{ + std::move(deserializer_result.value()) + }; + } catch (clp::TraceableException& exception) { + handle_traceable_exception(exception); + return false; + } + + return true; +} + +auto PyDeserializer::deserialize_log_event() -> PyObject* { + try { + while (false == is_stream_complete()) { + auto const ir_unit_type_result{ + m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) + }; + if (ir_unit_type_result.has_error()) { + if (false == handle_incomplete_ir_error(ir_unit_type_result.error())) { + return nullptr; + } + break; + } + auto const ir_unit_type{ir_unit_type_result.value()}; + if (IrUnitType::LogEvent == ir_unit_type && has_unreleased_deserialized_log_event()) { + return py_reinterpret_cast( + PyKeyValuePairLogEvent::create(release_deserialized_log_event()) + ); + } + } + } catch (clp::TraceableException& exception) { + handle_traceable_exception(exception); + return nullptr; + } + + Py_RETURN_NONE; +} + +auto PyDeserializer::module_level_init(PyObject* py_module) -> bool { + static_assert(std::is_trivially_destructible()); + auto* type{py_reinterpret_cast(PyType_FromSpec(&PyDeserializer_type_spec))}; + m_py_type.reset(type); + if (nullptr == type) { + return false; + } + return add_python_type(get_py_type(), "Deserializer", py_module); +} + +auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event) -> IRErrorCode { + if (has_unreleased_deserialized_log_event()) { + release_deserialized_log_event(); + } + m_deserialized_log_event = new clp::ffi::KeyValuePairLogEvent{std::move(log_event)}; + return IRErrorCode::IRErrorCode_Success; +} + +auto PyDeserializer::handle_incomplete_ir_error(std::error_code err) -> bool { + if (std::errc::result_out_of_range == err || std::errc::no_message_available == err) { + if (m_allow_incomplete_stream) { + handle_end_of_stream(); + return true; + } + PyErr_SetString( + PyDeserializerBuffer::get_py_incomplete_stream_error(), + cDeserializerIncompleteIRError + ); + return false; + } + PyErr_Format( + PyExc_RuntimeError, + cDeserializerDeserializeNextIrUnitErrorFormatStr.data(), + err.message().c_str() + ); + return false; +} +} // namespace clp_ffi_py::ir::native diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp new file mode 100644 index 00000000..04961590 --- /dev/null +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -0,0 +1,255 @@ +#ifndef CLP_FFI_PY_IR_NATIVE_PYDESERIALIZER_HPP +#define CLP_FFI_PY_IR_NATIVE_PYDESERIALIZER_HPP + +#include // Must always be included before any other header files + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace clp_ffi_py::ir::native { +/** + * A PyObject structure for deserializing CLP key-value pair IR stream. The underlying deserializer + * is pointed by `m_deserializer`, which reads the IR stream from a Python `IO[byte]` object via + * `DeserializerBufferReader`. + */ +class PyDeserializer { +public: + /** + * The default buffer capacity for the underlying deserializer buffer reader. Any change to the + * value should also be applied to `__init__`'s doc string and Python stub file. + */ + static constexpr Py_ssize_t cDefaultBufferCapacity{65'536}; + + // Delete default constructor to disable direct instantiation. + PyDeserializer() = delete; + + // Delete copy/move constructors and assignments + PyDeserializer(PyDeserializer const&) = delete; + PyDeserializer(PyDeserializer&&) = delete; + auto operator=(PyDeserializer const&) -> PyDeserializer& = delete; + auto operator=(PyDeserializer&&) -> PyDeserializer& = delete; + + // Destructor + ~PyDeserializer() = default; + + // Methods + /** + * Since the memory allocation of `PyDeserializer` is handled by CPython's allocator, cpp + * constructors will not be explicitly called. This function serves as the default constructor + * to initialize the underlying deserializer and deserializer buffer reader. Other data members + * are assumed to be zero-initialized by `default-init` method. It has to be manually called + * whenever creating a new `PyDeserializer` object through CPython APIs. + * @param input_stream The input IR stream. Must be a Python `IO[byte]` object. + * @param buffer_capacity The buffer capacity used to initialize the underlying + * `PyDeserializerBufferReader`. + * @param allow_incomplete_stream Whether to treat an incomplete CLP IR stream as an error. When + * set to `true`, an incomplete stream is interpreted as the end of the stream without raising + * an exception. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto + init(PyObject* input_stream, Py_ssize_t buffer_capacity, bool allow_incomplete_stream) -> bool; + + /** + * Zero-initializes all the data members in `PyDeserializer`. Should be called once the + * object is allocated. + */ + auto default_init() -> void { + m_end_of_stream_reached = false; + m_allow_incomplete_stream = false; + m_deserializer_buffer_reader = nullptr; + m_deserializer = nullptr; + m_deserialized_log_event = nullptr; + } + + /** + * Releases the memory allocated for the underlying data fields. + */ + auto clean() -> void { + delete m_deserializer; + delete m_deserializer_buffer_reader; + if (has_unreleased_deserialized_log_event()) { + release_deserialized_log_event(); + } + } + + /** + * Deserializes to the next key value pair log event from the IR stream. + * @return A new reference to a `KeyValuePairLogEvent` object representing the deserialized log + * event on success. + * @return A new reference to `Py_None` when the end of IR stream is reached. + * @return nullptr on failure with the relevant Python exception and error set. + */ + [[nodiscard]] auto deserialize_log_event() -> PyObject*; + + /** + * Gets the `PyTypeObject` that represents `PyDeserializer`'s Python type. This type is + * dynamically created and initialized during the execution of + * `PyDeserializer::module_level_init`. + * @return Python type object associated with `PyDeserializer`. + */ + [[nodiscard]] static auto get_py_type() -> PyTypeObject* { return m_py_type.get(); } + + /** + * Creates and initializes `PyDeserializer` as a Python type, and then incorporates this + * type as a Python object into the py_module module. + * @param py_module This is the Python module where the initialized `PyDeserializer` will be + * incorporated. + * @return true on success. + * @return false on failure with the relevant Python exception and error set. + */ + [[nodiscard]] static auto module_level_init(PyObject* py_module) -> bool; + +private: + /** + * Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for deserializing + * key-value pair log events using user-defined handlers. + */ + class IrUnitHandler { + public: + // Types + using LogEventHandle + = std::function; + using UtcOffsetChangeHandle + = std::function; + using SchemaTreeNodeInsertionHandle + = std::function; + using EndOfStreamHandle = std::function; + + // Constructor + IrUnitHandler( + LogEventHandle log_event_handle, + UtcOffsetChangeHandle utc_offset_handle, + SchemaTreeNodeInsertionHandle schema_tree_insertion_handle, + EndOfStreamHandle end_of_stream_handle + ) + : m_log_event_handle{std::move(log_event_handle)}, + m_utc_offset_change_handle{std::move(utc_offset_handle)}, + m_schema_tree_node_insertion_handle{std::move(schema_tree_insertion_handle)}, + m_end_of_stream_handle{std::move(end_of_stream_handle)} {} + + // Delete copy constructor and assignment + IrUnitHandler(IrUnitHandler const&) = delete; + auto operator=(IrUnitHandler const&) -> IrUnitHandler& = delete; + + // Default move constructor and assignment + IrUnitHandler(IrUnitHandler&&) = default; + auto operator=(IrUnitHandler&&) -> IrUnitHandler& = default; + + // Destructor + ~IrUnitHandler() = default; + + // Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface + [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode { + return m_log_event_handle(std::move(log_event)); + } + + [[nodiscard]] auto handle_utc_offset_change( + clp::UtcOffset utc_offset_old, + clp::UtcOffset utc_offset_new + ) -> clp::ffi::ir_stream::IRErrorCode { + return m_utc_offset_change_handle(utc_offset_old, utc_offset_new); + } + + [[nodiscard]] auto handle_schema_tree_node_insertion( + clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> clp::ffi::ir_stream::IRErrorCode { + return m_schema_tree_node_insertion_handle(schema_tree_node_locator); + } + + [[nodiscard]] auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + return m_end_of_stream_handle(); + } + + private: + // Variables + LogEventHandle m_log_event_handle; + UtcOffsetChangeHandle m_utc_offset_change_handle; + SchemaTreeNodeInsertionHandle m_schema_tree_node_insertion_handle; + EndOfStreamHandle m_end_of_stream_handle; + }; + + using Deserializer = clp::ffi::ir_stream::Deserializer; + + // Methods + /** + * Implements `IrUnitHandler::EndOfStreamHandle`. + * This handle function sets the underlying `m_end_of_stream_reached` to true. + * @return IRErrorCode::IRErrorCode_Success on success. + */ + [[maybe_unused]] auto handle_end_of_stream() -> clp::ffi::ir_stream::IRErrorCode { + m_end_of_stream_reached = true; + return clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; + } + + /** + * Implements `IrUnitHandler::LogEventHandle`. + * This handle function sets the underlying `m_deserialized_log_event` with the given input. + * @param kv_log_event + * @return IRErrorCode::IRErrorCode_Success on success. + * + */ + [[nodiscard]] auto handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event + ) -> clp::ffi::ir_stream::IRErrorCode; + + /** + * @return Whether `m_deserialized_log_event` has been set. + */ + [[nodiscard]] auto has_unreleased_deserialized_log_event() const -> bool { + return nullptr != m_deserialized_log_event; + } + + /** + * Releases the ownership of the underlying deserialized log event as a rvalue reference. + * NOTE: this method doesn't check whether the ownership is empty (nullptr). The caller must + * ensure the ownership is legal. + * @return The released ownership of the deserialized log event. + */ + [[maybe_unused]] auto release_deserialized_log_event() -> clp::ffi::KeyValuePairLogEvent { + auto released{std::move(*m_deserialized_log_event)}; + delete m_deserialized_log_event; + m_deserialized_log_event = nullptr; + return released; + } + + /** + * Handles the incomplete IR error returned from `Deserializer::deserialize_next_ir_unit`. + * @param err + * @return true if the error is resolved on success. + * @return false with the relevant Python exception and error set on failures: + * - The error doesn't indicate an incomplete IR stream. + * - Incomplete IR stream is not allowed. + */ + [[nodiscard]] auto handle_incomplete_ir_error(std::error_code err) -> bool; + + [[nodiscard]] auto is_stream_complete() const -> bool { return m_end_of_stream_reached; } + + // Variables + PyObject_HEAD; + bool m_end_of_stream_reached; + bool m_allow_incomplete_stream; + // NOLINTBEGIN(cppcoreguidelines-owning-memory) + gsl::owner m_deserializer_buffer_reader; + gsl::owner m_deserializer; + gsl::owner m_deserialized_log_event; + // NOLINTEND(cppcoreguidelines-owning-memory) + + static inline PyObjectStaticPtr m_py_type{nullptr}; +}; +} // namespace clp_ffi_py::ir::native + +#endif // CLP_FFI_PY_IR_NATIVE_PYDESERIALIZER_HPP diff --git a/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.cpp b/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.cpp index 437429bb..e4dbd257 100644 --- a/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.cpp +++ b/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.cpp @@ -325,6 +325,21 @@ auto PyKeyValuePairLogEvent::init(clp::ffi::KeyValuePairLogEvent kv_pair_log_eve return true; } +auto PyKeyValuePairLogEvent::create(clp::ffi::KeyValuePairLogEvent kv_log_event +) -> PyKeyValuePairLogEvent* { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-cstyle-cast) + PyKeyValuePairLogEvent* self{PyObject_New(PyKeyValuePairLogEvent, get_py_type())}; + if (nullptr == self) { + PyErr_SetString(PyExc_MemoryError, clp_ffi_py::cOutofMemoryError); + return nullptr; + } + self->default_init(); + if (false == self->init(std::move(kv_log_event))) { + return nullptr; + } + return self; +} + auto PyKeyValuePairLogEvent::get_py_type() -> PyTypeObject* { return m_py_type.get(); } diff --git a/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp b/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp index 1003059c..37005b60 100644 --- a/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp +++ b/src/clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp @@ -57,6 +57,16 @@ class PyKeyValuePairLogEvent { return static_cast(m_kv_pair_log_event); } + /** + * CPython-level factory function. + * @param kv_log_event + * @return a new reference of a `PyKeyValuePairLogEvent` object that is initialized with the + * given kv log event. + * @return nullptr on failure with the relevant Python exception and error set. + */ + [[nodiscard]] static auto create(clp::ffi::KeyValuePairLogEvent kv_log_event + ) -> PyKeyValuePairLogEvent*; + /** * Gets the `PyTypeObject` that represents `PyKeyValuePair`'s Python type. This type is * dynamically created and initialized during the execution of `module_level_init`. diff --git a/src/clp_ffi_py/modules/ir_native.cpp b/src/clp_ffi_py/modules/ir_native.cpp index c2b802b4..11be2ab8 100644 --- a/src/clp_ffi_py/modules/ir_native.cpp +++ b/src/clp_ffi_py/modules/ir_native.cpp @@ -1,5 +1,6 @@ #include // Must always be included before any other header files +#include #include #include #include @@ -81,5 +82,10 @@ PyMODINIT_FUNC PyInit_native() { return nullptr; } + if (false == clp_ffi_py::ir::native::PyDeserializer::module_level_init(new_module)) { + Py_DECREF(new_module); + return nullptr; + } + return new_module; } diff --git a/tests/test_ir/test_serder.py b/tests/test_ir/test_serder.py new file mode 100644 index 00000000..0162991c --- /dev/null +++ b/tests/test_ir/test_serder.py @@ -0,0 +1,175 @@ +from pathlib import Path +from typing import Any, Dict, IO, List, Optional + +from smart_open import open # type: ignore +from test_ir.test_utils import JsonLinesFileReader, TestCLPBase + +from clp_ffi_py.ir import Deserializer, IncompleteStreamError, KeyValuePairLogEvent, Serializer +from clp_ffi_py.utils import serialize_dict_to_msgpack + +LOG_DIR: Path = Path("unittest-logs") + + +class TestCaseSerDerBase(TestCLPBase): + """ + Class for testing serialization and deserialization of CLP key-value pair IR stream. + """ + + test_data_dir: Path = Path(__file__).resolve().parent / "test_data/jsonl" + + enable_compression: bool + generate_incomplete_ir: bool + + ir_stream_path_prefix: str + ir_stream_path_postfix: str + + # override + @classmethod + def setUpClass(cls) -> None: + if not LOG_DIR.exists(): + LOG_DIR.mkdir(parents=True, exist_ok=True) + assert LOG_DIR.is_dir() + + # override + def setUp(self) -> None: + self.ir_stream_path_prefix: str = f"{self.id()}" + file_extension_name: str = "clp.zst" if self.enable_compression else "clp" + self.ir_stream_path_postfix: str = ( + f"incomplete.{file_extension_name}" + if self.generate_incomplete_ir + else file_extension_name + ) + + def _serialize(self, inputs: List[Dict[Any, Any]], ir_stream_path: Path) -> None: + """ + Serializes the given JSON Lines file into CLP key-value pair IR stream. + + :param inputs: A list of dictionary objects to serialize. + :param ir_stream_path: Path to the output file that the serializer writes to. + """ + file_stream: IO[bytes] = open(ir_stream_path, "wb") + serializer: Serializer = Serializer(file_stream) + for input_dict in inputs: + serializer.serialize_log_event_from_msgpack_map(serialize_dict_to_msgpack(input_dict)) + serializer.flush() + if self.generate_incomplete_ir: + file_stream.close() + return + serializer.close() + + def _deserialize( + self, + ir_stream_path: Path, + buffer_capacity: int, + allow_incomplete_ir_stream: bool, + expected_outputs: List[Dict[Any, Any]], + ) -> None: + """ + Deserializes the input CLP key-value pair IR stream and compare the deserialized log events + with the given expected outputs. + + :param ir_stream_path: Path to the input file that the deserializers reads from. + :param buffer_capacity: Buffer capacity used to create the deserializer. + :param allow_incomplete_ir_stream: Whether to allow incomplete IR streams. + :param expected_outputs: A list of dictionary objects as the expected outputs. + """ + input_stream: IO[bytes] = open(ir_stream_path, "rb") + deserializer: Deserializer = Deserializer( + input_stream, + allow_incomplete_stream=allow_incomplete_ir_stream, + buffer_capacity=buffer_capacity, + ) + for expected in expected_outputs: + actual: Optional[KeyValuePairLogEvent] = deserializer.deserialize_log_event() + self.assertNotEqual(actual, None) + assert actual is not None # To silent mypy + self.assertEqual(actual.to_dict(), expected) + + if not self.generate_incomplete_ir or allow_incomplete_ir_stream: + self.assertEqual(None, deserializer.deserialize_log_event()) + return + + with self.assertRaises(IncompleteStreamError): + deserializer.deserialize_log_event() + + def _get_ir_stream_path( + self, + jsonl_path: Path, + ) -> Path: + ir_stream_path: Path = LOG_DIR / Path( + f"{self.ir_stream_path_prefix}.{str(jsonl_path.stem)}.{self.ir_stream_path_postfix}" + ) + if ir_stream_path.exists(): + ir_stream_path.unlink() + return ir_stream_path + + def test_serder(self) -> None: + """ + Tests serializing and deserializing CLP key-value pair IR streams. + """ + jsonl_file_paths: List[Path] = [] + + for file_path in TestCaseSerDerBase.test_data_dir.rglob("*"): + if not file_path.is_file(): + continue + jsonl_file_paths.append(file_path) + self.assertNotEqual(len(jsonl_file_paths), 0, "No test files") + + for jsonl_file_path in jsonl_file_paths: + expected_outputs: List[Dict[Any, Any]] = [] + for json_obj in JsonLinesFileReader(jsonl_file_path).read_lines(): + expected_outputs.append(json_obj) + ir_stream_path: Path = self._get_ir_stream_path(jsonl_file_path) + self._serialize(expected_outputs, ir_stream_path) + for buffer_capacity in [1, 16, 256, 4096, 65536]: + self._deserialize(ir_stream_path, buffer_capacity, False, expected_outputs) + if self.generate_incomplete_ir: + self._deserialize(ir_stream_path, 65536, True, expected_outputs) + + +class TestCaseSerDerRaw(TestCaseSerDerBase): + """ + Tests serialization and deserialization against raw IR stream. + """ + + # override + def setUp(self) -> None: + self.enable_compression = False + self.generate_incomplete_ir = False + super().setUp() + + +class TestCaseSerDerIncompleteRaw(TestCaseSerDerBase): + """ + Tests serialization and deserialization against raw incomplete IR stream. + """ + + # override + def setUp(self) -> None: + self.enable_compression = False + self.generate_incomplete_ir = True + super().setUp() + + +class TestCaseSerDerZstd(TestCaseSerDerBase): + """ + Tests serialization and deserialization against zstd-compressed IR stream. + """ + + # override + def setUp(self) -> None: + self.enable_compression = True + self.generate_incomplete_ir = False + super().setUp() + + +class TestCaseSerDerIncompleteZstd(TestCaseSerDerBase): + """ + Tests serialization and deserialization against zstd-compressed incomplete IR stream. + """ + + # override + def setUp(self) -> None: + self.enable_compression = True + self.generate_incomplete_ir = True + super().setUp() From 81625b94a093081878ef2b5e91e807ab028d81a8 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Sat, 23 Nov 2024 01:29:26 -0500 Subject: [PATCH 02/10] Apply latest coding standard for PyDeserializer --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 2 +- src/clp_ffi_py/ir/native/PyDeserializer.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index facf0d9c..81442c63 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -129,7 +129,7 @@ PyDeserializer_init(PyDeserializer* self, PyObject* args, PyObject* keywords) -> == static_cast(PyArg_ParseTupleAndKeywords( args, keywords, - "O|Lp", + "O|np", static_cast(keyword_table), &input_stream, &buffer_capacity, diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp index 04961590..fdfd77ae 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.hpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -34,7 +34,7 @@ class PyDeserializer { // Delete default constructor to disable direct instantiation. PyDeserializer() = delete; - // Delete copy/move constructors and assignments + // Delete copy & move constructors and assignment operators PyDeserializer(PyDeserializer const&) = delete; PyDeserializer(PyDeserializer&&) = delete; auto operator=(PyDeserializer const&) -> PyDeserializer& = delete; From e44f85d34591f65f96254a4d0bfed24d5be1d7f7 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Tue, 26 Nov 2024 17:50:56 -0500 Subject: [PATCH 03/10] Apply suggestions from code review Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com> --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 6 +++--- src/clp_ffi_py/ir/native/PyDeserializer.hpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 81442c63..2a0c6ccf 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -36,14 +36,14 @@ namespace { PyDoc_STRVAR( cPyDeserializerDoc, "Deserializer for deserializing CLP key-value pair IR streams.\n" - "This class deserializes log events from a CLP key-value pair IR stream.\n\n" + "This class deserializes a CLP key-value pair IR stream into log events.\n\n" "__init__(self, input_stream, buffer_capacity=65536, allow_incomplete_stream=False)\n\n" "Initializes a :class:`Deserializer` instance with the given inputs. Note that each" " object should only be initialized once. Double initialization will result in a memory" " leak.\n\n" ":param input_stream: Input stream that contains serialized CLP IR.\n" ":type input_stream: IO[bytes]\n" - ":param buffer_capacity: The capacity used to initialize the underlying read buffer.\n" + ":param buffer_capacity: The capacity of the underlying read buffer.\n" ":type buffer_capacity: int\n" ":param allow_incomplete_stream: If set to `True`, an incomplete CLP IR stream is not" " treated as an error.\n" @@ -59,7 +59,7 @@ PyDoc_STRVAR( cPyDeserializerDeserializeLogEventDoc, "deserialize_log_event(self)\n" "--\n\n" - "Deserializes the IR stream until the next log event has been deserialized.\n\n" + "Deserializes the next log event from the IR stream.\n\n" ":return:\n" " - The next deserialized log event from the IR stream.\n" " - None if there are no more log events in the stream.\n" diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp index fdfd77ae..ef3f74c3 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.hpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -86,7 +86,7 @@ class PyDeserializer { } /** - * Deserializes to the next key value pair log event from the IR stream. + * Deserializes the next key value pair log event from the IR stream. * @return A new reference to a `KeyValuePairLogEvent` object representing the deserialized log * event on success. * @return A new reference to `Py_None` when the end of IR stream is reached. From 0ad047734c9aa4d93065437a7df0bd618372d491 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 18:27:15 -0500 Subject: [PATCH 04/10] Fix typo --- clp_ffi_py/ir/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clp_ffi_py/ir/__init__.py b/clp_ffi_py/ir/__init__.py index 567574d9..f0c57377 100644 --- a/clp_ffi_py/ir/__init__.py +++ b/clp_ffi_py/ir/__init__.py @@ -8,7 +8,7 @@ __all__: List[str] = [ "Decoder", # native_deprecated "DecoderBuffer", # native_deprecated - "Deserializer", # Native + "Deserializer", # native "DeserializerBuffer", # native "FourByteDeserializer", # native "FourByteEncoder", # native_deprecated From e4a8e3f447b551e1ee18e7bf1c0fd18a5c76779e Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 18:28:15 -0500 Subject: [PATCH 05/10] Fix comment --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 2a0c6ccf..2765ea69 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -41,7 +41,7 @@ PyDoc_STRVAR( "Initializes a :class:`Deserializer` instance with the given inputs. Note that each" " object should only be initialized once. Double initialization will result in a memory" " leak.\n\n" - ":param input_stream: Input stream that contains serialized CLP IR.\n" + ":param input_stream: Serialized CLP IR stream.\n" ":type input_stream: IO[bytes]\n" ":param buffer_capacity: The capacity of the underlying read buffer.\n" ":type buffer_capacity: int\n" From 1789dfb9c448e5fe571a4c5806a27d111012e220 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 18:29:22 -0500 Subject: [PATCH 06/10] Rename is_stream_completed --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 2 +- src/clp_ffi_py/ir/native/PyDeserializer.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 2765ea69..1d68ad99 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -216,7 +216,7 @@ auto PyDeserializer::init( auto PyDeserializer::deserialize_log_event() -> PyObject* { try { - while (false == is_stream_complete()) { + while (false == is_stream_completed()) { auto const ir_unit_type_result{ m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) }; diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp index ef3f74c3..267c4530 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.hpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -236,7 +236,7 @@ class PyDeserializer { */ [[nodiscard]] auto handle_incomplete_ir_error(std::error_code err) -> bool; - [[nodiscard]] auto is_stream_complete() const -> bool { return m_end_of_stream_reached; } + [[nodiscard]] auto is_stream_completed() const -> bool { return m_end_of_stream_reached; } // Variables PyObject_HEAD; From be90f7c6c7efc8066adedc9a2d1e69adc6421db5 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 18:48:46 -0500 Subject: [PATCH 07/10] Fix release/unreleased log event --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 23 ++++++++++++--------- src/clp_ffi_py/ir/native/PyDeserializer.hpp | 14 +++++++------ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 1d68ad99..1b663ce8 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include #include @@ -16,8 +15,6 @@ #include #include -#include -#include #include #include #include @@ -217,17 +214,17 @@ auto PyDeserializer::init( auto PyDeserializer::deserialize_log_event() -> PyObject* { try { while (false == is_stream_completed()) { - auto const ir_unit_type_result{ - m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) - }; - if (ir_unit_type_result.has_error()) { + if (auto const ir_unit_type_result{ + m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) + }; + ir_unit_type_result.has_error()) + { if (false == handle_incomplete_ir_error(ir_unit_type_result.error())) { return nullptr; } break; } - auto const ir_unit_type{ir_unit_type_result.value()}; - if (IrUnitType::LogEvent == ir_unit_type && has_unreleased_deserialized_log_event()) { + if (has_unreleased_deserialized_log_event()) { return py_reinterpret_cast( PyKeyValuePairLogEvent::create(release_deserialized_log_event()) ); @@ -253,7 +250,13 @@ auto PyDeserializer::module_level_init(PyObject* py_module) -> bool { auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event) -> IRErrorCode { if (has_unreleased_deserialized_log_event()) { - release_deserialized_log_event(); + // This situation may occur if the deserializer methods return an error after the last + // successful call to `handle_log_event`. If the user resolves the error and invokes the + // deserializer methods again, the underlying deserialized log event from the previous + // failed calls remains unreleased. + // To prevent a memory leak, we must free the associated memory by clearing the last + // deserialized log event. + clear_deserialized_log_event(); } m_deserialized_log_event = new clp::ffi::KeyValuePairLogEvent{std::move(log_event)}; return IRErrorCode::IRErrorCode_Success; diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp index 267c4530..9a071479 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.hpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -80,9 +80,7 @@ class PyDeserializer { auto clean() -> void { delete m_deserializer; delete m_deserializer_buffer_reader; - if (has_unreleased_deserialized_log_event()) { - release_deserialized_log_event(); - } + clear_deserialized_log_event(); } /** @@ -219,10 +217,9 @@ class PyDeserializer { * ensure the ownership is legal. * @return The released ownership of the deserialized log event. */ - [[maybe_unused]] auto release_deserialized_log_event() -> clp::ffi::KeyValuePairLogEvent { + [[nodiscard]] auto release_deserialized_log_event() -> clp::ffi::KeyValuePairLogEvent { auto released{std::move(*m_deserialized_log_event)}; - delete m_deserialized_log_event; - m_deserialized_log_event = nullptr; + clear_deserialized_log_event(); return released; } @@ -238,6 +235,11 @@ class PyDeserializer { [[nodiscard]] auto is_stream_completed() const -> bool { return m_end_of_stream_reached; } + auto clear_deserialized_log_event() -> void { + delete m_deserialized_log_event; + m_deserialized_log_event = nullptr; + } + // Variables PyObject_HEAD; bool m_end_of_stream_reached; From 65742fa711b9de9a95ad8427f772671d903909f0 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 18:57:06 -0500 Subject: [PATCH 08/10] Clarify the returned ir unit type logic --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 1b663ce8..549e91dd 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -214,21 +214,23 @@ auto PyDeserializer::init( auto PyDeserializer::deserialize_log_event() -> PyObject* { try { while (false == is_stream_completed()) { - if (auto const ir_unit_type_result{ - m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) - }; - ir_unit_type_result.has_error()) - { + auto const ir_unit_type_result{ + m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) + }; + if (ir_unit_type_result.has_error()) { if (false == handle_incomplete_ir_error(ir_unit_type_result.error())) { return nullptr; } break; } - if (has_unreleased_deserialized_log_event()) { - return py_reinterpret_cast( - PyKeyValuePairLogEvent::create(release_deserialized_log_event()) - ); + if (IrUnitType::LogEvent != ir_unit_type_result.value() + || false == has_unreleased_deserialized_log_event()) + { + continue; } + return py_reinterpret_cast( + PyKeyValuePairLogEvent::create(release_deserialized_log_event()) + ); } } catch (clp::TraceableException& exception) { handle_traceable_exception(exception); From 5a884c7592548ab828265a51bd10ca312957d6db Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Tue, 26 Nov 2024 21:06:50 -0500 Subject: [PATCH 09/10] Add comments for clarification --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 549e91dd..0feb9220 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -223,11 +223,21 @@ auto PyDeserializer::deserialize_log_event() -> PyObject* { } break; } - if (IrUnitType::LogEvent != ir_unit_type_result.value() - || false == has_unreleased_deserialized_log_event()) - { + if (IrUnitType::LogEvent != ir_unit_type_result.value()) { continue; } + if (false == has_unreleased_deserialized_log_event()) { + // TODO: after native query is implemented, this branch could indicate the + // deserialized log event IR unit doesn't match the query and thus not set as a + // buffered deserialization result. But before that, this check is added to ensure + // we check the ownership is valid before calling `release_deserialized_log_event`. + PyErr_SetString( + PyExc_RuntimeError, + "Deserializer failed to set the underlying deserialized log event properly" + " after successfully deserializing a log event IR unit." + ); + return nullptr; + } return py_reinterpret_cast( PyKeyValuePairLogEvent::create(release_deserialized_log_event()) ); From d3bfa5175a9a62763764b0e264fa6e149752f794 Mon Sep 17 00:00:00 2001 From: LinZhihao-723 Date: Wed, 27 Nov 2024 16:22:15 -0500 Subject: [PATCH 10/10] Fix naming --- src/clp_ffi_py/ir/native/PyDeserializer.cpp | 33 +++++++++++---------- src/clp_ffi_py/ir/native/PyDeserializer.hpp | 11 +++---- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.cpp b/src/clp_ffi_py/ir/native/PyDeserializer.cpp index 0feb9220..b00166a2 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.cpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.cpp @@ -218,7 +218,16 @@ auto PyDeserializer::deserialize_log_event() -> PyObject* { m_deserializer->deserialize_next_ir_unit(*m_deserializer_buffer_reader) }; if (ir_unit_type_result.has_error()) { - if (false == handle_incomplete_ir_error(ir_unit_type_result.error())) { + auto const err{ir_unit_type_result.error()}; + if (std::errc::result_out_of_range != err) { + PyErr_Format( + PyExc_RuntimeError, + cDeserializerDeserializeNextIrUnitErrorFormatStr.data(), + err.message().c_str() + ); + return nullptr; + } + if (false == handle_incomplete_stream_error()) { return nullptr; } break; @@ -274,22 +283,14 @@ auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event return IRErrorCode::IRErrorCode_Success; } -auto PyDeserializer::handle_incomplete_ir_error(std::error_code err) -> bool { - if (std::errc::result_out_of_range == err || std::errc::no_message_available == err) { - if (m_allow_incomplete_stream) { - handle_end_of_stream(); - return true; - } - PyErr_SetString( - PyDeserializerBuffer::get_py_incomplete_stream_error(), - cDeserializerIncompleteIRError - ); - return false; +auto PyDeserializer::handle_incomplete_stream_error() -> bool { + if (m_allow_incomplete_stream) { + handle_end_of_stream(); + return true; } - PyErr_Format( - PyExc_RuntimeError, - cDeserializerDeserializeNextIrUnitErrorFormatStr.data(), - err.message().c_str() + PyErr_SetString( + PyDeserializerBuffer::get_py_incomplete_stream_error(), + cDeserializerIncompleteIRError ); return false; } diff --git a/src/clp_ffi_py/ir/native/PyDeserializer.hpp b/src/clp_ffi_py/ir/native/PyDeserializer.hpp index 9a071479..72fb438c 100644 --- a/src/clp_ffi_py/ir/native/PyDeserializer.hpp +++ b/src/clp_ffi_py/ir/native/PyDeserializer.hpp @@ -224,14 +224,11 @@ class PyDeserializer { } /** - * Handles the incomplete IR error returned from `Deserializer::deserialize_next_ir_unit`. - * @param err - * @return true if the error is resolved on success. - * @return false with the relevant Python exception and error set on failures: - * - The error doesn't indicate an incomplete IR stream. - * - Incomplete IR stream is not allowed. + * Handles the incomplete stream error returned from `Deserializer::deserialize_next_ir_unit`. + * @return true if incomplete stream is allowed. + * @return false with the relevant Python exception and error set otherwise. */ - [[nodiscard]] auto handle_incomplete_ir_error(std::error_code err) -> bool; + [[nodiscard]] auto handle_incomplete_stream_error() -> bool; [[nodiscard]] auto is_stream_completed() const -> bool { return m_end_of_stream_reached; }