Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stack/spsc_queue - introduce move semantics #90

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions include/boost/lockfree/detail/freelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,25 @@ class freelist_stack : Alloc
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType const& arg )
T* construct( const ArgumentType& arg )

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you going over to the dark side? :)

{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( arg );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
new ( node ) T( std::forward< ArgumentType >( arg ) );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 >
T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 )
T* construct( ArgumentType1&& arg1, ArgumentType2&& arg2 )
{
T* node = allocate< ThreadSafe, Bounded >();
if ( node )
Expand Down Expand Up @@ -447,7 +456,7 @@ class fixed_size_freelist : NodeStorage
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType const& arg )
T* construct( const ArgumentType& arg )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
Expand All @@ -458,8 +467,20 @@ class fixed_size_freelist : NodeStorage
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType >
T* construct( ArgumentType&& arg )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
return NULL;

T* node = NodeStorage::nodes() + node_index;
new ( node ) T( std::forward< ArgumentType >( arg ) );
return node;
}

template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 >
T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 )
T* construct( const ArgumentType1& arg1, const ArgumentType2& arg2 )
{
index_t node_index = allocate< ThreadSafe >();
if ( node_index == null_handle() )
Expand Down
37 changes: 31 additions & 6 deletions include/boost/lockfree/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,17 @@ class queue
* \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will
* be allocated from the OS. This may not be lock-free.
* */
bool push( T const& t )
bool push( const T& t )
{
return do_push< false >( t );
}

/// \copydoc boost::lockfree::queue::push(const T & t)
bool push( T&& t )
{
return do_push< false >( std::forward< T >( t ) );
}

/** Pushes object t to the queue.
*
* \post object will be pushed to the queue, if internal node can be allocated
Expand All @@ -308,18 +314,36 @@ class queue
* \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
* \throws if memory allocator throws
* */
bool bounded_push( T const& t )
bool bounded_push( const T& t )
{
return do_push< true >( t );
}

/// \copydoc boost::lockfree::queue::bounded_push(const T & t)
bool bounded_push( T&& t )
{
return do_push< true >( std::forward< T >( t ) );
}


private:
#ifndef BOOST_DOXYGEN_INVOKED
template < bool Bounded >
bool do_push( T&& t )
{
node* n = pool.template construct< true, Bounded >( std::forward< T >( t ), pool.null_handle() );
return do_push_node( n );
}

template < bool Bounded >
bool do_push( T const& t )
{
node* n = pool.template construct< true, Bounded >( t, pool.null_handle() );
node* n = pool.template construct< true, Bounded >( t, pool.null_handle() );
return do_push_node( n );
}

bool do_push_node( node* n )
{
handle_type node_handle = pool.get_handle( n );

if ( n == NULL )
Expand Down Expand Up @@ -347,6 +371,7 @@ class queue
}
}
}

#endif

public:
Expand All @@ -358,9 +383,9 @@ class queue
* \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node
* will be allocated from the OS. This may not be lock-free. \throws if memory allocator throws
* */
bool unsynchronized_push( T const& t )
bool unsynchronized_push( T&& t )
{
node* n = pool.template construct< false, false >( t, pool.null_handle() );
node* n = pool.template construct< false, false >( std::forward< T >( t ), pool.null_handle() );

if ( n == NULL )
return false;
Expand Down Expand Up @@ -509,7 +534,7 @@ class queue
T element;
bool success = pop( element );
if ( success )
f( element );
f( std::move( element ) );

return success;
}
Expand Down
64 changes: 48 additions & 16 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class ringbuffer_base
return write_available( write_index, read_index, max_size );
}

bool push( T const& t, T* buffer, size_t max_size )

bool push( const T& t, T* buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );
Expand All @@ -107,6 +108,21 @@ class ringbuffer_base
return true;
}

bool push( T&& t, T* buffer, size_t max_size )
{
const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
const size_t next = next_index( write_index, max_size );

if ( next == read_index_.load( memory_order_acquire ) )
return false; /* ringbuffer is full */

new ( buffer + write_index ) T( std::forward< T >( t ) ); // move-construct

write_index_.store( next, memory_order_release );

return true;
}

size_t push( const T* input_buffer, size_t input_count, T* internal_buffer, size_t max_size )
{
return push( input_buffer, input_buffer + input_count, internal_buffer, max_size ) - input_buffer;
Expand Down Expand Up @@ -159,7 +175,7 @@ class ringbuffer_base
return false;

T& object_to_consume = buffer[ read_index ];
functor( object_to_consume );
functor( std::move( object_to_consume ) );
object_to_consume.~T();

size_t next = next_index( read_index, max_size );
Expand Down Expand Up @@ -221,12 +237,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = output_count - count0;

copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
copy_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );
move_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
move_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );

new_read_index -= max_size;
} else {
copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
move_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
if ( new_read_index == max_size )
new_read_index = 0;
}
Expand All @@ -252,12 +268,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = avail - count0;

it = copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
copy_and_delete( internal_buffer, internal_buffer + count1, it );
it = move_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
move_and_delete( internal_buffer, internal_buffer + count1, it );

new_read_index -= max_size;
} else {
copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
move_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
if ( new_read_index == max_size )
new_read_index = 0;
}
Expand Down Expand Up @@ -322,13 +338,13 @@ class ringbuffer_base
}

template < class OutputIterator >
OutputIterator copy_and_delete( T* first, T* last, OutputIterator out )
OutputIterator move_and_delete( T* first, T* last, OutputIterator out )
{
if ( std::is_trivially_destructible< T >::value ) {
return std::copy( first, last, out ); // will use memcpy if possible
} else {
for ( ; first != last; ++first, ++out ) {
*out = *first;
*out = std::move( *first );
first->~T();
}
return out;
Expand All @@ -339,7 +355,7 @@ class ringbuffer_base
void run_functor_and_delete( T* first, T* last, Functor&& functor )
{
for ( ; first != last; ++first ) {
functor( *first );
functor( std::move( *first ) );
first->~T();
}
}
Expand Down Expand Up @@ -379,11 +395,16 @@ class compile_time_sized_ringbuffer : public ringbuffer_base< T >
}

public:
bool push( T const& t )
bool push( const T& t )
{
return ringbuffer_base< T >::push( t, data(), max_size );
}

bool push( T&& t )
{
return ringbuffer_base< T >::push( std::forward< T >( t ), data(), max_size );
}

template < typename Functor >
bool consume_one( Functor&& f )
{
Expand Down Expand Up @@ -484,11 +505,16 @@ class runtime_sized_ringbuffer : public ringbuffer_base< T >, private Alloc
allocator_traits::deallocate( allocator, array_, max_elements_ );
}

bool push( T const& t )
bool push( const T& t )
{
return ringbuffer_base< T >::push( t, &*array_, max_elements_ );
}

bool push( T&& t )
{
return ringbuffer_base< T >::push( std::forward< T >( t ), &*array_, max_elements_ );
}

template < typename Functor >
bool consume_one( Functor&& f )
{
Expand Down Expand Up @@ -676,11 +702,17 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t
*
* \note Thread-safe and wait-free
* */
bool push( T const& t )
bool push( const T& t )
{
return base_type::push( t );
}

/// \copydoc boost::lockfree::spsc_queue::push(const T& t)
bool push( T&& t )
{
return base_type::push( std::forward< T >( t ) );
}

/** Pops one object from ringbuffer.
*
* \pre only one thread is allowed to pop data from the spsc_queue
Expand All @@ -705,8 +737,8 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t
template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > >
bool pop( U& ret )
{
return consume_one( [ & ]( const T& t ) {
ret = std::move( t );
return consume_one( [ & ]( T&& t ) {
ret = std::forward< T >( t );
} );
}

Expand Down
Loading