From 77fdcba2c2662209bea69c214c32293fa7739f8c Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 30 Aug 2024 14:52:02 +0200 Subject: [PATCH] Fix a few bugs in buffers chaining management (particularly when buffer was full) and add a conditional mutex to protect from concurrent access --- src/buffer.c | 74 +++++++++++++++++++++++++++++----------------------- src/config.h | 4 +++ 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/buffer.c b/src/buffer.c index a0bf938..60945a2 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -6,6 +6,19 @@ #include "utils.h" #include "buffer.h" +#if BUFFER_USE_MUTEX +# include +# define PEER_MUTEX_INIT(X) static pthread_mutex_t X = PTHREAD_MUTEX_INITIALIZER +# define PEER_MUTEX_LOCK(X) pthread_mutex_lock(X) +# define PEER_MUTEX_UNLOCK(X) pthread_mutex_unlock(X) +#else +# define PEER_MUTEX_INIT(X) +# define PEER_MUTEX_LOCK(X) +# define PEER_MUTEX_UNLOCK(X) +#endif + +PEER_MUTEX_INIT(g_buffer_mutex); + Buffer* buffer_new(int size) { Buffer *rb; @@ -20,100 +33,95 @@ Buffer* buffer_new(int size) { } void buffer_clear(Buffer *rb) { - - rb->head = 0; - rb->tail = 0; + PEER_MUTEX_LOCK(&g_buffer_mutex); + if (rb) { + rb->head = 0; + rb->tail = 0; + } + PEER_MUTEX_UNLOCK(&g_buffer_mutex); } void buffer_free(Buffer *rb) { - + PEER_MUTEX_LOCK(&g_buffer_mutex); if (rb) { - free(rb->data); rb->data = NULL; - rb = NULL; + free(rb); // Now properly freeing the buffer itself } + PEER_MUTEX_UNLOCK(&g_buffer_mutex); } int buffer_push_tail(Buffer *rb, const uint8_t *data, int size) { + PEER_MUTEX_LOCK(&g_buffer_mutex); int free_space = (rb->size + rb->head - rb->tail - 1) % rb->size; - int align_size = ALIGN32(size + 4); if (align_size > free_space) { LOGE("no enough space"); + PEER_MUTEX_UNLOCK(&g_buffer_mutex); return -1; } int tail_end = (rb->tail + align_size) % rb->size; if (tail_end < rb->tail) { - + // Handle wrap-around if (rb->head < align_size) { - LOGE("no enough space"); + PEER_MUTEX_UNLOCK(&g_buffer_mutex); return -1; } int *p = (int*)(rb->data + rb->tail); *p = size; - memcpy(rb->data, data, size); - rb->tail = size; - + memcpy(rb->data + rb->tail + 4, data, rb->size - rb->tail - 4); + memcpy(rb->data, data + (rb->size - rb->tail - 4), tail_end); } else { - int *p = (int*)(rb->data + rb->tail); *p = size; memcpy(rb->data + rb->tail + 4, data, size); - rb->tail = tail_end; } + rb->tail = tail_end; + + PEER_MUTEX_UNLOCK(&g_buffer_mutex); return size; } uint8_t* buffer_peak_head(Buffer *rb, int *size) { - + PEER_MUTEX_LOCK(&g_buffer_mutex); if (!rb || rb->head == rb->tail) { - + (*size) = 0; + PEER_MUTEX_UNLOCK(&g_buffer_mutex); return NULL; } *size = *((int*)(rb->data + rb->head)); int align_size = ALIGN32(*size + 4); - int head_end = (rb->head + align_size) % rb->size; - if (head_end < rb->head) { - - return rb->data; + PEER_MUTEX_UNLOCK(&g_buffer_mutex); + if (head_end < rb->head) { + return rb->data; // Returns pointer to start of buffer in case of wrap-around } else { - return rb->data + (rb->head + 4); } } void buffer_pop_head(Buffer *rb) { + PEER_MUTEX_LOCK(&g_buffer_mutex); if (!rb || rb->head == rb->tail) { + PEER_MUTEX_UNLOCK(&g_buffer_mutex); return; } int *size = (int*)(rb->data + rb->head); - int align_size = ALIGN32(*size + 4); + rb->head = (rb->head + align_size) % rb->size; - int head_end = (rb->head + align_size) % rb->size; - - if (head_end < rb->head) { - - rb->head = *size; - - } else { - - rb->head = rb->head + align_size; - } + PEER_MUTEX_UNLOCK(&g_buffer_mutex); } - diff --git a/src/config.h b/src/config.h index 1c6a0c2..06f0d3e 100644 --- a/src/config.h +++ b/src/config.h @@ -25,4 +25,8 @@ //#define LOG_LEVEL LEVEL_DEBUG #define LOG_REDIRECT 0 +#ifndef BUFFER_USE_MUTEX +#define BUFFER_USE_MUTEX 0 +#endif + #endif // CONFIG_H_