From 2aa7973d56992d0118cecc320c0318d4342f7b8c Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Wed, 1 Nov 2023 09:28:23 +0800 Subject: [PATCH] stack/spsc_queue - introduce move semantics --- include/boost/lockfree/detail/freelist.hpp | 29 ++++++++-- include/boost/lockfree/queue.hpp | 37 ++++++++++-- include/boost/lockfree/spsc_queue.hpp | 64 +++++++++++++++------ include/boost/lockfree/stack.hpp | 66 +++++++++++++++++----- test/queue_test.cpp | 21 +++++++ test/spsc_queue_test.cpp | 21 +++++++ test/stack_test.cpp | 21 +++++++ 7 files changed, 220 insertions(+), 39 deletions(-) diff --git a/include/boost/lockfree/detail/freelist.hpp b/include/boost/lockfree/detail/freelist.hpp index e063c85..31c8c98 100644 --- a/include/boost/lockfree/detail/freelist.hpp +++ b/include/boost/lockfree/detail/freelist.hpp @@ -85,7 +85,7 @@ class freelist_stack : Alloc } template < bool ThreadSafe, bool Bounded, typename ArgumentType > - T* construct( ArgumentType const& arg ) + T* construct( const ArgumentType& arg ) { T* node = allocate< ThreadSafe, Bounded >(); if ( node ) @@ -93,8 +93,17 @@ class freelist_stack : Alloc return node; } + template < bool ThreadSafe, bool Bounded, typename ArgumentType > + T* construct( ArgumentType&& arg ) + { + T* node = allocate< ThreadSafe, Bounded >(); + if ( node ) + new ( node ) T( std::forward< ArgumentType >( arg ) ); + return node; + } + template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 > - T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 ) + T* construct( ArgumentType1&& arg1, ArgumentType2&& arg2 ) { T* node = allocate< ThreadSafe, Bounded >(); if ( node ) @@ -447,7 +456,7 @@ class fixed_size_freelist : NodeStorage } template < bool ThreadSafe, bool Bounded, typename ArgumentType > - T* construct( ArgumentType const& arg ) + T* construct( const ArgumentType& arg ) { index_t node_index = allocate< ThreadSafe >(); if ( node_index == null_handle() ) @@ -458,8 +467,20 @@ class fixed_size_freelist : NodeStorage return node; } + template < bool ThreadSafe, bool Bounded, typename ArgumentType > + T* construct( ArgumentType&& arg ) + { + index_t node_index = allocate< ThreadSafe >(); + if ( node_index == null_handle() ) + return NULL; + + T* node = NodeStorage::nodes() + node_index; + new ( node ) T( std::forward< ArgumentType >( arg ) ); + return node; + } + template < bool ThreadSafe, bool Bounded, typename ArgumentType1, typename ArgumentType2 > - T* construct( ArgumentType1 const& arg1, ArgumentType2 const& arg2 ) + T* construct( const ArgumentType1& arg1, const ArgumentType2& arg2 ) { index_t node_index = allocate< ThreadSafe >(); if ( node_index == null_handle() ) diff --git a/include/boost/lockfree/queue.hpp b/include/boost/lockfree/queue.hpp index 5e63ccc..64e3dd2 100644 --- a/include/boost/lockfree/queue.hpp +++ b/include/boost/lockfree/queue.hpp @@ -295,11 +295,17 @@ class queue * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will * be allocated from the OS. This may not be lock-free. * */ - bool push( T const& t ) + bool push( const T& t ) { return do_push< false >( t ); } + /// \copydoc boost::lockfree::queue::push(const T & t) + bool push( T&& t ) + { + return do_push< false >( std::forward< T >( t ) ); + } + /** Pushes object t to the queue. * * \post object will be pushed to the queue, if internal node can be allocated @@ -308,18 +314,36 @@ class queue * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail * \throws if memory allocator throws * */ - bool bounded_push( T const& t ) + bool bounded_push( const T& t ) { return do_push< true >( t ); } + /// \copydoc boost::lockfree::queue::bounded_push(const T & t) + bool bounded_push( T&& t ) + { + return do_push< true >( std::forward< T >( t ) ); + } + private: #ifndef BOOST_DOXYGEN_INVOKED + template < bool Bounded > + bool do_push( T&& t ) + { + node* n = pool.template construct< true, Bounded >( std::forward< T >( t ), pool.null_handle() ); + return do_push_node( n ); + } + template < bool Bounded > bool do_push( T const& t ) { - node* n = pool.template construct< true, Bounded >( t, pool.null_handle() ); + node* n = pool.template construct< true, Bounded >( t, pool.null_handle() ); + return do_push_node( n ); + } + + bool do_push_node( node* n ) + { handle_type node_handle = pool.get_handle( n ); if ( n == NULL ) @@ -347,6 +371,7 @@ class queue } } } + #endif public: @@ -358,9 +383,9 @@ class queue * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node * will be allocated from the OS. This may not be lock-free. \throws if memory allocator throws * */ - bool unsynchronized_push( T const& t ) + bool unsynchronized_push( T&& t ) { - node* n = pool.template construct< false, false >( t, pool.null_handle() ); + node* n = pool.template construct< false, false >( std::forward< T >( t ), pool.null_handle() ); if ( n == NULL ) return false; @@ -509,7 +534,7 @@ class queue T element; bool success = pop( element ); if ( success ) - f( element ); + f( std::move( element ) ); return success; } diff --git a/include/boost/lockfree/spsc_queue.hpp b/include/boost/lockfree/spsc_queue.hpp index 3596165..8bea336 100644 --- a/include/boost/lockfree/spsc_queue.hpp +++ b/include/boost/lockfree/spsc_queue.hpp @@ -92,7 +92,8 @@ class ringbuffer_base return write_available( write_index, read_index, max_size ); } - bool push( T const& t, T* buffer, size_t max_size ) + + bool push( const T& t, T* buffer, size_t max_size ) { const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread const size_t next = next_index( write_index, max_size ); @@ -107,6 +108,21 @@ class ringbuffer_base return true; } + bool push( T&& t, T* buffer, size_t max_size ) + { + const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread + const size_t next = next_index( write_index, max_size ); + + if ( next == read_index_.load( memory_order_acquire ) ) + return false; /* ringbuffer is full */ + + new ( buffer + write_index ) T( std::forward< T >( t ) ); // move-construct + + write_index_.store( next, memory_order_release ); + + return true; + } + size_t push( const T* input_buffer, size_t input_count, T* internal_buffer, size_t max_size ) { return push( input_buffer, input_buffer + input_count, internal_buffer, max_size ) - input_buffer; @@ -159,7 +175,7 @@ class ringbuffer_base return false; T& object_to_consume = buffer[ read_index ]; - functor( object_to_consume ); + functor( std::move( object_to_consume ) ); object_to_consume.~T(); size_t next = next_index( read_index, max_size ); @@ -221,12 +237,12 @@ class ringbuffer_base const size_t count0 = max_size - read_index; const size_t count1 = output_count - count0; - copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer ); - copy_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 ); + move_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer ); + move_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 ); new_read_index -= max_size; } else { - copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer ); + move_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer ); if ( new_read_index == max_size ) new_read_index = 0; } @@ -252,12 +268,12 @@ class ringbuffer_base const size_t count0 = max_size - read_index; const size_t count1 = avail - count0; - it = copy_and_delete( internal_buffer + read_index, internal_buffer + max_size, it ); - copy_and_delete( internal_buffer, internal_buffer + count1, it ); + it = move_and_delete( internal_buffer + read_index, internal_buffer + max_size, it ); + move_and_delete( internal_buffer, internal_buffer + count1, it ); new_read_index -= max_size; } else { - copy_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it ); + move_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it ); if ( new_read_index == max_size ) new_read_index = 0; } @@ -322,13 +338,13 @@ class ringbuffer_base } template < class OutputIterator > - OutputIterator copy_and_delete( T* first, T* last, OutputIterator out ) + OutputIterator move_and_delete( T* first, T* last, OutputIterator out ) { if ( std::is_trivially_destructible< T >::value ) { return std::copy( first, last, out ); // will use memcpy if possible } else { for ( ; first != last; ++first, ++out ) { - *out = *first; + *out = std::move( *first ); first->~T(); } return out; @@ -339,7 +355,7 @@ class ringbuffer_base void run_functor_and_delete( T* first, T* last, Functor&& functor ) { for ( ; first != last; ++first ) { - functor( *first ); + functor( std::move( *first ) ); first->~T(); } } @@ -379,11 +395,16 @@ class compile_time_sized_ringbuffer : public ringbuffer_base< T > } public: - bool push( T const& t ) + bool push( const T& t ) { return ringbuffer_base< T >::push( t, data(), max_size ); } + bool push( T&& t ) + { + return ringbuffer_base< T >::push( std::forward< T >( t ), data(), max_size ); + } + template < typename Functor > bool consume_one( Functor&& f ) { @@ -484,11 +505,16 @@ class runtime_sized_ringbuffer : public ringbuffer_base< T >, private Alloc allocator_traits::deallocate( allocator, array_, max_elements_ ); } - bool push( T const& t ) + bool push( const T& t ) { return ringbuffer_base< T >::push( t, &*array_, max_elements_ ); } + bool push( T&& t ) + { + return ringbuffer_base< T >::push( std::forward< T >( t ), &*array_, max_elements_ ); + } + template < typename Functor > bool consume_one( Functor&& f ) { @@ -676,11 +702,17 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t * * \note Thread-safe and wait-free * */ - bool push( T const& t ) + bool push( const T& t ) { return base_type::push( t ); } + /// \copydoc boost::lockfree::spsc_queue::push(const T& t) + bool push( T&& t ) + { + return base_type::push( std::forward< T >( t ) ); + } + /** Pops one object from ringbuffer. * * \pre only one thread is allowed to pop data from the spsc_queue @@ -705,8 +737,8 @@ class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_t template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > > bool pop( U& ret ) { - return consume_one( [ & ]( const T& t ) { - ret = std::move( t ); + return consume_one( [ & ]( T&& t ) { + ret = std::forward< T >( t ); } ); } diff --git a/include/boost/lockfree/stack.hpp b/include/boost/lockfree/stack.hpp index 6b1d831..cd622a7 100644 --- a/include/boost/lockfree/stack.hpp +++ b/include/boost/lockfree/stack.hpp @@ -65,7 +65,7 @@ class stack { private: #ifndef BOOST_DOXYGEN_INVOKED - BOOST_STATIC_ASSERT( std::is_copy_constructible< T >::value ); + BOOST_STATIC_ASSERT( std::is_copy_constructible< T >::value || std::is_move_constructible< T >::value ); typedef typename detail::stack_signature::bind< Options... >::type bound_args; @@ -77,13 +77,18 @@ class stack struct node { - node( T const& val ) : + node( const T& val ) : v( val ) {} + node( T&& val ) : + v( std::forward< T >( val ) ) + {} + typedef typename detail::select_tagged_handle< node, node_based >::handle_type handle_t; - handle_t next; - const T v; + + handle_t next; + T v; }; typedef typename detail::extract_allocator< bound_args, node >::type node_allocator; @@ -304,6 +309,17 @@ class stack return std::make_tuple( new_top_node, end_node ); } + template < bool Bounded > + bool do_push( T&& v ) + { + node* newnode = pool.template construct< true, Bounded >( std::forward< T >( v ) ); + if ( newnode == 0 ) + return false; + + link_nodes_atomic( newnode, newnode ); + return true; + } + template < bool Bounded > bool do_push( T const& v ) { @@ -339,11 +355,17 @@ class stack * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will * be allocated from the OS. This may not be lock-free. \throws if memory allocator throws * */ - bool push( T const& v ) + bool push( const T& v ) { return do_push< false >( v ); } + /// \copydoc boost::lockfree::stack::push(const T& t) + bool push( T&& v ) + { + return do_push< false >( std::forward< T >( v ) ); + } + /** Pushes object t to the stack. * * \post object will be pushed to the stack, if internal node can be allocated @@ -351,11 +373,18 @@ class stack * * \note Thread-safe and non-blocking. If internal memory pool is exhausted, the push operation will fail * */ - bool bounded_push( T const& v ) + bool bounded_push( const T& v ) { return do_push< true >( v ); } + /// \copydoc boost::lockfree::stack::bounded_push(const T& t) + bool bounded_push( T&& v ) + { + return do_push< true >( std::forward< T >( v ) ); + } + + /** Pushes as many objects from the range [begin, end) as freelist node can be allocated. * * \return iterator to the first element, which has not been pushed @@ -428,7 +457,7 @@ class stack * will be allocated from the OS. This may not be lock-free. * \throws if memory allocator throws * */ - bool unsynchronized_push( T const& v ) + bool unsynchronized_push( const T& v ) { node* newnode = pool.template construct< false, false >( v ); if ( newnode == 0 ) @@ -438,6 +467,17 @@ class stack return true; } + /// \copydoc boost::lockfree::stack::unsynchronized_push(const T& t) + bool unsynchronized_push( T&& v ) + { + node* newnode = pool.template construct< false, false >( std::forward< T >( v ) ); + if ( newnode == 0 ) + return false; + + link_nodes_unsafe( newnode, newnode ); + return true; + } + /** Pushes as many objects from the range [begin, end) as freelist node can be allocated. * * \return iterator to the first element, which has not been pushed @@ -499,8 +539,8 @@ class stack template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > > bool pop( U& ret ) { - return consume_one( [ & ]( const T& t ) { - ret = U( t ); + return consume_one( [ & ]( T&& arg ) { + ret = std::forward< T >( arg ); } ); } @@ -540,7 +580,7 @@ class stack tagged_node_handle new_tos( pool.get_handle( new_tos_ptr ), old_tos.get_next_tag() ); tos.store( new_tos, memory_order_relaxed ); - detail::copy_payload( old_tos_pointer->v, ret ); + ret = std::move( old_tos_pointer->v ); pool.template destruct< false >( old_tos ); return true; } @@ -566,7 +606,7 @@ class stack tagged_node_handle new_tos( old_tos_pointer->next, old_tos.get_next_tag() ); if ( tos.compare_exchange_weak( old_tos, new_tos ) ) { - f( old_tos_pointer->v ); + f( std::move( old_tos_pointer->v ) ); pool.template destruct< true >( old_tos ); return true; } @@ -620,7 +660,7 @@ class stack for ( ;; ) { node* node_pointer = pool.get_pointer( nodes_to_consume ); - f( node_pointer->v ); + f( std::move( node_pointer->v ) ); element_count += 1; node* next_node = pool.get_pointer( node_pointer->next ); @@ -685,7 +725,7 @@ class stack for ( ;; ) { node* node_pointer = pool.get_pointer( nodes_in_reversed_order ); - f( node_pointer->v ); + f( std::move( node_pointer->v ) ); element_count += 1; node* next_node = pool.get_pointer( node_pointer->next ); diff --git a/test/queue_test.cpp b/test/queue_test.cpp index eb603d5..e4cdca0 100644 --- a/test/queue_test.cpp +++ b/test/queue_test.cpp @@ -207,3 +207,24 @@ BOOST_AUTO_TEST_CASE( queue_with_allocator ) }; } } + +BOOST_AUTO_TEST_CASE( move_semantics ) +{ + boost::lockfree::queue< int, boost::lockfree::capacity< 128 > > stk; + + stk.push( 0 ); + stk.push( 1 ); + + auto two = 2; + stk.push( std::move( two ) ); + + int out; + BOOST_TEST_REQUIRE( stk.pop( out ) ); + BOOST_TEST_REQUIRE( out == 0 ); + + stk.consume_one( []( int one ) { + BOOST_TEST_REQUIRE( one == 1 ); + } ); + + stk.consume_all( []( int ) {} ); +} diff --git a/test/spsc_queue_test.cpp b/test/spsc_queue_test.cpp index 76fa429..610e43c 100644 --- a/test/spsc_queue_test.cpp +++ b/test/spsc_queue_test.cpp @@ -359,3 +359,24 @@ BOOST_AUTO_TEST_CASE( spsc_queue_reset_test ) BOOST_TEST_REQUIRE( f.empty() ); } + +BOOST_AUTO_TEST_CASE( move_semantics ) +{ + boost::lockfree::spsc_queue< std::unique_ptr< int >, boost::lockfree::capacity< 128 > > stk; + + stk.push( std::make_unique< int >( 0 ) ); + stk.push( std::make_unique< int >( 1 ) ); + + auto two = std::make_unique< int >( 2 ); + stk.push( std::move( two ) ); + + std::unique_ptr< int > out; + BOOST_TEST_REQUIRE( stk.pop( out ) ); + BOOST_TEST_REQUIRE( *out == 0 ); + + stk.consume_one( []( std::unique_ptr< int > one ) { + BOOST_TEST_REQUIRE( *one == 1 ); + } ); + + stk.consume_all( []( std::unique_ptr< int > ) {} ); +} diff --git a/test/stack_test.cpp b/test/stack_test.cpp index b8fc4db..bcf58aa 100644 --- a/test/stack_test.cpp +++ b/test/stack_test.cpp @@ -272,3 +272,24 @@ BOOST_AUTO_TEST_CASE( stack_with_allocator ) }; } } + +BOOST_AUTO_TEST_CASE( move_semantics ) +{ + boost::lockfree::stack< std::unique_ptr< int >, boost::lockfree::capacity< 128 > > stk; + + stk.push( std::make_unique< int >( 0 ) ); + stk.push( std::make_unique< int >( 1 ) ); + + auto two = std::make_unique< int >( 2 ); + stk.push( std::move( two ) ); + + std::unique_ptr< int > out; + BOOST_TEST_REQUIRE( stk.pop( out ) ); + BOOST_TEST_REQUIRE( *out == 2 ); + + stk.consume_one( []( std::unique_ptr< int > one ) { + BOOST_TEST_REQUIRE( *one == 1 ); + } ); + + stk.consume_all( []( std::unique_ptr< int > ) {} ); +}