Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd.decompress not threadsafe for shared objects #4206

Closed
mlbileschi opened this issue Nov 26, 2024 · 4 comments
Closed

zstd.decompress not threadsafe for shared objects #4206

mlbileschi opened this issue Nov 26, 2024 · 4 comments

Comments

@mlbileschi
Copy link

Describe the bug
Hi, When I have an object zstandard.ZstdCompressor() shared across a concurrent.futures.ThreadPoolExecutor, I see some nondeterministic behavior when using this object to compress.

My expectation was that there is no shared state in calls to compressor.compress(bytes), so it was surprising to me that threadedness saw some stochasticity in outputs.

To Reproduce

!pip install zstandard # this fetched zstandard-0.23.0
import zstandard
import concurrent.futures
import time

n = 10000
compressor = zstandard.ZstdCompressor()
decompressor = zstandard.ZstdDecompressor()

def compress(b):
  return compressor.compress(b)
def decompress(b):
  return decompressor.decompress(b)

compressed_values = []

# Compress in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
  futures = [executor.submit(compress, str(i).encode()) for i in range(n)]

  for future in concurrent.futures.as_completed(futures):
    compressed_values.append(future.result())

# Decompress in parallel
decompressed_values = []
with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
  futures = [executor.submit(decompress, o) for o in compressed_values]

  for future in concurrent.futures.as_completed(futures):
    decompressed_values.append(future.result())
decompressed_values

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots and charts
I see 4 different behaviors: a "happy path", and 3 errors. Sometimes the code outputs the decompressed values, and sometimes it does not.
One of two errors:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
<ipython-input-28-4a3cb429fab4> in <cell line: 17>()
     19 
     20   for future in concurrent.futures.as_completed(futures):
---> 21     compressed_values.append(future.result())
     22 
     23 decompressed_values = []

3 frames
<ipython-input-28-4a3cb429fab4> in compress(b)
      9 
     10 def compress(b):
---> 11   return compressor.compress(b)
     12 def decompress(b):
     13   return decompressor.decompress(b)

ZstdError: cannot compress: Operation not authorized at current processing stage

Another error:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
<ipython-input-30-4a3cb429fab4> in <cell line: 24>()
     28   # Get results as they become available
     29   for future in concurrent.futures.as_completed(futures):
---> 30     decompressed_values.append(future.result())
     31 decompressed_values

3 frames
<ipython-input-30-4a3cb429fab4> in decompress(b)
     11   return compressor.compress(b)
     12 def decompress(b):
---> 13   return decompressor.decompress(b)
     14 
     15 compressed_values = []

ZstdError: decompression error: Unknown frame descriptor

The final error I'm seeing:

---------------------------------------------------------------------------
ZstdError                                 Traceback (most recent call last)
[<ipython-input-35-4a3cb429fab4>](https://localhost:8080/#) in <cell line: 24>()
     28   # Get results as they become available
     29   for future in concurrent.futures.as_completed(futures):
---> 30     decompressed_values.append(future.result())
     31 decompressed_values

3 frames
[<ipython-input-35-4a3cb429fab4>](https://localhost:8080/#) in decompress(b)
     11   return compressor.compress(b)
     12 def decompress(b):
---> 13   return decompressor.decompress(b)
     14 
     15 compressed_values = []

ZstdError: decompression error: Data corruption detected

Desktop (please complete the following information):
!pip install zstandard # this fetched zstandard-0.23.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
from a free colab kernel at colab.research.google.com

Additional context
This may be a user error, but I found it surprising nonetheless

@Cyan4973
Copy link
Contributor

This repository and its issue board focus on the underlying C reference implementation of libzstd.

The question you've raised appears to be specific to the zstandard Python binding, which is not provided by this repository.

To ensure a more accurate response, we recommend directing your question to the relevant repository that maintains the zstandard Python binding.

@mlbileschi
Copy link
Author

ack, thanks. is the following the appropriate/canonical zstd python repo?

https://github.com/indygreg/python-zstandard

@Cyan4973
Copy link
Contributor

Cyan4973 commented Nov 27, 2024

There are several Python bindings, so I can't be completely sure which one you use.
The one you link to is pretty good, and if you recognise the API you use in this repository, it's likely the correct one.

@mlbileschi
Copy link
Author

thanks!
submitted as indygreg/python-zstandard#244

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants