-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUE-1619: Implement session flow control using new Proton window API #1647
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -34,7 +34,13 @@ typedef struct pn_link_t pn_link_t; | |||||
// These types are private to the AMQP adaptor: | ||||||
typedef struct qd_connection_t qd_connection_t; | ||||||
typedef struct qd_link_t qd_link_t; | ||||||
typedef struct qd_session_t qd_session_t; | ||||||
|
||||||
// Session windowing limits | ||||||
extern const size_t qd_session_max_in_win_user; // incoming window byte limit for user connections | ||||||
extern const size_t qd_session_max_in_win_trunk; // incoming window byte limit for inter-router connections | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
extern const size_t qd_session_max_outgoing_bytes; // limit to outgoing buffered data | ||||||
extern const size_t qd_session_low_outgoing_bytes; // low water mark to resume buffering outgoing data | ||||||
|
||||||
// For use by message.c | ||||||
|
||||||
|
@@ -46,6 +52,8 @@ pn_link_t *qd_link_pn(const qd_link_t *link); | |||||
bool qd_connection_strip_annotations_in(const qd_connection_t *c); | ||||||
uint64_t qd_connection_max_message_size(const qd_connection_t *c); | ||||||
void qd_connection_log_policy_denial(const qd_link_t *link, const char *text); | ||||||
qd_session_t *qd_link_get_session(const qd_link_t *link); | ||||||
size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn); | ||||||
|
||||||
// Used by the log module | ||||||
void qd_amqp_connection_set_tracing(bool enabled); | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -69,15 +69,34 @@ struct qd_session_t { | |||||
sys_atomic_t ref_count; | ||||||
pn_session_t *pn_session; | ||||||
qd_link_list_t q3_blocked_links; ///< Q3 blocked if !empty | ||||||
uint32_t remote_max_frame; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
}; | ||||||
|
||||||
// For outgoing session flow control. Never buffer more than out_window_limit bytes of data on the session before | ||||||
// returning control to the proactor. This prevents memory bloat and allows proactor to send buffered data in a | ||||||
// timely manner. The low watermark is used to unblock the session - do not resume writing to the session until the | ||||||
// amount of available capacity has grown to at least the low watermark. | ||||||
// Session window limits | ||||||
// | ||||||
// A session incoming window determines how many incoming frames the session will accept across all incoming links. This | ||||||
// places a limit on the number of incoming data bytes that have to be buffered on the session (connection max-frame * | ||||||
// max incoming window frames). The local session incoming window configuration is determined by the maxFrameSize and | ||||||
// maxSessionFrames configuration attributes of an AMQP listener/connector. | ||||||
// | ||||||
// The remote peers session window must be honored when writing output to a sending link. In addition we limit the | ||||||
// amount of outgoing data that can be buffered on a session before control is returned to Proton. This is necessary to | ||||||
// improve latency and allow capacity sharing among all links on the session. | ||||||
// | ||||||
const size_t qd_session_max_outgoing_bytes = 1048576; // max buffered bytes on a session | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
const size_t qd_session_low_outgoing_bytes = 524288; // low watermark for max buffered bytes | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
size_t out_window_limit; | ||||||
size_t out_window_low_watermark; | ||||||
}; | ||||||
const size_t qd_session_max_in_win_user = (size_t) 8388608; // AMQP application in window max bytes 8MB | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is not clear to me what AMQP application means There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ganeshmurthy "user connection"? Basically any connection via a listener with the role "normal". These would include connections from the control plane to do management stuff. All non router-2-router connections. |
||||||
const size_t qd_session_max_in_win_trunk = (size_t) 134217728; // inter-router in window max bytes 128MB | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ganeshmurthy but the maximum is not per connection, it is per session, right? Hence the variable prefix "qd_session_..." |
||||||
|
||||||
|
||||||
// Can we leverage the new Proton Session Window API? | ||||||
// | ||||||
#if (PN_VERSION_MAJOR > 0) || (PN_VERSION_MINOR > 39) | ||||||
#define USE_PN_SESSION_WINDOWS 1 | ||||||
#else | ||||||
#define USE_PN_SESSION_WINDOWS 0 | ||||||
#endif | ||||||
|
||||||
// Bug workaround to free Proton links when we hope they are no longer used! | ||||||
// Fingers crossed! :| | ||||||
|
@@ -439,7 +458,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event, | |||||
qd_conn->n_sessions++; | ||||||
} | ||||||
DEQ_INSERT_TAIL(qd_conn->child_sessions, qd_ssn); | ||||||
qd_policy_apply_session_settings(qd_ssn->pn_session, qd_conn); | ||||||
uint32_t in_window; | ||||||
qd_policy_get_session_settings(qd_conn, &in_window); | ||||||
qd_session_set_max_in_window(qd_ssn, in_window); | ||||||
pn_session_open(qd_ssn->pn_session); | ||||||
} | ||||||
} | ||||||
|
@@ -700,7 +721,7 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name, | |||||
DEQ_INSERT_TAIL(conn->child_sessions, qd_ssn); | ||||||
conn->qd_sessions[ssn_class] = qd_ssn; | ||||||
qd_session_incref(qd_ssn); | ||||||
pn_session_set_incoming_capacity(qd_ssn->pn_session, cf->incoming_capacity); | ||||||
qd_session_set_max_in_window(qd_ssn, cf->session_max_in_window); | ||||||
pn_session_open(qd_ssn->pn_session); | ||||||
} | ||||||
|
||||||
|
@@ -897,6 +918,9 @@ void qd_link_set_link_id(qd_link_t *link, uint64_t link_id) | |||||
qd_session_t *qd_session(pn_session_t *pn_ssn) | ||||||
{ | ||||||
assert(pn_ssn && qd_session_from_pn(pn_ssn) == 0); | ||||||
pn_connection_t *pn_conn = pn_session_connection(pn_ssn); | ||||||
pn_transport_t *pn_tport = pn_connection_transport(pn_conn); | ||||||
|
||||||
qd_session_t *qd_ssn = new_qd_session_t(); | ||||||
if (qd_ssn) { | ||||||
ZERO(qd_ssn); | ||||||
|
@@ -905,10 +929,8 @@ qd_session_t *qd_session(pn_session_t *pn_ssn) | |||||
qd_ssn->pn_session = pn_ssn; | ||||||
DEQ_INIT(qd_ssn->q3_blocked_links); | ||||||
pn_session_set_context(pn_ssn, qd_ssn); | ||||||
|
||||||
// @TODO(kgiusti) make these dependent on connection role | ||||||
qd_ssn->out_window_limit = QD_QLIMIT_Q3_UPPER * QD_BUFFER_SIZE; | ||||||
qd_ssn->out_window_low_watermark = QD_QLIMIT_Q3_LOWER * QD_BUFFER_SIZE; | ||||||
qd_ssn->remote_max_frame = pn_transport_get_remote_max_frame(pn_tport); | ||||||
assert(qd_ssn->remote_max_frame != 0); | ||||||
} | ||||||
return qd_ssn; | ||||||
} | ||||||
|
@@ -970,22 +992,38 @@ size_t qd_session_get_outgoing_capacity(const qd_session_t *qd_ssn) | |||||
{ | ||||||
assert(qd_ssn && qd_ssn->pn_session); | ||||||
|
||||||
// discount any data already written but not yet sent | ||||||
size_t buffered = pn_session_outgoing_bytes(qd_ssn->pn_session); | ||||||
if (buffered < qd_ssn->out_window_limit) { | ||||||
return qd_ssn->out_window_limit - buffered; | ||||||
} | ||||||
return 0; | ||||||
if (buffered >= qd_session_max_outgoing_bytes) | ||||||
return 0; // exceeded maximum buffered limit | ||||||
size_t avail = qd_session_max_outgoing_bytes - buffered; | ||||||
|
||||||
#if USE_PN_SESSION_WINDOWS | ||||||
// never exceed the remaining in window of the peer | ||||||
size_t limit = pn_session_remote_incoming_window(qd_ssn->pn_session); | ||||||
limit *= qd_ssn->remote_max_frame; | ||||||
return MIN(avail, limit); | ||||||
#else | ||||||
return avail; | ||||||
#endif | ||||||
} | ||||||
|
||||||
|
||||||
/** Return the session outgoing window low water mark | ||||||
/** Configure the sessions incoming window limit | ||||||
* | ||||||
* Blocked session can resume output once the available outgoing capacity reaches at least this value | ||||||
* @param qd_ssn Session to configure | ||||||
* @param in_window maximum incoming window in frames | ||||||
*/ | ||||||
size_t qd_session_get_outgoing_threshold(const qd_session_t *qd_ssn) | ||||||
void qd_session_set_max_in_window(qd_session_t *qd_ssn, uint32_t in_window) | ||||||
{ | ||||||
assert(qd_ssn); | ||||||
return qd_ssn->out_window_low_watermark; | ||||||
// older proton session windowing would stall so do not enable it | ||||||
#if USE_PN_SESSION_WINDOWS | ||||||
// Use new window configuration API to set the maximum in window and low water mark | ||||||
assert(in_window >= 2); | ||||||
int rc = pn_session_set_incoming_window_and_lwm(qd_ssn->pn_session, in_window, in_window / 2); | ||||||
(void) rc; | ||||||
assert(rc == 0); | ||||||
#endif | ||||||
} | ||||||
|
||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -25,6 +25,7 @@ | |||||
#include "dispatch_private.h" | ||||||
#include "entity.h" | ||||||
|
||||||
#include <qpid/dispatch/amqp_adaptor.h> | ||||||
#include <qpid/dispatch/log.h> | ||||||
#include <qpid/dispatch/tls_common.h> | ||||||
|
||||||
|
@@ -134,9 +135,6 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, | |||||
config->http = qd_entity_opt_bool(entity, "http", false); CHECK(); | ||||||
config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", 0); CHECK(); | ||||||
config->http = config->http || config->http_root_dir; /* httpRootDir implies http */ | ||||||
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize"); CHECK(); | ||||||
config->max_sessions = qd_entity_get_long(entity, "maxSessions"); CHECK(); | ||||||
uint64_t ssn_frames = qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK(); | ||||||
config->idle_timeout_seconds = qd_entity_get_long(entity, "idleTimeoutSeconds"); CHECK(); | ||||||
if (is_listener) { | ||||||
config->initial_handshake_timeout_seconds = qd_entity_get_long(entity, "initialHandshakeTimeoutSeconds"); CHECK(); | ||||||
|
@@ -191,48 +189,49 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, | |||||
if (config->link_capacity == 0) | ||||||
config->link_capacity = 250; | ||||||
|
||||||
if (config->max_sessions == 0 || config->max_sessions > 32768) | ||||||
// Proton disallows > 32768 | ||||||
config->max_sessions = 32768; | ||||||
|
||||||
if (config->max_frame_size < QD_AMQP_MIN_MAX_FRAME_SIZE) | ||||||
// Silently promote the minimum max-frame-size | ||||||
// Proton will do this but the number is needed for the | ||||||
// incoming capacity calculation. | ||||||
config->max_frame_size = QD_AMQP_MIN_MAX_FRAME_SIZE; | ||||||
|
||||||
// | ||||||
// Given session frame count and max frame size, compute session incoming_capacity | ||||||
// On 64-bit systems the capacity has no practical limit. | ||||||
// On 32-bit systems the largest default capacity is half the process address space. | ||||||
// | ||||||
bool is_64bit = sizeof(size_t) == 8; | ||||||
#define MAX_32BIT_CAPACITY ((size_t)(2147483647)) | ||||||
if (ssn_frames == 0) { | ||||||
config->incoming_capacity = is_64bit ? MAX_32BIT_CAPACITY * (size_t)config->max_frame_size : MAX_32BIT_CAPACITY; | ||||||
} else { | ||||||
// Limited incoming frames. | ||||||
if (is_64bit) { | ||||||
// Specify this to proton by setting capacity to be | ||||||
// the product (max_frame_size * ssn_frames). | ||||||
config->incoming_capacity = (size_t)config->max_frame_size * (size_t)ssn_frames; | ||||||
// Proton does not support maxSessions > 32768 | ||||||
int64_t value = (int64_t) qd_entity_get_long(entity, "maxSessions"); CHECK(); | ||||||
if (value == 0) { | ||||||
value = 32768; // default | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
define QD_PN_MAX_SESSIONS in amqp.h ? |
||||||
} else if (value < 0 || value > 32768) { | ||||||
(void) qd_error(QD_ERROR_CONFIG, | ||||||
"Invalid maxSessions specified (%"PRId64"). Minimum value is 1 and maximum value is %i", | ||||||
value, 32768); | ||||||
goto error; | ||||||
} | ||||||
config->max_sessions = (uint32_t) value; | ||||||
|
||||||
// Ensure maxFrameSize is at least the minimum value required by the standard, | ||||||
// and it does not exceed the proton APIs max of INT32_MAX | ||||||
value = (int64_t) qd_entity_get_long(entity, "maxFrameSize"); CHECK(); | ||||||
if (value == 0) { | ||||||
value = 16384; // default | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} else if (value < QD_AMQP_MIN_MAX_FRAME_SIZE || value > INT32_MAX) { | ||||||
(void) qd_error(QD_ERROR_CONFIG, | ||||||
"Invalid maxFrameSize specified (%"PRId64"). Minimum value is %d and maximum value is %"PRIi32, | ||||||
value, QD_AMQP_MIN_MAX_FRAME_SIZE, INT32_MAX); | ||||||
goto error; | ||||||
} | ||||||
config->max_frame_size = (uint32_t) value; | ||||||
|
||||||
// Ensure that maxSessionFrames does not exceed the proton APIs max of INT32_MAX | ||||||
value = (int64_t) qd_entity_opt_long(entity, "maxSessionFrames", 0); CHECK(); | ||||||
if (value == 0) { | ||||||
// Use a sane default. Allow router to router links more capacity than AMQP application links | ||||||
if (strcmp(config->role, "normal") == 0) { | ||||||
value = qd_session_max_in_win_user / config->max_frame_size; | ||||||
} else { | ||||||
// 32-bit systems have an upper bound to the capacity | ||||||
uint64_t max_32bit_capacity = (uint64_t)MAX_32BIT_CAPACITY; | ||||||
uint64_t capacity = (uint64_t)config->max_frame_size * (uint64_t)ssn_frames; | ||||||
if (capacity <= max_32bit_capacity) { | ||||||
config->incoming_capacity = (size_t)capacity; | ||||||
} else { | ||||||
config->incoming_capacity = MAX_32BIT_CAPACITY; | ||||||
uint64_t actual_frames = max_32bit_capacity / (uint64_t)config->max_frame_size; | ||||||
|
||||||
qd_log(LOG_CONN_MGR, QD_LOG_WARNING, | ||||||
"Server configuration for I/O adapter entity name:'%s', host:'%s', port:'%s', " | ||||||
"requested maxSessionFrames truncated from %" PRId64 " to %" PRId64, | ||||||
config->name, config->host, config->port, ssn_frames, actual_frames); | ||||||
} | ||||||
value = qd_session_max_in_win_trunk / config->max_frame_size; | ||||||
} | ||||||
// Ensure the window is at least 2 frames to allow a non-zero low water mark | ||||||
value = MAX(value, 2); | ||||||
} else if (value < 2 || value > INT32_MAX) { | ||||||
(void) qd_error(QD_ERROR_CONFIG, | ||||||
"Invalid maxSessionFrames specified (%"PRId64"). Minimum value is 2 and maximum value is %"PRIi32, | ||||||
value, INT32_MAX); | ||||||
goto error; | ||||||
} | ||||||
config->session_max_in_window = (uint32_t) value; | ||||||
|
||||||
// | ||||||
// For now we are hardwiring this attribute to true. If there's an outcry from the | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -199,7 +199,7 @@ typedef struct qd_server_config_t { | |||||
int inter_router_cost; | ||||||
|
||||||
/** | ||||||
* The maximum size of an AMQP frame in octets. | ||||||
* The maximum size of an AMQP frame in octets. Frome maxFrameSize configuration attribute. | ||||||
*/ | ||||||
uint32_t max_frame_size; | ||||||
|
||||||
|
@@ -209,12 +209,14 @@ typedef struct qd_server_config_t { | |||||
uint32_t max_sessions; | ||||||
|
||||||
/** | ||||||
* The incoming capacity value is calculated to be (sessionMaxFrames * maxFrameSize). | ||||||
* In a round about way the calculation forces the AMQP Begin/incoming-capacity value | ||||||
* to equal the specified sessionMaxFrames value measured in units of transfer frames. | ||||||
* This calculation is done to satisfy proton pn_session_set_incoming_capacity(). | ||||||
* The session incoming window limit in frames. From maxSessionFrames configuration attribute | ||||||
* | ||||||
* The window indicates the maximum number of incoming *frames* that the session will buffer. This places a limit on | ||||||
* the amount of memory the router needs to reserve to accomodate data arriving from the peer session endpoint. | ||||||
* | ||||||
* The maximum amount of memory is computed as (max_frame_size * session_max_in_window) | ||||||
*/ | ||||||
size_t incoming_capacity; | ||||||
uint32_t session_max_in_window; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
/** | ||||||
* The idle timeout, in seconds. If the peer sends no data frames in this many seconds, the | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.