-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.h
137 lines (111 loc) · 3.36 KB
/
queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <optional>
#include <functional>
#include <algorithm>
template<typename Data>
class Queue {
public:
/**
* @brief Construct queue with desired size
* @exception std::length_error
*/
Queue(std::size_t size) noexcept(false) : m_maxCap{size} {
m_buffer.reserve(size);
}
/**
* @brief Blocking method wait until data available
* @param No
* @return if queue is not closed first data in the queue, otherwise null.
* @details Thread safety is guaranteed. Return value must be checked.
*/
[[nodiscard]] std::optional<Data> pop() {
std::unique_lock lock{m_mutex};
if (disableQueue)
return std::optional<Data>{};
cvBlockPop.wait(lock,[&](){return m_size > 0;});
Data d = m_buffer.front();
remove();
return std::optional(d);
}
/**
* @brief Non-blocking push.
* @param data to be added to the queue
* @return True if pushing is successful, otherwise false
* @details Thread safety is guaranteed. Return value must be checked.
*/
[[nodiscard]] bool push(Data data) {
std::unique_lock lock{m_mutex};
if (disableQueue)
return false;
if (m_size == m_maxCap)
return false;
add(data);
cvBlockPop.notify_all();
return true;
}
/**
* @brief Disables queue pop,push operations.
* @details Thread safety is guaranteed
*/
void close() noexcept {
disableQueue=true;
}
/**
* @brief Thread Safety, get first element satisfies predicate.
* Can be called after queue is closed
* @return If predicate satisfied desired element, otherwise null.
* @param Predicate callable for desired element.
* @details Thread safety is guaranteed. Return value must be checked.
*/
[[nodiscard]] std::optional<Data> get(std::function<bool(Data &)> predicate) {
std::unique_lock lock{m_mutex};
auto findIter = std::find_if(m_buffer.begin(),m_buffer.end(),predicate);
return findIter!=m_buffer.end()?*findIter:std::optional<Data>{};
}
private:
/**
* @details Add data to underlying buffer
* Updates m_size attribute
*/
void add(Data& data) {
m_buffer.push_back(data);
m_size = m_buffer.size();
}
/**
* @details Remove first data from underlying buffer
* Updates m_size attribute
*/
void remove() {
m_buffer.erase(m_buffer.begin());
m_size = m_buffer.size();
}
/**
* @details Underlying buffer to store queue elements
*/
std::vector<Data> m_buffer;
/**
* @details Mutex member used to sync multi thread applications
*/
std::mutex m_mutex;
/**
* @details Used for blocking pop operation.
*/
std::condition_variable cvBlockPop;
/**
* @details Used as alias to underlying buffer size
*/
std::atomic<int> m_size{0};
/**
* @details Used as alias to underlying buffer capacity
*/
const std::atomic<size_t> m_maxCap;
/**
* @details Disables queue pop,push operations.
* Get operations is also available after closing.
*/
std::atomic<bool> disableQueue{false};
};