Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
Use eventfd to terminate monitor thread
Browse files Browse the repository at this point in the history
  • Loading branch information
RoEdAl committed Jun 19, 2024
1 parent 4d9c2bb commit 44707f5
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/at_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -3053,7 +3053,7 @@ static void response_taskproc(struct pvt_taskproc_data* ptd)

if (at_queue_run(rtd->ptd.pvt)) {
ast_log(LOG_ERROR, "[%s] Fail to run command from queue\n", PVT_ID(rtd->ptd.pvt));
rtd->ptd.pvt->terminate_monitor = 1;
eventfd_signal(rtd->ptd.pvt->monitor_thread_event);
}
}

Expand Down
39 changes: 16 additions & 23 deletions src/chan_quectel.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,21 +658,18 @@ int pvt_taskproc_trylock_and_execute(struct pvt* pvt, void (*task_exe)(struct pv
return 0;
}

ao2_ref(pvt, 1);
if (ao2_trylock(pvt)) {
ao2_ref(pvt, -1);
ast_debug(4, "[%s] Task skipping: no lock\n", S_OR(task_name, "UNKNOWN"));
return 0;
}

if (pvt->terminate_monitor) {
ast_debug(5, "[%s][%s] Task skipping: monitor thread terminated\n", PVT_ID(pvt), S_OR(task_name, "UNKNOWN"));
ao2_unlock(pvt);
return 0;
}

ast_debug(5, "[%s][%s] Task executing\n", PVT_ID(pvt), S_OR(task_name, "UNKNOWN"));
task_exe(pvt);
ast_debug(6, "[%s][%s] Task executed\n", PVT_ID(pvt), S_OR(task_name, "UNKNOWN"));
ao2_unlock(pvt);
ao2_ref(pvt, -1);
return 0;
}

Expand All @@ -682,16 +679,11 @@ int pvt_taskproc_lock_and_execute(struct pvt_taskproc_data* ptd, void (*task_exe
return 0;
}

SCOPED_AO2LOCK(plock, ptd->pvt);

if (ptd->pvt->terminate_monitor) {
ast_debug(5, "[%s][%s] Task skipping: monitor thread terminated\n", PVT_ID(ptd->pvt), S_OR(task_name, "UNKNOWN"));
return 0;
}

AO2_REF_AND_LOCK(ptd->pvt);
ast_debug(5, "[%s][%s] Task executing\n", PVT_ID(ptd->pvt), S_OR(task_name, "UNKNOWN"));
task_exe(ptd);
ast_debug(6, "[%s][%s] Task executed\n", PVT_ID(ptd->pvt), S_OR(task_name, "UNKNOWN"));
AO2_UNLOCK_AND_UNREF(ptd->pvt);
return 0;
}

Expand Down Expand Up @@ -1236,16 +1228,17 @@ static struct pvt* pvt_create(const pvt_config_t* settings)
AST_LIST_HEAD_INIT_NOLOCK(&pvt->at_queue);
AST_LIST_HEAD_INIT_NOLOCK(&pvt->chans);

pvt->monitor_thread = AST_PTHREADT_NULL;
pvt->sys_chan.pvt = pvt;
pvt->sys_chan.state = CALL_STATE_RELEASED;
pvt->audio_fd = -1;
pvt->data_fd = -1;
pvt->gsm_reg_status = -1;
pvt->has_sms = SCONFIG(settings, msg_direct) ? 0 : 1;
pvt->incoming_sms_index = -1;
pvt->incoming_sms_type = RES_UNKNOWN;
pvt->desired_state = SCONFIG(settings, init_state);
pvt->monitor_thread = AST_PTHREADT_NULL;
pvt->monitor_thread_event = -1;
pvt->sys_chan.pvt = pvt;
pvt->sys_chan.state = CALL_STATE_RELEASED;
pvt->audio_fd = -1;
pvt->data_fd = -1;
pvt->gsm_reg_status = -1;
pvt->has_sms = SCONFIG(settings, msg_direct) ? 0 : 1;
pvt->incoming_sms_index = -1;
pvt->incoming_sms_type = RES_UNKNOWN;
pvt->desired_state = SCONFIG(settings, init_state);

ast_string_field_init(pvt, 14);
ast_string_field_set(pvt, provider_name, "NONE");
Expand Down
2 changes: 1 addition & 1 deletion src/chan_quectel.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ typedef struct pvt {

unsigned long channel_instance; /*!< number of channels created on this device */
pthread_t monitor_thread; /*!< monitor (at commands reader) thread handle */
int monitor_thread_event;

int audio_fd; /*!< audio descriptor */
snd_pcm_t* icard;
Expand Down Expand Up @@ -166,7 +167,6 @@ typedef struct pvt {
unsigned int prov_last_used :1; /*!< mark the last used device */
unsigned int sim_last_used :1; /*!< mark the last used device */

unsigned int terminate_monitor :1; /*!< non-zero if we want terminate monitor thread i.e. restart, stop, remove */
unsigned int has_subscriber_number:1; /*!< subscriber_number field is valid */
unsigned int must_remove :1; /*!< mean must removed from list: NOT FULLY THREADSAFE */

Expand Down
99 changes: 63 additions & 36 deletions src/monitor_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "at_read.h"
#include "chan_quectel.h"
#include "channel.h"
#include "eventfd.h"
#include "helpers.h"
#include "smsdb.h"
#include "tty.h"
Expand Down Expand Up @@ -70,10 +71,6 @@ static void handle_expired_reports(struct pvt* pvt)

static int handle_expired_reports_taskproc(void* tpdata) { return PVT_TASKPROC_TRYLOCK_AND_EXECUTE(tpdata, handle_expired_reports); }

static void restart_monitor(struct pvt* pvt) { pvt->terminate_monitor = 1; }

static int restart_monitor_taskproc(void* tpdata) { return PVT_TASKPROC_TRYLOCK_AND_EXECUTE(tpdata, restart_monitor); }

static void cmd_timeout(struct pvt* const pvt)
{
const struct at_queue_cmd* const ecmd = at_queue_head_cmd(pvt);
Expand All @@ -83,15 +80,15 @@ static void cmd_timeout(struct pvt* const pvt)

if (at_response(pvt, &pvt->empty_str, RES_TIMEOUT)) {
ast_log(LOG_ERROR, "[%s] Fail to handle response\n", PVT_ID(pvt));
pvt->terminate_monitor = 1;
eventfd_signal(pvt->monitor_thread_event);
return;
}

if (ecmd->flags & ATQ_CMD_FLAG_IGNORE) {
return;
}

pvt->terminate_monitor = 1;
eventfd_signal(pvt->monitor_thread_event);
}

static int cmd_timeout_taskproc(void* tpdata) { return PVT_TASKPROC_TRYLOCK_AND_EXECUTE(tpdata, cmd_timeout); }
Expand Down Expand Up @@ -165,6 +162,19 @@ static int check_dev_status(struct pvt* const pvt, struct ast_taskprocessor* tps
return 0;
}

static int at_wait_n(int* fds, int n, int* ms)
{
int exception;

const int outfd = ast_waitfor_n_fd(fds, n, ms, &exception);

if (outfd < 0) {
return 0;
}

return outfd;
}

static void monitor_threadproc_pvt(struct pvt* const pvt)
{
static const size_t RINGBUFFER_SIZE = 2 * 1024;
Expand All @@ -188,8 +198,8 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
}

/* 4 reduce locking time make copy of this readonly fields */
const int fd = pvt->data_fd;
at_clean_data(dev, fd, &rb);
int fd[2] = {pvt->data_fd, pvt->monitor_thread_event};
at_clean_data(dev, fd[0], &rb);

/* schedule initilization */
if (at_enqueue_initialization(&pvt->sys_chan)) {
Expand All @@ -206,8 +216,12 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
}

if (ao2_trylock(pvt)) { // pvt unlocked
int t = RESPONSE_READ_TIMEOUT;
if (!at_wait(fd, &t)) {
int t = RESPONSE_READ_TIMEOUT;
const int w = at_wait_n(fd, 2, &t);
if (w == fd[1]) {
eventfd_reset(pvt->monitor_thread_event);
goto e_restart;
} else if (w != fd[0]) {
if (ast_taskprocessor_push(tps, at_enqueue_ping_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to handle timeout\n", dev);
}
Expand All @@ -218,11 +232,6 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
goto e_cleanup;
}

if (pvt->terminate_monitor) {
ast_log(LOG_NOTICE, "[%s] Stopping by %s request\n", dev, dev_state2str(pvt->desired_state));
goto e_restart;
}

int t;
int is_cmd_timeout = 1;
if (at_queue_timeout(pvt, &t)) {
Expand All @@ -234,32 +243,45 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
if (is_cmd_timeout) {
if (t <= 0) {
if (check_taskprocessor(tps, dev)) {
if (ast_taskprocessor_push(tps, restart_monitor_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to restart monitor thread\n", dev);
}
eventfd_signal(pvt->monitor_thread_event);
}

if (ast_taskprocessor_push(tps, cmd_timeout_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to handle timeout\n", dev);
}

t = UNHANDLED_COMMAND_TIMEOUT;
if (!at_wait(fd, &t)) {
t = UNHANDLED_COMMAND_TIMEOUT;
const int w = at_wait_n(fd, 2, &t);
if (w == fd[1]) {
eventfd_reset(pvt->monitor_thread_event);
goto e_restart;
} else if (w != fd[0]) {
continue;
}
} else if (!at_wait(fd, &t)) {
if (ast_taskprocessor_push(tps, cmd_timeout_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to handle timeout\n", dev);
} else {
const int w = at_wait_n(fd, 2, &t);
if (w == fd[1]) {
eventfd_reset(pvt->monitor_thread_event);
goto e_restart;

} else if (w != fd[0]) {
if (ast_taskprocessor_push(tps, cmd_timeout_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to handle timeout\n", dev);
}
continue;
}
continue;
}
} else {
t = RESPONSE_READ_TIMEOUT;
if (!at_wait(fd, &t)) {
t = RESPONSE_READ_TIMEOUT;
const int w = at_wait_n(fd, 2, &t);
if (w == fd[1]) {
eventfd_reset(pvt->monitor_thread_event);
goto e_restart;

} else if (w != fd[0]) {
if (check_taskprocessor(tps, dev)) {
if (ast_taskprocessor_push(tps, restart_monitor_taskproc, pvt)) {
ast_debug(5, "[%s] Unable to restart monitor thread\n", dev);
}
eventfd_signal(pvt->monitor_thread_event);
goto e_restart;
}

if (ast_taskprocessor_push(tps, at_enqueue_ping_taskproc, pvt)) {
Expand All @@ -271,7 +293,7 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
}

/* FIXME: access to device not locked */
int iovcnt = at_read(dev, fd, &rb);
int iovcnt = at_read(dev, fd[0], &rb);
if (iovcnt < 0) {
break;
}
Expand Down Expand Up @@ -310,8 +332,6 @@ static void monitor_threadproc_pvt(struct pvt* const pvt)
// TODO: send monitor event
ast_verb(3, "[%s] Error initializing channel\n", dev);
}
/* it real, unsolicited disconnect */
pvt->terminate_monitor = 0;

e_restart:
pvt_disconnect(pvt);
Expand All @@ -329,9 +349,17 @@ static void* monitor_threadproc(void* _pvt)
int pvt_monitor_start(struct pvt* pvt)
{
ao2_ref(pvt, 1);

const int monitor_thread_event = eventfd_create();
if (monitor_thread_event <= 0) {
return 0;
}

pvt->monitor_thread_event = monitor_thread_event;
if (ast_pthread_create_background(&pvt->monitor_thread, NULL, monitor_threadproc, pvt) < 0) {
ao2_ref(pvt, -1);
pvt->monitor_thread = AST_PTHREADT_NULL;
eventfd_close(&pvt->monitor_thread_event);
return 0;
}

Expand All @@ -344,15 +372,14 @@ void pvt_monitor_stop(struct pvt* pvt)
return;
}

pvt->terminate_monitor = 1;
pthread_kill(pvt->monitor_thread, SIGURG);
eventfd_signal(pvt->monitor_thread_event);

{
const pthread_t id = pvt->monitor_thread;
SCOPED_LOCK(pvtl, pvt, ao2_unlock, ao2_lock); // scoped UNlock
pthread_join(id, NULL);
}

pvt->terminate_monitor = 0;
pvt->monitor_thread = AST_PTHREADT_NULL;
pvt->monitor_thread = AST_PTHREADT_NULL;
eventfd_close(&pvt->monitor_thread_event);
}

0 comments on commit 44707f5

Please sign in to comment.