-
Notifications
You must be signed in to change notification settings - Fork 484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ORC-1280:[C++] Implement block-based buffer(Part I) #1271
Changes from 7 commits
59081e7
cbec2d2
cecb4b2
3ac5489
5e8c768
9733151
0aad043
937a6d1
a9789eb
3020b0f
8e4f148
6ee8ac2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "BlockBuffer.hh" | ||
|
||
#include <algorithm> | ||
|
||
namespace orc { | ||
|
||
BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize) | ||
: memoryPool(pool), | ||
currentSize(0), | ||
currentCapacity(0), | ||
blockSize(_blockSize) { | ||
if (blockSize == 0) { | ||
throw std::logic_error("Block size cannot be zero"); | ||
} | ||
reserve(blockSize); | ||
} | ||
|
||
BlockBuffer::~BlockBuffer() { | ||
for (size_t i = 0; i < blocks.size(); ++i) { | ||
memoryPool.free(blocks[i]); | ||
} | ||
blocks.clear(); | ||
currentSize = currentCapacity = 0; | ||
} | ||
|
||
uint64_t BlockBuffer::getBlockNumber() const { | ||
return (currentSize + blockSize - 1) / blockSize; | ||
} | ||
|
||
Block BlockBuffer::getBlock(uint64_t blockIndex) { | ||
if (blockIndex >= getBlockNumber()) { | ||
throw std::out_of_range("Block index out of range"); | ||
} | ||
return Block(blocks[blockIndex], | ||
std::min(currentSize - blockIndex * blockSize, blockSize)); | ||
} | ||
|
||
Block BlockBuffer::getEmptyBlock() { | ||
if (currentSize < currentCapacity) { | ||
Block emptyBlock(blocks[currentSize / blockSize] + currentSize % blockSize, | ||
blockSize - currentSize % blockSize); | ||
currentSize = (currentSize / blockSize + 1) * blockSize; | ||
return emptyBlock; | ||
} else { | ||
resize(currentSize + blockSize); | ||
return Block(blocks.back(), blockSize); | ||
} | ||
} | ||
|
||
void BlockBuffer::reserve(uint64_t capacity) { | ||
while (currentCapacity < capacity) { | ||
char* newBlockPtr = memoryPool.malloc(blockSize); | ||
if (newBlockPtr != nullptr) { | ||
blocks.push_back(newBlockPtr); | ||
currentCapacity += blockSize; | ||
} else { | ||
break; | ||
} | ||
} | ||
} | ||
|
||
void BlockBuffer::resize(uint64_t size) { | ||
reserve(size); | ||
if (currentCapacity >= size) { | ||
currentSize = size; | ||
} else { | ||
throw std::logic_error("Block buffer resize error"); | ||
} | ||
} | ||
} // namespace orc |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#ifndef ORC_MEMORYPOOL_IMPL_HH | ||
#define ORC_MEMORYPOOL_IMPL_HH | ||
|
||
#include "orc/MemoryPool.hh" | ||
|
||
#include <vector> | ||
|
||
namespace orc { | ||
|
||
struct Block { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better to make it nested class of BlockBuffer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As it stands struct Block only serves class BlockBuffer, so it makes reasonable to treat it as a nested class of class BlockBuffer. |
||
char* data; | ||
uint64_t size; | ||
|
||
Block() : data(nullptr), size(0) {} | ||
Block(char* _data, uint64_t _size) : data(_data), size(_size) {} | ||
Block(const Block& block) = default; | ||
~Block() {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ~Block() = default; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
}; | ||
|
||
class BlockBuffer { | ||
private: | ||
MemoryPool& memoryPool; | ||
// current buffer size | ||
uint64_t currentSize; | ||
// maximal capacity (actual allocated memory) | ||
uint64_t currentCapacity; | ||
// unit for buffer expansion | ||
const uint64_t blockSize; | ||
// pointers to the start of each block | ||
std::vector<char*> blocks; | ||
|
||
// non-copy-constructible | ||
BlockBuffer(BlockBuffer& buffer) = delete; | ||
BlockBuffer& operator=(BlockBuffer& buffer) = delete; | ||
BlockBuffer(BlockBuffer&& buffer) = delete; | ||
BlockBuffer& operator=(BlockBuffer&& buffer) = delete; | ||
|
||
public: | ||
BlockBuffer(MemoryPool& pool, uint64_t blockSize); | ||
|
||
~BlockBuffer(); | ||
|
||
/** | ||
* Get the Block object | ||
*/ | ||
Block getBlock(uint64_t blockIndex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need some detail explanation with regard to the mechanism and behavior of the BlockBuffer. Especially concepts like block, block index and block number. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some description for block, blockbuffer, block index and block number. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we provide mutable and immutable overloads? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the getBlock function does not modify class member variables, the const attribute is added to the function. |
||
|
||
/** | ||
* Get a empty block or allocate a new block if the buffer is exhausted | ||
*/ | ||
Block getEmptyBlock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename to getNextBlock or getBlockToWrite? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Rename to getNextBlock. |
||
|
||
/** | ||
* Get the Block number | ||
*/ | ||
uint64_t getBlockNumber() const; | ||
|
||
uint64_t size() const { | ||
return currentSize; | ||
} | ||
|
||
uint64_t capacity() const { | ||
return currentCapacity; | ||
} | ||
|
||
void resize(uint64_t size); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we support shrinkToFit parameter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Class DataBuffer only recycles memory during destructor, so class BlockBuffer does not support shrink function, which can be further optimized later. |
||
void reserve(uint64_t capacity); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Insert a new line before reserve function. Better to explain what will happen in the reserve function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some comments to explain the reserve function. |
||
}; | ||
} // namespace | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. // namespace orc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
|
||
#endif |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
#include "BlockBuffer.hh" | ||
#include "orc/OrcFile.hh" | ||
#include "wrap/gtest-wrapper.h" | ||
|
||
namespace orc { | ||
|
||
TEST(TestBlockBuffer, size_and_capacity) { | ||
MemoryPool* pool = getDefaultPool(); | ||
BlockBuffer buffer(*pool, 1024); | ||
|
||
// block buffer will preallocate one block during initialization | ||
EXPECT_EQ(buffer.getBlockNumber(), 0); | ||
EXPECT_EQ(buffer.size(), 0); | ||
EXPECT_EQ(buffer.capacity(), 1024); | ||
|
||
buffer.reserve(128 * 1024); | ||
EXPECT_EQ(buffer.getBlockNumber(), 0); | ||
EXPECT_EQ(buffer.size(), 0); | ||
EXPECT_EQ(buffer.capacity(), 128 * 1024); | ||
|
||
// new size < old capacity | ||
buffer.resize(64 * 1024); | ||
EXPECT_EQ(buffer.getBlockNumber(), 64); | ||
EXPECT_EQ(buffer.size(), 64 * 1024); | ||
EXPECT_EQ(buffer.capacity(), 128 * 1024); | ||
|
||
// new size > old capacity | ||
buffer.resize(256 * 1024); | ||
EXPECT_EQ(buffer.getBlockNumber(), 256); | ||
EXPECT_EQ(buffer.size(), 256 * 1024); | ||
EXPECT_EQ(buffer.capacity(), 256 * 1024); | ||
} | ||
|
||
TEST(TestBlockBuffer, get_block) { | ||
MemoryPool* pool = getDefaultPool(); | ||
BlockBuffer buffer(*pool, 1024); | ||
|
||
EXPECT_EQ(buffer.getBlockNumber(), 0); | ||
for (uint64_t i = 0; i < 10; ++i) { | ||
Block block = buffer.getEmptyBlock(); | ||
EXPECT_EQ(buffer.getBlockNumber(), i + 1); | ||
for (uint64_t j = 0; j < block.size; ++j) { | ||
if (i % 2 == 0) { | ||
block.data[j] = static_cast<char>('A' + (i + j) % 26); | ||
} else { | ||
block.data[j] = static_cast<char>('a' + (i + j) % 26); | ||
} | ||
} | ||
} | ||
|
||
// verify the block data | ||
for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) { | ||
Block block = buffer.getBlock(i); | ||
for (uint64_t j = 0; j < block.size; ++j) { | ||
if (i % 2 == 0) { | ||
EXPECT_EQ(block.data[j], 'A' + (i + j) % 26); | ||
} else { | ||
EXPECT_EQ(block.data[j], 'a' + (i + j) % 26); | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we provide a function to update currentSize to reflect the actual size written? Maybe something setSize() or backup(). This looks weird when the returned block is written partially meaning that currentSize is larger than used
To provide better usability, we may provide a function like void append(const char data, size_t size)* to append data to the buffer and manage the blocks internally. This can be a separate patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user only uses part of the block, the size of BlockBuffer can currently be set via the resize() function. BlockBuffer class is temporarily used to replace DataBuffer in BufferedOutputStream, and the existing functions conform to BufferedOutputStream's usage behavior.
The append function can be added additionally later if there are usage scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the flush() function of class BufferedOutputStream needs to access all allocated memory blocks, it needs class BlockBuffer to provide an interface in order for BufferedOutputStream to access all allocated memory blocks.