Skip to content

Commit

Permalink
[fix](scan) Avoid memory allocated by buffered_reader from being trac…
Browse files Browse the repository at this point in the history
…ed (#41921) (#44253)

Use OwnedSlice to replace `char*` in BufferedReader

## Proposed changes

pick #41921
  • Loading branch information
mrhhsg authored Nov 20, 2024
1 parent a666b5e commit dc67086
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
19 changes: 11 additions & 8 deletions be/src/io/fs/buffered_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <algorithm>
#include <chrono>
#include <memory>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
Expand All @@ -31,6 +32,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "util/threadpool.h"

namespace doris {
Expand Down Expand Up @@ -270,7 +272,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off
}
if (copy_out != nullptr) {
memcpy(copy_out + to_handle - remaining,
_boxes[box_index] + cached_data.box_start_offset[i], box_to_handle);
_boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle);
}
remaining -= box_to_handle;
cached_data.box_start_offset[i] += box_to_handle;
Expand Down Expand Up @@ -307,14 +309,15 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off

Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read,
size_t* bytes_read, const IOContext* io_ctx) {
if (_read_slice == nullptr) {
_read_slice = new char[READ_SLICE_SIZE];
if (!_read_slice) {
_read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
}

*bytes_read = 0;
{
SCOPED_RAW_TIMER(&_statistics.read_time);
RETURN_IF_ERROR(
_reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx));
RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read),
bytes_read, io_ctx));
_statistics.merged_io++;
_statistics.merged_bytes += *bytes_read;
}
Expand All @@ -328,8 +331,8 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz

auto fill_box = [&](int16 fill_box_ref, uint32 box_usage, size_t box_copy_end) {
size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage);
memcpy(_boxes[fill_box_ref] + box_usage, _read_slice + copy_start - start_offset,
copy_size);
memcpy(_boxes[fill_box_ref].data() + box_usage,
_read_slice->data() + copy_start - start_offset, copy_size);
filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size);
copy_start += copy_size;
_last_box_ref = fill_box_ref;
Expand Down Expand Up @@ -367,7 +370,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz
}
// apply for new box to copy data
while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
_boxes.emplace_back(new char[BOX_SIZE]);
_boxes.emplace_back(BOX_SIZE);
_box_ref.emplace_back(0);
fill_box(_boxes.size() - 1, 0, range_copy_end);
}
Expand Down
11 changes: 3 additions & 8 deletions be/src/io/fs/buffered_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,7 @@ class MergeRangeFileReader : public io::FileReader {
}
}

~MergeRangeFileReader() override {
delete[] _read_slice;
for (char* box : _boxes) {
delete[] box;
}
}
~MergeRangeFileReader() override = default;

Status close() override {
if (!_closed) {
Expand Down Expand Up @@ -246,8 +241,8 @@ class MergeRangeFileReader : public io::FileReader {
bool _closed = false;
size_t _remaining;

char* _read_slice = nullptr;
std::vector<char*> _boxes;
std::unique_ptr<OwnedSlice> _read_slice;
std::vector<OwnedSlice> _boxes;
int16 _last_box_ref = -1;
uint32 _last_box_usage = 0;
std::vector<int16> _box_ref;
Expand Down
6 changes: 6 additions & 0 deletions be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}

OwnedSlice(size_t length)
: _slice(reinterpret_cast<char*>(Allocator::alloc(length)), length),
_capacity(length) {}

OwnedSlice(OwnedSlice&& src) : _slice(src._slice), _capacity(src._capacity) {
src._slice.data = nullptr;
src._slice.size = 0;
Expand All @@ -369,6 +373,8 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator
}
}

char* data() const { return _slice.data; }

const Slice& slice() const { return _slice; }

private:
Expand Down

0 comments on commit dc67086

Please sign in to comment.