Skip to content

Commit

Permalink
Set the raw_opened flag to boolean instead of atomic. The introductio…
Browse files Browse the repository at this point in the history
…n of the new activation lock already protects raw_opened
  • Loading branch information
ganeshmurthy committed Oct 19, 2023
1 parent 525754e commit f763814
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
11 changes: 4 additions & 7 deletions src/adaptors/tcp_lite/tcp_lite.c
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ static void free_connection_IO(void *context)
}

sys_atomic_destroy(&conn->core_activation);
sys_atomic_destroy(&conn->raw_opened);
qd_timer_free(conn->close_timer);
sys_mutex_free(&conn->activation_lock);
free_tcplite_connection_t(conn);
Expand All @@ -403,12 +402,12 @@ static void close_raw_connection_XSIDE_IO(tcplite_connection_t *conn)
if (conn->state != XSIDE_CLOSING) {
set_state_XSIDE_IO(conn, XSIDE_CLOSING);
if (!!conn->raw_conn) {
CLEAR_ATOMIC_FLAG(&conn->raw_opened);
pn_raw_connection_close(conn->raw_conn);
drain_read_buffers_XSIDE_IO(conn->raw_conn);
drain_write_buffers_XSIDE_IO(conn->raw_conn);

sys_mutex_lock(&conn->activation_lock);
conn->raw_opened = false;
pn_raw_connection_set_context(conn->raw_conn, 0);
conn->raw_conn = 0;
sys_mutex_unlock(&conn->activation_lock);
Expand Down Expand Up @@ -964,7 +963,6 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li
conn->common.parent = (tcplite_common_t*) cr;

sys_atomic_init(&conn->core_activation, 0);
sys_atomic_init(&conn->raw_opened, 0);

conn->listener_side = false;
conn->state = CSIDE_RAW_CONNECTION_OPENING;
Expand Down Expand Up @@ -1007,7 +1005,7 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li
// of the new connection to handle raw connection events.
// ISSUE-1202 - Set the conn->raw_opened flag before calling pn_proactor_raw_connect()
//
SET_ATOMIC_FLAG(&conn->raw_opened);
conn->raw_opened = true;
pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port);
}

Expand All @@ -1026,7 +1024,7 @@ static void handle_outbound_delivery_CSIDE(tcplite_connection_t *conn, qdr_link_
// continued processing in the correct context.
//
sys_mutex_lock(&conn->activation_lock);
if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) {
if (conn->raw_opened) {
pn_raw_connection_wake(conn->raw_conn);
}
sys_mutex_unlock(&conn->activation_lock);
Expand Down Expand Up @@ -1375,7 +1373,6 @@ void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void

sys_mutex_init(&conn->activation_lock);
sys_atomic_init(&conn->core_activation, 0);
sys_atomic_init(&conn->raw_opened, 1);

conn->listener_side = true;
conn->state = LSIDE_INITIAL;
Expand Down Expand Up @@ -1419,7 +1416,7 @@ static void CORE_activate(void *context, qdr_connection_t *core_conn)
case TL_CONNECTION:
conn = (tcplite_connection_t*) common;
sys_mutex_lock(&conn->activation_lock);
if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) {
if (conn->raw_opened) {
SET_ATOMIC_FLAG(&conn->core_activation);
pn_raw_connection_wake(conn->raw_conn);
}
Expand Down
2 changes: 1 addition & 1 deletion src/adaptors/tcp_lite/tcp_lite.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ typedef struct tcplite_connection_t {
pn_raw_connection_t *raw_conn;
sys_mutex_t activation_lock;
sys_atomic_t core_activation;
sys_atomic_t raw_opened;
qd_timer_t *close_timer;
qdr_link_t *inbound_link;
qd_message_t *inbound_stream;
Expand All @@ -145,6 +144,7 @@ typedef struct tcplite_connection_t {
bool inbound_first_octet;
bool outbound_first_octet;
bool outbound_body_complete;
bool raw_opened;
} tcplite_connection_t;


Expand Down
4 changes: 3 additions & 1 deletion src/cutthrough_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio

case QD_ACTIVATION_TCP: {
tcplite_connection_t *conn = safe_deref_tcplite_connection_t(activation->safeptr);
if (!!conn && IS_ATOMIC_FLAG_SET(&conn->raw_opened)) {
sys_mutex_lock(&conn->activation_lock);
if (!!conn && conn->raw_opened) {
pn_raw_connection_wake(conn->raw_conn);
}
sys_mutex_unlock(&conn->activation_lock);
break;
}
}
Expand Down

0 comments on commit f763814

Please sign in to comment.