From d85c3aaea1720424e38de2910b6b2fe6ff05393b Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 16 Aug 2024 13:49:27 +0100 Subject: [PATCH] Add sample reformatted files --- cpp/arcticdb/codec/codec.cpp | 1113 ++++++++--------- cpp/arcticdb/codec/codec.hpp | 94 +- .../version_store/test_aggregation.py | 24 +- 3 files changed, 607 insertions(+), 624 deletions(-) diff --git a/cpp/arcticdb/codec/codec.cpp b/cpp/arcticdb/codec/codec.cpp index 801ddbaf71..87b8ddbe95 100644 --- a/cpp/arcticdb/codec/codec.cpp +++ b/cpp/arcticdb/codec/codec.cpp @@ -1,671 +1,658 @@ /* Copyright 2023 Man Group Operations Limited * - * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * Use of this software is governed by the Business Source License 1.1 included in the + * file licenses/BSL.txt. * - * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + * As of the Change Date specified in that file, in accordance with the Business Source + * License, use of this software will be governed by the Apache License, version 2.0. */ #include -#include -#include -#include -#include #include +#include #include #include -#include -#include #include +#include +#include +#include +#include +#include -#include #include +#include #include namespace arcticdb { constexpr TypeDescriptor metadata_type_desc() { - return TypeDescriptor{ - DataType::UINT8, Dimension::Dim1 - }; -} - -SizeResult max_compressed_size_dispatch( - const SegmentInMemory& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec &codec_opts, - EncodingVersion encoding_version) { - if(encoding_version == EncodingVersion::V2) { - return max_compressed_size_v2(in_mem_seg, codec_opts); - } else { - return max_compressed_size_v1(in_mem_seg, codec_opts); - } + return TypeDescriptor{DataType::UINT8, Dimension::Dim1}; } -Segment encode_dispatch( - SegmentInMemory&& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec &codec_opts, - EncodingVersion encoding_version) { - if(encoding_version == EncodingVersion::V2) { - return encode_v2(std::move(in_mem_seg), codec_opts); - } else { - return encode_v1(std::move(in_mem_seg), codec_opts); - } +SizeResult +max_compressed_size_dispatch(const SegmentInMemory& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts, + EncodingVersion encoding_version) { + if (encoding_version == EncodingVersion::V2) { + return max_compressed_size_v2(in_mem_seg, codec_opts); + } else { + return max_compressed_size_v1(in_mem_seg, codec_opts); + } +} + +Segment encode_dispatch(SegmentInMemory&& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts, + EncodingVersion encoding_version) { + if (encoding_version == EncodingVersion::V2) { + return encode_v2(std::move(in_mem_seg), codec_opts); + } else { + return encode_v1(std::move(in_mem_seg), codec_opts); + } } namespace { class MetaBuffer { - public: - MetaBuffer() = default; +public: + MetaBuffer() = default; - shape_t *allocate_shapes(std::size_t bytes) { - util::check_arg(bytes == 8, "expected exactly one shape, actual {}", bytes / sizeof(shape_t)); - return &shape_; - } + shape_t* allocate_shapes(std::size_t bytes) { + util::check_arg(bytes == 8, "expected exactly one shape, actual {}", + bytes / sizeof(shape_t)); + return &shape_; + } - uint8_t *allocate_data(std::size_t bytes) { - buff_.ensure(bytes); - return buff_.data(); - } + uint8_t* allocate_data(std::size_t bytes) { + buff_.ensure(bytes); + return buff_.data(); + } - void advance_data(std::size_t) const { - // Not used - } + void advance_data(std::size_t) const { + // Not used + } - void advance_shapes(std::size_t) const { - // Not used - } + void advance_shapes(std::size_t) const { + // Not used + } - void set_allow_sparse(bool) const { - // Not used - } + void set_allow_sparse(bool) const { + // Not used + } - [[nodiscard]] const Buffer& buffer() const { return buff_; } + [[nodiscard]] const Buffer& buffer() const { return buff_; } - Buffer&& detach_buffer() { - return std::move(buff_); - } + Buffer&& detach_buffer() { return std::move(buff_); } - private: - Buffer buff_; - shape_t shape_ = 0; +private: + Buffer buff_; + shape_t shape_ = 0; }; -} - -std::optional decode_metadata( - const SegmentHeader& hdr, - const uint8_t*& data, - const uint8_t* begin ARCTICDB_UNUSED - ) { - if (hdr.has_metadata_field()) { - hdr.metadata_field().validate(); - auto meta_type_desc = metadata_type_desc(); - MetaBuffer meta_buf; - std::optional bv; - ARCTICDB_DEBUG(log::codec(), "Decoding metadata at position {}: {}", data - begin, dump_bytes(data, 10)); - data += decode_ndarray(meta_type_desc, hdr.metadata_field().ndarray(), data, meta_buf, bv, hdr.encoding_version()); - ARCTICDB_TRACE(log::codec(), "Decoded metadata to position {}", data - begin); - google::protobuf::io::ArrayInputStream ais(meta_buf.buffer().data(), - static_cast(meta_buf.buffer().bytes())); - google::protobuf::Any any; - auto success = any.ParseFromZeroCopyStream(&ais); - util::check(success, "Failed to parse metadata field in decode_metadata"); - return std::make_optional(std::move(any)); - } else { - return std::nullopt; - } -} +} // namespace + +std::optional +decode_metadata(const SegmentHeader& hdr, const uint8_t*& data, + const uint8_t* begin ARCTICDB_UNUSED) { + if (hdr.has_metadata_field()) { + hdr.metadata_field().validate(); + auto meta_type_desc = metadata_type_desc(); + MetaBuffer meta_buf; + std::optional bv; + ARCTICDB_DEBUG(log::codec(), "Decoding metadata at position {}: {}", data - begin, + dump_bytes(data, 10)); + data += decode_ndarray(meta_type_desc, hdr.metadata_field().ndarray(), data, + meta_buf, bv, hdr.encoding_version()); + ARCTICDB_TRACE(log::codec(), "Decoded metadata to position {}", data - begin); + google::protobuf::io::ArrayInputStream ais( + meta_buf.buffer().data(), static_cast(meta_buf.buffer().bytes())); + google::protobuf::Any any; + auto success = any.ParseFromZeroCopyStream(&ais); + util::check(success, "Failed to parse metadata field in decode_metadata"); + return std::make_optional(std::move(any)); + } else { + return std::nullopt; + } +} + +void decode_metadata(const SegmentHeader& hdr, const uint8_t*& data, + const uint8_t* begin ARCTICDB_UNUSED, SegmentInMemory& res) { + auto maybe_any = decode_metadata(hdr, data, begin); + if (maybe_any) { + ARCTICDB_DEBUG(log::version(), "Found metadata on segment"); + res.set_metadata(std::move(*maybe_any)); + } else { + ARCTICDB_DEBUG(log::version(), "No metadata on segment"); + } +} + +std::optional +decode_metadata_from_segment(const Segment& segment) { + auto& hdr = segment.header(); + const uint8_t* data = segment.buffer().data(); + + const auto begin = data; + if (const auto has_magic_numbers = + EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2; + has_magic_numbers) + util::check_magic(data); -void decode_metadata( - const SegmentHeader& hdr, - const uint8_t*& data, - const uint8_t* begin ARCTICDB_UNUSED, - SegmentInMemory& res) { - auto maybe_any = decode_metadata(hdr, data, begin); - if(maybe_any) { - ARCTICDB_DEBUG(log::version(), "Found metadata on segment"); - res.set_metadata(std::move(*maybe_any)); - } else { - ARCTICDB_DEBUG(log::version(), "No metadata on segment"); - } + return decode_metadata(hdr, data, begin); } -std::optional decode_metadata_from_segment(const Segment &segment) { - auto &hdr = segment.header(); - const uint8_t* data = segment.buffer().data(); +EncodedFieldCollection decode_encoded_fields(const SegmentHeader& hdr, + const uint8_t* data, + const uint8_t* begin ARCTICDB_UNUSED) { + ARCTICDB_TRACE(log::codec(), "Decoding encoded fields"); - const auto begin = data; - if(const auto has_magic_numbers = EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2; has_magic_numbers) - util::check_magic(data); + util::check(hdr.has_column_fields() && hdr.column_fields().has_ndarray(), + "Expected encoded field description to be set in header"); + std::optional bv; + const auto uncompressed_size = encoding_sizes::uncompressed_size(hdr.column_fields()); + constexpr auto type_desc = encoded_fields_type_desc(); + Column encoded_column(type_desc, uncompressed_size, false, false); + decode_ndarray(type_desc, hdr.column_fields().ndarray(), data, encoded_column, bv, + hdr.encoding_version()); - return decode_metadata(hdr, data, begin); + ARCTICDB_TRACE(log::codec(), "Decoded encoded fields at position {}", data - begin); + return {std::move(encoded_column.release_buffer()), + std::move(encoded_column.release_shapes())}; } -EncodedFieldCollection decode_encoded_fields( - const SegmentHeader& hdr, - const uint8_t* data, - const uint8_t* begin ARCTICDB_UNUSED) { - ARCTICDB_TRACE(log::codec(), "Decoding encoded fields"); - - util::check(hdr.has_column_fields() && hdr.column_fields().has_ndarray(), "Expected encoded field description to be set in header"); - std::optional bv; - const auto uncompressed_size = encoding_sizes::uncompressed_size(hdr.column_fields()); - constexpr auto type_desc = encoded_fields_type_desc(); - Column encoded_column(type_desc, uncompressed_size, false, false); - decode_ndarray(type_desc, hdr.column_fields().ndarray(), data, encoded_column, bv, hdr.encoding_version()); - - ARCTICDB_TRACE(log::codec(), "Decoded encoded fields at position {}", data-begin); - return {std::move(encoded_column.release_buffer()), std::move(encoded_column.release_shapes())}; -} - -std::shared_ptr extract_frame_metadata( - SegmentInMemory& res) { - auto output = std::make_shared(); - util::check(res.has_metadata(), "Cannot extract frame metadata as it is null"); - res.metadata()->UnpackTo(output.get()); - return output; -} - -FrameDescriptorImpl read_frame_descriptor( - const uint8_t*& data) { - auto* frame_descriptor = reinterpret_cast(data); - data += sizeof(FrameDescriptorImpl); - return *frame_descriptor; -} - -SegmentDescriptorImpl read_segment_descriptor( - const uint8_t*& data) { - util::check_magic(data); - auto* frame_descriptor = reinterpret_cast(data); - data += sizeof(SegmentDescriptorImpl); - return *frame_descriptor; -} - -std::shared_ptr decode_index_fields( - const SegmentHeader& hdr, - const uint8_t*& data, - const uint8_t* begin ARCTICDB_UNUSED, - const uint8_t* end) { - auto fields = std::make_shared(); - if(hdr.has_index_descriptor_field() && hdr.index_descriptor_field().has_ndarray()) { - ARCTICDB_TRACE(log::codec(), "Decoding index fields"); - util::check(data!=end, "Reached end of input block with index descriptor fields to decode"); - std::optional bv; - - data += decode_ndarray(FieldCollection::type(), - hdr.index_descriptor_field().ndarray(), - data, - *fields, - bv, - hdr.encoding_version()); - - ARCTICDB_TRACE(log::codec(), "Decoded index descriptor to position {}", data-begin); - } - fields->regenerate_offsets(); - return fields; +std::shared_ptr +extract_frame_metadata(SegmentInMemory& res) { + auto output = std::make_shared(); + util::check(res.has_metadata(), "Cannot extract frame metadata as it is null"); + res.metadata()->UnpackTo(output.get()); + return output; } -namespace { -inline arcticdb::proto::descriptors::TimeSeriesDescriptor timeseries_descriptor_from_any(const google::protobuf::Any& any) { - arcticdb::proto::descriptors::TimeSeriesDescriptor tsd; - any.UnpackTo(&tsd); - return tsd; -} - -inline arcticdb::proto::descriptors::FrameMetadata frame_metadata_from_any(const google::protobuf::Any& any) { - arcticdb::proto::descriptors::FrameMetadata frame_meta; - any.UnpackTo(&frame_meta); - return frame_meta; -} -} - -std::optional decode_descriptor_fields( - const SegmentHeader& hdr, - const uint8_t*& data, - const uint8_t* begin ARCTICDB_UNUSED, - const uint8_t* end) { - if(hdr.has_descriptor_field()) { - ARCTICDB_TRACE(log::codec(), "Decoding index fields"); - util::check(data!=end, "Reached end of input block with descriptor fields to decode"); - std::optional bv; - FieldCollection fields; - data += decode_field(FieldCollection::type(), - hdr.descriptor_field(), - data, - fields, - bv, - hdr.encoding_version()); - - ARCTICDB_TRACE(log::codec(), "Decoded descriptor fields to position {}", data-begin); - return std::make_optional(std::move(fields)); - } else { - return std::nullopt; - } +FrameDescriptorImpl read_frame_descriptor(const uint8_t*& data) { + auto* frame_descriptor = reinterpret_cast(data); + data += sizeof(FrameDescriptorImpl); + return *frame_descriptor; } -TimeseriesDescriptor unpack_timeseries_descriptor_from_proto( - const google::protobuf::Any& any, const StreamDescriptor& stream_desc, bool is_decoding_incompletes) { +SegmentDescriptorImpl read_segment_descriptor(const uint8_t*& data) { + util::check_magic(data); + auto* frame_descriptor = reinterpret_cast(data); + data += sizeof(SegmentDescriptorImpl); + return *frame_descriptor; +} - auto tsd = timeseries_descriptor_from_any(any); - if (is_decoding_incompletes) { - // Prefer the stream descriptor on the segment header for incompletes. - // See PR #1647. - arcticc::pb2::descriptors_pb2::StreamDescriptor desc_proto; - copy_stream_descriptor_to_proto(stream_desc, desc_proto); - tsd.mutable_stream_descriptor()->CopyFrom(desc_proto); - } +std::shared_ptr +decode_index_fields(const SegmentHeader& hdr, const uint8_t*& data, + const uint8_t* begin ARCTICDB_UNUSED, const uint8_t* end) { + auto fields = std::make_shared(); + if (hdr.has_index_descriptor_field() && hdr.index_descriptor_field().has_ndarray()) { + ARCTICDB_TRACE(log::codec(), "Decoding index fields"); + util::check(data != end, + "Reached end of input block with index descriptor fields to decode"); + std::optional bv; - auto frame_meta = std::make_shared(); - exchange_timeseries_proto(tsd, *frame_meta); + data += + decode_ndarray(FieldCollection::type(), hdr.index_descriptor_field().ndarray(), + data, *fields, bv, hdr.encoding_version()); - auto segment_desc = std::make_shared(segment_descriptor_from_proto((tsd.stream_descriptor()))); - auto frame_desc = std::make_shared(frame_descriptor_from_proto(tsd)); - const auto& desc = tsd.stream_descriptor(); - auto old_fields = std::make_shared(fields_from_proto(tsd.stream_descriptor())); - StreamId stream_id = desc.id_case() == desc.kNumId ? StreamId(desc.num_id()) : StreamId(desc.str_id()); - return {frame_desc, segment_desc, frame_meta, old_fields, stream_id}; + ARCTICDB_TRACE(log::codec(), "Decoded index descriptor to position {}", + data - begin); + } + fields->regenerate_offsets(); + return fields; } -std::optional decode_timeseries_descriptor_v1( - const SegmentHeader& hdr, - const uint8_t* data, - const uint8_t* begin, - const StreamDescriptor& descriptor) { - auto maybe_any = decode_metadata(hdr, data, begin); - if(!maybe_any) - return std::nullopt; - - return unpack_timeseries_descriptor_from_proto(*maybe_any, descriptor, false); +namespace { +inline arcticdb::proto::descriptors::TimeSeriesDescriptor +timeseries_descriptor_from_any(const google::protobuf::Any& any) { + arcticdb::proto::descriptors::TimeSeriesDescriptor tsd; + any.UnpackTo(&tsd); + return tsd; +} + +inline arcticdb::proto::descriptors::FrameMetadata +frame_metadata_from_any(const google::protobuf::Any& any) { + arcticdb::proto::descriptors::FrameMetadata frame_meta; + any.UnpackTo(&frame_meta); + return frame_meta; +} +} // namespace + +std::optional +decode_descriptor_fields(const SegmentHeader& hdr, const uint8_t*& data, + const uint8_t* begin ARCTICDB_UNUSED, const uint8_t* end) { + if (hdr.has_descriptor_field()) { + ARCTICDB_TRACE(log::codec(), "Decoding index fields"); + util::check(data != end, + "Reached end of input block with descriptor fields to decode"); + std::optional bv; + FieldCollection fields; + data += decode_field(FieldCollection::type(), hdr.descriptor_field(), data, fields, + bv, hdr.encoding_version()); + + ARCTICDB_TRACE(log::codec(), "Decoded descriptor fields to position {}", + data - begin); + return std::make_optional(std::move(fields)); + } else { + return std::nullopt; + } +} + +TimeseriesDescriptor +unpack_timeseries_descriptor_from_proto(const google::protobuf::Any& any, + const StreamDescriptor& stream_desc, + bool is_decoding_incompletes) { + + auto tsd = timeseries_descriptor_from_any(any); + if (is_decoding_incompletes) { + // Prefer the stream descriptor on the segment header for incompletes. + // See PR #1647. + arcticc::pb2::descriptors_pb2::StreamDescriptor desc_proto; + copy_stream_descriptor_to_proto(stream_desc, desc_proto); + tsd.mutable_stream_descriptor()->CopyFrom(desc_proto); + } + + auto frame_meta = std::make_shared(); + exchange_timeseries_proto(tsd, *frame_meta); + + auto segment_desc = std::make_shared( + segment_descriptor_from_proto((tsd.stream_descriptor()))); + auto frame_desc = + std::make_shared(frame_descriptor_from_proto(tsd)); + const auto& desc = tsd.stream_descriptor(); + auto old_fields = + std::make_shared(fields_from_proto(tsd.stream_descriptor())); + StreamId stream_id = + desc.id_case() == desc.kNumId ? StreamId(desc.num_id()) : StreamId(desc.str_id()); + return {frame_desc, segment_desc, frame_meta, old_fields, stream_id}; +} + +std::optional +decode_timeseries_descriptor_v1(const SegmentHeader& hdr, const uint8_t* data, + const uint8_t* begin, + const StreamDescriptor& descriptor) { + auto maybe_any = decode_metadata(hdr, data, begin); + if (!maybe_any) + return std::nullopt; + + return unpack_timeseries_descriptor_from_proto(*maybe_any, descriptor, false); } void skip_descriptor(const uint8_t*& data, const SegmentHeader& hdr) { - util::check_magic(data); - data += sizeof(SegmentDescriptor); - skip_identifier(data); - util::check_magic(data); - if(hdr.has_descriptor_field() && hdr.descriptor_field().has_ndarray()) - data += encoding_sizes::field_compressed_size(hdr.descriptor_field()); - + util::check_magic(data); + data += sizeof(SegmentDescriptor); + skip_identifier(data); + util::check_magic(data); + if (hdr.has_descriptor_field() && hdr.descriptor_field().has_ndarray()) + data += encoding_sizes::field_compressed_size(hdr.descriptor_field()); } -std::optional decode_timeseries_descriptor_v2( - const SegmentHeader& hdr, - const uint8_t* data, - const uint8_t* begin, - const uint8_t* end) { - util::check_magic(data); +std::optional +decode_timeseries_descriptor_v2(const SegmentHeader& hdr, const uint8_t* data, + const uint8_t* begin, const uint8_t* end) { + util::check_magic(data); - auto maybe_any = decode_metadata(hdr, data, begin); - if(!maybe_any) - return std::nullopt; + auto maybe_any = decode_metadata(hdr, data, begin); + if (!maybe_any) + return std::nullopt; - auto frame_meta = std::make_shared(frame_metadata_from_any(*maybe_any)); + auto frame_meta = std::make_shared( + frame_metadata_from_any(*maybe_any)); - skip_descriptor(data, hdr); + skip_descriptor(data, hdr); - util::check_magic(data); - auto frame_desc = std::make_shared(read_frame_descriptor(data)); - auto segment_desc = std::make_shared(read_segment_descriptor(data)); - auto segment_id = read_identifier(data); - auto index_fields = decode_index_fields(hdr, data, begin, end); - return std::make_optional(frame_desc, segment_desc, frame_meta, std::move(index_fields), segment_id); + util::check_magic(data); + auto frame_desc = std::make_shared(read_frame_descriptor(data)); + auto segment_desc = + std::make_shared(read_segment_descriptor(data)); + auto segment_id = read_identifier(data); + auto index_fields = decode_index_fields(hdr, data, begin, end); + return std::make_optional(frame_desc, segment_desc, frame_meta, + std::move(index_fields), segment_id); } -std::optional decode_timeseries_descriptor( - const SegmentHeader& hdr, - const uint8_t* data, - const uint8_t* begin, - const uint8_t* end, - const StreamDescriptor& descriptor) { - util::check(data != nullptr, "Got null data ptr from segment"); - auto encoding_version = EncodingVersion(hdr.encoding_version()); - if (encoding_version == EncodingVersion::V1) - return decode_timeseries_descriptor_v1(hdr, data, begin, descriptor); - else - return decode_timeseries_descriptor_v2(hdr, data, begin, end); +std::optional +decode_timeseries_descriptor(const SegmentHeader& hdr, const uint8_t* data, + const uint8_t* begin, const uint8_t* end, + const StreamDescriptor& descriptor) { + util::check(data != nullptr, "Got null data ptr from segment"); + auto encoding_version = EncodingVersion(hdr.encoding_version()); + if (encoding_version == EncodingVersion::V1) + return decode_timeseries_descriptor_v1(hdr, data, begin, descriptor); + else + return decode_timeseries_descriptor_v2(hdr, data, begin, end); } -std::optional decode_timeseries_descriptor( - Segment& segment) { - const auto &hdr = segment.header(); - const uint8_t* data = segment.buffer().data(); +std::optional decode_timeseries_descriptor(Segment& segment) { + const auto& hdr = segment.header(); + const uint8_t* data = segment.buffer().data(); - util::check(data != nullptr, "Got null data ptr from segment"); - const uint8_t* begin = data; - const uint8_t* end = data + segment.buffer().bytes(); + util::check(data != nullptr, "Got null data ptr from segment"); + const uint8_t* begin = data; + const uint8_t* end = data + segment.buffer().bytes(); - return decode_timeseries_descriptor(hdr, data, begin, end, segment.descriptor()); + return decode_timeseries_descriptor(hdr, data, begin, end, segment.descriptor()); } std::optional decode_timeseries_descriptor_for_incompletes( - const SegmentHeader& hdr, - const StreamDescriptor& desc, - const uint8_t* data, - const uint8_t* begin, - const uint8_t* end) { - util::check(data != nullptr, "Got null data ptr from segment"); - auto encoding_version = EncodingVersion(hdr.encoding_version()); - if (encoding_version == EncodingVersion::V1) { - auto maybe_any = decode_metadata(hdr, data, begin); - if (!maybe_any) - return std::nullopt; - - return unpack_timeseries_descriptor_from_proto(*maybe_any, desc, true); - } else { - return decode_timeseries_descriptor_v2(hdr, data, begin, end); - } + const SegmentHeader& hdr, const StreamDescriptor& desc, const uint8_t* data, + const uint8_t* begin, const uint8_t* end) { + util::check(data != nullptr, "Got null data ptr from segment"); + auto encoding_version = EncodingVersion(hdr.encoding_version()); + if (encoding_version == EncodingVersion::V1) { + auto maybe_any = decode_metadata(hdr, data, begin); + if (!maybe_any) + return std::nullopt; + + return unpack_timeseries_descriptor_from_proto(*maybe_any, desc, true); + } else { + return decode_timeseries_descriptor_v2(hdr, data, begin, end); + } } -std::optional decode_timeseries_descriptor_for_incompletes( - Segment& segment) { - auto &hdr = segment.header(); - const uint8_t* data = segment.buffer().data(); +std::optional +decode_timeseries_descriptor_for_incompletes(Segment& segment) { + auto& hdr = segment.header(); + const uint8_t* data = segment.buffer().data(); - util::check(data != nullptr, "Got null data ptr from segment"); - const uint8_t* begin = data; - const uint8_t* end = data + segment.buffer().bytes(); + util::check(data != nullptr, "Got null data ptr from segment"); + const uint8_t* begin = data; + const uint8_t* end = data + segment.buffer().bytes(); - return decode_timeseries_descriptor_for_incompletes(hdr, segment.descriptor(), data, begin, end); + return decode_timeseries_descriptor_for_incompletes(hdr, segment.descriptor(), data, + begin, end); } -std::pair, StreamDescriptor> decode_metadata_and_descriptor_fields( - Segment& segment) { - auto &hdr = segment.header(); - const uint8_t* data = segment.buffer().data(); +std::pair, StreamDescriptor> +decode_metadata_and_descriptor_fields(Segment& segment) { + auto& hdr = segment.header(); + const uint8_t* data = segment.buffer().data(); - util::check(data != nullptr, "Got null data ptr from segment"); - const uint8_t* begin = data; - if(EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2) - util::check_magic(data); + util::check(data != nullptr, "Got null data ptr from segment"); + const uint8_t* begin = data; + if (EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2) + util::check_magic(data); - auto maybe_any = decode_metadata(hdr, data, begin); - if(EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2) - util::check_magic(data); + auto maybe_any = decode_metadata(hdr, data, begin); + if (EncodingVersion(hdr.encoding_version()) == EncodingVersion::V2) + util::check_magic(data); - return std::make_pair(std::move(maybe_any), segment.descriptor()); + return std::make_pair(std::move(maybe_any), segment.descriptor()); } -void decode_string_pool( - const SegmentHeader& hdr, - const uint8_t*& data, - const uint8_t* begin ARCTICDB_UNUSED, - const uint8_t* end, - SegmentInMemory& res) { - if (hdr.has_string_pool_field()) { - ARCTICDB_TRACE(log::codec(), "Decoding string pool"); - util::check(data!=end, "Reached end of input block with string pool fields to decode"); - std::optional bv; - data += decode_ndarray(string_pool_descriptor().type(), - hdr.string_pool_field(), - data, - res.string_pool(), - bv, - hdr.encoding_version()); - - ARCTICDB_TRACE(log::codec(), "Decoded string pool to position {}", data-begin); - } -} +void decode_string_pool(const SegmentHeader& hdr, const uint8_t*& data, + const uint8_t* begin ARCTICDB_UNUSED, const uint8_t* end, + SegmentInMemory& res) { + if (hdr.has_string_pool_field()) { + ARCTICDB_TRACE(log::codec(), "Decoding string pool"); + util::check(data != end, + "Reached end of input block with string pool fields to decode"); + std::optional bv; + data += decode_ndarray(string_pool_descriptor().type(), hdr.string_pool_field(), + data, res.string_pool(), bv, hdr.encoding_version()); -ssize_t calculate_last_row(const Column& col) { - ssize_t last_row{0}; - if (col.opt_sparse_map().has_value()) { - bm::bvector_size_type last_set_bit; - if (col.sparse_map().find_reverse(last_set_bit)) { - last_row = static_cast(last_set_bit); - } - } else { - last_row = static_cast(col.row_count()) - 1; - } - return last_row; + ARCTICDB_TRACE(log::codec(), "Decoded string pool to position {}", data - begin); + } } -void decode_v2(const Segment& segment, - const SegmentHeader& hdr, - SegmentInMemory& res, - const StreamDescriptor& desc) { - ARCTICDB_SAMPLE(DecodeSegment, 0) - if(segment.buffer().data() == nullptr) { - ARCTICDB_DEBUG(log::codec(), "Segment contains no data in decode_v2"); - return; - } - - const auto [begin, end] = get_segment_begin_end(segment, hdr); - auto encoded_fields_ptr = end; - auto data = begin; - util::check_magic(data); - decode_metadata(hdr, data, begin, res); - skip_descriptor(data, hdr); - - util::check_magic(data); - if(hdr.has_index_descriptor_field()) { - auto index_frame_descriptor = std::make_shared(read_frame_descriptor(data)); - auto frame_metadata = extract_frame_metadata(res); - auto index_segment_descriptor = std::make_shared(read_segment_descriptor(data)); - auto index_segment_identifier = read_identifier(data); - auto index_fields = decode_index_fields(hdr, data, begin, end); - TimeseriesDescriptor tsd{std::move(index_frame_descriptor), std::move(index_segment_descriptor), std::move(frame_metadata), std::move(index_fields), index_segment_identifier}; - res.set_timeseries_descriptor(tsd); - res.reset_metadata(); +ssize_t calculate_last_row(const Column& col) { + ssize_t last_row{0}; + if (col.opt_sparse_map().has_value()) { + bm::bvector_size_type last_set_bit; + if (col.sparse_map().find_reverse(last_set_bit)) { + last_row = static_cast(last_set_bit); } - - if (data!=end) { - util::check(hdr.has_column_fields(), "Expected column fields in v2 encoding"); - util::check_magic(encoded_fields_ptr); - auto encoded_fields_buffer = decode_encoded_fields(hdr, encoded_fields_ptr, begin); - const auto fields_size = desc.fields().size(); - const auto start_row = res.row_count(); - EncodedFieldCollection encoded_fields(std::move(encoded_fields_buffer)); - - auto encoded_field = encoded_fields.begin(); - res.init_column_map(); - - ssize_t seg_row_count = 0; - for (std::size_t i = 0; i < fields_size; ++i) { + } else { + last_row = static_cast(col.row_count()) - 1; + } + return last_row; +} + +void decode_v2(const Segment& segment, const SegmentHeader& hdr, SegmentInMemory& res, + const StreamDescriptor& desc) { + ARCTICDB_SAMPLE(DecodeSegment, 0) + if (segment.buffer().data() == nullptr) { + ARCTICDB_DEBUG(log::codec(), "Segment contains no data in decode_v2"); + return; + } + + const auto [begin, end] = get_segment_begin_end(segment, hdr); + auto encoded_fields_ptr = end; + auto data = begin; + util::check_magic(data); + decode_metadata(hdr, data, begin, res); + skip_descriptor(data, hdr); + + util::check_magic(data); + if (hdr.has_index_descriptor_field()) { + auto index_frame_descriptor = + std::make_shared(read_frame_descriptor(data)); + auto frame_metadata = extract_frame_metadata(res); + auto index_segment_descriptor = + std::make_shared(read_segment_descriptor(data)); + auto index_segment_identifier = read_identifier(data); + auto index_fields = decode_index_fields(hdr, data, begin, end); + TimeseriesDescriptor tsd{ + std::move(index_frame_descriptor), std::move(index_segment_descriptor), + std::move(frame_metadata), std::move(index_fields), index_segment_identifier}; + res.set_timeseries_descriptor(tsd); + res.reset_metadata(); + } + + if (data != end) { + util::check(hdr.has_column_fields(), "Expected column fields in v2 encoding"); + util::check_magic(encoded_fields_ptr); + auto encoded_fields_buffer = decode_encoded_fields(hdr, encoded_fields_ptr, begin); + const auto fields_size = desc.fields().size(); + const auto start_row = res.row_count(); + EncodedFieldCollection encoded_fields(std::move(encoded_fields_buffer)); + + auto encoded_field = encoded_fields.begin(); + res.init_column_map(); + + ssize_t seg_row_count = 0; + for (std::size_t i = 0; i < fields_size; ++i) { #ifdef DUMP_BYTES - log::version().debug("{}", dump_bytes(begin, (data - begin) + encoding_sizes::field_compressed_size(*encoded_field), 100u)); + log::version().debug( + "{}", dump_bytes(begin, + (data - begin) + + encoding_sizes::field_compressed_size(*encoded_field), + 100u)); #endif - const auto& field_name = desc.fields(i).name(); - util::check(data!=end, "Reached end of input block with {} fields to decode", fields_size-i); - if(auto col_index = res.column_index(field_name)) { - auto& col = res.column(static_cast(*col_index)); - - data += decode_field(res.field(*col_index).type(), *encoded_field, data, col, col.opt_sparse_map(), hdr.encoding_version()); - seg_row_count = std::max(seg_row_count, calculate_last_row(col)); - } else { - data += encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic); - } - ++encoded_field; - ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data-begin); - } - - util::check_magic(data); - decode_string_pool(hdr, data, begin, end, res); - - res.set_row_data(static_cast(start_row + seg_row_count)); - res.set_compacted(segment.header().compacted()); - } -} - -void decode_v1(const Segment& segment, - const SegmentHeader& hdr, - SegmentInMemory& res, - const StreamDescriptor& desc, - bool is_decoding_incompletes) { - ARCTICDB_SAMPLE(DecodeSegment, 0) - const uint8_t* data = segment.buffer().data(); - if(data == nullptr) { - ARCTICDB_DEBUG(log::codec(), "Segment contains no data in decode_v1"); - return; + const auto& field_name = desc.fields(i).name(); + util::check(data != end, "Reached end of input block with {} fields to decode", + fields_size - i); + if (auto col_index = res.column_index(field_name)) { + auto& col = res.column(static_cast(*col_index)); + + data += decode_field(res.field(*col_index).type(), *encoded_field, data, col, + col.opt_sparse_map(), hdr.encoding_version()); + seg_row_count = std::max(seg_row_count, calculate_last_row(col)); + } else { + data += + encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic); + } + ++encoded_field; + ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data - begin); } - const uint8_t* begin = data; - const uint8_t* end = begin + segment.buffer().bytes(); - decode_metadata(hdr, data, begin, res); - if(res.has_metadata() && res.metadata()->Is()) { - ARCTICDB_DEBUG(log::version(), "Unpacking timeseries descriptor from metadata"); - auto tsd = unpack_timeseries_descriptor_from_proto(*res.metadata(), desc, is_decoding_incompletes); - res.set_timeseries_descriptor(tsd); - res.reset_metadata(); - } - - if (data != end) { - const auto fields_size = desc.fields().size(); - const auto &column_fields = hdr.body_fields(); - util::check(fields_size == segment.fields_size(), - "Mismatch between descriptor and header field size: {} != {}", - fields_size, - column_fields.size()); - const auto start_row = res.row_count(); - - res.init_column_map(); - - ssize_t seg_row_count = 0; - for (std::size_t i = 0; i < fields_size; ++i) { - const auto &field = column_fields.at(i); - const auto& desc_field = desc.fields(i); - const auto &field_name = desc_field.name(); - util::check(data != end || is_empty_type(desc_field.type().data_type()), "Reached end of input block with {} fields to decode", fields_size - i); - if (auto col_index = res.column_index(field_name)) { - auto &col = res.column(static_cast(*col_index)); - data += decode_field( - res.field(*col_index).type(), - field, - data, - col, - col.opt_sparse_map(), - hdr.encoding_version() - ); - seg_row_count = std::max(seg_row_count, calculate_last_row(col)); - ARCTICDB_DEBUG(log::codec(), "Decoded column {} to position {}", i, data - begin); - } else { - data += encoding_sizes::field_compressed_size(field); - ARCTICDB_DEBUG(log::codec(), "Skipped column {}, at position {}", i, data - begin); - } - } - decode_string_pool(hdr, data, begin, end, res); - res.set_row_data(static_cast(start_row + seg_row_count)); - res.set_compacted(segment.header().compacted()); + util::check_magic(data); + decode_string_pool(hdr, data, begin, end, res); + + res.set_row_data(static_cast(start_row + seg_row_count)); + res.set_compacted(segment.header().compacted()); + } +} + +void decode_v1(const Segment& segment, const SegmentHeader& hdr, SegmentInMemory& res, + const StreamDescriptor& desc, bool is_decoding_incompletes) { + ARCTICDB_SAMPLE(DecodeSegment, 0) + const uint8_t* data = segment.buffer().data(); + if (data == nullptr) { + ARCTICDB_DEBUG(log::codec(), "Segment contains no data in decode_v1"); + return; + } + + const uint8_t* begin = data; + const uint8_t* end = begin + segment.buffer().bytes(); + decode_metadata(hdr, data, begin, res); + if (res.has_metadata() && + res.metadata()->Is()) { + ARCTICDB_DEBUG(log::version(), "Unpacking timeseries descriptor from metadata"); + auto tsd = unpack_timeseries_descriptor_from_proto(*res.metadata(), desc, + is_decoding_incompletes); + res.set_timeseries_descriptor(tsd); + res.reset_metadata(); + } + + if (data != end) { + const auto fields_size = desc.fields().size(); + const auto& column_fields = hdr.body_fields(); + util::check(fields_size == segment.fields_size(), + "Mismatch between descriptor and header field size: {} != {}", + fields_size, column_fields.size()); + const auto start_row = res.row_count(); + + res.init_column_map(); + + ssize_t seg_row_count = 0; + for (std::size_t i = 0; i < fields_size; ++i) { + const auto& field = column_fields.at(i); + const auto& desc_field = desc.fields(i); + const auto& field_name = desc_field.name(); + util::check(data != end || is_empty_type(desc_field.type().data_type()), + "Reached end of input block with {} fields to decode", + fields_size - i); + if (auto col_index = res.column_index(field_name)) { + auto& col = res.column(static_cast(*col_index)); + data += decode_field(res.field(*col_index).type(), field, data, col, + col.opt_sparse_map(), hdr.encoding_version()); + seg_row_count = std::max(seg_row_count, calculate_last_row(col)); + ARCTICDB_DEBUG(log::codec(), "Decoded column {} to position {}", i, + data - begin); + } else { + data += encoding_sizes::field_compressed_size(field); + ARCTICDB_DEBUG(log::codec(), "Skipped column {}, at position {}", i, + data - begin); + } } + decode_string_pool(hdr, data, begin, end, res); + res.set_row_data(static_cast(start_row + seg_row_count)); + res.set_compacted(segment.header().compacted()); + } } -void decode_into_memory_segment( - const Segment& segment, - SegmentHeader& hdr, - SegmentInMemory& res, - const StreamDescriptor& desc) -{ - if(EncodingVersion(segment.header().encoding_version()) == EncodingVersion::V2) - decode_v2(segment, hdr, res, desc); - else - decode_v1(segment, hdr, res, desc); +void decode_into_memory_segment(const Segment& segment, SegmentHeader& hdr, + SegmentInMemory& res, const StreamDescriptor& desc) { + if (EncodingVersion(segment.header().encoding_version()) == EncodingVersion::V2) + decode_v2(segment, hdr, res, desc); + else + decode_v1(segment, hdr, res, desc); } SegmentInMemory decode_segment(Segment&& s) { - auto segment = std::move(s); - auto &hdr = segment.header(); - ARCTICDB_TRACE(log::codec(), "Decoding descriptor: {}", segment.descriptor()); - auto descriptor = segment.descriptor(); - descriptor.fields().regenerate_offsets(); - ARCTICDB_TRACE(log::codec(), "Creating segment"); - SegmentInMemory res(std::move(descriptor)); - ARCTICDB_TRACE(log::codec(), "Decoding segment"); - decode_into_memory_segment(segment, hdr, res, res.descriptor()); - ARCTICDB_TRACE(log::codec(), "Returning segment"); - return res; + auto segment = std::move(s); + auto& hdr = segment.header(); + ARCTICDB_TRACE(log::codec(), "Decoding descriptor: {}", segment.descriptor()); + auto descriptor = segment.descriptor(); + descriptor.fields().regenerate_offsets(); + ARCTICDB_TRACE(log::codec(), "Creating segment"); + SegmentInMemory res(std::move(descriptor)); + ARCTICDB_TRACE(log::codec(), "Decoding segment"); + decode_into_memory_segment(segment, hdr, res, res.descriptor()); + ARCTICDB_TRACE(log::codec(), "Returning segment"); + return res; } template -void hash_field(const EncodedFieldType &field, HashAccum &accum) { - auto &n = field.ndarray(); - for(auto i = 0; i < n.shapes_size(); ++i) { - auto v = n.shapes(i).hash(); - accum(&v); - } +void hash_field(const EncodedFieldType& field, HashAccum& accum) { + auto& n = field.ndarray(); + for (auto i = 0; i < n.shapes_size(); ++i) { + auto v = n.shapes(i).hash(); + accum(&v); + } - for(auto j = 0; j < n.values_size(); ++j) { - auto v = n.values(j).hash(); - accum(&v); - } + for (auto j = 0; j < n.values_size(); ++j) { + auto v = n.values(j).hash(); + accum(&v); + } } HashedValue get_segment_hash(Segment& seg) { - HashAccum accum; - const auto& fields = seg.fields_ptr(); - if(fields && !fields->empty()) { - hash_buffer(fields->buffer(), accum); + HashAccum accum; + const auto& fields = seg.fields_ptr(); + if (fields && !fields->empty()) { + hash_buffer(fields->buffer(), accum); + } + + const auto& hdr = seg.header(); + if (hdr.encoding_version() == EncodingVersion::V1) { + // The hashes are part of the encoded fields protobuf in the v1 header, which is not + // ideal but needs to be maintained for consistency + const auto& proto = seg.generate_header_proto(); + if (proto.has_metadata_field()) { + hash_field(proto.metadata_field(), accum); + } + for (int i = 0; i < proto.fields_size(); ++i) { + hash_field(proto.fields(i), accum); + } + if (hdr.has_string_pool_field()) { + hash_field(proto.string_pool_field(), accum); + } + } else { + const auto& header_fields = hdr.header_fields(); + for (auto i = 0UL; i < header_fields.size(); ++i) { + hash_field(header_fields.at(i), accum); } - const auto& hdr = seg.header(); - if(hdr.encoding_version() == EncodingVersion::V1) { - // The hashes are part of the encoded fields protobuf in the v1 header, which is not - // ideal but needs to be maintained for consistency - const auto& proto = seg.generate_header_proto(); - if (proto.has_metadata_field()) { - hash_field(proto.metadata_field(), accum); - } - for (int i = 0; i < proto.fields_size(); ++i) { - hash_field(proto.fields(i), accum); - } - if (hdr.has_string_pool_field()) { - hash_field(proto.string_pool_field(), accum); - } - } else { - const auto& header_fields = hdr.header_fields(); - for(auto i = 0UL; i < header_fields.size(); ++i) { - hash_field(header_fields.at(i), accum); - } - - const auto& body_fields = hdr.body_fields(); - for(auto i = 0UL; i < body_fields.size(); ++i) { - hash_field(body_fields.at(i), accum); - } + const auto& body_fields = hdr.body_fields(); + for (auto i = 0UL; i < body_fields.size(); ++i) { + hash_field(body_fields.at(i), accum); } + } - return accum.digest(); + return accum.digest(); } -void add_bitmagic_compressed_size( - const ColumnData& column_data, - size_t& max_compressed_bytes, - size_t& uncompressed_bytes -) { - if (column_data.bit_vector() != nullptr && column_data.bit_vector()->count() > 0) { - bm::serializer::statistics_type stat{}; - column_data.bit_vector()->calc_stat(&stat); - uncompressed_bytes += stat.memory_used; - max_compressed_bytes += stat.max_serialize_mem; - } +void add_bitmagic_compressed_size(const ColumnData& column_data, + size_t& max_compressed_bytes, + size_t& uncompressed_bytes) { + if (column_data.bit_vector() != nullptr && column_data.bit_vector()->count() > 0) { + bm::serializer::statistics_type stat{}; + column_data.bit_vector()->calc_stat(&stat); + uncompressed_bytes += stat.memory_used; + max_compressed_bytes += stat.max_serialize_mem; + } } /// @brief Write the sparse map to the out buffer -/// Bitmagic achieves the theoretical best compression for booleans. Adding additional encoding (lz4, zstd, etc...) -/// will not improve anything and in fact it might worsen the encoding. -[[nodiscard]] static size_t encode_bitmap(const util::BitMagic& sparse_map, Buffer& out, std::ptrdiff_t& pos) { - ARCTICDB_DEBUG(log::version(), "Encoding sparse map of count: {}", sparse_map.count()); - bm::serializer > bvs; - bm::serializer >::buffer sbuf; - bvs.serialize(sparse_map, sbuf); - auto sz = sbuf.size(); - auto total_sz = sz + util::combined_bit_magic_delimiters_size(); - out.assert_size(pos + total_sz); - - uint8_t* target = out.data() + pos; - util::write_magic(target); - std::memcpy(target, sbuf.data(), sz); - target += sz; - util::write_magic(target); - pos = pos + static_cast(total_sz); - return total_sz; -} - -void encode_sparse_map( - ColumnData& column_data, - EncodedFieldImpl& field, - Buffer& out, - std::ptrdiff_t& pos -) { - if (column_data.bit_vector() != nullptr && column_data.bit_vector()->count() > 0) { - util::check(!is_empty_type(column_data.type().data_type()), "Empty typed columns should not have sparse maps"); - ARCTICDB_DEBUG(log::codec(), "Sparse map count = {} pos = {}", column_data.bit_vector()->count(), pos); - const size_t sparse_bm_bytes = encode_bitmap(*column_data.bit_vector(), out, pos); - field.mutable_ndarray()->set_sparse_map_bytes(static_cast(sparse_bm_bytes)); - } +/// Bitmagic achieves the theoretical best compression for booleans. Adding additional +/// encoding (lz4, zstd, etc...) will not improve anything and in fact it might worsen +/// the encoding. +[[nodiscard]] static size_t encode_bitmap(const util::BitMagic& sparse_map, Buffer& out, + std::ptrdiff_t& pos) { + ARCTICDB_DEBUG(log::version(), "Encoding sparse map of count: {}", + sparse_map.count()); + bm::serializer> bvs; + bm::serializer>::buffer sbuf; + bvs.serialize(sparse_map, sbuf); + auto sz = sbuf.size(); + auto total_sz = sz + util::combined_bit_magic_delimiters_size(); + out.assert_size(pos + total_sz); + + uint8_t* target = out.data() + pos; + util::write_magic(target); + std::memcpy(target, sbuf.data(), sz); + target += sz; + util::write_magic(target); + pos = pos + static_cast(total_sz); + return total_sz; +} + +void encode_sparse_map(ColumnData& column_data, EncodedFieldImpl& field, Buffer& out, + std::ptrdiff_t& pos) { + if (column_data.bit_vector() != nullptr && column_data.bit_vector()->count() > 0) { + util::check(!is_empty_type(column_data.type().data_type()), + "Empty typed columns should not have sparse maps"); + ARCTICDB_DEBUG(log::codec(), "Sparse map count = {} pos = {}", + column_data.bit_vector()->count(), pos); + const size_t sparse_bm_bytes = encode_bitmap(*column_data.bit_vector(), out, pos); + field.mutable_ndarray()->set_sparse_map_bytes(static_cast(sparse_bm_bytes)); + } } } // namespace arcticdb diff --git a/cpp/arcticdb/codec/codec.hpp b/cpp/arcticdb/codec/codec.hpp index 446bacaed2..e38215c4fc 100644 --- a/cpp/arcticdb/codec/codec.hpp +++ b/cpp/arcticdb/codec/codec.hpp @@ -1,89 +1,77 @@ /* Copyright 2023 Man Group Operations Limited * - * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * Use of this software is governed by the Business Source License 1.1 included in the + * file licenses/BSL.txt. * - * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + * As of the Change Date specified in that file, in accordance with the Business Source + * License, use of this software will be governed by the Apache License, version 2.0. */ #pragma once #include -#include #include +#include #include #include -#include +#include namespace arcticdb { class Segment; class SegmentInMemory; -using ShapesBlockTDT = entity::TypeDescriptorTag, entity::DimensionTag>; +using ShapesBlockTDT = + entity::TypeDescriptorTag, + entity::DimensionTag>; -Segment encode_dispatch( - SegmentInMemory&& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec &codec_opts, - EncodingVersion encoding_version); +Segment encode_dispatch(SegmentInMemory&& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts, + EncodingVersion encoding_version); -Segment encode_v2( - SegmentInMemory&& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec& codec_opts -); +Segment encode_v2(SegmentInMemory&& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts); -Segment encode_v1( - SegmentInMemory&& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec& codec_opts -); +Segment encode_v1(SegmentInMemory&& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts); -void decode_v1(const Segment& segment, - const SegmentHeader& hdr, - SegmentInMemory& res, - const StreamDescriptor& desc, - bool is_decoding_incompletes = false); +void decode_v1(const Segment& segment, const SegmentHeader& hdr, SegmentInMemory& res, + const StreamDescriptor& desc, bool is_decoding_incompletes = false); -void decode_v2(const Segment& segment, - const SegmentHeader& hdr, - SegmentInMemory& res, +void decode_v2(const Segment& segment, const SegmentHeader& hdr, SegmentInMemory& res, const StreamDescriptor& desc); -SizeResult max_compressed_size_dispatch( - const SegmentInMemory& in_mem_seg, - const arcticdb::proto::encoding::VariantCodec &codec_opts, - EncodingVersion encoding_version); +SizeResult +max_compressed_size_dispatch(const SegmentInMemory& in_mem_seg, + const arcticdb::proto::encoding::VariantCodec& codec_opts, + EncodingVersion encoding_version); -EncodedFieldCollection decode_encoded_fields( - const SegmentHeader& hdr, - const uint8_t* data, - const uint8_t* begin ARCTICDB_UNUSED); +EncodedFieldCollection decode_encoded_fields(const SegmentHeader& hdr, + const uint8_t* data, + const uint8_t* begin ARCTICDB_UNUSED); SegmentInMemory decode_segment(Segment&& segment); -void decode_into_memory_segment( - const Segment& segment, - SegmentHeader& hdr, - SegmentInMemory& res, - const entity::StreamDescriptor& desc); +void decode_into_memory_segment(const Segment& segment, SegmentHeader& hdr, + SegmentInMemory& res, + const entity::StreamDescriptor& desc); -template -std::size_t decode_field( - const entity::TypeDescriptor &td, - const EncodedFieldImpl &field, - const uint8_t *input, - DataSink &data_sink, - std::optional& bv, - arcticdb::EncodingVersion encoding_version); +template +std::size_t decode_field(const entity::TypeDescriptor& td, + const EncodedFieldImpl& field, const uint8_t* input, + DataSink& data_sink, std::optional& bv, + arcticdb::EncodingVersion encoding_version); -std::optional decode_metadata_from_segment( - const Segment& segment); +std::optional +decode_metadata_from_segment(const Segment& segment); -std::pair, StreamDescriptor> decode_metadata_and_descriptor_fields( - Segment& segment); +std::pair, StreamDescriptor> +decode_metadata_and_descriptor_fields(Segment& segment); -std::optional decode_timeseries_descriptor( - Segment& segment); +std::optional decode_timeseries_descriptor(Segment& segment); -std::optional decode_timeseries_descriptor_for_incompletes(Segment& segment); +std::optional +decode_timeseries_descriptor_for_incompletes(Segment& segment); HashedValue get_segment_hash(Segment& seg); diff --git a/python/tests/unit/arcticdb/version_store/test_aggregation.py b/python/tests/unit/arcticdb/version_store/test_aggregation.py index 06dc8b3809..08cc3d781b 100644 --- a/python/tests/unit/arcticdb/version_store/test_aggregation.py +++ b/python/tests/unit/arcticdb/version_store/test_aggregation.py @@ -5,6 +5,7 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import pytest import numpy as np import pandas as pd @@ -348,7 +349,17 @@ def test_hypothesis_last_agg_numeric(lmdb_version_store, df): def test_last_aggregation(local_object_version_store): df = DataFrame( { - "grouping_column": ["group_1", "group_2", "group_4", "group_5", "group_2", "group_1", "group_3", "group_1", "group_5"], + "grouping_column": [ + "group_1", + "group_2", + "group_4", + "group_5", + "group_2", + "group_1", + "group_3", + "group_1", + "group_5", + ], "get_last": [100.0, 2.7, np.nan, np.nan, np.nan, 1.4, 5.8, 3.45, 6.9], }, index=np.arange(9), @@ -361,7 +372,9 @@ def test_last_aggregation(local_object_version_store): res = local_object_version_store.read(symbol, query_builder=q) res.data.sort_index(inplace=True) - df = pd.DataFrame({"get_last": [3.45, 2.7, 5.8, np.nan, 6.9]}, index=["group_1", "group_2", "group_3", "group_4", "group_5"]) + df = pd.DataFrame( + {"get_last": [3.45, 2.7, 5.8, np.nan, 6.9]}, index=["group_1", "group_2", "group_3", "group_4", "group_5"] + ) df.index.rename("grouping_column", inplace=True) assert_frame_equal(res.data, df) @@ -451,12 +464,7 @@ def test_named_agg(lmdb_version_store_tiny_segment): lib = lmdb_version_store_tiny_segment sym = "test_named_agg" gen = np.random.default_rng() - df = DataFrame( - { - "grouping_column": [1, 1, 1, 2, 3, 4], - "agg_column": gen.random(6) - } - ) + df = DataFrame({"grouping_column": [1, 1, 1, 2, 3, 4], "agg_column": gen.random(6)}) lib.write(sym, df) expected = df.groupby("grouping_column").agg( agg_column_sum=pd.NamedAgg("agg_column", "sum"),