Skip to content

Commit

Permalink
[fix] Avoid segfault when using ParallelGzipReader into an externally…
Browse files Browse the repository at this point in the history
… started thread
  • Loading branch information
mxmlnkn committed Jan 6, 2024
1 parent 5d70bd9 commit 3912b28
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 29 deletions.
18 changes: 18 additions & 0 deletions src/core/FasterVector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ class RpmallocInit
inline static const RpmallocInit rpmallocInit{};


class RpmallocThreadInit
{
public:
RpmallocThreadInit()
{
rpmalloc_thread_initialize();
}

~RpmallocThreadInit()
{
rpmalloc_thread_finalize( /* release caches */ true );
}
};


static const thread_local RpmallocThreadInit rpmallocThreadInit{};


template<typename ElementType>
class RpmallocAllocator
{
Expand Down
28 changes: 0 additions & 28 deletions src/core/JoiningThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,6 @@
#include <thread>
#include <utility>

#ifdef WITH_RPMALLOC
#include <rpmalloc.h>
#endif


#ifdef WITH_RPMALLOC
class RpmallocThreadInit
{
public:
RpmallocThreadInit()
{
rpmalloc_thread_initialize();
}

~RpmallocThreadInit()
{
rpmalloc_thread_finalize( /* release caches */ true );
}
};
#endif


/**
* Similar to the planned C++20 std::jthread, this class joins in the destructor.
Expand All @@ -35,14 +14,7 @@ class JoiningThread
template<class Function, class... Args>
explicit
JoiningThread( Function&& function, Args&&... args ) :
#ifdef WITH_RPMALLOC
m_thread( [=] () {
static const thread_local RpmallocThreadInit rpmallocThreadInit{};
function( std::forward<Args>( args )... );
} )
#else
m_thread( std::forward<Function>( function ), std::forward<Args>( args )... )
#endif
{}

JoiningThread( JoiningThread&& ) = default;
Expand Down
34 changes: 34 additions & 0 deletions src/tests/rapidgzip/testParallelGzipReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,39 @@ testChecksummedMultiStreamDecompression( const std::filesystem::path& encoded,
}


void
testMultiThreadedUsage( const std::string& encodedFilePath )
{
static constexpr size_t DATA_SIZE = 64_Mi;

/* As there are 4 symbols, 2 bits per symbol should suffice and as the data is random, almost no backreferences
* should be viable. This leads to a compression ratio of ~4, which is large enough for splitting and benign
* enough to have multiple chunks with fairly little uncompressed data. */
const auto compressedRandomDNA = compressWithZlib( createRandomData( DATA_SIZE, DNA_SYMBOLS ),
CompressionStrategy::HUFFMAN_ONLY );

auto reader = std::make_unique<rapidgzip::ParallelGzipReader<rapidgzip::ChunkData, /* ENABLE_STATISTICS */ true> >(
std::make_unique<BufferViewFileReader>( compressedRandomDNA ),
/* parallelization */ 6 );
reader->setCRC32Enabled( true );

std::vector<char> result;
std::thread thread( [&result, gzipReader = std::move( reader )] () {
std::vector<char> buffer( 1024ULL );
while ( true ) {
const auto nBytesRead = gzipReader->read( buffer.data(), buffer.size() );
if ( nBytesRead == 0 ) {
break;
}
result.insert( result.end(), buffer.begin(), buffer.begin() + nBytesRead );
}
} );

thread.join();
REQUIRE( !result.empty() );
}


int
main( int argc,
char** argv )
Expand All @@ -895,6 +928,7 @@ main( int argc,
findParentFolderContaining( binaryFolder, "src/tests/data/base64-256KiB.bgz" )
) / "src" / "tests" / "data";

testMultiThreadedUsage( rootFolder / "1B.gz" );
testCRC32AndCleanUnmarkedData();
testPrefetchingAfterSplit();
testCachedChunkReuseAfterSplit();
Expand Down
17 changes: 16 additions & 1 deletion src/tests/testPythonWrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess
import sys
import tempfile
import threading
import time
import zlib

Expand Down Expand Up @@ -362,7 +363,7 @@ def testTriggerDeadlock(filePath):
def testDeadlock(encoder):
print("Create test file...")
# We need at least something larger than the chunk size.
rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, 'pygzip')
rawFile, compressedFile = createRandomCompressedFile(100 * 1024 * 1024, 6, encoder)

task = multiprocessing.Process(target=testTriggerDeadlock, args=(compressedFile.name,))
task.start()
Expand All @@ -381,6 +382,19 @@ def testFileTypeAPI(format):
assert decompressedGzipFile.file_type() == format


def readAndPrintFirstBytes(file):
print(file.read(8))


def testRpmallocThreadSafety(encoder):
rawFile, compressedFile = createRandomCompressedFile(1024 * 1024, 6, encoder)
with rapidgzip.open(compressedFile.name) as gzipReader:
thread = threading.Thread(target=readAndPrintFirstBytes, args=[gzipReader])
thread.start()
thread.join()
os.unlink(filename)


if __name__ == '__main__':
print("indexed_bzip2 version:", indexed_bzip2.__version__)
print("rapidgzip version:", rapidgzip.__version__)
Expand All @@ -391,6 +405,7 @@ def testFileTypeAPI(format):
testFileTypeAPI('deflate')

testDeadlock('pygzip')
testRpmallocThreadSafety('pygzip')

def test(openIndexedFileFromName, closeUnderlyingFile=None):
testPythonInterface(
Expand Down

0 comments on commit 3912b28

Please sign in to comment.