Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1280:[C++] Implement block-based buffer(Part I) #1271

Merged
merged 12 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions c++/src/BlockBuffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BlockBuffer.hh"

#include <algorithm>

namespace orc {

BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
: memoryPool(pool),
currentSize(0),
currentCapacity(0),
blockSize(_blockSize) {
if (blockSize == 0) {
throw std::logic_error("Block size cannot be zero");
}
reserve(blockSize);
}

BlockBuffer::~BlockBuffer() {
for (size_t i = 0; i < blocks.size(); ++i) {
memoryPool.free(blocks[i]);
}
blocks.clear();
currentSize = currentCapacity = 0;
}

BlockBuffer::Block BlockBuffer::getBlock(uint64_t blockIndex) const {
if (blockIndex >= getBlockNumber()) {
throw std::out_of_range("Block index out of range");
}
return Block(blocks[blockIndex],
std::min(currentSize - blockIndex * blockSize, blockSize));
}

BlockBuffer::Block BlockBuffer::getNextBlock() {
if (currentSize < currentCapacity) {
Block emptyBlock(
blocks[currentSize / blockSize] + currentSize % blockSize,
blockSize - currentSize % blockSize);
currentSize = (currentSize / blockSize + 1) * blockSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide a function to update currentSize to reflect the actual size written? Maybe something setSize() or backup(). This looks weird when the returned block is written partially meaning that currentSize is larger than used

To provide better usability, we may provide a function like void append(const char data, size_t size)* to append data to the buffer and manage the blocks internally. This can be a separate patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user only uses part of the block, the size of BlockBuffer can currently be set via the resize() function. BlockBuffer class is temporarily used to replace DataBuffer in BufferedOutputStream, and the existing functions conform to BufferedOutputStream's usage behavior.

The append function can be added additionally later if there are usage scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the flush() function of class BufferedOutputStream needs to access all allocated memory blocks, it needs class BlockBuffer to provide an interface in order for BufferedOutputStream to access all allocated memory blocks.

return emptyBlock;
} else {
resize(currentSize + blockSize);
return Block(blocks.back(), blockSize);
}
}

void BlockBuffer::resize(uint64_t size) {
reserve(size);
if (currentCapacity >= size) {
currentSize = size;
} else {
throw std::logic_error("Block buffer resize error");
}
}

void BlockBuffer::reserve(uint64_t newCapacity) {
while (currentCapacity < newCapacity) {
char* newBlockPtr = memoryPool.malloc(blockSize);
if (newBlockPtr != nullptr) {
blocks.push_back(newBlockPtr);
currentCapacity += blockSize;
} else {
break;
}
}
}
} // namespace orc
116 changes: 116 additions & 0 deletions c++/src/BlockBuffer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ORC_BLOCK_BUFFER_HH
#define ORC_BLOCK_BUFFER_HH

#include "orc/MemoryPool.hh"

#include <vector>

namespace orc {

/**
* BlockBuffer implements a memory allocation policy based on
* equal-length blocks. BlockBuffer will reserve multiple blocks
* for allocation.
*/
class BlockBuffer {
private:
MemoryPool& memoryPool;
// current buffer size
uint64_t currentSize;
// maximal capacity (actual allocated memory)
uint64_t currentCapacity;
// unit for buffer expansion
const uint64_t blockSize;
// pointers to the start of each block
std::vector<char*> blocks;

// non-copy-constructible
BlockBuffer(BlockBuffer& buffer) = delete;
BlockBuffer& operator=(BlockBuffer& buffer) = delete;
BlockBuffer(BlockBuffer&& buffer) = delete;
BlockBuffer& operator=(BlockBuffer&& buffer) = delete;

public:
BlockBuffer(MemoryPool& pool, uint64_t blockSize);

~BlockBuffer();

/**
* Block points to a section of memory allocated by BlockBuffer,
* containing the corresponding physical memory address and available size.
*/
struct Block {
// the start of block
char* data;
// number of bytes available at data
uint64_t size;

Block() : data(nullptr), size(0) {}
Block(char* _data, uint64_t _size) : data(_data), size(_size) {}
Block(const Block& block) = default;
~Block() = default;
};

/**
* Get the allocated block object.
* The last allocated block size may be less than blockSize,
* and the rest of the blocks are all of size blockSize.
* @param blockIndex the index of blocks
* @return the allocated block object
*/
Block getBlock(uint64_t blockIndex) const;
wgtmac marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get a empty block or allocate a new block to write.
* If the last allocated block size is less than blockSize,
* the size of empty block is equal to blockSize minus the size of
* the last allocated block size. Otherwise, the size of
* the empty block is equal to blockSize.
* @return a empty block object
*/
Block getNextBlock();

/**
* Get the number of blocks that are fully or partially occupied
*/
uint64_t getBlockNumber() const {
return (currentSize + blockSize - 1) / blockSize;
}

uint64_t size() const {
return currentSize;
}

uint64_t capacity() const {
return currentCapacity;
}

void resize(uint64_t size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support shrinkToFit parameter?

Copy link
Contributor Author

@coderex2522 coderex2522 Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class DataBuffer only recycles memory during destructor, so class BlockBuffer does not support shrink function, which can be further optimized later.

/**
* Requests the BlockBuffer to contain at least newCapacity bytes.
* Reallocation happens if there is need of more space.
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
};
} // namespace orc

#endif
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ set(SOURCE_FILES
sargs/TruthValue.cc
wrap/orc-proto-wrapper.cc
Adaptor.cc
BlockBuffer.cc
BloomFilter.cc
ByteRLE.cc
ColumnPrinter.cc
Expand Down
1 change: 1 addition & 0 deletions c++/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_executable (orc-test
MemoryInputStream.cc
MemoryOutputStream.cc
TestAttributes.cc
TestBlockBuffer.cc
TestBufferedOutputStream.cc
TestBloomFilter.cc
TestByteRle.cc
Expand Down
81 changes: 81 additions & 0 deletions c++/test/TestBlockBuffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BlockBuffer.hh"
#include "orc/OrcFile.hh"
#include "wrap/gtest-wrapper.h"

namespace orc {

TEST(TestBlockBuffer, size_and_capacity) {
MemoryPool* pool = getDefaultPool();
BlockBuffer buffer(*pool, 1024);

// block buffer will preallocate one block during initialization
EXPECT_EQ(buffer.getBlockNumber(), 0);
EXPECT_EQ(buffer.size(), 0);
EXPECT_EQ(buffer.capacity(), 1024);

buffer.reserve(128 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 0);
EXPECT_EQ(buffer.size(), 0);
EXPECT_EQ(buffer.capacity(), 128 * 1024);

// new size < old capacity
buffer.resize(64 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 64);
EXPECT_EQ(buffer.size(), 64 * 1024);
EXPECT_EQ(buffer.capacity(), 128 * 1024);

// new size > old capacity
buffer.resize(256 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 256);
EXPECT_EQ(buffer.size(), 256 * 1024);
EXPECT_EQ(buffer.capacity(), 256 * 1024);
}

TEST(TestBlockBuffer, get_block) {
MemoryPool* pool = getDefaultPool();
BlockBuffer buffer(*pool, 1024);

EXPECT_EQ(buffer.getBlockNumber(), 0);
for (uint64_t i = 0; i < 10; ++i) {
BlockBuffer::Block block = buffer.getNextBlock();
EXPECT_EQ(buffer.getBlockNumber(), i + 1);
for (uint64_t j = 0; j < block.size; ++j) {
if (i % 2 == 0) {
block.data[j] = static_cast<char>('A' + (i + j) % 26);
} else {
block.data[j] = static_cast<char>('a' + (i + j) % 26);
}
}
}

// verify the block data
for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
BlockBuffer::Block block = buffer.getBlock(i);
for (uint64_t j = 0; j < block.size; ++j) {
if (i % 2 == 0) {
EXPECT_EQ(block.data[j], 'A' + (i + j) % 26);
} else {
EXPECT_EQ(block.data[j], 'a' + (i + j) % 26);
}
}
}
}
}