diff --git a/include/qpid/dispatch/amqp_adaptor.h b/include/qpid/dispatch/amqp_adaptor.h index 23f8e0ee8..2f9ee64a3 100644 --- a/include/qpid/dispatch/amqp_adaptor.h +++ b/include/qpid/dispatch/amqp_adaptor.h @@ -34,7 +34,13 @@ typedef struct pn_link_t pn_link_t; // These types are private to the AMQP adaptor: typedef struct qd_connection_t qd_connection_t; typedef struct qd_link_t qd_link_t; +typedef struct qd_session_t qd_session_t; +// Session windowing limits +extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections +extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections +extern const size_t qd_session_max_outgoing_bytes; // limit to outgoing buffered data +extern const size_t qd_session_low_outgoing_bytes; // low water mark to resume buffering outgoing data // For use by message.c @@ -46,6 +52,8 @@ pn_link_t *qd_link_pn(const qd_link_t *link); bool qd_connection_strip_annotations_in(const qd_connection_t *c); uint64_t qd_connection_max_message_size(const qd_connection_t *c); void qd_connection_log_policy_denial(const qd_link_t *link, const char *text); +qd_session_t *qd_link_get_session(const qd_link_t *link); +size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn); // Used by the log module void qd_amqp_connection_set_tracing(bool enabled); diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index c1b59d5f0..d4ff2224f 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -51,15 +51,6 @@ typedef struct qd_link_t qd_link_t; #define QD_QLIMIT_Q2_LOWER 32 // Re-enable link receive #define QD_QLIMIT_Q2_UPPER (QD_QLIMIT_Q2_LOWER * 2) // Disable link receive -// Q3 limits the number of bytes allowed to be buffered in an AMQP session's outgoing buffer. Once the Q3 upper limit -// is hit (read via pn_session_outgoing_bytes), pn_link_send will no longer be called for ALL outgoing links sharing the -// session. When enough outgoing bytes have been drained below the lower limit pn_link_sends will resume. Note that Q3 -// only applies to AMQP links. Non-AMQP (adaptor) link output is limited by the capacity of the raw connection buffer -// pool. - -#define QD_QLIMIT_Q3_LOWER (QD_QLIMIT_Q2_UPPER * 2) // in qd_buffer_t's -#define QD_QLIMIT_Q3_UPPER (QD_QLIMIT_Q3_LOWER * 2) - // Callback for status change (confirmed persistent, loaded-in-memory, etc.) typedef struct qd_message_t qd_message_t; diff --git a/python/skupper_router/management/skrouter.json b/python/skupper_router/management/skrouter.json index b4f568366..0cab8b9ef 100644 --- a/python/skupper_router/management/skrouter.json +++ b/python/skupper_router/management/skrouter.json @@ -821,7 +821,7 @@ }, "maxSessionFrames": { "type": "integer", - "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings, if specified, will overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).", + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.", "required": false, "create": true }, @@ -983,7 +983,7 @@ }, "maxSessionFrames": { "type": "integer", - "description": "Session incoming window measured in transfer frames for sessions created on this connection. This is the number of transfer frames that may simultaneously be in flight for all links in the session. Setting this value to zero selects the default session window size. Policy settings will not overwrite this value. The numerical product of maxFrameSize and maxSessionFrames may not exceed 2^31-1. If (maxFrameSize x maxSessionFrames) exceeds 2^31-1 then maxSessionFrames is reduced to (2^31-1 / maxFrameSize). maxSessionFrames has a minimum value of 1. Defaults to 0 (unlimited window).", + "description": "Session incoming window measured in transfer frames for sessions created on this connection. This value sets a limit to the number of incoming frames the router will buffer before flow-control is enforced. Thus the maximum amount of memory required for holding incoming data is limited to (maxFrameSize * maxSessionFrames) bytes per session. If not explicitly set a default session window size that is optimized for the connection role is selected. Policy settings will not overwrite this value. maxSessionFrames has a minimum value of 2.", "required": false, "create": true }, diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 48e38c11c..afb256247 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1172,7 +1172,7 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link) qd_session_t *qd_ssn = qd_link_get_session(link); if (qd_session_is_q3_blocked(qd_ssn)) { // Q3 blocked - have we drained enough outgoing bytes? - if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_get_outgoing_threshold(qd_ssn)) { + if (qd_session_get_outgoing_capacity(qd_ssn) >= qd_session_low_outgoing_bytes) { // yes. We must now unblock all links that have been blocked by Q3 qd_link_list_t *blinks = qd_session_q3_blocked_links(qd_ssn); diff --git a/src/adaptors/amqp/container.c b/src/adaptors/amqp/container.c index 69abb07ba..38c73e571 100644 --- a/src/adaptors/amqp/container.c +++ b/src/adaptors/amqp/container.c @@ -69,15 +69,34 @@ struct qd_session_t { sys_atomic_t ref_count; pn_session_t *pn_session; qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty + uint32_t remote_max_frame; +}; - // For outgoing session flow control. Never buffer more than out_window_limit bytes of data on the session before - // returning control to the proactor. This prevents memory bloat and allows proactor to send buffered data in a - // timely manner. The low watermark is used to unblock the session - do not resume writing to the session until the - // amount of available capacity has grown to at least the low watermark. +// Session window limits +// +// A session incoming window determines how many incoming frames the session will accept across all incoming links. This +// places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame * +// max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and +// maxSessionFrames configuration attributes of an AMQP listener/connector. +// +// The remote peers session window must be honored when writing output to a sending link. In addition we limit the +// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to +// improve latency and allow capacity sharing among all links on the session. +// +const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session +const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes - size_t out_window_limit; - size_t out_window_low_watermark; -}; +const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB +const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB + + +// Can we leverage the new Proton Session Window API? +// +#if (PN_VERSION_MAJOR > 0) || (PN_VERSION_MINOR > 39) +#define USE_PN_SESSION_WINDOWS 1 +#else +#define USE_PN_SESSION_WINDOWS 0 +#endif // Bug workaround to free Proton links when we hope they are no longer used! // Fingers crossed! :| @@ -439,7 +458,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, qd_conn->n_sessions++; } DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn); - qd_policy_apply_session_settings(qd_ssn->pn_session, qd_conn); + uint32_t in_window; + qd_policy_get_session_settings(qd_conn, &in_window); + qd_session_set_max_in_window(qd_ssn, in_window); pn_session_open(qd_ssn->pn_session); } } @@ -700,7 +721,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn); conn->qd_sessions[ssn_class] = qd_ssn; qd_session_incref(qd_ssn); - pn_session_set_incoming_capacity(qd_ssn->pn_session, cf->incoming_capacity); + qd_session_set_max_in_window(qd_ssn, cf->session_max_in_window); pn_session_open(qd_ssn->pn_session); } @@ -897,6 +918,9 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id) qd_session_t *qd_session(pn_session_t *pn_ssn) { assert(pn_ssn && qd_session_from_pn(pn_ssn) == 0); + pn_connection_t *pn_conn = pn_session_connection(pn_ssn); + pn_transport_t *pn_tport = pn_connection_transport(pn_conn); + qd_session_t *qd_ssn = new_qd_session_t(); if (qd_ssn) { ZERO(qd_ssn); @@ -905,10 +929,8 @@ qd_session_t *qd_session(pn_session_t *pn_ssn) qd_ssn->pn_session = pn_ssn; DEQ_INIT(qd_ssn->q3_blocked_links); pn_session_set_context(pn_ssn, qd_ssn); - - // @TODO(kgiusti) make these dependent on connection role - qd_ssn->out_window_limit = QD_QLIMIT_Q3_UPPER * QD_BUFFER_SIZE; - qd_ssn->out_window_low_watermark = QD_QLIMIT_Q3_LOWER * QD_BUFFER_SIZE; + qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport); + assert(qd_ssn->remote_max_frame != 0); } return qd_ssn; } @@ -970,22 +992,38 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) { assert(qd_ssn && qd_ssn->pn_session); + // discount any data already written but not yet sent size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session); - if (buffered < qd_ssn->out_window_limit) { - return qd_ssn->out_window_limit - buffered; - } - return 0; + if (buffered >= qd_session_max_outgoing_bytes) + return 0; // exceeded maximum buffered limit + size_t avail = qd_session_max_outgoing_bytes - buffered; + +#if USE_PN_SESSION_WINDOWS + // never exceed the remaining in window of the peer + size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session); + limit *= qd_ssn->remote_max_frame; + return MIN(avail, limit); +#else + return avail; +#endif } -/** Return the session outgoing window low water mark +/** Configure the sessions incoming window limit * - * Blocked session can resume output once the available outgoing capacity reaches at least this value + * @param qd_ssn Session to configure + * @param in_window maximum incoming window in frames */ -size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn) +void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window) { - assert(qd_ssn); - return qd_ssn->out_window_low_watermark; + // older proton session windowing would stall so do not enable it +#if USE_PN_SESSION_WINDOWS + // Use new window configuration API to set the maximum in window and low water mark + assert(in_window >= 2); + int rc = pn_session_set_incoming_window_and_lwm(qd_ssn->pn_session, in_window, in_window / 2); + (void) rc; + assert(rc == 0); +#endif } diff --git a/src/adaptors/amqp/container.h b/src/adaptors/amqp/container.h index 757d71846..e7938c43b 100644 --- a/src/adaptors/amqp/container.h +++ b/src/adaptors/amqp/container.h @@ -107,14 +107,12 @@ uint64_t qd_link_link_id(const qd_link_t *link); void qd_link_set_link_id(qd_link_t *link, uint64_t link_id); struct qd_message_t; void qd_link_set_incoming_msg(qd_link_t *link, struct qd_message_t *msg); -qd_session_t *qd_link_get_session(const qd_link_t *link); void qd_session_incref(qd_session_t *qd_ssn); void qd_session_decref(qd_session_t *qd_ssn); bool qd_session_is_q3_blocked(const qd_session_t *qd_ssn); qd_link_list_t *qd_session_q3_blocked_links(qd_session_t *qd_ssn); -size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn); -size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn); +void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window); void qd_connection_release_sessions(qd_connection_t *qd_conn); diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index b7088338e..2e68e0459 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -700,8 +700,12 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { // Common transport configuration. // pn_transport_set_max_frame(tport, config->max_frame_size); - pn_transport_set_channel_max(tport, config->max_sessions - 1); pn_transport_set_idle_timeout(tport, config->idle_timeout_seconds * 1000); + // pn_transport_set_channel_max sets the maximum session *identifier*, not the total number of sessions. Thus Proton + // will allow sessions with identifiers [0..max_sessions], which is one greater than the value we pass to + // pn_transport_set_channel_max. So to limit the maximum number of simultaineous sessions to config->max_sessions we + // have to decrement it by one for Proton. + pn_transport_set_channel_max(tport, config->max_sessions - 1); } void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_connection_t *pn_conn, qd_connection_t *qd_conn); diff --git a/src/adaptors/amqp/server_config.c b/src/adaptors/amqp/server_config.c index 94543eb2b..b3c09950f 100644 --- a/src/adaptors/amqp/server_config.c +++ b/src/adaptors/amqp/server_config.c @@ -25,6 +25,7 @@ #include "dispatch_private.h" #include "entity.h" +#include #include #include @@ -134,9 +135,6 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, config->http = qd_entity_opt_bool(entity, "http", false); CHECK(); config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", 0); CHECK(); config->http = config->http || config->http_root_dir; /* httpRootDir implies http */ - config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); - config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); - uint64_t ssn_frames = qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK(); config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK(); if (is_listener) { config->initial_handshake_timeout_seconds = qd_entity_get_long(entity, "initialHandshakeTimeoutSeconds"); CHECK(); @@ -191,48 +189,49 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, if (config->link_capacity == 0) config->link_capacity = 250; - if (config->max_sessions == 0 || config->max_sessions > 32768) - // Proton disallows > 32768 - config->max_sessions = 32768; - - if (config->max_frame_size < QD_AMQP_MIN_MAX_FRAME_SIZE) - // Silently promote the minimum max-frame-size - // Proton will do this but the number is needed for the - // incoming capacity calculation. - config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; - - // - // Given session frame count and max frame size, compute session incoming_capacity - // On 64-bit systems the capacity has no practical limit. - // On 32-bit systems the largest default capacity is half the process address space. - // - bool is_64bit = sizeof(size_t) == 8; -#define MAX_32BIT_CAPACITY ((size_t)(2147483647)) - if (ssn_frames == 0) { - config->incoming_capacity = is_64bit ? MAX_32BIT_CAPACITY * (size_t)config->max_frame_size : MAX_32BIT_CAPACITY; - } else { - // Limited incoming frames. - if (is_64bit) { - // Specify this to proton by setting capacity to be - // the product (max_frame_size * ssn_frames). - config->incoming_capacity = (size_t)config->max_frame_size * (size_t)ssn_frames; + // Proton does not support maxSessions > 32768 + int64_t value = (int64_t) qd_entity_get_long(entity, "maxSessions"); CHECK(); + if (value == 0) { + value = 32768; // default + } else if (value < 0 || value > 32768) { + (void) qd_error(QD_ERROR_CONFIG, + "Invalid maxSessions specified (%"PRId64"). Minimum value is 1 and maximum value is %i", + value, 32768); + goto error; + } + config->max_sessions = (uint32_t) value; + + // Ensure maxFrameSize is at least the minimum value required by the standard, + // and it does not exceed the proton APIs max of INT32_MAX + value = (int64_t) qd_entity_get_long(entity, "maxFrameSize"); CHECK(); + if (value == 0) { + value = 16384; // default + } else if (value < QD_AMQP_MIN_MAX_FRAME_SIZE || value > INT32_MAX) { + (void) qd_error(QD_ERROR_CONFIG, + "Invalid maxFrameSize specified (%"PRId64"). Minimum value is %d and maximum value is %"PRIi32, + value, QD_AMQP_MIN_MAX_FRAME_SIZE, INT32_MAX); + goto error; + } + config->max_frame_size = (uint32_t) value; + + // Ensure that maxSessionFrames does not exceed the proton APIs max of INT32_MAX + value = (int64_t) qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK(); + if (value == 0) { + // Use a sane default. Allow router to router links more capacity than AMQP application links + if (strcmp(config->role, "normal") == 0) { + value = qd_session_max_in_win_user / config->max_frame_size; } else { - // 32-bit systems have an upper bound to the capacity - uint64_t max_32bit_capacity = (uint64_t)MAX_32BIT_CAPACITY; - uint64_t capacity = (uint64_t)config->max_frame_size * (uint64_t)ssn_frames; - if (capacity <= max_32bit_capacity) { - config->incoming_capacity = (size_t)capacity; - } else { - config->incoming_capacity = MAX_32BIT_CAPACITY; - uint64_t actual_frames = max_32bit_capacity / (uint64_t)config->max_frame_size; - - qd_log(LOG_CONN_MGR, QD_LOG_WARNING, - "Server configuration for I/O adapter entity name:'%s', host:'%s', port:'%s', " - "requested maxSessionFrames truncated from %" PRId64 " to %" PRId64, - config->name, config->host, config->port, ssn_frames, actual_frames); - } + value = qd_session_max_in_win_trunk / config->max_frame_size; } + // Ensure the window is at least 2 frames to allow a non-zero low water mark + value = MAX(value, 2); + } else if (value < 2 || value > INT32_MAX) { + (void) qd_error(QD_ERROR_CONFIG, + "Invalid maxSessionFrames specified (%"PRId64"). Minimum value is 2 and maximum value is %"PRIi32, + value, INT32_MAX); + goto error; } + config->session_max_in_window = (uint32_t) value; // // For now we are hardwiring this attribute to true. If there's an outcry from the diff --git a/src/adaptors/amqp/server_config.h b/src/adaptors/amqp/server_config.h index a0d8ae846..73ac109ef 100644 --- a/src/adaptors/amqp/server_config.h +++ b/src/adaptors/amqp/server_config.h @@ -199,7 +199,7 @@ typedef struct qd_server_config_t { int inter_router_cost; /** - * The maximum size of an AMQP frame in octets. + * The maximum size of an AMQP frame in octets. Frome maxFrameSize configuration attribute. */ uint32_t max_frame_size; @@ -209,12 +209,14 @@ typedef struct qd_server_config_t { uint32_t max_sessions; /** - * The incoming capacity value is calculated to be (sessionMaxFrames * maxFrameSize). - * In a round about way the calculation forces the AMQP Begin/incoming-capacity value - * to equal the specified sessionMaxFrames value measured in units of transfer frames. - * This calculation is done to satisfy proton pn_session_set_incoming_capacity(). + * The session incoming window limit in frames. From maxSessionFrames configuration attribute + * + * The window indicates the maximum number of incoming *frames* that the session will buffer. This places a limit on + * the amount of memory the router needs to reserve to accomodate data arriving from the peer session endpoint. + * + * The maximum amount of memory is computed as (max_frame_size * session_max_in_window) */ - size_t incoming_capacity; + uint32_t session_max_in_window; /** * The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the diff --git a/src/message.c b/src/message.c index 8807e93db..3d6b65feb 100644 --- a/src/message.c +++ b/src/message.c @@ -1984,30 +1984,35 @@ uint32_t _compose_router_annotations(qd_message_pvt_t *msg, unsigned int ra_flag } -static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_content_t *content, pn_link_t *pnl, pn_session_t *pns, bool *q3_stalled) +static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_content_t *content, qd_link_t *link, bool *session_stalled) { - const size_t q3_upper = QD_BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; - bool notify_consumed = false; + pn_link_t *pnl = qd_link_pn(link); + size_t session_limit = qd_session_get_outgoing_capacity(qd_link_get_session(link)); + bool notify_consumed = false; - *q3_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && (pn_session_outgoing_bytes(pns) >= q3_upper); - while (!*q3_stalled && (sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT != 0) { + *session_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && session_limit == 0; + while (!*session_stalled && (sys_atomic_get(&content->uct_consume_slot) - sys_atomic_get(&content->uct_produce_slot)) % UCT_SLOT_COUNT != 0) { uint32_t use_slot = sys_atomic_get(&content->uct_consume_slot); qd_buffer_t *buf = DEQ_HEAD(content->uct_slots[use_slot]); - while (!!buf) { + while (!!buf && session_limit > 0) { DEQ_REMOVE_HEAD(content->uct_slots[use_slot]); if (!IS_ATOMIC_FLAG_SET(&content->aborted)) { ssize_t sent = pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf)); (void) sent; assert(sent == qd_buffer_size(buf)); + // (probably) ok to overflow the session limit a bit + session_limit = (sent >= session_limit) ? 0 : session_limit - sent; } qd_buffer_free(buf); buf = DEQ_HEAD(content->uct_slots[use_slot]); } - sys_atomic_set(&content->uct_consume_slot, (use_slot + 1) % UCT_SLOT_COUNT); - notify_consumed = true; - *q3_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && (pn_session_outgoing_bytes(pns) >= q3_upper); + if (DEQ_IS_EMPTY(content->uct_slots[use_slot])) { + sys_atomic_set(&content->uct_consume_slot, (use_slot + 1) % UCT_SLOT_COUNT); + notify_consumed = true; + } + *session_stalled = !IS_ATOMIC_FLAG_SET(&content->aborted) && session_limit == 0; } if ((IS_ATOMIC_FLAG_SET(&content->aborted) || IS_ATOMIC_FLAG_SET(&content->receive_complete)) @@ -2029,21 +2034,22 @@ static void qd_message_send_cut_through(qd_message_pvt_t *msg, qd_message_conten ssize_t qd_message_send(qd_message_t *in_msg, qd_link_t *link, unsigned int ra_flags, - bool *q3_stalled) + bool *session_stalled) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; qd_message_content_t *content = msg->content; pn_link_t *pnl = qd_link_pn(link); - pn_session_t *pns = pn_link_session(pnl); ssize_t bytes_sent = 0; - *q3_stalled = false; + CHECK_PROACTOR_CONNECTION(pn_session_connection(pn_link_session(pnl))); + + *session_stalled = false; if (msg->uct_started) { // // Perform the cut-through transfer from the message content to the outbound link // - qd_message_send_cut_through(msg, content, pnl, pns, q3_stalled); + qd_message_send_cut_through(msg, content, link, session_stalled); return 0; } @@ -2099,14 +2105,12 @@ ssize_t qd_message_send(qd_message_t *in_msg, qd_buffer_t *buf = msg->cursor.buffer; - qd_message_q2_unblocker_t q2_unblock = {0}; - const size_t q3_upper = QD_BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; - - CHECK_PROACTOR_CONNECTION(pn_session_connection(pns)); + qd_message_q2_unblocker_t q2_unblock = {0}; + size_t session_limit = qd_session_get_outgoing_capacity(qd_link_get_session(link)); while (!IS_ATOMIC_FLAG_SET(&content->aborted) && buf - && pn_session_outgoing_bytes(pns) < q3_upper) { + && session_limit > 0) { // This will send the remaining data in the buffer if any. There may be // zero bytes left to send if we stopped here last time and there was @@ -2114,6 +2118,7 @@ ssize_t qd_message_send(qd_message_t *in_msg, // size_t buf_size = qd_buffer_size(buf); int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf)); + num_bytes_to_send = MIN(num_bytes_to_send, session_limit); if (num_bytes_to_send > 0) { bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send); } @@ -2137,8 +2142,9 @@ ssize_t qd_message_send(qd_message_t *in_msg, } else { msg->cursor.cursor += bytes_sent; + session_limit -= bytes_sent; - if (bytes_sent == num_bytes_to_send) { + if (msg->cursor.cursor == qd_buffer_cursor(buf)) { // // sent the whole buffer. // Can we move to the next buffer? Only if there is a next buffer @@ -2218,9 +2224,9 @@ ssize_t qd_message_send(qd_message_t *in_msg, && (!msg->cursor.buffer || ((msg->cursor.cursor - qd_buffer_base(msg->cursor.buffer) == qd_buffer_size(msg->cursor.buffer)) && !DEQ_NEXT(msg->cursor.buffer)))) { msg->uct_started = true; - qd_message_send_cut_through(msg, content, pnl, pns, q3_stalled); + qd_message_send_cut_through(msg, content, link, session_stalled); } else { - *q3_stalled = (pn_session_outgoing_bytes(pns) >= q3_upper); + *session_stalled = session_limit == 0; } return bytes_sent; diff --git a/src/policy.c b/src/policy.c index 3cfb22183..75214b720 100644 --- a/src/policy.c +++ b/src/policy.c @@ -652,17 +652,23 @@ bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn) // // -void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn) +void qd_policy_get_session_settings(qd_connection_t *qd_conn, uint32_t *in_window) { - size_t capacity; - if (qd_conn->policy_settings && qd_conn->policy_settings->spec.maxSessionWindow - && !qd_conn->policy_settings->spec.outgoingConnection) { - capacity = qd_conn->policy_settings->spec.maxSessionWindow; - } else { - const qd_server_config_t * cf = qd_connection_config(qd_conn); - capacity = cf->incoming_capacity; + const qd_server_config_t *cf = qd_connection_config(qd_conn); + + // Assume defaults will be used + *in_window = cf->session_max_in_window; + + if (qd_conn->policy_settings) { + const qd_policy_spec_t *spec = &qd_conn->policy_settings->spec; + if (!spec->outgoingConnection && spec->maxSessionWindow) { + // Policy configures the window *in bytes* but Proton uses *frames*. Convert to frames + uint32_t max_frame = spec->maxFrameSize ? spec->maxFrameSize : cf->max_frame_size; + *in_window = spec->maxSessionWindow / max_frame; + if (*in_window < 2) + *in_window = 2; + } } - pn_session_set_incoming_capacity(ssn, capacity); } // diff --git a/src/policy.h b/src/policy.h index 145277dc3..82c220a8c 100644 --- a/src/policy.h +++ b/src/policy.h @@ -143,12 +143,12 @@ bool qd_policy_lookup_vhost_alias( bool qd_policy_approve_amqp_session(pn_session_t *ssn, qd_connection_t *qd_conn); -/** Apply policy or default settings for a new session. +/** Retrieve policy or default settings for a new session on the given connection * - * @param[in] ssn proton session being set * @param[in] qd_conn dispatch connection with policy settings and counts + * @param[out] in_window set the the allowable maximum incoming session window (in frames) **/ -void qd_policy_apply_session_settings(pn_session_t *ssn, qd_connection_t *qd_conn); +void qd_policy_get_session_settings(qd_connection_t *qd_conn, uint32_t *in_window); /** Approve a new sender link based on connection's policy. diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 1110552f8..32809465c 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -509,6 +509,36 @@ def setUpClass(cls): ]) cls.routers.append(cls.tester.qdrouterd(name, config_24, wait=False, expect=Process.EXIT_FAIL)) + # Invalid maxSessions + name = "test-router-25" + config_25 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': name}), + ('listener', {'host': '0.0.0.0', + 'port': 9999, + 'maxSessions': 99999}) + ]) + cls.routers.append(cls.tester.qdrouterd(name, config_25, wait=False, expect=Process.EXIT_FAIL)) + + # Invalid maxFrameSize + name = "test-router-26" + config_26 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': name}), + ('listener', {'host': '0.0.0.0', + 'port': 9999, + 'maxFrameSize': -1}) + ]) + cls.routers.append(cls.tester.qdrouterd(name, config_26, wait=False, expect=Process.EXIT_FAIL)) + + # Invalid maxSessionFrames + name = "test-router-27" + config_27 = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': name}), + ('listener', {'host': '0.0.0.0', + 'port': 9999, + 'maxSessionFrames': 1}) + ]) + cls.routers.append(cls.tester.qdrouterd(name, config_27, wait=False, expect=Process.EXIT_FAIL)) + # Give some time for the test to write to the .out file. Without this, the tests execute too # fast and find that nothing has yet been written to the .out files. for router in cls.routers: @@ -644,6 +674,27 @@ def test_48_router_in_error(self): err = "version must be >= oldestValidVersion" self.routers[24].wait_log_message(err, timeout=1.0) + with open(self.routers[25].outfile + '.out', 'r') as out_file: + for line in out_file: + if "Invalid maxSessions" in line: + test_pass = True + break + self.assertTrue(test_pass) + + with open(self.routers[26].outfile + '.out', 'r') as out_file: + for line in out_file: + if "Invalid maxFrameSize" in line: + test_pass = True + break + self.assertTrue(test_pass) + + with open(self.routers[27].outfile + '.out', 'r') as out_file: + for line in out_file: + if "Invalid maxSessionFrames" in line: + test_pass = True + break + self.assertTrue(test_pass) + class OneRouterTest(TestCase): """System tests involving a single router""" diff --git a/tests/system_tests_protocol_settings.py b/tests/system_tests_protocol_settings.py index faaa889e6..8cf75d6aa 100644 --- a/tests/system_tests_protocol_settings.py +++ b/tests/system_tests_protocol_settings.py @@ -96,7 +96,7 @@ def setUpClass(cls): config = Qdrouterd.Config([ ('router', {'mode': 'standalone', 'id': 'QDR'}), - ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '500000'}), + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxSessions': '32768'}), ]) cls.router = cls.tester.qdrouterd(name, config) cls.router.wait_ready() @@ -105,11 +105,14 @@ def setUpClass(cls): def test_max_sessions_large(self): sniffer = ProtocolSettingsSniffer(self.router.addresses[0], "xxx") sniffer.run() + # note: remote_channel_max is the highest channel number supported, so + # there are actually [0..remote_channel_max] channels, which is 1 + # greater than the value of remote_channel_max self.assertEqual(32767, sniffer.remote_channel_max) class MaxFrameSmallTest(TestCase): - """System tests setting proton max-frame-size""" + """System tests setting proton minimum max-frame-size""" @classmethod def setUpClass(cls): """Start a router and a messenger""" @@ -118,7 +121,7 @@ def setUpClass(cls): config = Qdrouterd.Config([ ('router', {'mode': 'standalone', 'id': 'QDR'}), - ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '2'}), + ('listener', {'host': '0.0.0.0', 'port': cls.tester.get_port(), 'maxFrameSize': '512'}), ]) cls.router = cls.tester.qdrouterd(name, config) cls.router.wait_ready() diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py index 8d40f3b9c..810bf42fc 100644 --- a/tests/system_tests_two_routers.py +++ b/tests/system_tests_two_routers.py @@ -1906,5 +1906,75 @@ def run(self): Container(self).run() +class TwoRouterSessionWindowTest(TestCase): + """ + Verify that the session window on the inter-router connection does not + stall on large message transfer + """ + @classmethod + def setUpClass(cls): + super(TwoRouterSessionWindowTest, cls).setUpClass() + + # Configure a very small inter-router session window: max_frame * max_session_window + cls.max_frame = 512 + cls.max_session_window = 10 + + def router(name, extra_config): + config = [ + ('router', {'mode': 'interior', + 'id': name, + 'dataConnectionCount': 0}), + + ('listener', {'port': cls.tester.get_port()}), + + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + extra_config + + config = Qdrouterd.Config(config) + return cls.tester.qdrouterd(name, config, wait=False) + + inter_router_port = cls.tester.get_port() + + cls.RouterA = router('RouterA', + [ + ('listener', {'role': 'inter-router', + 'host': '0.0.0.0', + 'port': inter_router_port, + 'saslMechanisms': 'ANONYMOUS', + 'maxFrameSize': cls.max_frame, + 'maxSessionFrames': cls.max_session_window}), + ]) + + cls.RouterB = router('RouterB', + [ + ('connector', {'name': 'toRouterA', + 'role': 'inter-router', + 'port': inter_router_port, + 'maxFrameSize': cls.max_frame, + 'maxSessionFrames': cls.max_session_window}), + ]) + + cls.RouterA.wait_router_connected('RouterB') + cls.RouterB.wait_router_connected('RouterA') + + def test_large_transfer(self): + """ + Transfer AMQP messages that are much bigger than the session window on + the inter-router connection + """ + payload = "?" * (10 * self.max_frame * self.max_session_window) + msg = Message(body=payload) + rx_client = AsyncTestReceiver(address=self.RouterB.addresses[0], source="test/session") + tx_client = AsyncTestSender(address=self.RouterA.addresses[0], + target="test/session", message=msg, + count=10) + tx_client.wait() # until all messages are sent + for i in range(10): + rx_client.queue.get() # will raise error if no message available + rx_client.stop() + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/test-receiver.c b/tests/test-receiver.c index 75547e1ad..46377ffbd 100644 --- a/tests/test-receiver.c +++ b/tests/test-receiver.c @@ -24,6 +24,8 @@ #include "proton/message.h" #include "proton/session.h" #include "proton/proactor.h" +#include "proton/transport.h" +#include "proton/version.h" #include #include @@ -40,6 +42,10 @@ bool stop = false; bool verbose = false; bool debug_mode = false; +uint32_t in_session_window = 0; // 0 == use Proton default (frames) +uint32_t in_window_lwm = 0; // incoming session window low watermark (frames) 0 == use Proton default +uint32_t in_max_frame = 0; // 0 == use Proton default + int credit_window = 1000; char *source_address = "test-address"; // name of the source node to receive from char _addr[] = "127.0.0.1:5672"; @@ -88,12 +94,26 @@ static bool event_handler(pn_event_t *event) debug("new event=%s\n", pn_event_type_name(type)); switch (type) { - case PN_CONNECTION_INIT: { + case PN_CONNECTION_BOUND: { // Create and open all the endpoints needed to send a message // + pn_transport_t *tport = pn_connection_transport(pn_conn); in_message = pn_message(); + if (in_max_frame) { + pn_transport_set_max_frame(tport, in_max_frame); + } pn_connection_open(pn_conn); pn_ssn = pn_session(pn_conn); + if (in_session_window) { +#if (PN_VERSION_MAJOR > 0) || (PN_VERSION_MINOR > 39) + int rc = pn_session_set_incoming_window_and_lwm(pn_ssn, in_session_window, in_window_lwm); + if (rc != 0) { + fprintf(stderr, "Failed to set incoming window and low watermark\n"); + fflush(stderr); + abort(); + } +#endif + } pn_session_open(pn_ssn); pn_link = pn_receiver(pn_ssn, "MyReceiver"); pn_terminus_set_address(pn_link_source(pn_link), source_address); @@ -199,6 +219,9 @@ static void usage(void) printf("-E \tExit without cleanly closing the connection [off]\n"); printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose)); printf("-D \tPrint debug info [off]\n"); + printf("-F \tSet Incoming Max Frame (max 512, 0 == use internal default) [%"PRIu32" bytes]\n", in_max_frame); + printf("-W \tSet Session Incoming Window (min 2, 0 == use internal default) [%"PRIu32" frames]\n", in_session_window); + printf("-L \tSet Session Incoming Window Low Watermark (0 == use internal default) [%"PRIu32" frames]\n", in_window_lwm); exit(1); } @@ -208,7 +231,7 @@ int main(int argc, char** argv) /* command line options */ opterr = 0; int c; - while((c = getopt(argc, argv, "i:a:s:hdDw:c:E")) != -1) { + while((c = getopt(argc, argv, "i:a:s:hdDw:c:EF:W:L:")) != -1) { switch(c) { case 'h': usage(); break; case 'a': host_address = optarg; break; @@ -225,6 +248,21 @@ int main(int argc, char** argv) case 'E': drop_connection = true; break; case 'd': verbose = true; break; case 'D': debug_mode = true; break; + case 'F': + if (sscanf(optarg, "%"SCNu32, &in_max_frame) != 1 || in_max_frame < 512) + usage(); + break; + case 'W': + if (sscanf(optarg, "%"SCNu32, &in_session_window) != 1 || in_session_window < 2) + usage(); + break; + case 'L': + if (sscanf(optarg, "%"SCNu32, &in_window_lwm) != 1 || in_window_lwm > in_session_window) { + fprintf(stderr, "Session Incoming Window Low Watermark (%"PRIu32") must be <= Session Incoming Window (%"PRIu32")\n", + in_window_lwm, in_session_window); + usage(); + } + break; default: usage(); diff --git a/tests/test-sender.c b/tests/test-sender.c index 79e782175..2646d00e0 100644 --- a/tests/test-sender.c +++ b/tests/test-sender.c @@ -46,7 +46,7 @@ #define BODY_SIZE_SMALL 100L #define BODY_SIZE_MEDIUM ((long int)((4 * 1024) + 1)) #define BODY_SIZE_LARGE ((long int)((65 * 1024) + 1)) -#define BODY_SIZE_HUGE ((long int)((QD_BUFFER_DEFAULT_SIZE * QD_QLIMIT_Q3_UPPER * 3) + 1)) +#define BODY_SIZE_HUGE ((long int)((4 * 1024 * 1024) + 1)) #define DEFAULT_PRIORITY 4