Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent_bounding_queue capacity on copy, move and swap operations #1609

Merged
merged 6 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, A

// A high-performance thread-safe non-blocking concurrent queue.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
class concurrent_queue {
using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
Expand Down Expand Up @@ -317,7 +316,6 @@ namespace d2 {
// A high-performance thread-safe blocking concurrent bounded queue.
// Supports boundedness and blocking semantics.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
class concurrent_bounded_queue {
using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
Expand Down Expand Up @@ -376,12 +374,14 @@ class concurrent_bounded_queue {
concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
concurrent_bounded_queue(a)
{
my_capacity = src.my_capacity;
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}

concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
{
my_capacity = src.my_capacity;
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}

Expand Down Expand Up @@ -420,6 +420,7 @@ class concurrent_bounded_queue {
if (my_queue_representation != other.my_queue_representation) {
clear();
my_allocator = other.my_allocator;
my_capacity = other.my_capacity;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
return *this;
Expand All @@ -435,6 +436,7 @@ class concurrent_bounded_queue {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
my_capacity = other.my_capacity;
}
}
return *this;
Expand Down Expand Up @@ -547,8 +549,10 @@ class concurrent_bounded_queue {

private:
void internal_swap( concurrent_bounded_queue& src ) {
std::swap(my_queue_representation, src.my_queue_representation);
std::swap(my_monitors, src.my_monitors);
using std::swap;
swap(my_queue_representation, src.my_queue_representation);
swap(my_capacity, src.my_capacity);
swap(my_monitors, src.my_monitors);
}

static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
Expand Down
44 changes: 43 additions & 1 deletion test/tbb/test_concurrent_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -282,3 +282,45 @@ TEST_CASE("Test clear and dtor with TrackableItem") {
test_tracking_dtors_on_clear<oneapi::tbb::concurrent_queue<TrackableItem>>();
test_tracking_dtors_on_clear<oneapi::tbb::concurrent_bounded_queue<TrackableItem>>();
}

//! \brief \ref regression
TEST_CASE("test capacity on modifying operations") {
// Test that concurrent_bounded_queue capacity is preserved on copying, moving and swapping
using queue_type = oneapi::tbb::concurrent_bounded_queue<int>;
using capacity_type = typename queue_type::size_type;

queue_type q;
capacity_type desired_capacity = 64;

q.set_capacity(desired_capacity);
REQUIRE_MESSAGE(q.capacity() == desired_capacity, "Capacity is not set correctly");

queue_type q_copy(q);
REQUIRE_MESSAGE(q_copy.capacity() == desired_capacity, "Capacity is not preserved on copying");

queue_type q_move(std::move(q));
REQUIRE_MESSAGE(q_move.capacity() == desired_capacity, "Capacity is not preserved on moving");

queue_type different_capacity_q1;
different_capacity_q1.set_capacity(desired_capacity * 2);

different_capacity_q1 = q_move;
REQUIRE_MESSAGE(different_capacity_q1.capacity() == desired_capacity,
"Capacity is not preserved on copy assignment");

queue_type different_capacity_q2;
different_capacity_q2.set_capacity(desired_capacity * 2);

different_capacity_q2 = std::move(q_move);
REQUIRE_MESSAGE(different_capacity_q2.capacity() == desired_capacity,
"Capacity is not preserved on move assignment");

queue_type different_capacity_q3;
different_capacity_q3.set_capacity(desired_capacity * 2);

different_capacity_q3.swap(different_capacity_q2);
REQUIRE_MESSAGE(different_capacity_q3.capacity() == desired_capacity,
"Capacity is not preserved on swap");
REQUIRE_MESSAGE(different_capacity_q2.capacity() == desired_capacity * 2,
"Capacity is not preserved on swap");
}
Loading