Skip to content

Commit

Permalink
Fixes skupperproject#1700: refactor the AMQP link lifecycle
Browse files Browse the repository at this point in the history
This change removes some of the old link attach routing logic and
attempts to clean up the link API.

The logic that used to track the exchange of Attach/Detach
performatives has been simplified. The various counters and booleans
maintained by the qdr_link_t structure for tracking this exchange has
been reduced to a mask/flag implementation similar to Protons
implementation of endpoint state.

This patch refactors the link detach adaptor API to be more like the
existing AMQP connection API: there is now an explict API call to
release the link instance at the end of its lifecycle.

The adaptor API is modified by separating the AMQP detach handling
from the release of the link instance. The old qdr_link_detach()
adaptor function has been refactored into two functions:
qdr_link_detach_received() and qdr_link_close().

The qdr_link_detach_received() call is made by the AMQP adaptor when a
Detach Peformative has been received by the peer. It is only used by
the AMQP adaptor.

The new qdr_link_closed() API call is made by all adaptors when the
link instance is destroyed. This is similar to the existing
qdr_connection_closed() call but for links. It is used by all adaptors
to indicate to the core that the link is no longer in use and can be
cleaned up. In the case of the AMQP adaptor this call will be made
after the link detach handshake has completed.

Test coverage by the test-sender AMQP client has been increased by
adding a clean connection close function.
  • Loading branch information
kgiusti committed Dec 17, 2024
1 parent f2b8092 commit 0d0caab
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 241 deletions.
31 changes: 21 additions & 10 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,6 @@ void qdr_terminus_set_dnp_address_iterator(qdr_terminus_t *term, qd_iterator_t *
******************************************************************************
*/

typedef enum {
QD_DETACHED, // Protocol detach
QD_CLOSED, // Protocol close
QD_LOST // Connection or session closed
} qd_detach_type_t;

/**
* qdr_link_set_context
*
Expand Down Expand Up @@ -810,15 +804,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

/**
* qdr_link_detach
* qdr_link_detach_received
*
* This function is invoked when a link detach arrives.
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach
* (peer-initiated link detach) or in response to a detach sent by the router (second detach).
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param dt The type of detach that occurred.
* @param error The link error from the detach frame or 0 if none.
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error);


/**
* qdr_link_closed
*
* This function is invoked by the adaptor when the link has fully closed. This will be the last call made by the
* adaptor for this link. This may be called as a result of a successful detach handshake or due to link loss. This will
* also be called during adaptor shutdown on any outstanding links.
*
* The core may free the qdr_link_t by this call. The adaptor MUST NOT reference the qdr_link_t on return from this
* call.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake.
*/
void qdr_link_closed(qdr_link_t *link, bool forced);


/**
* qdr_link_deliver
Expand Down
96 changes: 46 additions & 50 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
return ref ? (qdr_delivery_t*) ref->ref : 0;
}

// clean up all qdr_delivery/pn_delivery bindings for the link
//
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link)
{
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// this will remove and release the ref
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}
}


// read the delivery-state set by the remote endpoint
//
Expand Down Expand Up @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt)
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
if (!link)
return 0;
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
Expand Down Expand Up @@ -1257,29 +1238,55 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det
}
}

qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
// Notify the core that a detach has been received

qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link);
if (rlink) {
//
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
}

qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
pn_condition_t *cond = pn_link_remote_condition(pn_link);
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach_received(rlink, error);
}

return 0;
}


/**
* Link closed handler
*
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
* link has not properly closed (detach handshake completed).
*/
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
{
assert(qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
if (qdr_link) {
// Notify core that this link no longer exists
qdr_link_set_context(qdr_link, 0);
qd_link_set_context(qd_link, 0);
qdr_link_closed(qdr_link, forced);
// This will cause the core to free qdr_link at some point so:
qdr_link = 0;
}
}

static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
Expand Down Expand Up @@ -1776,8 +1783,8 @@ static const qd_node_type_t router_node = {"router", 0,
AMQP_outgoing_link_handler,
AMQP_conn_wake_handler,
AMQP_link_detach_handler,
AMQP_link_closed_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
Expand Down Expand Up @@ -1920,7 +1927,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
return;

pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached
return;

if (error) {
Expand All @@ -1945,17 +1952,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

//
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);
if (!first) {
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}

Expand Down
56 changes: 32 additions & 24 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct qd_link_t {
ALLOC_DEFINE_SAFE(qd_link_t);
ALLOC_DEFINE(qd_link_ref_t);

static void qd_link_free(qd_link_t *);


/** Encapsulates a proton session */
struct qd_session_t {
DEQ_LINKS(qd_session_t);
Expand Down Expand Up @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed.
//
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
Expand All @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced
qd_link_free(qd_link);
}

Expand Down Expand Up @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link)
// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
if (msg) {
qd_nullify_safe_ptr(&link->incoming_msg);
qd_message_free(msg);
}
}
Expand All @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link)
static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
// Close all links. These links are not being properly 'detached'. They are being orphaned.
//
if (qd_conn)
qd_conn->closed = true;
Expand Down Expand Up @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
// Remote has closed the session. Check for any child links and forcibly close them since there will be
// no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton
// will free all child pn_link_t when it frees the session.
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
Expand All @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true);
qd_link_free(qd_link);
}
}
Expand Down Expand Up @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
Expand All @@ -609,25 +609,35 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);
// notify arrival of inbound detach
container->ntype->link_detach_handler(container->qd_router, qd_link);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
} else { // no qd_link, manually detach or free
if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
pn_link_close(pn_link);
} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
}
break;

case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
}
add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why???
}
break;

Expand Down Expand Up @@ -775,16 +785,14 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
}


void qd_link_free(qd_link_t *link)
static void qd_link_free(qd_link_t *link)
{
if (!link) return;

sys_mutex_lock(&amqp_adaptor.container->lock);
DEQ_REMOVE(amqp_adaptor.container->links, link);
sys_mutex_unlock(&amqp_adaptor.container->lock);

amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link);

cleanup_link(link);
free_qd_link_t(link);
}
Expand Down
2 changes: 0 additions & 2 deletions src/adaptors/amqp/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ qd_container_t *qd_container(qd_router_t *router, const qd_node_type_t *node_typ
void qd_container_free(qd_container_t *container);

qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char *name, qd_session_class_t);
void qd_link_free(qd_link_t *link);

/**
* List of reference in the qd_link used to track abandoned deliveries
Expand Down Expand Up @@ -98,7 +97,6 @@ pn_terminus_t *qd_link_target(qd_link_t *link);
pn_terminus_t *qd_link_remote_source(qd_link_t *link);
pn_terminus_t *qd_link_remote_target(qd_link_t *link);
void qd_link_close(qd_link_t *link);
void qd_link_free(qd_link_t *link);
void qd_link_q2_restart_receive(const qd_alloc_safe_ptr_t context);
void qd_link_q3_block(qd_link_t *link);
void qd_link_q3_unblock(qd_link_t *link);
Expand Down
15 changes: 10 additions & 5 deletions src/adaptors/amqp/node_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ typedef struct qd_router_t qd_router_t;
typedef bool (*qd_container_delivery_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_disposition_handler_t) (qd_router_t *, qd_link_t *link, pn_delivery_t *pnd);
typedef int (*qd_container_link_handler_t) (qd_router_t *, qd_link_t *link);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link, qd_detach_type_t dt);
typedef int (*qd_container_link_detach_handler_t) (qd_router_t *, qd_link_t *link);
typedef void (*qd_container_link_closed_handler_t) (qd_router_t *, qd_link_t *link, bool forced);
typedef void (*qd_container_node_handler_t) (qd_router_t *);
typedef int (*qd_container_conn_handler_t) (qd_router_t *, qd_connection_t *conn, void *context);
typedef void (*qd_container_link_abandoned_deliveries_handler_t) (qd_router_t *, qd_link_t *link);

/**
* A set of Node handlers for deliveries, links and container events.
Expand Down Expand Up @@ -57,15 +57,20 @@ struct qd_node_type_t {
/** Invoked when an activated connection is available for writing. */
qd_container_conn_handler_t writable_handler;

/** Invoked when a link is detached. */
/** Invoked when link detached is received. */
qd_container_link_detach_handler_t link_detach_handler;

/** The last callback issued for the given qd_link_t. The adaptor must clean up all state related to the qd_link_t
* as it will be freed on return from this call. The forced flag is set to true if the link is being forced closed
* due to the parent connection/session closing or on shutdown.
*/
qd_container_link_closed_handler_t link_closed_handler;

///@}

/** Invoked when a link we created was opened by the peer */
qd_container_link_handler_t link_attach_handler;

qd_container_link_abandoned_deliveries_handler_t link_abandoned_deliveries_handler;

/** Invoked when a link receives a flow event */
qd_container_link_handler_t link_flow_handler;

Expand Down
Loading

0 comments on commit 0d0caab

Please sign in to comment.