-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLockFreeQueue.h
203 lines (176 loc) · 6.1 KB
/
LockFreeQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//
// Created by djshaji on 3/5/24.
//
/*
* Copyright 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef AMP_RACK_LOCKFREEQUEUE_H
#define AMP_RACK_LOCKFREEQUEUE_H
#ifndef __linux__
#define gettid getpid
#endif
#include <cstdint>
#include <atomic>
#include <cstdlib>
#include <thread>
#include "logging_macros.h"
#include <unistd.h>
#include "AudioBuffer.h"
#include <jni.h>
//#include "Engine.h"
/**
* A lock-free queue for single consumer, single producer. Not thread-safe when using multiple
* consumers or producers.
*
* Example code:
*
* LockFreeQueue<int, 1024> myQueue;
* int value = 123;
* myQueue.push(value);
* myQueue.pop(value);
*
* @tparam T - The item type
* @tparam CAPACITY - Maximum number of items which can be held in the queue. Must be a power of 2.
* Must be less than the maximum value permissible in INDEX_TYPE
* @tparam INDEX_TYPE - The internal index type, defaults to uint32_t. Changing this will affect
* the maximum capacity. Included for ease of unit testing because testing queue lengths of
* UINT32_MAX can be time consuming and is not always possible.
*/
template <typename T, uint32_t CAPACITY, typename INDEX_TYPE = uint32_t>
class LockFreeQueue {
public:
/**
* Implementation details:
*
* We have 2 counters: readCounter and writeCounter. Each will increment until it reaches
* INDEX_TYPE_MAX, then wrap to zero. Unsigned integer overflow is defined behaviour in C++.
*
* Each time we need to access our data array we call mask() which gives us the index into the
* array. This approach avoids having a "dead item" in the buffer to distinguish between full
* and empty states. It also allows us to have a size() method which is easily calculated.
*
* IMPORTANT: This implementation is only thread-safe with a single reader thread and a single
* writer thread. Have more than one of either will result in Bad Things™.
*/
static constexpr bool isPowerOfTwo(uint32_t n) { return (n & (n - 1)) == 0; }
static_assert(isPowerOfTwo(CAPACITY), "Capacity must be a power of 2");
static_assert(std::is_unsigned<INDEX_TYPE>::value, "Index type must be unsigned");
/**
* Pop a value off the head of the queue
*
* @param val - element will be stored in this variable
* @return true if value was popped successfully, false if the queue is empty
*/
bool pop(T &val) {
if (isEmpty()){
return false;
} else {
val = buffer[mask(readCounter)];
++readCounter;
return true;
}
}
/**
* Add an item to the back of the queue
*
* @param item - The item to add
* @return true if item was added, false if the queue was full
*/
bool push(const T& item) {
if (isFull()){
return false;
} else {
buffer[mask(writeCounter)] = item;
++writeCounter;
return true;
}
}
/**
* Get the item at the front of the queue but do not remove it
*
* @param item - item will be stored in this variable
* @return true if item was stored, false if the queue was empty
*/
bool peek(T &item) const {
if (isEmpty()){
return false;
} else {
item = buffer[mask(readCounter)];
return true;
}
}
/**
* Get the number of items in the queue
*
* @return number of items in the queue
*/
INDEX_TYPE size() const {
/**
* This is worth some explanation:
*
* Whilst writeCounter is greater than readCounter the result of (write - read) will always
* be positive. Simple.
*
* But when writeCounter is equal to INDEX_TYPE_MAX (e.g. UINT32_MAX) the next push will
* wrap it around to zero, the start of the buffer, making writeCounter less than
* readCounter so the result of (write - read) will be negative.
*
* But because we're returning an unsigned type return value will be as follows:
*
* returnValue = INDEX_TYPE_MAX - (write - read)
*
* e.g. if write is 0, read is 150 and the INDEX_TYPE is uint8_t where the max value is
* 255 the return value will be (255 - (0 - 150)) = 105.
*
*/
return writeCounter - readCounter;
};
private:
bool isEmpty() const { return readCounter == writeCounter; }
bool isFull() const { return size() == CAPACITY; }
INDEX_TYPE mask(INDEX_TYPE n) const { return static_cast<INDEX_TYPE>(n & (CAPACITY - 1)); }
T buffer[CAPACITY];
std::atomic<INDEX_TYPE> writeCounter { 0 };
std::atomic<INDEX_TYPE> readCounter { 0 };
};
#define LOCK_FREE_SIZE 128
#define SPARE_BUFFERS 128
class LockFreeQueueManager {
static LockFreeQueue<AudioBuffer *, LOCK_FREE_SIZE> lockFreeQueue ;
static AudioBuffer * pAudioBuffer [SPARE_BUFFERS];
int buffer_size ;
static int buffer_counter ;
static bool ready ;
#define MAX_FUNCTIONS 10
static void (* functions [MAX_FUNCTIONS])(AudioBuffer *) ;
int functions_count ;
static std::thread fileWriteThread ;
bool thread_started = false;
public:
JavaVM * vm = NULL ;
void init (int _buffer_size) ;
void add_function(int (*f)(AudioBuffer *));
void process (float * raw, float * data, int samplesToProcess) ;
void main () ;
void quit () ;
LockFreeQueueManager () {
functions_count = 0 ;
pAudioBuffer [0] = nullptr;
}
void attach();
void detach ();
void pop_function();
};
#endif //AMP_RACK_LOCKFREEQUEUE_H