Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor dlq queueing and locking #496

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 63 additions & 67 deletions src/dlq.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
/* The queue is a linked list of these. */

static struct dlq_item_s *queue_head = NULL; /* Head of linked list for queue. */
static struct dlq_item_s *queue_tail = NULL; /* Tail of linked list for queue. */
int queue_length = 0; /* Count of items in queue */

#if __WIN32__

Expand All @@ -75,8 +77,6 @@ static pthread_mutex_t dlq_mutex; /* Critical section for updating queues. */

static pthread_cond_t wake_up_cond; /* Notify received packet processing thread when queue not empty. */

static pthread_mutex_t wake_up_mutex; /* Required by cond_wait. */

static volatile int recv_thread_is_waiting = 0;

#endif
Expand Down Expand Up @@ -117,7 +117,8 @@ void dlq_init (void)
dw_printf ("dlq_init ( )\n");
#endif

queue_head = NULL;
queue_head = queue_tail = NULL;
queue_length = 0;


#if DEBUG
Expand All @@ -129,13 +130,6 @@ void dlq_init (void)
InitializeCriticalSection (&dlq_cs);
#else
int err;
err = pthread_mutex_init (&wake_up_mutex, NULL);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_init: pthread_mutex_init err=%d", err);
perror ("");
exit (EXIT_FAILURE);
}
err = pthread_mutex_init (&dlq_mutex, NULL);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
Expand Down Expand Up @@ -314,9 +308,6 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev

static void append_to_queue (struct dlq_item_s *pnew)
{
struct dlq_item_s *plast;
int queue_length = 0;

if ( ! was_init) {
dlq_init ();
}
Expand All @@ -341,30 +332,19 @@ static void append_to_queue (struct dlq_item_s *pnew)
#endif

if (queue_head == NULL) {
queue_head = pnew;
assert (queue_tail == NULL);
queue_head = queue_tail = pnew;
queue_length = 1;
} else {
assert (queue_tail != NULL);
queue_tail->nextp = pnew;
queue_tail = pnew;
queue_length++;
assert (queue_length > 1);
}
else {
queue_length = 2; /* head + new one */
plast = queue_head;
while (plast->nextp != NULL) {
plast = plast->nextp;
queue_length++;
}
plast->nextp = pnew;
}


#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#else
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock err=%d", err);
perror ("");
exit (1);
}
#endif
#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
Expand Down Expand Up @@ -416,7 +396,7 @@ static void append_to_queue (struct dlq_item_s *pnew)
* and blocking on a write.
*/

if (queue_length > 10) {
if (queue_length > 15) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("Received frame queue is out of control. Length=%d.\n", queue_length);
dw_printf ("Reader thread is probably frozen.\n");
Expand All @@ -431,29 +411,21 @@ static void append_to_queue (struct dlq_item_s *pnew)
#else
if (recv_thread_is_waiting) {

err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}

err = pthread_cond_signal (&wake_up_cond);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_cond_signal err=%d", err);
perror ("");
exit (1);
}
}

err = pthread_mutex_unlock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif

Expand Down Expand Up @@ -1011,9 +983,29 @@ int dlq_wait_while_empty (double timeout)
dlq_init ();
}

#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq dlq_wait_while_empty: enter critical section\n");
#endif
#if __WIN32__
EnterCriticalSection (&dlq_cs);
#else
int err;
err = pthread_mutex_lock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock err=%d", err);
perror ("");
exit (1);
}
#endif

if (queue_head == NULL) {

#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif

#if DEBUG
text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n");
Expand All @@ -1037,45 +1029,41 @@ int dlq_wait_while_empty (double timeout)
else {
WaitForSingleObject (wake_up_event, INFINITE);
}
} else {
#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif
}

#else
int err;

err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}

recv_thread_is_waiting = 1;
if (timeout != 0.0) {
struct timespec abstime;

abstime.tv_sec = (time_t)(long)timeout;
abstime.tv_nsec = (long)((timeout - (long)abstime.tv_sec) * 1000000000.0);

err = pthread_cond_timedwait (&wake_up_cond, &wake_up_mutex, &abstime);
err = pthread_cond_timedwait (&wake_up_cond, &dlq_mutex, &abstime);
if (err == ETIMEDOUT) {
timed_out_result = 1;
}
}
else {
err = pthread_cond_wait (&wake_up_cond, &wake_up_mutex);
err = pthread_cond_wait (&wake_up_cond, &dlq_mutex);
}
recv_thread_is_waiting = 0;

err = pthread_mutex_unlock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif
}

err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif

#if DEBUG
text_color_set(DW_COLOR_DEBUG);
Expand Down Expand Up @@ -1133,6 +1121,14 @@ struct dlq_item_s *dlq_remove (void)
if (queue_head != NULL) {
result = queue_head;
queue_head = queue_head->nextp;
queue_length--;
if (queue_head == NULL) {
assert (queue_length == 0);
queue_tail = NULL;
}
if (queue_length == 1) {
assert (queue_head == queue_tail);
}
}

#if __WIN32__
Expand Down
Loading