From 16a8d6299fc1d547c5cd904070b21ab5206afb8a Mon Sep 17 00:00:00 2001 From: Tim Blechmann Date: Wed, 22 Nov 2023 14:16:26 +0800 Subject: [PATCH] prototype spsc_value --- CMakeLists.txt | 1 + doc/lockfree.qbk | 33 +- include/boost/lockfree/detail/parameter.hpp | 21 +- include/boost/lockfree/lockfree_forward.hpp | 4 + include/boost/lockfree/policies.hpp | 27 +- include/boost/lockfree/spsc_value.hpp | 337 ++++++++++++++++++++ test/CMakeLists.txt | 2 + test/spsc_value_stress_test.cpp | 90 ++++++ test/spsc_value_test.cpp | 119 +++++++ 9 files changed, 592 insertions(+), 42 deletions(-) create mode 100644 include/boost/lockfree/spsc_value.hpp create mode 100644 test/spsc_value_stress_test.cpp create mode 100644 test/spsc_value_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4037869..f158ae4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(Boost::lockfree ALIAS boost_lockfree) if (CMAKE_VERSION VERSION_GREATER_EQUAL 3.23) set(Headers include/boost/lockfree/spsc_queue.hpp + include/boost/lockfree/spsc_value.hpp include/boost/lockfree/policies.hpp include/boost/lockfree/queue.hpp include/boost/lockfree/lockfree_forward.hpp diff --git a/doc/lockfree.qbk b/doc/lockfree.qbk index 6bc5f94..52e5722 100644 --- a/doc/lockfree.qbk +++ b/doc/lockfree.qbk @@ -115,7 +115,7 @@ lock-freedom: [h2 Data Structures] -_lockfree_ implements three lock-free data structures: +_lockfree_ implements four lock-free data structures: [variablelist [[[classref boost::lockfree::queue]] @@ -129,6 +129,10 @@ _lockfree_ implements three lock-free data structures: [[[classref boost::lockfree::spsc_queue]] [a wait-free single-producer/single-consumer queue (commonly known as ringbuffer)] ] + + [[[classref boost::lockfree::spsc_value]] + [a wait-free single-producer/single-consumer value (commonly known as triple buffer)] + ] ] [h3 Data Structure Configuration] @@ -150,7 +154,11 @@ The data structures can be configured with [@boost:/libs/parameter/doc/html/inde ] [[[classref boost::lockfree::allocator]] - [Defines the allocator. _lockfree_ supports stateful allocator and is compatible with [@boost:/libs/interprocess/index.html Boost.Interprocess] allocators.] + [Defines the allocator.] + ] + + [[[classref boost::lockfree::allow_multiple_reads]] + [Configures the [classref boost::lockfree::spsc_value] to allow the content to be read multiple times.] ] ] @@ -161,7 +169,7 @@ The data structures can be configured with [@boost:/libs/parameter/doc/html/inde [h2 Queue] -The [classref boost::lockfree::queue boost::lockfree::queue] class implements a multi-writer/multi-reader queue. The +The [classref boost::lockfree::queue] class implements a multi-writer/multi-reader queue. The following example shows how integer values are produced and consumed by 4 threads each: [import ../examples/queue.cpp] @@ -177,7 +185,7 @@ consumed 40000000 objects. [h2 Stack] -The [classref boost::lockfree::stack boost::lockfree::stack] class implements a multi-writer/multi-reader stack. The +The [classref boost::lockfree::stack] class implements a multi-writer/multi-reader stack. The following example shows how integer values are produced and consumed by 4 threads each: [import ../examples/stack.cpp] @@ -212,16 +220,6 @@ consumed 10000000 objects. [section Rationale] -[section Data Structures] - -The implementations are implementations of well-known data structures. The queue is based on -[@http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.37.3574 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Michael Scott and Maged Michael], -the stack is based on [@http://books.google.com/books?id=YQg3HAAACAAJ Systems programming: coping with parallelism by R. K. Treiber] -and the spsc_queue is considered as 'folklore' and is implemented in several open-source projects including the linux kernel. All -data structures are discussed in detail in [@http://books.google.com/books?id=pFSwuqtJgxYC "The Art of Multiprocessor Programming" by Herlihy & Shavit]. - -[endsect] - [section Memory Management] The lock-free [classref boost::lockfree::queue] and [classref boost::lockfree::stack] classes are node-based data structures, @@ -274,13 +272,6 @@ _lockfree_ requires a c++14 compliant compiler [endsect] -[section Future Developments] - -* More data structures (set, hash table, dequeue) -* Backoff schemes (exponential backoff or elimination) - -[endsect] - [section References] # [@http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.37.3574 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Michael Scott and Maged Michael], diff --git a/include/boost/lockfree/detail/parameter.hpp b/include/boost/lockfree/detail/parameter.hpp index c88d1fd..1617b55 100644 --- a/include/boost/lockfree/detail/parameter.hpp +++ b/include/boost/lockfree/detail/parameter.hpp @@ -24,6 +24,15 @@ namespace boost { namespace lockfree { namespace detail { template < typename bound_args, typename tag_type, typename default_ > using extract_arg_or_default_t = typename parameter::binding< bound_args, tag_type, default_ >::type; + +template < typename BoundArgs, typename TypeTag, typename IntegralType, IntegralType default_ = IntegralType {} > +struct extract_integral_arg_or_default_t +{ + static constexpr IntegralType value + = extract_arg_or_default_t< BoundArgs, TypeTag, std::integral_constant< IntegralType, default_ > >::value; +}; + + struct no_such_parameter_t {}; @@ -65,20 +74,16 @@ using extract_allocator_t = typename extract_allocator< bound_args, T >::type; //---------------------------------------------------------------------------------------------------------------------- template < typename bound_args, bool default_ = false > -struct extract_fixed_sized -{ - using capacity_t - = extract_arg_or_default_t< bound_args, tag::fixed_sized, std::integral_constant< bool, default_ > >; +using extract_fixed_sized = extract_integral_arg_or_default_t< bound_args, tag::fixed_sized, bool, default_ >; - static constexpr bool value = capacity_t::value; -}; +//---------------------------------------------------------------------------------------------------------------------- template < typename bound_args, bool default_ = false > -using extract_fixed_sized_t = typename extract_fixed_sized< bound_args, default_ >::type; +using extract_allow_multiple_reads + = extract_integral_arg_or_default_t< bound_args, tag::allow_multiple_reads, bool, default_ >; //---------------------------------------------------------------------------------------------------------------------- - }}} // namespace boost::lockfree::detail #endif /* BOOST_LOCKFREE_DETAIL_PARAMETER_HPP */ diff --git a/include/boost/lockfree/lockfree_forward.hpp b/include/boost/lockfree/lockfree_forward.hpp index 30385d8..3b9c717 100644 --- a/include/boost/lockfree/lockfree_forward.hpp +++ b/include/boost/lockfree/lockfree_forward.hpp @@ -50,6 +50,10 @@ template < typename T, typename... Options > requires( std::is_default_constructible_v< T >, std::is_move_assignable_v< T > || std::is_copy_assignable_v< T > ) # endif class spsc_queue; + +template < typename T, typename... Options > +struct spsc_value; + }} // namespace boost::lockfree #endif // BOOST_DOXYGEN_INVOKED diff --git a/include/boost/lockfree/policies.hpp b/include/boost/lockfree/policies.hpp index 8bea5d0..334e5a0 100644 --- a/include/boost/lockfree/policies.hpp +++ b/include/boost/lockfree/policies.hpp @@ -20,33 +20,26 @@ namespace tag { struct allocator; struct fixed_sized; struct capacity; +struct allow_multiple_reads; } // namespace tag -/** Configures a data structure as \b fixed-sized. - * - * The internal nodes are stored inside an array and they are addressed by array indexing. This limits the possible - * size of the queue to the number of elements that can be addressed by the index type (usually 2**16-2), but on - * platforms that lack double-width compare-and-exchange instructions, this is the best way to achieve lock-freedom. - * This implies that a data structure is bounded. - * */ template < bool IsFixedSized > struct fixed_sized : boost::parameter::template_keyword< tag::fixed_sized, std::integral_constant< bool, IsFixedSized > > {}; -/** Sets the \b capacity of a data structure at compile-time. - * - * This implies that a data structure is bounded and fixed-sized. - * */ template < size_t Size > struct capacity : boost::parameter::template_keyword< tag::capacity, std::integral_constant< size_t, Size > > {}; -/** Defines the \b allocator type of a data structure. - * */ template < class Alloc > struct allocator : boost::parameter::template_keyword< tag::allocator, Alloc > {}; +template < bool AllowMultipleReads > +struct allow_multiple_reads : + boost::parameter::template_keyword< tag::allow_multiple_reads, std::integral_constant< bool, AllowMultipleReads > > +{}; + #else /** Configures a data structure as \b fixed-sized. @@ -71,6 +64,14 @@ struct capacity; template < class Alloc > struct allocator; +/** Configures the spsc_value to consume the value multiple times + * + * Caveats: + * * one cannot move the value out + * */ +template < bool AllowMultipleReads > +struct allow_multiple_reads; + #endif }} // namespace boost::lockfree diff --git a/include/boost/lockfree/spsc_value.hpp b/include/boost/lockfree/spsc_value.hpp new file mode 100644 index 0000000..4630204 --- /dev/null +++ b/include/boost/lockfree/spsc_value.hpp @@ -0,0 +1,337 @@ +// lock-free single-producer/single-consumer value +// implemented via a triple buffer +// +// Copyright (C) 2023 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED +#define BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED + +#include + +#ifdef BOOST_HAS_PRAGMA_ONCE +#pragma once +#endif + +#include +#include +#include +#include + +#include +#include + +#include +#include + +#ifndef BOOST_DOXYGEN_INVOKED + +# ifdef BOOST_NO_CXX17_IF_CONSTEXPR +# define ifconstexpr +# else +# define ifconstexpr constexpr +# endif + +#endif + +namespace boost { +namespace lockfree { + +/** The spcs_value provides a single-writer/single-reader value, implemented by a triple buffer + * + * \b Policies: + * - \ref boost::lockfree::allow_multiple_reads, defaults to + * \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads" \n + * If multiple reads are allowed, a value written to the spsc_value can be read multiple times, but not moved out + * of the instance. If multiple reads are not allowed, the class works as single-element queue that overwrites on + * write + * + * */ +template < typename T, typename... Options > +struct spsc_value +{ +#ifndef BOOST_DOXYGEN_INVOKED +private: + using spsc_value_signature = parameter::parameters< boost::parameter::optional< tag::allow_multiple_reads > >; + using bound_args = typename spsc_value_signature::bind< Options... >::type; + + static const bool allow_multiple_reads = detail::extract_allow_multiple_reads< bound_args >::value; + +public: +#endif + + /** Construct a \ref boost::lockfree::spsc_value "spsc_value" + * + * If configured with \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads" it + * is initialized to a default-constructed value + * + * */ + explicit spsc_value() + { + if ifconstexpr ( allow_multiple_reads ) { + // populate initial reader + m_write_index = tagged_index { + 1, + }; + m_available_index.store( + tagged_index { + 0, + true, + }, + detail::memory_order_relaxed ); + m_buffer[ 0 ].value = {}; + } + } + + /** Construct a \ref boost::lockfree::spsc_value "spsc_value", initialized to a value + * */ + explicit spsc_value( T value ) : + m_write_index { + 1, + }, + m_available_index { + tagged_index { + 0, + true, + }, + } + { + m_buffer[ 0 ].value = std::move( value ); + } + + /** Writes `value` to the \ref boost::lockfree::spsc_value "spsc_value" + * + * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value" + * \post object will be written to the \ref boost::lockfree::spsc_value "spsc_value" + * + * \note Thread-safe and wait-free + * */ + void write( T&& value ) + { + m_buffer[ m_write_index.index() ].value = std::forward< T >( value ); + swap_write_buffer(); + } + + /// \copydoc boost::lockfree::spsc_value::write(T&& value) + void write( const T& value ) + { + m_buffer[ m_write_index.index() ].value = value; + swap_write_buffer(); + } + + /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value" + * + * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value" + * \post if read operation is successful, object will be copied to `ret`. + * \returns `true`, if the read operation is successful, false if the \ref boost::lockfree::spsc_value "spsc_value" is + * configured with \ref boost::lockfree::allow_multiple_reads + * "boost::lockfree::allow_multiple_reads" and no value is available for reading + * + * \note Thread-safe and wait-free + * */ + bool read( T& ret ) + { +#ifndef BOOST_NO_CXX17_IF_CONSTEXPR + bool read_index_updated = swap_read_buffer(); + + if constexpr ( allow_multiple_reads ) { + ret = m_buffer[ m_read_index.index() ].value; + } else { + if ( !read_index_updated ) + return false; + ret = std::move( m_buffer[ m_read_index.index() ].value ); + } + + return true; +#else + return read_helper( ret, std::integral_constant< bool, allow_multiple_reads > {} ); +#endif + } + +#if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED ) + /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value", returning an optional + * + * \pre only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value" + * \returns `std::optional` with value if successful, `std::nullopt` if spsc_value is configured with \ref + * boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads" and no value is + * available for reading + * + * \note Thread-safe and wait-free + * */ + std::optional< T > read( uses_optional_t ) + { + T to_dequeue; + if ( read( to_dequeue ) ) + return to_dequeue; + else + return std::nullopt; + } +#endif + + /** consumes value via a functor + * + * reads element from the spsc_value and applies the functor on this object + * + * \returns `true`, if element was consumed + * + * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking + * */ + + template < typename Functor > + bool consume( Functor&& f ) + { +#ifndef BOOST_NO_CXX17_IF_CONSTEXPR + bool read_index_updated = swap_read_buffer(); + + if constexpr ( allow_multiple_reads ) { + f( m_buffer[ m_read_index.index() ].value ); + } else { + if ( !read_index_updated ) + return false; + f( std::move( m_buffer[ m_read_index.index() ].value ) ); + } + + return true; +#else + return consume_helper( f, std::integral_constant< bool, allow_multiple_reads > {} ); +#endif + } + +private: +#ifndef BOOST_DOXYGEN_INVOKED + using allow_multiple_reads_true = std::true_type; + using allow_multiple_reads_false = std::false_type; + +# ifdef BOOST_NO_CXX17_IF_CONSTEXPR + template < typename Functor > + bool consume_helper( Functor&& f, allow_multiple_reads_true = {} ) + { + swap_read_buffer(); + f( m_buffer[ m_read_index.index() ].value ); + return true; + } + + template < typename Functor > + bool consume_helper( Functor&& f, allow_multiple_reads_false = {} ) + { + bool read_index_updated = swap_read_buffer(); + if ( !read_index_updated ) + return false; + f( std::move( m_buffer[ m_read_index.index() ].value ) ); + return true; + } + + template < typename TT > + bool read_helper( TT& ret, allow_multiple_reads_true = {} ) + { + swap_read_buffer(); + ret = m_buffer[ m_read_index.index() ].value; + return true; + } + + template < typename TT > + bool read_helper( TT& ret, allow_multiple_reads_false = {} ) + { + bool read_index_updated = swap_read_buffer(); + if ( !read_index_updated ) + return false; + ret = std::move( m_buffer[ m_read_index.index() ].value ); + return true; + } +# endif + + void swap_write_buffer() + { + tagged_index old_avail_index = m_available_index.exchange( + tagged_index { + m_write_index.index(), + true, + }, + std::memory_order_release ); + m_write_index.set_tag_and_index( old_avail_index.index(), false ); + } + + bool swap_read_buffer() + { + constexpr bool use_compare_exchange = false; // exchange is most likely faster + + if ifconstexpr ( use_compare_exchange ) { + tagged_index new_avail_index = m_read_index; + + tagged_index current_avail_index_with_tag = tagged_index { + m_available_index.load( std::memory_order_acquire ).index(), + true, + }; + + if ( m_available_index.compare_exchange_strong( current_avail_index_with_tag, + new_avail_index, + std::memory_order_acquire ) ) { + m_read_index = tagged_index( current_avail_index_with_tag.index(), false ); + return true; + } else + return false; + } else { + tagged_index new_avail_index = m_read_index; + + tagged_index current_avail_index = m_available_index.load( std::memory_order_acquire ); + if ( !current_avail_index.is_consumable() ) + return false; + + current_avail_index = m_available_index.exchange( new_avail_index, std::memory_order_acquire ); + m_read_index = tagged_index { + current_avail_index.index(), + false, + }; + return true; + } + } + + struct tagged_index + { + tagged_index( uint8_t index, bool tag = false ) + { + set_tag_and_index( index, tag ); + } + + uint8_t index() const + { + return byte & 0x07; + } + + bool is_consumable() const + { + return byte & 0x08; + } + + void set_tag_and_index( uint8_t index, bool tag ) + { + byte = index | ( tag ? 0x08 : 0x00 ); + } + + uint8_t byte; + }; + + static constexpr size_t cacheline_bytes = BOOST_LOCKFREE_CACHELINE_BYTES; + + struct alignas( cacheline_bytes ) cache_aligned_value + { + T value; + }; + + std::array< cache_aligned_value, 3 > m_buffer; + + alignas( cacheline_bytes ) tagged_index m_write_index { 0 }; + alignas( cacheline_bytes ) detail::atomic< tagged_index > m_available_index { 1 }; + alignas( cacheline_bytes ) tagged_index m_read_index { 2 }; +#endif +}; + +} // namespace boost::lockfree +} // namespace boost + +#undef ifconstexpr + +#endif /* BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED */ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index e0add4c..0c92ffc 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -25,6 +25,8 @@ set(Tests stack_test stack_unbounded_stress_test tagged_ptr_test + spsc_value_test + spsc_value_stress_test ) foreach(Test ${Tests}) diff --git a/test/spsc_value_stress_test.cpp b/test/spsc_value_stress_test.cpp new file mode 100644 index 0000000..a3b5f3a --- /dev/null +++ b/test/spsc_value_stress_test.cpp @@ -0,0 +1,90 @@ +// Copyright (C) 2011-2013 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +#include + +#include +#include + +// #define BOOST_LOCKFREE_STRESS_TEST + +#ifndef BOOST_LOCKFREE_STRESS_TEST +static const uint64_t nodes_per_thread = 100000; +#else +static const uint64_t nodes_per_thread = 100000000; +#endif + + +BOOST_AUTO_TEST_CASE( spsc_value_stress_test ) +{ + boost::lockfree::spsc_value< uint64_t > v; + + std::atomic< bool > done; + + std::thread producer( [ & ] { + for ( uint64_t i = 0; i != nodes_per_thread; ++i ) + v.write( i ); + done = true; + } ); + + boost::optional< uint64_t > consumed; + while ( !done.load( std::memory_order_relaxed ) ) { + uint64_t out; + bool read_success = v.read( out ); + + if ( !read_success ) { + std::this_thread::yield(); + continue; + } + + if ( consumed ) + BOOST_TEST_REQUIRE( out > *consumed ); + consumed = out; + } + + producer.join(); +} + +BOOST_AUTO_TEST_CASE( spsc_value_stress_test_allow_multiple_reads ) +{ + boost::lockfree::spsc_value< uint64_t, boost::lockfree::allow_multiple_reads< true > > v; + + std::atomic< bool > done; + + std::thread producer( [ & ] { + for ( uint64_t i = 0; i != nodes_per_thread; ++i ) { + std::this_thread::yield(); + v.write( i ); + } + done = true; + } ); + + boost::optional< uint64_t > consumed; + while ( !done.load( std::memory_order_relaxed ) ) { + uint64_t out {}; + bool read_success = v.read( out ); + + if ( !read_success ) { + std::this_thread::yield(); + continue; + } + + if ( consumed ) + BOOST_TEST_REQUIRE( out >= *consumed ); + consumed = out; + } + + producer.join(); +} diff --git a/test/spsc_value_test.cpp b/test/spsc_value_test.cpp new file mode 100644 index 0000000..1cdca9f --- /dev/null +++ b/test/spsc_value_test.cpp @@ -0,0 +1,119 @@ +// Copyright (C) 2011-2013 Tim Blechmann +// +// Distributed under the Boost Software License, Version 1.0. (See +// accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#include + +#define BOOST_TEST_MAIN +#ifdef BOOST_LOCKFREE_INCLUDE_TESTS +# include +#else +# include +#endif + +BOOST_AUTO_TEST_CASE( spsc_value_test ) +{ + boost::lockfree::spsc_value< uint64_t > v; + + auto validate_value = [ & ]( uint64_t expected ) { + uint64_t out {}; + BOOST_TEST_REQUIRE( v.read( out ) == true ); + BOOST_TEST_REQUIRE( out == expected ); + }; + + auto validate_no_pending_update = [ & ] { + uint64_t out {}; + BOOST_TEST_REQUIRE( v.read( out ) == false ); + }; + + validate_no_pending_update(); + + v.write( 1 ); + validate_value( 1 ); + + v.write( 2 ); + validate_value( 2 ); + v.write( 3 ); + validate_value( 3 ); + v.write( 4 ); + validate_value( 4 ); + validate_no_pending_update(); + validate_no_pending_update(); + validate_no_pending_update(); + + v.write( 5 ); + v.write( 6 ); + v.write( 7 ); + validate_value( 7 ); +} + +BOOST_AUTO_TEST_CASE( spsc_value_test_allow_duplicate_reads ) +{ + boost::lockfree::spsc_value< uint64_t, boost::lockfree::allow_multiple_reads< true > > v { 0xff }; + + auto validate_value = [ & ]( uint64_t expected ) { + uint64_t out {}; + BOOST_TEST_REQUIRE( v.read( out ) == true ); + BOOST_TEST_REQUIRE( out == expected ); + }; + + validate_value( 0xff ); + + v.write( 1 ); + validate_value( 1 ); + + v.write( 2 ); + validate_value( 2 ); + v.write( 3 ); + validate_value( 3 ); + v.write( 4 ); + validate_value( 4 ); + validate_value( 4 ); + validate_value( 4 ); + validate_value( 4 ); + + v.write( 5 ); + v.write( 6 ); + v.write( 7 ); + validate_value( 7 ); +} + + +BOOST_AUTO_TEST_CASE( spsc_value_test_move_only_type ) +{ + auto make_t = [ & ]( uint64_t val ) { + return std::make_unique< uint64_t >( val ); + }; + + boost::lockfree::spsc_value< std::unique_ptr< uint64_t > > v; + + auto validate_value = [ & ]( uint64_t expected ) { + std::unique_ptr< uint64_t > out; + BOOST_TEST_REQUIRE( v.read( out ) == true ); + BOOST_TEST_REQUIRE( *out == expected ); + }; + + auto t1 = make_t( 1 ); + auto t2 = make_t( 2 ); + auto t3 = make_t( 3 ); + + v.write( std::move( t1 ) ); + validate_value( 1 ); + + v.write( std::move( t2 ) ); + BOOST_TEST_REQUIRE( v.consume( []( const std::unique_ptr< uint64_t >& out ) { + BOOST_TEST_REQUIRE( *out == uint64_t( 2 ) ); + } ) ); + + v.write( std::move( t3 ) ); + BOOST_TEST_REQUIRE( v.consume( []( std::unique_ptr< uint64_t > out ) { + BOOST_TEST_REQUIRE( *out == uint64_t( 3 ) ); + } ) ); + + BOOST_TEST_REQUIRE( v.consume( []( std::unique_ptr< uint64_t > out ) {} ) == false ); + + std::unique_ptr< uint64_t > out; + BOOST_TEST_REQUIRE( v.read( out ) == false ); +}