diff --git a/src/adaptors/tcp/tcp_adaptor.c b/src/adaptors/tcp/tcp_adaptor.c index 812896999..8deac62a7 100644 --- a/src/adaptors/tcp/tcp_adaptor.c +++ b/src/adaptors/tcp/tcp_adaptor.c @@ -499,8 +499,14 @@ static void free_connection_IO(void *context) if (conn->common.parent->context_type == TL_LISTENER) { qd_tcp_listener_t *listener = (qd_tcp_listener_t*) conn->common.parent; sys_mutex_lock(&listener->lock); - DEQ_REMOVE(listener->connections, conn); listener->connections_closed++; + if (IS_ATOMIC_FLAG_SET(&listener->closing)) { + // Wake up the next conn on the list to get it closed + qd_tcp_connection_t *next_conn = DEQ_NEXT(conn); + if (!!next_conn) + pn_raw_connection_wake(next_conn->raw_conn); + } + DEQ_REMOVE(listener->connections, conn); sys_mutex_unlock(&listener->lock); // // Call listener decref when a connection associated with the listener is removed (DEQ_REMOVE(listener->connections, conn)) @@ -511,6 +517,12 @@ static void free_connection_IO(void *context) qd_tcp_connector_t *connector = (qd_tcp_connector_t*) conn->common.parent; sys_mutex_lock(&connector->lock); connector->connections_closed++; + if (IS_ATOMIC_FLAG_SET(&connector->closing)) { + // Wake up the next conn on the list to get it closed + qd_tcp_connection_t *next_conn = DEQ_NEXT(conn); + if (!!next_conn) + pn_raw_connection_wake(next_conn->raw_conn); + } DEQ_REMOVE(connector->connections, conn); sys_mutex_unlock(&connector->lock); // @@ -1222,6 +1234,9 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect conn->context.context = conn; conn->context.handler = on_connection_event_CSIDE_IO; + conn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(conn->raw_conn, &conn->context); + sys_mutex_lock(&connector->lock); DEQ_INSERT_TAIL(connector->connections, conn); connector->connections_opened++; @@ -1229,9 +1244,6 @@ static uint64_t handle_first_outbound_delivery_CSIDE(qd_tcp_connector_t *connect vflow_set_ref_from_record(conn->common.vflow, VFLOW_ATTRIBUTE_CONNECTOR, connector->common.vflow); sys_mutex_unlock(&connector->lock); - conn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(conn->raw_conn, &conn->context); - // // The raw connection establishment must be the last thing done in this function. // After this call, a separate IO thread may immediately be invoked in the context @@ -2047,18 +2059,19 @@ static void on_accept(qd_adaptor_listener_t *adaptor_listener, pn_listener_t *pn conn->context.context = conn; conn->context.handler = on_connection_event_LSIDE_IO; + conn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(conn->raw_conn, &conn->context); + + if (listener->protocol_observer) { + conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id); + } + sys_mutex_lock(&listener->lock); DEQ_INSERT_TAIL(listener->connections, conn); listener->connections_opened++; vflow_set_uint64(listener->common.vflow, VFLOW_ATTRIBUTE_FLOW_COUNT_L4, listener->connections_opened); sys_mutex_unlock(&listener->lock); - if (listener->protocol_observer) { - conn->observer_handle = qdpo_begin(listener->protocol_observer, conn->common.vflow, conn, conn->conn_id); - } - - conn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(conn->raw_conn, &conn->context); // Note: this will trigger the connection's event handler on another thread: pn_listener_raw_accept(pn_listener, conn->raw_conn); } @@ -2405,10 +2418,8 @@ QD_EXPORT void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl) sys_mutex_lock(&listener->lock); SET_ATOMIC_FLAG(&listener->closing); qd_tcp_connection_t *conn = DEQ_HEAD(listener->connections); - while (conn) { + if (conn) pn_raw_connection_wake(conn->raw_conn); - conn = DEQ_NEXT(conn); - } sys_mutex_unlock(&listener->lock); } // @@ -2447,10 +2458,8 @@ QD_EXPORT void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) sys_mutex_lock(&connector->lock); SET_ATOMIC_FLAG(&connector->closing); qd_tcp_connection_t *conn = DEQ_HEAD(connector->connections); - while (conn) { + if (conn) pn_raw_connection_wake(conn->raw_conn); - conn = DEQ_NEXT(conn); - } sys_mutex_unlock(&connector->lock); } //