diff --git a/src/adaptors/tcp_lite/tcp_lite.c b/src/adaptors/tcp_lite/tcp_lite.c index 0ae579c5a..0862bc8e8 100644 --- a/src/adaptors/tcp_lite/tcp_lite.c +++ b/src/adaptors/tcp_lite/tcp_lite.c @@ -390,7 +390,6 @@ static void free_connection_IO(void *context) } sys_atomic_destroy(&conn->core_activation); - sys_atomic_destroy(&conn->raw_opened); qd_timer_free(conn->close_timer); sys_mutex_free(&conn->activation_lock); free_tcplite_connection_t(conn); @@ -403,12 +402,12 @@ static void close_raw_connection_XSIDE_IO(tcplite_connection_t *conn) if (conn->state != XSIDE_CLOSING) { set_state_XSIDE_IO(conn, XSIDE_CLOSING); if (!!conn->raw_conn) { - CLEAR_ATOMIC_FLAG(&conn->raw_opened); pn_raw_connection_close(conn->raw_conn); drain_read_buffers_XSIDE_IO(conn->raw_conn); drain_write_buffers_XSIDE_IO(conn->raw_conn); sys_mutex_lock(&conn->activation_lock); + conn->raw_opened = false; pn_raw_connection_set_context(conn->raw_conn, 0); conn->raw_conn = 0; sys_mutex_unlock(&conn->activation_lock); @@ -964,7 +963,6 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li conn->common.parent = (tcplite_common_t*) cr; sys_atomic_init(&conn->core_activation, 0); - sys_atomic_init(&conn->raw_opened, 0); conn->listener_side = false; conn->state = CSIDE_RAW_CONNECTION_OPENING; @@ -1007,7 +1005,7 @@ static void handle_first_outbound_delivery_CSIDE(tcplite_connector_t *cr, qdr_li // of the new connection to handle raw connection events. // ISSUE-1202 - Set the conn->raw_opened flag before calling pn_proactor_raw_connect() // - SET_ATOMIC_FLAG(&conn->raw_opened); + conn->raw_opened = true; pn_proactor_raw_connect(tcplite_context->proactor, conn->raw_conn, cr->adaptor_config->host_port); } @@ -1026,7 +1024,7 @@ static void handle_outbound_delivery_CSIDE(tcplite_connection_t *conn, qdr_link_ // continued processing in the correct context. // sys_mutex_lock(&conn->activation_lock); - if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { + if (conn->raw_opened) { pn_raw_connection_wake(conn->raw_conn); } sys_mutex_unlock(&conn->activation_lock); @@ -1375,7 +1373,6 @@ void on_accept(qd_adaptor_listener_t *listener, pn_listener_t *pn_listener, void sys_mutex_init(&conn->activation_lock); sys_atomic_init(&conn->core_activation, 0); - sys_atomic_init(&conn->raw_opened, 1); conn->listener_side = true; conn->state = LSIDE_INITIAL; @@ -1419,7 +1416,7 @@ static void CORE_activate(void *context, qdr_connection_t *core_conn) case TL_CONNECTION: conn = (tcplite_connection_t*) common; sys_mutex_lock(&conn->activation_lock); - if (IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { + if (conn->raw_opened) { SET_ATOMIC_FLAG(&conn->core_activation); pn_raw_connection_wake(conn->raw_conn); } diff --git a/src/adaptors/tcp_lite/tcp_lite.h b/src/adaptors/tcp_lite/tcp_lite.h index 0f4868435..135651a0c 100644 --- a/src/adaptors/tcp_lite/tcp_lite.h +++ b/src/adaptors/tcp_lite/tcp_lite.h @@ -120,7 +120,6 @@ typedef struct tcplite_connection_t { pn_raw_connection_t *raw_conn; sys_mutex_t activation_lock; sys_atomic_t core_activation; - sys_atomic_t raw_opened; qd_timer_t *close_timer; qdr_link_t *inbound_link; qd_message_t *inbound_stream; @@ -145,6 +144,7 @@ typedef struct tcplite_connection_t { bool inbound_first_octet; bool outbound_first_octet; bool outbound_body_complete; + bool raw_opened; } tcplite_connection_t; diff --git a/src/cutthrough_utils.c b/src/cutthrough_utils.c index c6e1b4ec3..26f689859 100644 --- a/src/cutthrough_utils.c +++ b/src/cutthrough_utils.c @@ -64,9 +64,11 @@ static void activate_connection(qd_message_activation_t *activation, qd_directio case QD_ACTIVATION_TCP: { tcplite_connection_t *conn = safe_deref_tcplite_connection_t(activation->safeptr); - if (!!conn && IS_ATOMIC_FLAG_SET(&conn->raw_opened)) { + sys_mutex_lock(&conn->activation_lock); + if (!!conn && conn->raw_opened) { pn_raw_connection_wake(conn->raw_conn); } + sys_mutex_unlock(&conn->activation_lock); break; } }