Skip to content

Commit

Permalink
lock free atomic fifo implemented successfully™
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaji Khan committed Mar 6, 2024
1 parent 27c64f9 commit 1197105
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
63 changes: 39 additions & 24 deletions app/src/main/cpp/FileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include "FileWriter.h"

std::thread FileWriter::fileWriteThread ;
int FileWriter::buffer_write_index = 0 ;
LockFreeQueue<buffer_t*, 1024> FileWriter::lockFreeQueue;
void * FileWriter::mp3_buffer = NULL;
Expand All @@ -19,15 +20,15 @@ int FileWriter::unreported_overruns = 0 ;
int FileWriter::total_overruns = 0;
SNDFILE * FileWriter::soundfile = NULL;
buffer_t *FileWriter::current_buffer;
buffer_t *FileWriter::bg_buffer;
buffer_t *FileWriter::bg_buffer = nullptr;
int FileWriter::num_channels = 1 ;
int FileWriter::block_size = 384 ;
float FileWriter::min_buffer_time = 0.01f,
FileWriter::max_buffer_time = 40.0f ;
int FileWriter::jack_samplerate ;
int FileWriter::buffer_size_in_bytes;
bool FileWriter::ready = false ;
vringbuffer_t * FileWriter::vringbuffer = NULL;
//vringbuffer_t * FileWriter::vringbuffer = NULL;
FileType FileWriter::fileType = MP3 ;
OpusEncoder *FileWriter::encoder ;
opus_int16 FileWriter::opusIn[960 * 2];
Expand Down Expand Up @@ -85,8 +86,8 @@ int FileWriter::seconds_to_buffers(float seconds){
}

FileWriter::~FileWriter () {
free (empty_buffer);
vringbuffer_delete(vringbuffer);
// free (empty_buffer);
// vringbuffer_delete(vringbuffer);
}
FileWriter::FileWriter () {
IN
Expand Down Expand Up @@ -248,12 +249,12 @@ int FileWriter::disk_write(float *data,size_t frames) {
// LOGD("[ringbuffer id] %d", getpid ());

// IN
if (frames == 0 || disk_writes > processed) {
return 0;
}
// if (frames == 0 || disk_writes > processed) {
// return 0;
// }

// LOGD("disk write [%d] %d frames", disk_writes, frames);
disk_writes ++ ;
// disk_writes ++ ;
if (fileType == MP3) {
int write = lame_encode_buffer_ieee_float(lame, data, NULL, frames, (unsigned char *) mp3_buffer, (block_size * 1.25) + 7200);
if (write < 0) {
Expand Down Expand Up @@ -316,18 +317,29 @@ void FileWriter::startRecording () {
IN
openFile();
ready = true ;
fileWriteThread = std::thread (&FileWriter::writeLoop, this);
OUT
}

void FileWriter::writeLoop () {
buffer_t *buffer ;
while (ready) {
lockFreeQueue.pop(buffer) ;
disk_write(buffer->data, buffer->pos);
}

}

void FileWriter::stopRecording () {
IN
// because we use a large buffer, there can be samples in the buffer which have not been written yet
// if (useStaticBuffer)
// vringbuffer_return_writing(vringbuffer,buffers);

// vringbuffer_stop_callbacks(vringbuffer);
closeFile();
ready = false ;
fileWriteThread.join();
closeFile();
LOGD("recording stopped: %d buffer underruns", total_overruns);
bg_buffer->pos = 0 ;
OUT
Expand All @@ -342,15 +354,17 @@ void FileWriter::setBufferSize (int bufferSize) {
LOGD("setting buffer size: %d from block size: %d", buffer_size_in_bytes, block_size);
disk_writes = 0 ;
processed = 0 ;
if (vringbuffer != NULL) {
if (bg_buffer != nullptr) {
///| @attention: this *must* be freed
free (bg_buffer->data);
free (bg_buffer) ;
free (empty_buffer);
vringbuffer_delete(vringbuffer);
vringbuffer = NULL ;
}

bg_buffer = static_cast<buffer_t *>(calloc(1, sizeof(buffer_t)));
// HERE LOGD("using buffer size %d", bufferSize);
bg_buffer->data = static_cast<float *>(malloc(buffer_size_in_bytes));
bg_buffer->pos = 0 ;
/*
if (vringbuffer == NULL) {
vringbuffer = vringbuffer_create(JC_MAX(4, seconds_to_buffers(min_buffer_time)),
JC_MAX(4, seconds_to_buffers(max_buffer_time)),
Expand All @@ -371,6 +385,7 @@ void FileWriter::setBufferSize (int bufferSize) {
OUT
return ;
}
*/

// for (int i = 0 ; i < 32 ; i ++) {
// buffers[i] = static_cast<buffer_t *>(malloc(sizeof(buffer_t)));
Expand Down Expand Up @@ -405,7 +420,7 @@ void FileWriter::process_fill_buffer(float *in[], buffer_t *buffer, int i, int e

bool FileWriter::process_new_current_buffer(int frames_left){
IN
current_buffer=(buffer_t*)vringbuffer_get_writing(vringbuffer);
// current_buffer=(buffer_t*)vringbuffer_get_writing(vringbuffer);
// current_buffer = vringbuffer->for_writer1 ;
if(current_buffer==NULL){
total_overruns++;
Expand All @@ -421,7 +436,7 @@ bool FileWriter::process_new_current_buffer(int frames_left){

void FileWriter::send_buffer_to_disk_thread(buffer_t *buffer){
buffer->overruns = unreported_overruns;
vringbuffer_return_writing(vringbuffer,buffer);
// vringbuffer_return_writing(vringbuffer,buffer);
unreported_overruns = 0;
}

Expand Down Expand Up @@ -492,7 +507,7 @@ int FileWriter::process(int nframes, const float *arg) {

} else {
// LOGD("bufferUsed = MAX_STATIC_BUFFER, vringbuffer_return_writing") ;
vringbuffer_return_writing(vringbuffer,buffers);
// vringbuffer_return_writing(vringbuffer,buffers);
bufferUsed = 0;
}
} else {
Expand All @@ -503,13 +518,13 @@ int FileWriter::process(int nframes, const float *arg) {
// bg_buffer->pos = 0 ;

// if (false && buffer_write_index > 19 ) {
buffer_write_index = 0 ;
// buffer_write_index = 0 ;
// LOGD("[realtime id] %d", gettid ());
// process_new_current_buffer(nframes);
vringbuffer_return_writing(vringbuffer,bg_buffer);
// vringbuffer_return_writing(vringbuffer,bg_buffer);
// if (current_buffer != nullptr)
// LOGD("current buffer %d", current_buffer->pos);
bg_buffer->pos = 0 ;
// bg_buffer->pos = 0 ;
// }

// process_new_current_buffer(nframes) ;
Expand All @@ -520,10 +535,10 @@ int FileWriter::process(int nframes, const float *arg) {

// LOGD("testmest %d", current_buffer->data [0]);
// vringbuffer_return_writing(vringbuffer,current_buffer);
while (vringbuffer_writing_size(vringbuffer)==0)
usleep(2);
while(vringbuffer_reading_size(vringbuffer) > 0)
usleep(2);
// while (vringbuffer_writing_size(vringbuffer)==0)
// usleep(2);
// while(vringbuffer_reading_size(vringbuffer) > 0)
// usleep(2);

// jack_ringbuffer_read(vringbuffer->for_writer2,(char*)&current_buffer,sizeof(void*)); // Checking writer2 first since that memory is more likely to already be in the cache.
// jack_ringbuffer_read(vringbuffer->for_writer1,(char*)&current_buffer,sizeof(void*));
Expand Down Expand Up @@ -556,7 +571,7 @@ int FileWriter::process(int nframes, const float *arg) {
bg_buffer->pos = nframes;
lockFreeQueue.push(bg_buffer);
// LOGD("return writing [%d] %d", processed, nframes);
processed++;
// processed++;
// vringbuffer_return_writing(vringbuffer,bg_buffer);
// current_buffer->data = (float *) arg;
// current_buffer->pos = nframes;
Expand Down
5 changes: 4 additions & 1 deletion app/src/main/cpp/FileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <string>
#include <ctime>
#include <thread>
#include "logging_macros.h"
#include "sndfile.h"
#include "opus.h"
Expand Down Expand Up @@ -77,13 +78,14 @@ class FileWriter {
static bool ready ;
static FileType fileType;
bool buffer_interleaved = true ;
static vringbuffer_t * vringbuffer ;
// static vringbuffer_t * vringbuffer ;
static int jack_samplerate ;
static int buffer_size_in_bytes ;
static float min_buffer_time ,
max_buffer_time ;

float *empty_buffer;
static std::thread fileWriteThread ;

static int block_size;
int default_block_size = 384 ;
Expand Down Expand Up @@ -196,6 +198,7 @@ class FileWriter {

void setChannels(int channels);

void writeLoop();
};


Expand Down

0 comments on commit 1197105

Please sign in to comment.