Skip to content

Commit

Permalink
The following changes (as well as changes in shared-util) enable uAMQ…
Browse files Browse the repository at this point in the history
…P server mode when the client is AMQPlite.net.

1. When detecting an invalid header (e.g. due to protocol id 3 being sent as in the case of SASL, per spec the server must reply with its desired header then close the connection. Registering a sent complete callback and then invoke the error handler to clean up.

2. connection_listen opens the underlying IO. Since header detect io is already opened by the endpoint callback, the callbacks are not properly registered causing a prefetch on null later.

3. A link attach request needs to have a link attach response. This was not sent since the link was initialized already to be in HALF_ATTACH state. Remove initial Set and have the session state change callback take care of the attach.

4. Attach properties was not initialized, causing a crash on send attach later.

5. Flow sends incoming id, even though incoming id was not yet initialized in a flow message from client. This is not per spec, as the id is not initialized and thus random, thus client closes the connection. Ensure that incoming id is not added to the flow message on first send.

6. Likewise, on first received flow frame, per spec the window is calculated from the initial outgoing-id of the endpoint (in our case 0).

7. Adding the wsio_open changes required due to xio_open signature change.  Fixing sasl_io to honor the close callback.

All changes were tested using the local server and client samples as well as the amqpnodeserver for fgw. These changes depend on latest develop branch state in amqp-shared-c, so this change also forwards the commit id of the shared submodule.
  • Loading branch information
marcschier committed Mar 3, 2016
1 parent 1b13a97 commit 070db5c
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 124 deletions.
2 changes: 1 addition & 1 deletion inc/wsio.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ typedef struct WSIO_CONFIG_TAG

extern CONCRETE_IO_HANDLE wsio_create(void* io_create_parameters, LOGGER_LOG logger_log);
extern void wsio_destroy(CONCRETE_IO_HANDLE ws_io);
extern int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, ON_BYTES_RECEIVED on_bytes_received, ON_IO_ERROR on_io_error, void* callback_context);
extern int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context);
extern int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context);
extern int wsio_send(CONCRETE_IO_HANDLE ws_io, const void* buffer, size_t size, ON_SEND_COMPLETE on_send_complete, void* callback_context);
extern void wsio_dowork(CONCRETE_IO_HANDLE ws_io);
Expand Down
55 changes: 34 additions & 21 deletions src/header_detect_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ static void indicate_close_complete(HEADER_DETECT_IO_INSTANCE* header_detect_io_
}
}

static void on_underlying_io_error(void* context);
static void on_send_complete_close(void* context, IO_SEND_RESULT send_result)
{
on_underlying_io_error(context);
}

static void on_underlying_io_bytes_received(void* context, const unsigned char* buffer, size_t size)
{
HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)context;
Expand All @@ -69,7 +75,10 @@ static void on_underlying_io_bytes_received(void* context, const unsigned char*
case IO_STATE_WAIT_FOR_HEADER:
if (amqp_header[header_detect_io_instance->header_pos] != buffer[0])
{
header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
/* Send expected header, then close as per spec. We do not care if we fail */
(void)xio_send(header_detect_io_instance->underlying_io, amqp_header, sizeof(amqp_header), on_send_complete_close, context);

header_detect_io_instance->io_state = IO_STATE_NOT_OPEN;
indicate_open_complete(header_detect_io_instance, IO_OPEN_ERROR);
size = 0;
}
Expand Down Expand Up @@ -227,7 +236,12 @@ int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE
{
HEADER_DETECT_IO_INSTANCE* header_detect_io_instance = (HEADER_DETECT_IO_INSTANCE*)header_detect_io;

if (header_detect_io_instance->io_state == IO_STATE_OPEN)
if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN &&
header_detect_io_instance->io_state != IO_STATE_OPEN)
{
result = __LINE__;
}
else
{
header_detect_io_instance->on_bytes_received = on_bytes_received;
header_detect_io_instance->on_io_open_complete = on_io_open_complete;
Expand All @@ -236,25 +250,24 @@ int headerdetectio_open(CONCRETE_IO_HANDLE header_detect_io, ON_IO_OPEN_COMPLETE
header_detect_io_instance->on_io_open_complete_context = on_io_open_complete_context;
header_detect_io_instance->on_io_error_context = on_io_error_context;

result = 0;
}
else if (header_detect_io_instance->io_state != IO_STATE_NOT_OPEN)
{
result = __LINE__;
}
else
{
header_detect_io_instance->header_pos = 0;
header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;

if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
{
result = __LINE__;
}
else
{
result = 0;
}
if (header_detect_io_instance->io_state == IO_STATE_OPEN)
{
indicate_open_complete(header_detect_io_instance, IO_OPEN_OK);
}
else
{
header_detect_io_instance->header_pos = 0;
header_detect_io_instance->io_state = IO_STATE_OPENING_UNDERLYING_IO;

if (xio_open(header_detect_io_instance->underlying_io, on_underlying_io_open_complete, header_detect_io_instance, on_underlying_io_bytes_received, header_detect_io_instance, on_underlying_io_error, header_detect_io_instance) != 0)
{
result = __LINE__;
}
else
{
result = 0;
}
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
result->initial_delivery_count = 0;
result->max_message_size = 0;
result->is_underlying_session_begun = 0;
result->source = amqpvalue_clone(target);
result->attach_properties = NULL;
result->source = amqpvalue_clone(target);
result->target = amqpvalue_clone(source);
if (role == role_sender)
{
Expand Down Expand Up @@ -579,7 +580,6 @@ LINK_HANDLE link_create_from_endpoint(SESSION_HANDLE session, LINK_ENDPOINT_HAND
(void)strcpy(result->name, name);
result->on_link_state_changed = NULL;
result->callback_context = NULL;
set_link_state(result, LINK_STATE_HALF_ATTACHED);
result->link_endpoint = link_endpoint;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/saslclientio.c
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ int saslclientio_open(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_OPEN_COMPLETE on_
return result;
}

int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* on_io_close_complete_context)
{
int result = 0;

Expand All @@ -991,6 +991,9 @@ int saslclientio_close(CONCRETE_IO_HANDLE sasl_client_io, ON_IO_CLOSE_COMPLETE o
{
sasl_client_io_instance->io_state = IO_STATE_CLOSING;

sasl_client_io_instance->on_io_close_complete = on_io_close_complete;
sasl_client_io_instance->on_io_close_complete_context = on_io_close_complete_context;

/* Codes_SRS_SASLCLIENTIO_01_015: [saslclientio_close shall close the underlying io handle passed in saslclientio_create by calling xio_close.] */
if (xio_close(sasl_client_io_instance->underlying_io, on_underlying_io_close_complete, sasl_client_io_instance) != 0)
{
Expand Down
21 changes: 17 additions & 4 deletions src/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,11 @@ static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t p
{
end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint");
}
else
else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
}
else
{
if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target))
{
Expand Down Expand Up @@ -493,8 +497,18 @@ static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t p
uint32_t remote_handle;
transfer_number flow_next_incoming_id;
uint32_t flow_incoming_window;

if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0)
{
/*
If the next-incoming-id field of the flow frame is not set,
then remote-incomingwindow is computed as follows:
initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)
*/
flow_next_incoming_id = session_instance->next_outgoing_id;
}

if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) ||
(flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0) ||
(flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0))
{
flow_destroy(flow_handle);
Expand Down Expand Up @@ -1152,8 +1166,7 @@ int session_send_flow(LINK_ENDPOINT_HANDLE link_endpoint, FLOW_HANDLE flow)

result = 0;

if ((session_instance->session_state == SESSION_STATE_BEGIN_RCVD) ||
((session_instance->session_state == SESSION_STATE_MAPPED)))
if (session_instance->session_state == SESSION_STATE_BEGIN_RCVD)
{
if (flow_set_next_incoming_id(flow, session_instance->next_incoming_id) != 0)
{
Expand Down
38 changes: 22 additions & 16 deletions src/wsio.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ typedef struct PENDING_SOCKET_IO_TAG
typedef struct WSIO_INSTANCE_TAG
{
ON_BYTES_RECEIVED on_bytes_received;
ON_IO_OPEN_COMPLETE on_io_open_complete;
ON_IO_ERROR on_io_error;
LOGGER_LOG logger_log;
void* open_callback_context;
void* on_bytes_received_context;
ON_IO_OPEN_COMPLETE on_io_open_complete;
void* on_io_open_complete_context;
ON_IO_ERROR on_io_error;
void* on_io_error_context;
LOGGER_LOG logger_log;
IO_STATE io_state;
LIST_HANDLE pending_io_list;
struct lws_context* ws_context;
Expand All @@ -60,7 +62,7 @@ static void indicate_error(WSIO_INSTANCE* wsio_instance)
wsio_instance->io_state = IO_STATE_ERROR;
if (wsio_instance->on_io_error != NULL)
{
wsio_instance->on_io_error(wsio_instance->open_callback_context);
wsio_instance->on_io_error(wsio_instance->on_io_error_context);
}
}

Expand All @@ -70,7 +72,7 @@ static void indicate_open_complete(WSIO_INSTANCE* ws_io_instance, IO_OPEN_RESULT
if (ws_io_instance->on_io_open_complete != NULL)
{
/* Codes_SRS_WSIO_01_039: [The callback_context argument shall be passed to on_io_open_complete as is.] */
ws_io_instance->on_io_open_complete(ws_io_instance->open_callback_context, open_result);
ws_io_instance->on_io_open_complete(ws_io_instance->on_io_open_complete_context, open_result);
}
}

Expand Down Expand Up @@ -390,7 +392,7 @@ static int on_ws_callback(struct lws *wsi, enum lws_callback_reasons reason, voi
/* Codes_SRS_WSIO_01_084: [The bytes argument shall point to the received bytes as indicated by the LWS_CALLBACK_CLIENT_RECEIVE in argument.] */
/* Codes_SRS_WSIO_01_085: [The length argument shall be set to the number of received bytes as indicated by the LWS_CALLBACK_CLIENT_RECEIVE len argument.] */
/* Codes_SRS_WSIO_01_086: [The callback_context shall be set to the callback_context that was passed in wsio_open.] */
wsio_instance->on_bytes_received(wsio_instance->open_callback_context, in, len);
wsio_instance->on_bytes_received(wsio_instance->on_bytes_received_context, in, len);
}
}

Expand Down Expand Up @@ -528,11 +530,13 @@ CONCRETE_IO_HANDLE wsio_create(void* io_create_parameters, LOGGER_LOG logger_log
result = amqpalloc_malloc(sizeof(WSIO_INSTANCE));
if (result != NULL)
{
result->on_bytes_received = NULL;
result->on_io_open_complete = NULL;
result->on_io_error = NULL;
result->logger_log = logger_log;
result->open_callback_context = NULL;
result->on_bytes_received = NULL;
result->on_bytes_received_context = NULL;
result->on_io_open_complete = NULL;
result->on_io_open_complete_context = NULL;
result->on_io_error = NULL;
result->on_io_error_context = NULL;
result->logger_log = logger_log;
result->wsi = NULL;
result->ws_context = NULL;

Expand Down Expand Up @@ -678,7 +682,7 @@ void wsio_destroy(CONCRETE_IO_HANDLE ws_io)
}
}

int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, ON_BYTES_RECEIVED on_bytes_received, ON_IO_ERROR on_io_error, void* callback_context)
int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete, void* on_io_open_complete_context, ON_BYTES_RECEIVED on_bytes_received, void* on_bytes_received_context, ON_IO_ERROR on_io_error, void* on_io_error_context)
{
int result = 0;

Expand All @@ -698,9 +702,11 @@ int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete,
else
{
wsio_instance->on_bytes_received = on_bytes_received;
wsio_instance->on_bytes_received_context = on_bytes_received_context;
wsio_instance->on_io_open_complete = on_io_open_complete;
wsio_instance->on_io_open_complete_context = on_io_open_complete_context;
wsio_instance->on_io_error = on_io_error;
wsio_instance->open_callback_context = callback_context;
wsio_instance->on_io_error_context = on_io_error_context;

int ietf_version = -1; /* latest */
struct lws_context_creation_info info;
Expand Down Expand Up @@ -775,7 +781,7 @@ int wsio_open(CONCRETE_IO_HANDLE ws_io, ON_IO_OPEN_COMPLETE on_io_open_complete,
return result;
}

int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* callback_context)
int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_complete, void* on_io_close_complete_context)
{
int result = 0;

Expand Down Expand Up @@ -844,7 +850,7 @@ int wsio_close(CONCRETE_IO_HANDLE ws_io, ON_IO_CLOSE_COMPLETE on_io_close_comple
{
/* Codes_SRS_WSIO_01_047: [The callback on_io_close_complete shall be called after the close action has been completed in the context of wsio_close (wsio_close is effectively blocking).] */
/* Codes_SRS_WSIO_01_048: [The callback_context argument shall be passed to on_io_close_complete as is.] */
on_io_close_complete(callback_context);
on_io_close_complete(on_io_close_complete_context);
}

/* Codes_SRS_WSIO_01_044: [On success wsio_close shall return 0.] */
Expand Down
Loading

0 comments on commit 070db5c

Please sign in to comment.