Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
Use atomic counters in statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
RoEdAl committed Jun 21, 2024
1 parent 9944cab commit 1ca012f
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 141 deletions.
3 changes: 2 additions & 1 deletion src/at_command.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "channel.h"
#include "char_conv.h" /* char_to_hexstr_7bit() */
#include "error.h"
#include "helpers.h"
#include "pdu.h" /* build_pdu() */
#include "smsdb.h"

Expand Down Expand Up @@ -1296,7 +1297,7 @@ int at_enqueue_hangup(struct cpvt* cpvt, int call_idx, int release_cause)

if (cpvt == &pvt->sys_chan || CPVT_DIR_INCOMING(cpvt) || (cpvt->state != CALL_STATE_INIT && cpvt->state != CALL_STATE_DIALING)) {
/* FIXME: other channels may be in RELEASED or INIT state */
if (PVT_STATE(pvt, chansno) > 1) {
if (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chansno)) > 1) {
DECLARE_AT_CMD(chld1x, "+CHLD=1%d");
static const at_queue_cmd_t cmd = ATQ_CMD_DECLARE_STFT(CMD_AT_CHLD_1x, RES_OK, AT_CMD(chld1x), ATQ_CMD_FLAG_DEFAULT, ATQ_CMD_TIMEOUT_LONG, 0);

Expand Down
24 changes: 12 additions & 12 deletions src/at_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ static void at_queue_remove(struct pvt* const pvt)
return;
}

PVT_STATE(pvt, at_tasks)--;
PVT_STATE(pvt, at_cmds) -= task->cmdsno - task->cindex;
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, at_tasks), 1);
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, at_cmds), task->cmdsno - task->cindex);

if (task->cmdsno == 1u) {
ast_debug(4, "[%s][%s] \xE2\x86\xB3 [%s] tasks:%lu \n", PVT_ID(pvt), at_cmd2str(task->cmds[0].cmd), at_res2str(task->cmds[0].res),
(unsigned long)PVT_STATE(pvt, at_tasks));
(unsigned long)ast_atomic_fetch_uint32(&PVT_STATE(pvt, at_tasks)));
} else if (task->cindex >= task->cmdsno) {
ast_debug(4, "[%s][%s] \xE2\x86\xB3 [%s] cmds:%u tasks:%lu\n", PVT_ID(pvt), at_cmd2str(task->cmds[0].cmd), at_res2str(task->cmds[0].res), task->cmdsno,
(unsigned long)PVT_STATE(pvt, at_tasks));
(unsigned long)ast_atomic_fetch_uint32(&PVT_STATE(pvt, at_tasks)));
} else {
ast_debug(3, "[%s][%s] \xE2\x86\xB3 [%s] cmds:%u/%u tasks:%lu\n", PVT_ID(pvt), at_cmd2str(task->cmds[0].cmd), at_res2str(task->cmds[0].res),
task->cindex, task->cmdsno, (unsigned long)PVT_STATE(pvt, at_tasks));
task->cindex, task->cmdsno, (unsigned long)ast_atomic_fetch_uint32(&PVT_STATE(pvt, at_tasks)));
}

at_queue_free(task);
Expand Down Expand Up @@ -92,8 +92,8 @@ at_queue_task_t* at_queue_add(struct cpvt* cpvt, const at_queue_cmd_t* cmds, uns
AST_LIST_INSERT_TAIL(&pvt->at_queue, e, entry);
}

PVT_STATE(pvt, at_tasks)++;
PVT_STATE(pvt, at_cmds) += cmdsno;
ast_atomic_fetchadd_uint32(&PVT_STATE(pvt, at_tasks), 1);
ast_atomic_fetchadd_uint32(&PVT_STATE(pvt, at_cmds), cmdsno);

if (e->cmdsno == 1u) {
ast_debug(4, "[%s][%s] \xE2\x86\xB5 [%s][%s] %s%s\n", PVT_ID(pvt), at_cmd2str(e->cmds[0].cmd), at_res2str(e->cmds[0].res),
Expand All @@ -114,8 +114,8 @@ static void at_queue_remove_cmd(struct pvt* pvt, at_res_t res)
}

if (task->at_once) {
task->cindex = task->cmdsno;
PVT_STATE(pvt, at_cmds) -= task->cmdsno;
task->cindex = task->cmdsno;
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, at_cmds), task->cmdsno);

if (task->cmds[0].res == res || (task->cmds[0].flags & ATQ_CMD_FLAG_IGNORE) || res == RES_TIMEOUT) {
at_queue_remove(pvt);
Expand All @@ -125,7 +125,7 @@ static void at_queue_remove_cmd(struct pvt* pvt, at_res_t res)
const unsigned index = task->cindex;

task->cindex++;
PVT_STATE(pvt, at_cmds)--;
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, at_cmds), -1);
if (task->cmds[index].res == res) {
ast_debug(6, "[%s][%s] \xE2\x8A\x9F result:[%s] cmd:%u/%u flags:%02x\n", PVT_ID(pvt), at_cmd2str(task->cmds[index].cmd), at_res2str(res),
task->cindex, task->cmdsno, task->cmds[index].flags);
Expand All @@ -145,8 +145,8 @@ static void at_queue_remove_task_at_once(struct pvt* const pvt)
at_queue_task_t* const task = AST_LIST_FIRST(&pvt->at_queue);

if (task && task->at_once) {
task->cindex = task->cmdsno;
PVT_STATE(pvt, at_cmds) -= task->cmdsno;
task->cindex = task->cmdsno;
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, at_cmds), 1);

if (!(task->cmds[0].flags & ATQ_CMD_FLAG_IGNORE)) {
at_queue_remove(pvt);
Expand Down
16 changes: 8 additions & 8 deletions src/at_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ static void handle_clcc(struct pvt* const pvt, const unsigned int call_idx, cons
pvt->cwaiting = 0;
pvt->ring = 0;

PVT_STAT(pvt, calls_answered[CPVT_DIRECTION(cpvt)])++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, calls_answered[CPVT_DIRECTION(cpvt)]), 1);
if (CPVT_TEST_FLAG(cpvt, CALL_FLAG_CONFERENCE)) {
at_enqueue_conference(cpvt);
}
Expand All @@ -1043,14 +1043,14 @@ static void handle_clcc(struct pvt* const pvt, const unsigned int call_idx, cons
pvt->dialing = 0;
pvt->cwaiting = 0;

PVT_STAT(pvt, in_calls)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, in_calls), 1);

if (pvt_enabled(pvt)) {
/* TODO: give dialplan level user tool for checking device is voice enabled or not */
if (start_pbx(pvt, number, call_idx, state)) {
PVT_STAT(pvt, in_pbx_fails)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, in_pbx_fails), 1);
} else {
PVT_STAT(pvt, in_calls_handled)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, in_calls_handled), 1);
if (!pvt->has_voice) {
ast_log(LOG_WARNING, "[%s] pbx started for device not voice capable\n", PVT_ID(pvt));
}
Expand All @@ -1064,18 +1064,18 @@ static void handle_clcc(struct pvt* const pvt, const unsigned int call_idx, cons
pvt->ring = 0;
pvt->dialing = 0;

PVT_STAT(pvt, cw_calls)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, cw_calls), 1);

if (dir == CALL_DIR_INCOMING) {
if (pvt_enabled(pvt)) {
/* TODO: give dialplan level user tool for checking device is voice enabled or not */
if (start_pbx(pvt, number, call_idx, state) == 0) {
PVT_STAT(pvt, in_calls_handled)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, in_calls_handled), 1);
if (!pvt->has_voice) {
ast_log(LOG_WARNING, "[%s] pbx started for device not voice capable\n", PVT_ID(pvt));
}
} else {
PVT_STAT(pvt, in_pbx_fails)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, in_pbx_fails), 1);
}
}
}
Expand Down Expand Up @@ -3047,7 +3047,7 @@ static void response_taskproc(struct pvt_taskproc_data* ptd)
ast_str_trim_blanks(&rtd->response);
}

PVT_STAT(rtd->ptd.pvt, at_responses)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(rtd->ptd.pvt, at_responses), 1);
if (at_response(rtd->ptd.pvt, &rtd->response, at_res)) {
ast_log(LOG_WARNING, "[%s] Fail to handle response\n", PVT_ID(rtd->ptd.pvt));
}
Expand Down
38 changes: 20 additions & 18 deletions src/chan_quectel.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ void pvt_disconnect(struct pvt* pvt)
if (!PVT_NO_CHANS(pvt)) {
struct cpvt* cpvt;
AST_LIST_TRAVERSE(&(pvt->chans), cpvt, entry) {
PVT_STATE(pvt, chan_count[cpvt->state])--;
PVT_STATE(pvt, chansno)--;
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, chan_count[cpvt->state]), 1);
ast_atomic_fetchsub_uint32(&PVT_STATE(pvt, chansno), 1);

at_hangup_immediately(cpvt, AST_CAUSE_NORMAL_UNSPECIFIED);
CPVT_SET_FLAG(cpvt, CALL_FLAG_DISCONNECTING);
Expand Down Expand Up @@ -1087,17 +1087,18 @@ const char* pvt_str_state(const struct pvt* pvt)
return state;
}

if (pvt->ring || PVT_STATE(pvt, chan_count[CALL_STATE_INCOMING])) {
if (pvt->ring || ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_INCOMING]))) {
state = "Ring";
} else if (pvt->cwaiting || PVT_STATE(pvt, chan_count[CALL_STATE_WAITING])) {
} else if (pvt->cwaiting || ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_WAITING]))) {
state = "Waiting";
} else if (pvt->dialing || (PVT_STATE(pvt, chan_count[CALL_STATE_INIT]) + PVT_STATE(pvt, chan_count[CALL_STATE_DIALING]) +
PVT_STATE(pvt, chan_count[CALL_STATE_ALERTING])) > 0) {
} else if (pvt->dialing || (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_INIT])) +
ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_DIALING])) +
ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ALERTING]))) > 0) {
state = "Dialing";
} else if (PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE]) > 0) {
} else if (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE])) > 0) {
// state = "Active";
state = pvt_str_call_dir(pvt);
} else if (PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD]) > 0) {
} else if (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD])) > 0) {
state = "Held";
} else if (pvt->outgoing_sms || pvt->incoming_sms_index >= 0) {
state = "SMS";
Expand All @@ -1120,25 +1121,26 @@ struct ast_str* pvt_str_state_ex(const struct pvt* pvt)
if (state) {
ast_str_append(&buf, 0, "%s", state);
} else {
if (pvt->ring || PVT_STATE(pvt, chan_count[CALL_STATE_INCOMING])) {
if (pvt->ring || ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_INCOMING]))) {
ast_str_append(&buf, 0, "Ring");
}

if (pvt->dialing || (PVT_STATE(pvt, chan_count[CALL_STATE_INIT]) + PVT_STATE(pvt, chan_count[CALL_STATE_DIALING]) +
PVT_STATE(pvt, chan_count[CALL_STATE_ALERTING])) > 0) {
if (pvt->dialing ||
(ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_INIT])) + ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_DIALING])) +
ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ALERTING]))) > 0) {
ast_str_append(&buf, 0, "Dialing");
}

if (pvt->cwaiting || PVT_STATE(pvt, chan_count[CALL_STATE_WAITING])) {
if (pvt->cwaiting || ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_WAITING]))) {
ast_str_append(&buf, 0, "Waiting");
}

if (PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE]) > 0) {
ast_str_append(&buf, 0, "Active %u", PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE]));
if (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE])) > 0) {
ast_str_append(&buf, 0, "Active %u", ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ACTIVE])));
}

if (PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD]) > 0) {
ast_str_append(&buf, 0, "Held %u", PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD]));
if (ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD])) > 0) {
ast_str_append(&buf, 0, "Held %u", ast_atomic_fetch_uint32(&PVT_STATE(pvt, chan_count[CALL_STATE_ONHOLD])));
}

if (pvt->incoming_sms_index >= 0) {
Expand Down Expand Up @@ -1501,8 +1503,8 @@ int pvt_direct_write(struct pvt* pvt, const char* buf, size_t count)
{
ast_debug(5, "[%s] [%s]\n", PVT_ID(pvt), tmp_esc_nstr(buf, count));

const size_t wrote = fd_write_all(pvt->data_fd, buf, count);
PVT_STAT(pvt, d_write_bytes) += wrote;
const size_t wrote = fd_write_all(pvt->data_fd, buf, count);
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, d_write_bytes), wrote);
if (wrote != count) {
ast_debug(1, "[%s][DATA] Write: %s\n", PVT_ID(pvt), strerror(errno));
}
Expand Down
11 changes: 5 additions & 6 deletions src/chan_quectel.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ typedef enum {

/* state */
typedef struct pvt_state {
uint32_t at_tasks; /*!< number of active tasks in at_queue */
uint32_t at_cmds; /*!< number of active commands in at_queue */
uint32_t chansno; /*!< number of channels in channels list */
uint8_t chan_count[CALL_STATES_NUMBER]; /*!< channel number grouped by state */
uint32_t at_tasks; /*!< number of active tasks in at_queue */
uint32_t at_cmds; /*!< number of active commands in at_queue */
uint32_t chansno; /*!< number of channels in channels list */
uint32_t chan_count[CALL_STATES_NUMBER]; /*!< channel number grouped by state */
} pvt_state_t;

#define PVT_STATE_T(state, name) ((state)->name)
Expand Down Expand Up @@ -83,7 +83,6 @@ typedef struct pvt_stat {
uint32_t in_pbx_fails; /*!< number of start_pbx fails */

uint32_t calls_answered[2]; /*!< number of outgoing and incoming/waiting calls answered */
uint32_t calls_duration[2]; /*!< seconds of outgoing and incoming/waiting calls */
} pvt_stat_t;

#define PVT_STAT_T(stat, name) ((stat)->name)
Expand Down Expand Up @@ -266,6 +265,6 @@ int pvt_taskproc_lock_and_execute(struct pvt_taskproc_data* ptd, void (*task_exe

struct ast_module* self_module();

#define PVT_NO_CHANS(pvt) (!PVT_STATE(pvt, chansno))
#define PVT_NO_CHANS(pvt) (!ast_atomic_fetch_uint32(&PVT_STATE(pvt, chansno)))

#endif /* CHAN_QUECTEL_H_INCLUDED */
38 changes: 19 additions & 19 deletions src/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ static int channel_call(struct ast_channel* channel, const char* dest, attribute
clir = -1;
}

PVT_STAT(pvt, out_calls)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, out_calls), 1);
if (at_enqueue_dial(cpvt, dest_num, clir)) {
ast_log(LOG_ERROR, "[%s] Error sending ATD command\n", PVT_ID(pvt));
return -1;
Expand Down Expand Up @@ -414,7 +414,7 @@ static void timing_write_tty(struct pvt* pvt, size_t frame_size)
mixb_read_upd(&pvt->write_mixb, frame_size);
change_audio_endianness_to_le(iov, iovcnt);
} else if (used > 0) {
PVT_STAT(pvt, write_tframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_tframes), 1);
msg = "[%s] write truncated frame\n";

iovcnt = mixb_read_all_iov(&pvt->write_mixb, iov);
Expand All @@ -426,7 +426,7 @@ static void timing_write_tty(struct pvt* pvt, size_t frame_size)
iovcnt++;
change_audio_endianness_to_le(iov, iovcnt);
} else {
PVT_STAT(pvt, write_sframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_sframes), 1);
msg = "[%s] write silence\n";

iov[0].iov_base = pvt_get_silence_buffer(pvt);
Expand All @@ -439,7 +439,7 @@ static void timing_write_tty(struct pvt* pvt, size_t frame_size)
}

if (iov_write(pvt, pvt->audio_fd, iov, iovcnt) >= 0) {
PVT_STAT(pvt, write_frames)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_frames), 1);
}
}

Expand Down Expand Up @@ -488,10 +488,10 @@ static struct ast_frame* channel_read_tty(struct cpvt* cpvt, struct pvt* pvt, si
write_conference(pvt, buf, res);
}

PVT_STAT(pvt, a_read_bytes) += res;
PVT_STAT(pvt, read_frames)++;
ast_atomic_fetchadd_uint64(&PVT_STAT(pvt, a_read_bytes), res);
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, read_frames), 1);
if (res < frame_size) {
PVT_STAT(pvt, read_sframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, read_sframes), 1);
}
}

Expand Down Expand Up @@ -566,10 +566,10 @@ static struct ast_frame* channel_read_uac(struct cpvt* cpvt, struct pvt* pvt, si
write_conference(pvt, buf, res);
}

PVT_STAT(pvt, a_read_bytes) += res * sizeof(int16_t);
PVT_STAT(pvt, read_frames)++;
ast_atomic_fetchadd_uint64(&PVT_STAT(pvt, a_read_bytes), res * sizeof(int16_t));
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, read_frames), 1);
if (res < frames) {
PVT_STAT(pvt, read_sframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, read_sframes), 1);
}
}

Expand Down Expand Up @@ -689,8 +689,8 @@ static int channel_write_tty(struct ast_channel* channel, struct ast_frame* f, s
if (count < (size_t)f->datalen) {
mixb_read_upd(&pvt->write_mixb, f->datalen - count);

PVT_STAT(pvt, write_rb_overflow_bytes) += f->datalen - count;
PVT_STAT(pvt, write_rb_overflow)++;
ast_atomic_fetchadd_uint64(&PVT_STAT(pvt, write_rb_overflow_bytes), f->datalen - count);
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_rb_overflow), 1);
}

mixb_write(&pvt->write_mixb, &cpvt->mixstream, f->data.ptr, f->datalen);
Expand All @@ -709,10 +709,10 @@ static int channel_write_tty(struct ast_channel* channel, struct ast_frame* f, s
const ssize_t res = iov_write(pvt, pvt->audio_fd, &iov, 1);

if (res >= 0) {
PVT_STAT(pvt, write_frames) += 1;
PVT_STAT(pvt, a_write_bytes) += res;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_frames), 1);
ast_atomic_fetchadd_uint64(&PVT_STAT(pvt, a_write_bytes), res);
if (res != f->datalen) {
PVT_STAT(pvt, write_tframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_tframes), 1);
}
}
}
Expand Down Expand Up @@ -779,10 +779,10 @@ static int channel_write_uac(struct ast_channel* attribute_unused(channel), stru

default:
if (res >= 0) {
PVT_STAT(pvt, write_frames) += 1;
PVT_STAT(pvt, a_write_bytes) += res * sizeof(int16_t);
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_frames), 1);
ast_atomic_fetchadd_uint64(&PVT_STAT(pvt, a_write_bytes), res * sizeof(int16_t));
if (res != samples) {
PVT_STAT(pvt, write_tframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_tframes), 1);
ast_log(LOG_WARNING, "[%s][ALSA][PLAYBACK] Write: %d/%d\n", PVT_ID(pvt), res, samples);
}
}
Expand Down Expand Up @@ -827,7 +827,7 @@ static int channel_write(struct ast_channel* channel, struct ast_frame* f)

if (f->datalen < frame_size) {
ast_debug(8, "[%s] Short voice frame: %d/%d, samples:%d\n", PVT_ID(pvt), f->datalen, (int)frame_size, f->samples);
PVT_STAT(pvt, write_tframes)++;
ast_atomic_fetchadd_uint32(&PVT_STAT(pvt, write_tframes), 1);
} else if (f->datalen > frame_size) {
ast_debug(8, "[%s] Large voice frame: %d/%d, samples: %d\n", PVT_ID(pvt), f->datalen, (int)frame_size, f->samples);
}
Expand Down
Loading

0 comments on commit 1ca012f

Please sign in to comment.