Skip to content

Commit

Permalink
Simplify queueing and locking
Browse files Browse the repository at this point in the history
  • Loading branch information
bjpetit committed Oct 31, 2023
1 parent 2260df1 commit 0338e90
Showing 1 changed file with 58 additions and 69 deletions.
127 changes: 58 additions & 69 deletions src/dlq.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
/* The queue is a linked list of these. */

static struct dlq_item_s *queue_head = NULL; /* Head of linked list for queue. */
static struct dlq_item_s *queue_tail = NULL; /* Tail of linked list for queue. */
int queue_length = 0; /* Count of items in queue */

#if __WIN32__

Expand All @@ -75,8 +77,6 @@ static pthread_mutex_t dlq_mutex; /* Critical section for updating queues. */

static pthread_cond_t wake_up_cond; /* Notify received packet processing thread when queue not empty. */

static pthread_mutex_t wake_up_mutex; /* Required by cond_wait. */

static volatile int recv_thread_is_waiting = 0;

#endif
Expand Down Expand Up @@ -117,7 +117,8 @@ void dlq_init (void)
dw_printf ("dlq_init ( )\n");
#endif

queue_head = NULL;
queue_head = queue_tail = NULL;
queue_length = 0;


#if DEBUG
Expand All @@ -129,13 +130,6 @@ void dlq_init (void)
InitializeCriticalSection (&dlq_cs);
#else
int err;
err = pthread_mutex_init (&wake_up_mutex, NULL);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_init: pthread_mutex_init err=%d", err);
perror ("");
exit (EXIT_FAILURE);
}
err = pthread_mutex_init (&dlq_mutex, NULL);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
Expand Down Expand Up @@ -258,7 +252,7 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev

/* Allocate a new queue item. */

pnew = (struct dlq_item_s *) calloc (sizeof(struct dlq_item_s), 1);
pnew = (struct dlq_item_s *) malloc (sizeof(struct dlq_item_s));
if (pnew == NULL) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("FATAL ERROR: Out of memory.\n");
Expand Down Expand Up @@ -314,9 +308,6 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev

static void append_to_queue (struct dlq_item_s *pnew)
{
struct dlq_item_s *plast;
int queue_length = 0;

if ( ! was_init) {
dlq_init ();
}
Expand All @@ -341,30 +332,16 @@ static void append_to_queue (struct dlq_item_s *pnew)
#endif

if (queue_head == NULL) {
queue_head = pnew;
queue_head = queue_tail = pnew;
queue_length = 1;
} else {
queue_tail->nextp = pnew;
queue_tail = pnew;
queue_length++;
}
else {
queue_length = 2; /* head + new one */
plast = queue_head;
while (plast->nextp != NULL) {
plast = plast->nextp;
queue_length++;
}
plast->nextp = pnew;
}


#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#else
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock err=%d", err);
perror ("");
exit (1);
}
#endif
#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
Expand Down Expand Up @@ -416,7 +393,7 @@ static void append_to_queue (struct dlq_item_s *pnew)
* and blocking on a write.
*/

if (queue_length > 10) {
if (queue_length > 15) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("Received frame queue is out of control. Length=%d.\n", queue_length);
dw_printf ("Reader thread is probably frozen.\n");
Expand All @@ -431,29 +408,21 @@ static void append_to_queue (struct dlq_item_s *pnew)
#else
if (recv_thread_is_waiting) {

err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}

err = pthread_cond_signal (&wake_up_cond);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_cond_signal err=%d", err);
perror ("");
exit (1);
}
}

err = pthread_mutex_unlock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif

Expand Down Expand Up @@ -1011,9 +980,29 @@ int dlq_wait_while_empty (double timeout)
dlq_init ();
}

#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq dlq_wait_while_empty: enter critical section\n");
#endif
#if __WIN32__
EnterCriticalSection (&dlq_cs);
#else
int err;
err = pthread_mutex_lock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock err=%d", err);
perror ("");
exit (1);
}
#endif

if (queue_head == NULL) {

#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif

#if DEBUG
text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n");
Expand All @@ -1037,45 +1026,41 @@ int dlq_wait_while_empty (double timeout)
else {
WaitForSingleObject (wake_up_event, INFINITE);
}
} else {
#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif
}

#else
int err;

err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}

recv_thread_is_waiting = 1;
if (timeout != 0.0) {
struct timespec abstime;

abstime.tv_sec = (time_t)(long)timeout;
abstime.tv_nsec = (long)((timeout - (long)abstime.tv_sec) * 1000000000.0);

err = pthread_cond_timedwait (&wake_up_cond, &wake_up_mutex, &abstime);
err = pthread_cond_timedwait (&wake_up_cond, &dlq_mutex, &abstime);
if (err == ETIMEDOUT) {
timed_out_result = 1;
}
}
else {
err = pthread_cond_wait (&wake_up_cond, &wake_up_mutex);
err = pthread_cond_wait (&wake_up_cond, &dlq_mutex);
}
recv_thread_is_waiting = 0;

err = pthread_mutex_unlock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif
}

err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif

#if DEBUG
text_color_set(DW_COLOR_DEBUG);
Expand Down Expand Up @@ -1105,7 +1090,6 @@ struct dlq_item_s *dlq_remove (void)
{

struct dlq_item_s *result = NULL;
//int err;

#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
Expand Down Expand Up @@ -1133,6 +1117,11 @@ struct dlq_item_s *dlq_remove (void)
if (queue_head != NULL) {
result = queue_head;
queue_head = queue_head->nextp;
queue_length--;

if (queue_head == NULL) {
queue_tail = NULL;
}
}

#if __WIN32__
Expand Down

0 comments on commit 0338e90

Please sign in to comment.