Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support moveable objects and allow emplacing #31

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
149 changes: 139 additions & 10 deletions include/boost/lockfree/spsc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@

#include <algorithm>
#include <memory>
#include <utility>

#include <boost/aligned_storage.hpp>
#include <boost/assert.hpp>
#include <boost/static_assert.hpp>
#include <boost/utility.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/config.hpp> // for BOOST_LIKELY
#include <boost/config.hpp> // for BOOST_LIKELY, BOOST_NO_CXX11_RVALUE_REFERENCES, and BOOST_NO_CXX11_VARIADIC_TEMPLATES

#include <boost/type_traits/has_trivial_destructor.hpp>
#include <boost/type_traits/is_convertible.hpp>
#include <boost/type_traits/is_copy_constructible.hpp>

#include <boost/lockfree/detail/atomic.hpp>
#include <boost/lockfree/detail/copy_payload.hpp>
Expand Down Expand Up @@ -112,6 +114,43 @@ class ringbuffer_base

return true;
}
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES

bool push(T&& t, T * buffer, size_t max_size)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this work on C++98 too with Boost.Move support by using BOOST_RV_REF(T) instead of T&& combined with boost::move instead of std::move.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't have a strong opinion on this: c++98 compilers are dying out and not so much new code is written for them. we just need to make sure, not to break any existing code by introducing unguarded c++1X constructs

{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);

if (next == read_index_.load(memory_order_acquire))
return false; /* ringbuffer is full */

new (buffer + write_index) T(std::move_if_noexcept(t)); // move-construct

write_index_.store(next, memory_order_release);

return true;
}

#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(T * buffer, size_t max_size, Args&&... args )
{
const size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
const size_t next = next_index(write_index, max_size);

if (next == read_index_.load(memory_order_acquire))
return false; /* ringbuffer is full */

new (buffer + write_index) T{std::forward<Args>(args)...}; // emplace

write_index_.store(next, memory_order_release);

return true;
}
#endif

#endif

size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
{
Expand Down Expand Up @@ -279,12 +318,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = output_count - count0;

copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
move_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
move_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);

new_read_index -= max_size;
} else {
copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
move_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
if (new_read_index == max_size)
new_read_index = 0;
}
Expand All @@ -310,12 +349,12 @@ class ringbuffer_base
const size_t count0 = max_size - read_index;
const size_t count1 = avail - count0;

it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
copy_and_delete(internal_buffer, internal_buffer + count1, it);
it = move_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
move_and_delete(internal_buffer, internal_buffer + count1, it);

new_read_index -= max_size;
} else {
copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
move_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
if (new_read_index == max_size)
new_read_index = 0;
}
Expand Down Expand Up @@ -382,8 +421,30 @@ class ringbuffer_base
return write_index == read_index;
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
template< class OutputIterator >
typename boost::enable_if< typename boost::is_constructible<T, T>::type, OutputIterator>::type
move_and_delete( T * first, T * last, OutputIterator out )
{
if (boost::has_trivial_destructor<T>::value) {
return std::move(first, last, out); // will use memcpy if possible

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating this function, and using SFINAE to select the desired one, I would just have one implementation. This is safe because while, during overload resolution, the compiler will prefer move assignment, it will still fall back to copy assignment if no move assignment operator is available.

Additionally I'd use boost::move from <boost/move/algorithm.hpp> and boost::move from <boost/move/utility_core.hpp>. That will work without having to use the preprocessor (it's valid regardless of whether BOOST_NO_CXX11_RVALUE_REFERENCES is defined) and additionally supports types that implement move emulation too.

} else {
for (; first != last; ++first, ++out) {
*out = std::move(*first);
first->~T();
}
return out;
}
}
#endif

template< class OutputIterator >
OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
typename boost::disable_if< typename boost::is_constructible<T, T>::type, OutputIterator>::type
#else
OutputIterator
#endif
move_and_delete( T * first, T * last, OutputIterator out )
{
if (boost::has_trivial_destructor<T>::value) {
return std::copy(first, last, out); // will use memcpy if possible
Expand Down Expand Up @@ -450,6 +511,23 @@ class compile_time_sized_ringbuffer:
return ringbuffer_base<T>::push(t, data(), max_size);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
bool push(T&& t)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: BOOST_RV_REF (also on C++98).

This does not apply to the variadic templates below though.

{
return ringbuffer_base<T>::push(std::move(t), data(), max_size);
}

#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return ringbuffer_base<T>::emplace(data(), max_size, std::forward<Args>(args)...);
}
#endif

#endif

template <typename Functor>
bool consume_one(Functor & f)
{
Expand Down Expand Up @@ -562,6 +640,23 @@ class runtime_sized_ringbuffer:
{
return ringbuffer_base<T>::push(t, &*array_, max_elements_);
}
#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES

bool push(T&& t)
{
return ringbuffer_base<T>::push(std::move(t), &*array_, max_elements_);
}

#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return ringbuffer_base<T>::emplace(&*array_, max_elements_, std::forward<Args>(args)...);
}
#endif

#endif

template <typename Functor>
bool consume_one(Functor & f)
Expand Down Expand Up @@ -761,6 +856,41 @@ class spsc_queue:
return base_type::push(t);
}

#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES

/** Pushes object t to the ringbuffer via move construction/
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe comment about the API which is not available if BOOST_NO_CXX11_RVALUE_REFERENCES is defined

* */

bool push(T&& t)
{
return base_type::push(std::move(t));
}

#ifndef BOOST_NO_CXX11_VARIADIC_TEMPLATES
/** Emplaces an instance of T to the ringbuffer via direct initialization using the given constructor arguments
*
* \pre only one thread is allowed to push data to the spsc_queue
* \post object will be pushed to the spsc_queue, unless it is full.
* \return true, if the push operation is successful.
*
* \note Thread-safe and wait-free
* */
template<typename... Args>
typename boost::enable_if< typename boost::is_constructible<T, Args...>::type, bool>::type
emplace(Args&&... args)
{
return base_type::emplace(std::forward<Args>(args)...);
}
#endif

#endif

/** Pops one object from ringbuffer.
*
* \pre only one thread is allowed to pop data to the spsc_queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just dreaming here, but:

It may be worth considering converting this argument-less pop() from returning bool to boost::optional<T>, which is convertible to bool. Because the current version, just returning bool is kind of worthless for everything except "some event has happened", similar to boost::optional<void> (if we had regular void). That would allow user code like this:

while (auto item = queue.pop())
{
  // process item
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true. one could only introduce a new API queue.pop_optional(). i'd merge a PR if you implement it :)

Expand Down Expand Up @@ -964,8 +1094,7 @@ class spsc_queue:
if ( !boost::has_trivial_destructor<T>::value ) {
// make sure to call all destructors!

T dummy_element;
while (pop(dummy_element))
while (pop())
{}
} else {
base_type::write_index_.store(0, memory_order_relaxed);
Expand Down
159 changes: 159 additions & 0 deletions test/spsc_queue_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <iostream>
#include <memory>
#include <utility>

#include "test_helpers.hpp"
#include "test_common.hpp"
Expand Down Expand Up @@ -405,3 +406,161 @@ BOOST_AUTO_TEST_CASE( spsc_queue_reset_test )

BOOST_REQUIRE(f.empty());
}

Copy link

@tnovotny tnovotny Jul 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there also be a test for a move_only_type like unique_ptr?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

struct immovable_type
{
int value;

immovable_type() : value(0) {}
immovable_type(int x) : value(x) {}
immovable_type(const immovable_type& other) : value(other.value) {}

immovable_type& operator=(const immovable_type& other)
{
this->value = other.value;
return *this;
}

#ifndef BOOST_NO_CXX11_DELETED_FUNCTIONS
#ifndef BOOST_NO_CX11_RVALUE_REFERENCES
immovable_type(immovable_type&&) = delete;
immovable_type& operator=(immovable_type&&) = delete;
#endif
#endif
};

struct test_immovable_object
{
test_immovable_object(int i):
i(i)
{}

void operator()(immovable_type arg) const
{
BOOST_REQUIRE_EQUAL(arg.value, i);
}

int i;
};

BOOST_AUTO_TEST_CASE( spsc_queue_immovable_object_test )
{
spsc_queue<immovable_type, capacity<64> > f;

f.emplace( 9001 );
f.emplace( 9002 );
f.push( immovable_type(9003) );

immovable_type x(9004);
immovable_type push_array[2];
push_array[0] = x;
push_array[1] = x;
f.push(push_array, 2);

immovable_type pop_array[2];
f.pop(pop_array, 2);
BOOST_CHECK_EQUAL( pop_array[0].value, 9001 );
BOOST_CHECK_EQUAL( pop_array[1].value, 9002 );

f.consume_one( test_immovable_object(9003) );
f.consume_all( test_immovable_object(9004) );

f.emplace( 42 );
f.emplace( 42 );
f.reset();
}

#ifndef BOOST_NO_CX11_RVALUE_REFERENCES
struct refcount_handle
{
int* p_refcount;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the hungarian notation here. Consider renaming the variable.


refcount_handle() : p_refcount( nullptr ) {}
refcount_handle(int* x) : p_refcount(x)
{
if (p_refcount != nullptr)
{
*p_refcount += 1;
}
}
~refcount_handle()
{
if (p_refcount != nullptr)
{
*p_refcount -= 1;
}
}

refcount_handle(const refcount_handle& other) = delete;
refcount_handle& operator=(const refcount_handle&) = delete;

refcount_handle& operator=(refcount_handle&& other)
{
if (this->p_refcount != nullptr)
{
*p_refcount -= 1;
}

this->p_refcount = other.p_refcount;
other.p_refcount = nullptr;
return *this;
}
refcount_handle(refcount_handle&& other)
: p_refcount( other.p_refcount )
{
other.p_refcount = nullptr;
}
};

struct test_refcount_handle
{
test_refcount_handle(int* p_refcount):
p_refcount(p_refcount)
{}

void operator()(refcount_handle& arg) const
{
BOOST_REQUIRE_EQUAL(arg.p_refcount, p_refcount);
}

int* p_refcount;
};

BOOST_AUTO_TEST_CASE( spsc_queue_refcount_test )
{
spsc_queue<refcount_handle, capacity<64> > f;

int refcount = 0;

f.emplace( &refcount );
f.emplace( &refcount );
f.emplace( &refcount );
f.emplace( &refcount );
BOOST_CHECK_EQUAL( refcount, 4 );

f.push( std::move( refcount_handle(&refcount) ) );

BOOST_CHECK_EQUAL( refcount, 5 );

{
refcount_handle pop_array[2];
f.pop(pop_array, 2);
BOOST_CHECK_EQUAL( pop_array[0].p_refcount, &refcount );
BOOST_CHECK_EQUAL( pop_array[1].p_refcount, &refcount );

BOOST_CHECK_EQUAL( refcount, 5 );
}
BOOST_CHECK_EQUAL( refcount, 3 );

f.consume_one( test_refcount_handle(&refcount) );
BOOST_CHECK_EQUAL( refcount, 2 );
f.consume_all( test_refcount_handle(&refcount) );
BOOST_CHECK_EQUAL( refcount, 0 );

f.emplace( &refcount );
f.emplace( &refcount );
BOOST_CHECK_EQUAL( refcount, 2 );
f.reset();
BOOST_CHECK_EQUAL( refcount, 0 );
}
#endif