Skip to content

Commit

Permalink
Add support for Uncompress(source, sink). Various changes to allow
Browse files Browse the repository at this point in the history
Uncompress(source, sink) to get the same performance as the different
variants of Uncompress to Cord/DataBuffer/String/FlatBuffer.

Changes to efficiently support Uncompress(source, sink)
--------

a) For strings - we add support to StringByteSink to do GetAppendBuffer so we
   can write to it without copying.
b) For flat array buffers, we do GetAppendBuffer and see if we can get a full buffer.

With the above changes we get performance with ByteSource/ByteSink
that is	very close to directly using flat arrays and strings.

We add various benchmark cases to demonstrate that.

Orthogonal change
------------------

Add support for TryFastAppend() for SnappyScatteredWriter.

Benchmark results are below

CPU: Intel Core2 dL1:32KB dL2:4096KB
Benchmark              Time(ns)    CPU(ns) Iterations
-----------------------------------------------------
BM_UFlat/0               109065     108996       6410 896.0MB/s  html
BM_UFlat/1              1012175    1012343        691 661.4MB/s  urls
BM_UFlat/2                26775      26771      26149 4.4GB/s  jpg
BM_UFlat/3                48947      48940      14363 1.8GB/s  pdf
BM_UFlat/4               441029     440835       1589 886.1MB/s  html4
BM_UFlat/5                39861      39880      17823 588.3MB/s  cp
BM_UFlat/6                18315      18300      38126 581.1MB/s  c
BM_UFlat/7                 5254       5254     100000 675.4MB/s  lsp
BM_UFlat/8              1568060    1567376        447 626.6MB/s  xls
BM_UFlat/9               337512     337734       2073 429.5MB/s  txt1
BM_UFlat/10              287269     287054       2434 415.9MB/s  txt2
BM_UFlat/11              890098     890219        787 457.2MB/s  txt3
BM_UFlat/12             1186593    1186863        590 387.2MB/s  txt4
BM_UFlat/13              573927     573318       1000 853.7MB/s  bin
BM_UFlat/14               64250      64294      10000 567.2MB/s  sum
BM_UFlat/15                7301       7300      96153 552.2MB/s  man
BM_UFlat/16              109617     109636       6375 1031.5MB/s  pb
BM_UFlat/17              364438     364497       1921 482.3MB/s  gaviota
BM_UFlatSink/0           108518     108465       6450 900.4MB/s  html
BM_UFlatSink/1           991952     991997        705 675.0MB/s  urls
BM_UFlatSink/2            26815      26798      26065 4.4GB/s  jpg
BM_UFlatSink/3            49127      49122      14255 1.8GB/s  pdf
BM_UFlatSink/4           436674     436731       1604 894.4MB/s  html4
BM_UFlatSink/5            39738      39733      17345 590.5MB/s  cp
BM_UFlatSink/6            18413      18416      37962 577.4MB/s  c
BM_UFlatSink/7             5677       5676     100000 625.2MB/s  lsp
BM_UFlatSink/8          1552175    1551026        451 633.2MB/s  xls
BM_UFlatSink/9           338526     338489       2065 428.5MB/s  txt1
BM_UFlatSink/10          289387     289307       2420 412.6MB/s  txt2
BM_UFlatSink/11          893803     893706        783 455.4MB/s  txt3
BM_UFlatSink/12         1195919    1195459        586 384.4MB/s  txt4
BM_UFlatSink/13          559637     559779       1000 874.3MB/s  bin
BM_UFlatSink/14           65073      65094      10000 560.2MB/s  sum
BM_UFlatSink/15            7618       7614      92823 529.5MB/s  man
BM_UFlatSink/16          110085     110121       6352 1027.0MB/s  pb
BM_UFlatSink/17          369196     368915       1896 476.5MB/s  gaviota
BM_UValidate/0            46954      46957      14899 2.0GB/s  html
BM_UValidate/1           500621     500868       1000 1.3GB/s  urls
BM_UValidate/2              283        283    2481447 417.2GB/s  jpg
BM_UValidate/3            16230      16228      43137 5.4GB/s  pdf
BM_UValidate/4           189129     189193       3701 2.0GB/s  html4

A=uday
R=sanjay
  • Loading branch information
sesse committed Jul 6, 2015
1 parent b2ad960 commit b2312c4
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 1 deletion.
33 changes: 33 additions & 0 deletions snappy-sinksource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ char* Sink::GetAppendBuffer(size_t length, char* scratch) {
return scratch;
}

char* Sink::GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size) {
*allocated_size = scratch_size;
return scratch;
}

void Sink::AppendAndTakeOwnership(
char* bytes, size_t n,
void (*deleter)(void*, const char*, size_t),
void *deleter_arg) {
Append(bytes, n);
(*deleter)(deleter_arg, bytes, n);
}

ByteArraySource::~ByteArraySource() { }

size_t ByteArraySource::Available() const { return left_; }
Expand Down Expand Up @@ -68,4 +83,22 @@ char* UncheckedByteArraySink::GetAppendBuffer(size_t len, char* scratch) {
return dest_;
}

void UncheckedByteArraySink::AppendAndTakeOwnership(
char* data, size_t n,
void (*deleter)(void*, const char*, size_t),
void *deleter_arg) {
if (data != dest_) {
memcpy(dest_, data, n);
(*deleter)(deleter_arg, data, n);
}
dest_ += n;
}

char* UncheckedByteArraySink::GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size) {
*allocated_size = desired_size_hint;
return dest_;
}

} // namespace snappy
47 changes: 47 additions & 0 deletions snappy-sinksource.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,47 @@ class Sink {
// The default implementation always returns the scratch buffer.
virtual char* GetAppendBuffer(size_t length, char* scratch);

// For higher performance, Sink implementations can provide custom
// AppendAndTakeOwnership() and GetAppendBufferVariable() methods.
// These methods can reduce the number of copies done during
// compression/decompression.

// Append "bytes[0,n-1] to the sink. Takes ownership of "bytes"
// and calls the deleter function as (*deleter)(deleter_arg, bytes, n)
// to free the buffer. deleter function must be non NULL.
//
// The default implementation just calls Append and frees "bytes".
// Other implementations may avoid a copy while appending the buffer.
virtual void AppendAndTakeOwnership(
char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
void *deleter_arg);

// Returns a writable buffer for appending and writes the buffer's capacity to
// *allocated_size. Guarantees *allocated_size >= min_size.
// May return a pointer to the caller-owned scratch buffer which must have
// scratch_size >= min_size.
//
// The returned buffer is only valid until the next operation
// on this ByteSink.
//
// After writing at most *allocated_size bytes, call Append() with the
// pointer returned from this function and the number of bytes written.
// Many Append() implementations will avoid copying bytes if this function
// returned an internal buffer.
//
// If the sink implementation allocates or reallocates an internal buffer,
// it should use the desired_size_hint if appropriate. If a caller cannot
// provide a reasonable guess at the desired capacity, it should set
// desired_size_hint = 0.
//
// If a non-scratch buffer is returned, the caller may only pass
// a prefix to it to Append(). That is, it is not correct to pass an
// interior pointer to Append().
//
// The default implementation always returns the scratch buffer.
virtual char* GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size);

private:
// No copying
Expand Down Expand Up @@ -121,6 +162,12 @@ class UncheckedByteArraySink : public Sink {
virtual ~UncheckedByteArraySink();
virtual void Append(const char* data, size_t n);
virtual char* GetAppendBuffer(size_t len, char* scratch);
virtual char* GetAppendBufferVariable(
size_t min_size, size_t desired_size_hint, char* scratch,
size_t scratch_size, size_t* allocated_size);
virtual void AppendAndTakeOwnership(
char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
void *deleter_arg);

// Return the current output pointer so that a caller can see how
// many bytes were produced.
Expand Down
249 changes: 248 additions & 1 deletion snappy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ static bool InternalUncompressAllTags(SnappyDecompressor* decompressor,

// Process the entire input
decompressor->DecompressAllTags(writer);
writer->Flush();
return (decompressor->eof() && writer->CheckLength());
}

Expand Down Expand Up @@ -1115,6 +1116,7 @@ class SnappyIOVecWriter {
return true;
}

inline void Flush() {}
};

bool RawUncompressToIOVec(const char* compressed, size_t compressed_length,
Expand Down Expand Up @@ -1215,6 +1217,10 @@ class SnappyArrayWriter {
op_ = op + len;
return true;
}
inline size_t Produced() const {
return op_ - base_;
}
inline void Flush() {}
};

bool RawUncompress(const char* compressed, size_t n, char* uncompressed) {
Expand Down Expand Up @@ -1269,6 +1275,7 @@ class SnappyDecompressionValidator {
produced_ += len;
return produced_ <= expected_;
}
inline void Flush() {}
};

bool IsValidCompressedBuffer(const char* compressed, size_t n) {
Expand All @@ -1277,6 +1284,11 @@ bool IsValidCompressedBuffer(const char* compressed, size_t n) {
return InternalUncompress(&reader, &writer);
}

bool IsValidCompressed(Source* compressed) {
SnappyDecompressionValidator writer;
return InternalUncompress(compressed, &writer);
}

void RawCompress(const char* input,
size_t input_length,
char* compressed,
Expand All @@ -1300,6 +1312,241 @@ size_t Compress(const char* input, size_t input_length, string* compressed) {
return compressed_length;
}

// -----------------------------------------------------------------------
// Sink interface
// -----------------------------------------------------------------------

} // end namespace snappy
// A type that decompresses into a Sink. The template parameter
// Allocator must export one method "char* Allocate(int size);", which
// allocates a buffer of "size" and appends that to the destination.
template <typename Allocator>
class SnappyScatteredWriter {
Allocator allocator_;

// We need random access into the data generated so far. Therefore
// we keep track of all of the generated data as an array of blocks.
// All of the blocks except the last have length kBlockSize.
vector<char*> blocks_;
size_t expected_;

// Total size of all fully generated blocks so far
size_t full_size_;

// Pointer into current output block
char* op_base_; // Base of output block
char* op_ptr_; // Pointer to next unfilled byte in block
char* op_limit_; // Pointer just past block

inline size_t Size() const {
return full_size_ + (op_ptr_ - op_base_);
}

bool SlowAppend(const char* ip, size_t len);
bool SlowAppendFromSelf(size_t offset, size_t len);

public:
inline explicit SnappyScatteredWriter(const Allocator& allocator)
: allocator_(allocator),
full_size_(0),
op_base_(NULL),
op_ptr_(NULL),
op_limit_(NULL) {
}

inline void SetExpectedLength(size_t len) {
assert(blocks_.empty());
expected_ = len;
}

inline bool CheckLength() const {
return Size() == expected_;
}

// Return the number of bytes actually uncompressed so far
inline size_t Produced() const {
return Size();
}

inline bool Append(const char* ip, size_t len) {
size_t avail = op_limit_ - op_ptr_;
if (len <= avail) {
// Fast path
memcpy(op_ptr_, ip, len);
op_ptr_ += len;
return true;
} else {
return SlowAppend(ip, len);
}
}

inline bool TryFastAppend(const char* ip, size_t available, size_t length) {
char* op = op_ptr_;
const int space_left = op_limit_ - op;
if (length <= 16 && available >= 16 + kMaximumTagLength &&
space_left >= 16) {
// Fast path, used for the majority (about 95%) of invocations.
UNALIGNED_STORE64(op, UNALIGNED_LOAD64(ip));
UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(ip + 8));
op_ptr_ = op + length;
return true;
} else {
return false;
}
}

inline bool AppendFromSelf(size_t offset, size_t len) {
// See SnappyArrayWriter::AppendFromSelf for an explanation of
// the "offset - 1u" trick.
if (offset - 1u < op_ptr_ - op_base_) {
const size_t space_left = op_limit_ - op_ptr_;
if (space_left >= len + kMaxIncrementCopyOverflow) {
// Fast path: src and dst in current block.
IncrementalCopyFastPath(op_ptr_ - offset, op_ptr_, len);
op_ptr_ += len;
return true;
}
}
return SlowAppendFromSelf(offset, len);
}

// Called at the end of the decompress. We ask the allocator
// write all blocks to the sink.
inline void Flush() { allocator_.Flush(Produced()); }
};

template<typename Allocator>
bool SnappyScatteredWriter<Allocator>::SlowAppend(const char* ip, size_t len) {
size_t avail = op_limit_ - op_ptr_;
while (len > avail) {
// Completely fill this block
memcpy(op_ptr_, ip, avail);
op_ptr_ += avail;
assert(op_limit_ - op_ptr_ == 0);
full_size_ += (op_ptr_ - op_base_);
len -= avail;
ip += avail;

// Bounds check
if (full_size_ + len > expected_) {
return false;
}

// Make new block
size_t bsize = min<size_t>(kBlockSize, expected_ - full_size_);
op_base_ = allocator_.Allocate(bsize);
op_ptr_ = op_base_;
op_limit_ = op_base_ + bsize;
blocks_.push_back(op_base_);
avail = bsize;
}

memcpy(op_ptr_, ip, len);
op_ptr_ += len;
return true;
}

template<typename Allocator>
bool SnappyScatteredWriter<Allocator>::SlowAppendFromSelf(size_t offset,
size_t len) {
// Overflow check
// See SnappyArrayWriter::AppendFromSelf for an explanation of
// the "offset - 1u" trick.
const size_t cur = Size();
if (offset - 1u >= cur) return false;
if (expected_ - cur < len) return false;

// Currently we shouldn't ever hit this path because Compress() chops the
// input into blocks and does not create cross-block copies. However, it is
// nice if we do not rely on that, since we can get better compression if we
// allow cross-block copies and thus might want to change the compressor in
// the future.
size_t src = cur - offset;
while (len-- > 0) {
char c = blocks_[src >> kBlockLog][src & (kBlockSize-1)];
Append(&c, 1);
src++;
}
return true;
}

class SnappySinkAllocator {
public:
explicit SnappySinkAllocator(Sink* dest): dest_(dest) {}
~SnappySinkAllocator() {}

char* Allocate(int size) {
Datablock block(new char[size], size);
blocks_.push_back(block);
return block.data;
}

// We flush only at the end, because the writer wants
// random access to the blocks and once we hand the
// block over to the sink, we can't access it anymore.
// Also we don't write more than has been actually written
// to the blocks.
void Flush(size_t size) {
size_t size_written = 0;
size_t block_size;
for (int i = 0; i < blocks_.size(); ++i) {
block_size = min<size_t>(blocks_[i].size, size - size_written);
dest_->AppendAndTakeOwnership(blocks_[i].data, block_size,
&SnappySinkAllocator::Deleter, NULL);
size_written += block_size;
}
blocks_.clear();
}

private:
struct Datablock {
char* data;
size_t size;
Datablock(char* p, size_t s) : data(p), size(s) {}
};

static void Deleter(void* arg, const char* bytes, size_t size) {
delete[] bytes;
}

Sink* dest_;
vector<Datablock> blocks_;

// Note: copying this object is allowed
};

size_t UncompressAsMuchAsPossible(Source* compressed, Sink* uncompressed) {
SnappySinkAllocator allocator(uncompressed);
SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
InternalUncompress(compressed, &writer);
return writer.Produced();
}

bool Uncompress(Source* compressed, Sink* uncompressed) {
// Read the uncompressed length from the front of the compressed input
SnappyDecompressor decompressor(compressed);
uint32 uncompressed_len = 0;
if (!decompressor.ReadUncompressedLength(&uncompressed_len)) {
return false;
}

char c;
size_t allocated_size;
char* buf = uncompressed->GetAppendBufferVariable(
1, uncompressed_len, &c, 1, &allocated_size);

// If we can get a flat buffer, then use it, otherwise do block by block
// uncompression
if (allocated_size >= uncompressed_len) {
SnappyArrayWriter writer(buf);
bool result = InternalUncompressAllTags(
&decompressor, &writer, uncompressed_len);
uncompressed->Append(buf, writer.Produced());
return result;
} else {
SnappySinkAllocator allocator(uncompressed);
SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
return InternalUncompressAllTags(&decompressor, &writer, uncompressed_len);
}
}

} // end namespace snappy
Loading

0 comments on commit b2312c4

Please sign in to comment.