Skip to content

Commit

Permalink
Add replacement for queue.Queue
Browse files Browse the repository at this point in the history
Summary: Now we have a scalable queue - let's make a drop in replacement so migrating to the new system is as easy as possible.

Reviewed By: andrewkho

Differential Revision: D65867909

fbshipit-source-id: d6ec7c2826ad9a387296aa519223f81a7ff275bf
  • Loading branch information
SonicField authored and facebook-github-bot committed Nov 15, 2024
1 parent 6e43a4d commit d51fa26
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 47 deletions.
115 changes: 102 additions & 13 deletions concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

# pyre-strict

import os
import threading
import time
from collections.abc import Iterator
from queue import Empty
from queue import Empty, Full

try:
from queue import ShutDown # type: ignore
Expand Down Expand Up @@ -121,6 +122,10 @@ def iterator_local(self, max_key: int, clear: bool = True) -> Iterator[Any]: #
class ConcurrentQueue:
"""
A thread-safe queue implementation that allows concurrent access and modification.
Note:
ConcurrentQueue deliberately does not follow the same API as queue.Queue. To get a replacement
for queue.Queue use StdConcurrentQueue.
"""

_SHUTDOWN = 1
Expand Down Expand Up @@ -204,12 +209,16 @@ def pop(self, timeout: float | None = None) -> Any: # type: ignore
"""
Removes and returns an element from the front of the queue.
Args:
timeout (float | None, optional): The maximum time to wait for an element to become available. Defaults to None.
timeout (float | None, optional): The maximum time to wait for an element to become available.
Defaults to None.
Returns:
Any: The removed element.
Raises:
Empty: If the queue is empty and the timeout expires.
ShutDown: If the queue is shutting down - i.e. shutdown() has been called.
Note:
Timeout can be 0 but this is not recommended; if you want non-blocking behaviour use StdConcurrentQueue.
"""
next_key = self._outkey.incr()
_flags = LocalWrapper(self._flags)
Expand Down Expand Up @@ -269,7 +278,7 @@ def pop(self, timeout: float | None = None) -> Any: # type: ignore
raise RuntimeError("Queue failed")
if timeout is None:
_cond.wait()
elif not _cond.wait(timeout):
elif timeout == 0.0 or not _cond.wait(timeout):
timed_out = True
break
if timed_out:
Expand Down Expand Up @@ -370,14 +379,94 @@ def pop_local(self, timeout: float | None = None) -> LocalWrapper:
"""
return LocalWrapper(self.pop(timeout))

def get(self, timeout: float | None = None) -> Any: # type: ignore
"""
An aliase for pop. See the docs for pop().
"""
return self.pop(timeout)

def put(self, value: Any) -> None: # type: ignore
"""
An alias for push(value=Any).
"""
self.push(value)
class StdConcurrentQueue(ConcurrentQueue):
"""
A class which is a drop in replacement for queue.Queue and behaves as a lock free ConcurrentQueue but supports
the features of queue.Queue which ConcurrentQueue does not. These extra features may add some overhead to
operation and so this Queue is only preferred when an exact replacement for queue.Queue is required.
Also note that there might be subtle differences in the way sequencing behaves in a multi-threaded environment
compared to queue.Queue simply because this is a (mainly) lock free algorithm.
"""

def __init__(self, maxsize: int = 0) -> None:
osc = os.cpu_count()
if osc:
super().__init__(scaling=max(1, osc // 2), lock_free=True)
else:
super().__init__(lock_free=True)

self._maxsize: int = max(maxsize, 0)
self._active_tasks = AtomicInt64(0)

def qsize(self) -> int:
return self.size()

def get(self, block: bool = True, timeout: float | None = None) -> Any: # type: ignore
if block and timeout != 0.0:
return self.pop(timeout=timeout)
else:
# Use this to attempt to avoid excessive placeholder creation.
if self.size() > 0:
return self.pop(timeout=0.0)
else:
raise Empty

def full(self) -> bool:
_maxsize = self._maxsize
return bool(_maxsize and self.size() >= _maxsize)

def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None: # type: ignore

if block and self._maxsize and self.full():
_flags = LocalWrapper(self._flags)
_shutdown = self._SHUTDOWN
_sleep = LocalWrapper(time.sleep)
_now = LocalWrapper(time.monotonic)
start = _now()
if timeout is not None:
end_time = start + timeout
else:
end_time = None
pause_time = start + 0.05
while self.full():
it_time = _now()
if _flags & _shutdown:
raise ShutDown
if end_time is not None and it_time > end_time:
raise Full
if it_time < pause_time:
_sleep(0)
else:
_sleep(0.05)
else:
if self.full():
raise Full

self.push(item)
# The push succeeded so we can do this here.
self._active_tasks.incr()

def put_nowait(self, item: Any) -> None: # type: ignore
return self.put(item, block=False)

def get_nowait(self) -> Any: # type: ignore
return self.get(block=False)

def task_done(self) -> None:
self._active_tasks.decr()

def join(self) -> None:
_sleep = LocalWrapper(time.sleep)
_now = LocalWrapper(time.monotonic)
_flags = LocalWrapper(self._flags)
_shut_now = self._SHUT_NOW
_active_tasks = LocalWrapper(self._active_tasks)
start = _now()
pause_time = start + 0.05
while _active_tasks and not (_flags & _shut_now):
if _now() < pause_time:
_sleep(0)
else:
_sleep(0.05)
19 changes: 16 additions & 3 deletions concurrent_queue_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
import queue

from ft_utils.benchmark_utils import BenchmarkProvider, execute_benchmarks
from ft_utils.concurrent import ConcurrentQueue
from ft_utils.concurrent import ConcurrentQueue, StdConcurrentQueue
from ft_utils.local import LocalWrapper

ConcurrentQueue.put = ConcurrentQueue.push # type: ignore
ConcurrentQueue.get = ConcurrentQueue.pop # type: ignore


class ConcurretQueueBenchmarkProvider(BenchmarkProvider):
def __init__(self, operations: int) -> None:
self._operations = operations
self._queue: ConcurrentQueue | None = None
self._queue_lf: ConcurrentQueue | None = None
self._queue_std: queue.Queue | None = None # type: ignore
self._queue_queue: queue.Queue | None = None # type: ignore
self._queue_std: StdConcurrentQueue | None = None # type: ignore

def set_up(self) -> None:
self._queue = ConcurrentQueue(os.cpu_count())
self._queue_lf = ConcurrentQueue(os.cpu_count(), lock_free=True)
self._queue_std = queue.Queue()
self._queue_queue = queue.Queue()
self._queue_std = StdConcurrentQueue()

def benchmark_locked(self) -> None:
lw = LocalWrapper(self._queue)
Expand All @@ -34,6 +39,10 @@ def benchmark_std(self) -> None:
lw = LocalWrapper(self._queue_std)
self._bm(lw)

def benchmark_queue(self) -> None:
lw = LocalWrapper(self._queue_queue)
self._bm(lw)

def _bm(self, lw) -> None: # type: ignore
for n in range(self._operations):
lw.put(n)
Expand All @@ -51,6 +60,10 @@ def benchmark_std_batch(self) -> None:
lw = LocalWrapper(self._queue_std)
self._bmb(lw)

def benchmark_queue_batch(self) -> None:
lw = LocalWrapper(self._queue_queue)
self._bmb(lw)

def _bmb(self, lw) -> None: # type: ignore
for n in range(self._operations // 100):
for _ in range(100):
Expand Down
13 changes: 11 additions & 2 deletions docs/concurrent_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,7 @@ A concurrent queue that allows multiple threads to push and pop values.

* `__init__(scaling=None, lock_free=False)`: Initializes a new ConcurrentQueue with the specified scaling factor. If `lock_free` is True, the queue will use a lock-free implementation, which can improve performance in certain scenarios.
* `push(value)`: Pushes a value onto the queue. This method is thread-safe and can be called from multiple threads.
* `put(value)`: An alias for `push(value)`.
* `pop(timeout=None)`: Pops a value from the queue. The method will block until a value is available. If `timeout` is specified, the method will raise an Empty exception if no value is available within the specified time.
* `get(timeout=None)`: An alias for `pop(timeout)`.
* `pop_local(timeout=None)`: Returns a LocalWrapper object containing the popped value. The behavior is otherwise identical to `pop(timeout)`.
* `shutdown(immediate=False)`: Initiates shutdown of the queue. If `immediate` is True, the queue will shut down immediately, otherwise it will wait for any pending operations to complete.
* `size()`: Returns the number of elements currently in the queue.
Expand Down Expand Up @@ -275,3 +273,14 @@ queue.shutdown()
# Raises ShutDown
queue.pop()
```

## StdConcurrentQueue

This follows the same API as [queue.Queue](https://docs.python.org/3/library/queue.html#queue.Queue). For simple applications StdConcurrentQueue will work as a drop in replacement for queue.Queue. However, there are subtle differences:

* StdConcurrentQueue will use a very small amount of CPU time even when not processing elements.
* This implementation has weeker FIFO guaratees than queue.Queue which might cause subtle issues in some applications.
* StdConcurrentQueue will use a release memory in a different pattern than queue.Queue.
* The maxsize is not as strictly guaranteed. If maxsize is set and a large number of threads attempt to fill the queue beyond maxsize then a small overfill might occur due to the lack of a lock to prevent this race condition.

Therefore, in complex applications it may be a better approach to mindfully replace highly contended queue.Queue instances with StdConcurrentQueue. In this case it is also better to use the simpler ConcurrentQueue where possible.
Loading

0 comments on commit d51fa26

Please sign in to comment.