diff --git a/CMakeLists.txt b/CMakeLists.txt index dc3395853..a53efc64e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -141,6 +141,14 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetB set(EVENT_LOOP_DEFINE "KQUEUE") set(USE_S2N ON) +elseif(CMAKE_SYSTEM_NAME STREQUAL "QNX") + file(GLOB AWS_IO_OS_SRC + "source/qnx/*.c" + "source/posix/*.c" + ) + set(EVENT_LOOP_DEFINE "IONOTIFY") + set(USE_S2N ON) + list(APPEND PLATFORM_LIBS "socket") endif() if (BYO_CRYPTO) @@ -231,7 +239,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}-config.cmake" DESTINATION "${LIBRARY_DIRECTORY}/${PROJECT_NAME}/cmake/" COMPONENT Development) -if (NOT CMAKE_CROSSCOMPILING) +if (NOT CMAKE_CROSSCOMPILING OR AWS_BUILD_QNX_TESTS) if (BUILD_TESTING) add_subdirectory(tests) endif() diff --git a/include/aws/io/io.h b/include/aws/io/io.h index 011e1a779..d39d42cf2 100644 --- a/include/aws/io/io.h +++ b/include/aws/io/io.h @@ -14,12 +14,37 @@ AWS_PUSH_SANE_WARNING_LEVEL #define AWS_C_IO_PACKAGE_ID 1 +struct aws_io_handle; + +struct aws_event_loop; + +/** + * Results of the I/O operation(s) performed on the aws_io_handle. + */ +struct aws_io_handle_io_op_result { + size_t read_bytes; + size_t written_bytes; + /** Error codes representing generic errors happening on I/O handles. */ + int error_code; + /** Error codes specific to reading operations. */ + int read_error_code; + /** Error codes specific to writing operations. */ + int write_error_code; +}; + +typedef void(aws_io_handle_update_io_results_fn)( + struct aws_event_loop *, + struct aws_io_handle *, + const struct aws_io_handle_io_op_result *); + struct aws_io_handle { union { int fd; void *handle; } data; void *additional_data; + /* Optional callback to return results of I/O operations performed on this handle. */ + aws_io_handle_update_io_results_fn *update_io_result; }; enum aws_io_message_type { diff --git a/source/posix/pipe.c b/source/posix/pipe.c index f727b021c..f5a72c8d6 100644 --- a/source/posix/pipe.c +++ b/source/posix/pipe.c @@ -278,9 +278,22 @@ int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_b if (read_val < 0) { int errno_value = errno; /* Always cache errno before potential side-effect */ if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { + if (read_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); + } return aws_raise_error(AWS_IO_READ_WOULD_BLOCK); } return s_raise_posix_error(errno_value); + } else if (read_val == 0) { + if (read_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.error_code = AWS_IO_SOCKET_CLOSED; + read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result); + } } /* Success */ @@ -454,6 +467,14 @@ static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) { if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { /* The pipe is no longer writable. Bail out */ write_impl->is_writable = false; + + if (write_impl->handle.update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; + write_impl->handle.update_io_result(write_impl->event_loop, &write_impl->handle, &io_op_result); + } + return; } diff --git a/source/posix/socket.c b/source/posix/socket.c index dbbf62657..1c1b5a617 100644 --- a/source/posix/socket.c +++ b/source/posix/socket.c @@ -476,6 +476,14 @@ static void s_socket_connect_event( "id=%p fd=%d: spurious event, waiting for another notification.", (void *)socket_args->socket, handle->data.fd); + + if (handle->update_io_result) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + handle->update_io_result(event_loop, handle, &io_op_result); + } + return; } @@ -955,6 +963,9 @@ static void s_socket_accept_event( AWS_LOGF_DEBUG( AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd); + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) { int in_fd = 0; while (socket_impl->continue_accept && in_fd != -1) { @@ -966,12 +977,14 @@ static void s_socket_accept_event( int errno_value = errno; /* Always cache errno before potential side-effect */ if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) { + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; break; } int aws_error = aws_socket_get_error(socket); aws_raise_error(aws_error); s_on_connection_error(socket, aws_error); + io_op_result.read_error_code = aws_error; break; } @@ -1064,6 +1077,10 @@ static void s_socket_accept_event( } } + if (handle->update_io_result) { + handle->update_io_result(event_loop, handle, &io_op_result); + } + AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p fd=%d: finished processing incoming connections, " @@ -1632,6 +1649,9 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc bool parent_request_failed = false; bool pushed_to_written_queue = false; + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + /* if a close call happens in the middle, this queue will have been cleaned out from under us. */ while (!aws_linked_list_empty(&socket_impl->write_queue)) { struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue); @@ -1660,6 +1680,7 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc if (errno_value == EAGAIN) { AWS_LOGF_TRACE( AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd); + io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK; break; } @@ -1672,6 +1693,7 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc aws_error = AWS_IO_SOCKET_CLOSED; aws_raise_error(aws_error); purge = true; + io_op_result.write_error_code = aws_error; break; } @@ -1684,9 +1706,12 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc errno_value); aws_error = s_determine_socket_error(errno_value); aws_raise_error(aws_error); + io_op_result.write_error_code = aws_error; break; } + io_op_result.written_bytes += (size_t)written; + size_t remaining_to_write = write_request->cursor_cpy.len; aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written); @@ -1732,6 +1757,10 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task); } + if (socket->io_handle.update_io_result) { + socket->io_handle.update_io_result(socket->event_loop, &socket->io_handle, &io_op_result); + } + /* Only report error if aws_socket_write() invoked this function and its write_request failed */ if (!parent_request_failed) { return AWS_OP_SUCCESS; diff --git a/source/qnx/ionotify_event_loop.c b/source/qnx/ionotify_event_loop.c new file mode 100644 index 000000000..a36c0eadd --- /dev/null +++ b/source/qnx/ionotify_event_loop.c @@ -0,0 +1,1202 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include + +static void s_destroy(struct aws_event_loop *event_loop); +static int s_run(struct aws_event_loop *event_loop); +static int s_stop(struct aws_event_loop *event_loop); +static int s_wait_for_stop_completion(struct aws_event_loop *event_loop); +static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task); +static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos); +static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task); +static int s_subscribe_to_io_events( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + aws_event_loop_on_event_fn *on_event, + void *user_data); +static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle); +static void s_free_io_event_resources(void *user_data); +static bool s_is_on_callers_thread(struct aws_event_loop *event_loop); + +static void aws_event_loop_thread(void *args); + +static struct aws_event_loop_vtable s_vtable = { + .destroy = s_destroy, + .run = s_run, + .stop = s_stop, + .wait_for_stop_completion = s_wait_for_stop_completion, + .schedule_task_now = s_schedule_task_now, + .schedule_task_future = s_schedule_task_future, + .cancel_task = s_cancel_task, + .subscribe_to_io_events = s_subscribe_to_io_events, + .unsubscribe_from_io_events = s_unsubscribe_from_io_events, + .free_io_event_resources = s_free_io_event_resources, + .is_on_callers_thread = s_is_on_callers_thread, +}; + +enum aws_ionotify_event_loop_state { AIELS_BASE_UNINITIALIZED, AIELS_LOOP_STOPPED, AIELS_LOOP_STARTED }; + +enum aws_ionotify_subscription_state { AISS_INITIATED, AISS_SIGEVENT_REGISTERED }; + +struct aws_ionotify_event_loop { + struct aws_allocator *allocator; + struct aws_event_loop base; + + struct aws_task_scheduler scheduler; + struct aws_thread thread_created_on; + struct aws_thread_options thread_options; + aws_thread_id_t thread_joined_to; + struct aws_atomic_var running_thread_id; + struct aws_atomic_var event_loop_state; + + /* Channel to receive I/O events. Resource managers open connections to this channel to send their events. */ + int io_events_channel_id; + /* Connection to the events channel opened by the event loop. It's used by ionotify and some event loop logic (e.g. + * cross-thread and I/O results notifications) to send pulses to the events channel. */ + int pulse_connection_id; + struct aws_mutex task_pre_queue_mutex; + struct aws_linked_list task_pre_queue; + struct aws_task stop_task; + struct aws_atomic_var stop_task_ptr; + bool should_continue; + /* ionotify forces to choose one of the following as user-provided data associated with each received event: + * 1. A pointer. But events won't contain the triggered flags (i.e. your code has to figure out itself if it was + * _NOTIFY_COND_INPUT or _NOTIFY_COND_HUP). + * 2. Some bits of a special field of type int (28 bits on x86_64). QNX will use the remaining bits (4 bits in + * QNX 8.0) in this field to specify the types of the triggered events. + * + * Since event loop must know the types of received I/O events, the second options is used. 28-bit IDs are mapped to + * each subscribed aws_io_handle. The mapping is stored in this hash table. */ + struct aws_hash_table handles; + int last_handle_id; +}; + +/* Data associated with a subscribed I/O handle. */ +struct aws_ionotify_handle_data { + struct aws_allocator *alloc; + struct aws_io_handle *handle; + struct aws_event_loop *event_loop; + aws_event_loop_on_event_fn *on_event; + int events_subscribed; + int events_to_resubscribe; + /* A QNX event notification can use only 4 bits for I/O event types (input data, output data, out-of-band data, and + * extended flag indicating that additional events happened). So, the latest_io_event_types field contains these + * additional event types converted to CRT event loop domain (enum aws_io_event_type). */ + int latest_io_event_types; + /* Connection opened on the events channel. Used to send pulses to the main event loop. */ + int pulse_connection_id; + struct sigevent event; + void *user_data; + struct aws_task subscribe_task; + struct aws_task resubscribe_task; + struct aws_task cleanup_task; + /* ID with a value that can fit into pulse user data field (only _NOTIFY_DATA_MASK bits can be used). */ + int handle_id; + /* False when handle is unsubscribed, but this struct hasn't been cleaned up yet. */ + bool is_subscribed; + enum aws_ionotify_subscription_state subscription_state; +}; + +/* SI_NOTIFY is a QNX special sigev code requesting resource managers to return active event type along with the event + * itself. */ +static short IO_EVENT_PULSE_SIGEV_CODE = SI_NOTIFY; +static short CROSS_THREAD_PULSE_SIGEV_CODE = _PULSE_CODE_MINAVAIL; +static short IO_EVENT_KICKSTART_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 1; +static short IO_EVENT_UPDATE_ERROR_SIGEV_CODE = _PULSE_CODE_MINAVAIL + 2; + +static void s_destroy_ionotify_event_loop(struct aws_ionotify_event_loop *ionotify_event_loop); + +/* Setup edge triggered ionotify with a scheduler. */ +struct aws_event_loop *aws_event_loop_new_default_with_options( + struct aws_allocator *allocator, + const struct aws_event_loop_options *options) { + AWS_PRECONDITION(options); + AWS_PRECONDITION(options->clock); + + struct aws_ionotify_event_loop *ionotify_event_loop = + aws_mem_calloc(allocator, 1, sizeof(struct aws_ionotify_event_loop)); + ionotify_event_loop->allocator = allocator; + struct aws_event_loop *base_event_loop = &ionotify_event_loop->base; + + aws_atomic_store_int_explicit( + &ionotify_event_loop->event_loop_state, AIELS_BASE_UNINITIALIZED, aws_memory_order_relaxed); + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered ionotify", (void *)base_event_loop); + if (aws_event_loop_init_base(&ionotify_event_loop->base, allocator, options->clock)) { + goto error; + } + + aws_atomic_store_int_explicit(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STOPPED, aws_memory_order_relaxed); + + if (options->thread_options) { + ionotify_event_loop->thread_options = *options->thread_options; + } else { + ionotify_event_loop->thread_options = *aws_default_thread_options(); + } + + /* initialize thread id to NULL, it should be updated when the event loop thread starts. */ + aws_atomic_init_ptr(&ionotify_event_loop->running_thread_id, NULL); + + aws_linked_list_init(&ionotify_event_loop->task_pre_queue); + ionotify_event_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT; + aws_atomic_init_ptr(&ionotify_event_loop->stop_task_ptr, NULL); + + if (aws_thread_init(&ionotify_event_loop->thread_created_on, allocator)) { + goto error; + } + + /* Setup QNX channel to receive events from resource managers. */ + ionotify_event_loop->io_events_channel_id = ChannelCreate(0); + int errno_value = errno; /* Always cache errno before potential side-effect */ + if (ionotify_event_loop->io_events_channel_id == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, "id=%p: ChannelCreate failed with errno %d\n", (void *)base_event_loop, errno_value); + goto error; + } + AWS_LOGF_DEBUG( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Opened QNX channel with ID %d", + (void *)base_event_loop, + ionotify_event_loop->io_events_channel_id); + + /* Open connection over the QNX channel for sending pulses. */ + int owner_pid = 0; /* PID of the owner of the channel, 0 means the calling process. */ + ionotify_event_loop->pulse_connection_id = + ConnectAttach(0 /* reserved */, owner_pid, ionotify_event_loop->io_events_channel_id, _NTO_SIDE_CHANNEL, 0); + errno_value = errno; /* Always cache errno before potential side-effect */ + if (ionotify_event_loop->pulse_connection_id == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: ConnectAttach failed with errno %d\n", + (void *)ionotify_event_loop, + errno_value); + goto error; + } + + if (aws_task_scheduler_init(&ionotify_event_loop->scheduler, allocator)) { + AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: aws_task_scheduler_init failed\n", (void *)base_event_loop); + goto error; + } + + if (aws_hash_table_init(&ionotify_event_loop->handles, allocator, 32, aws_hash_ptr, aws_ptr_eq, NULL, NULL)) { + goto error; + } + + ionotify_event_loop->should_continue = false; + + ionotify_event_loop->base.impl_data = ionotify_event_loop; + ionotify_event_loop->base.vtable = &s_vtable; + + return &ionotify_event_loop->base; + +error: + s_destroy_ionotify_event_loop(ionotify_event_loop); + return NULL; +} + +static void s_destroy_ionotify_event_loop(struct aws_ionotify_event_loop *ionotify_event_loop) { + struct aws_event_loop *base_event_loop = &ionotify_event_loop->base; + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)base_event_loop); + + int event_loop_state = (int)aws_atomic_load_int(&ionotify_event_loop->event_loop_state); + + if (event_loop_state == AIELS_LOOP_STARTED) { + /* we don't know if stop() has been called by someone else, + * just call stop() again and wait for event-loop to finish. */ + aws_event_loop_stop(base_event_loop); + s_wait_for_stop_completion(base_event_loop); + } + + if (aws_hash_table_is_valid(&ionotify_event_loop->handles)) { + aws_hash_table_clean_up(&ionotify_event_loop->handles); + } + + if (aws_task_scheduler_is_valid(&ionotify_event_loop->scheduler)) { + /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */ + ionotify_event_loop->thread_joined_to = aws_thread_current_thread_id(); + aws_atomic_store_ptr(&ionotify_event_loop->running_thread_id, &ionotify_event_loop->thread_joined_to); + aws_task_scheduler_clean_up(&ionotify_event_loop->scheduler); + + while (!aws_linked_list_empty(&ionotify_event_loop->task_pre_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&ionotify_event_loop->task_pre_queue); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED); + } + } + + if (ionotify_event_loop->pulse_connection_id != 0 && ionotify_event_loop->pulse_connection_id != -1) { + int rc = ConnectDetach(ionotify_event_loop->pulse_connection_id); + int errno_value = errno; + if (rc == -1) { + AWS_LOGF_WARN( + AWS_LS_IO_EVENT_LOOP, + "id=%p: ConnectDetach failed with errno %d", + (void *)base_event_loop, + errno_value); + } + } + + if (ionotify_event_loop->io_events_channel_id != 0 && ionotify_event_loop->io_events_channel_id != -1) { + int rc = ChannelDestroy(ionotify_event_loop->io_events_channel_id); + int errno_value = errno; + if (rc == -1) { + AWS_LOGF_WARN( + AWS_LS_IO_EVENT_LOOP, + "id=%p: ChannelDestroy failed with errno %d", + (void *)base_event_loop, + errno_value); + } + } + + aws_thread_clean_up(&ionotify_event_loop->thread_created_on); + + if (event_loop_state != AIELS_BASE_UNINITIALIZED) { + aws_event_loop_clean_up_base(base_event_loop); + } + + aws_mem_release(ionotify_event_loop->allocator, ionotify_event_loop); +} + +static void s_destroy(struct aws_event_loop *event_loop) { + s_destroy_ionotify_event_loop(event_loop->impl_data); +} + +static int s_run(struct aws_event_loop *event_loop) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + int current_state = (int)aws_atomic_load_int(&ionotify_event_loop->event_loop_state); + if (current_state != AIELS_LOOP_STOPPED) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Failed to start event-loop thread: event loop state is %d", + (void *)event_loop, + current_state); + return AWS_OP_ERR; + } + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop); + + ionotify_event_loop->should_continue = true; + aws_thread_increment_unjoined_count(); + if (aws_thread_launch( + &ionotify_event_loop->thread_created_on, + &aws_event_loop_thread, + event_loop, + &ionotify_event_loop->thread_options)) { + + aws_thread_decrement_unjoined_count(); + AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Thread creation failed.", (void *)event_loop); + ionotify_event_loop->should_continue = false; + return AWS_OP_ERR; + } + + aws_atomic_store_int(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STARTED); + + return AWS_OP_SUCCESS; +} + +static void s_stop_task(struct aws_task *task, void *args, enum aws_task_status status) { + (void)task; + struct aws_event_loop *event_loop = args; + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + /* now okay to reschedule stop tasks. */ + aws_atomic_store_ptr(&ionotify_event_loop->stop_task_ptr, NULL); + if (status == AWS_TASK_STATUS_RUN_READY) { + /* this allows the event loop to invoke the callback once the event loop has completed. */ + ionotify_event_loop->should_continue = false; + } +} + +static int s_stop(struct aws_event_loop *event_loop) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + void *expected_ptr = NULL; + bool update_succeeded = aws_atomic_compare_exchange_ptr( + &ionotify_event_loop->stop_task_ptr, &expected_ptr, &ionotify_event_loop->stop_task); + if (!update_succeeded) { + /* the stop task is already scheduled. */ + return AWS_OP_SUCCESS; + } + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread", (void *)event_loop); + aws_task_init(&ionotify_event_loop->stop_task, s_stop_task, event_loop, "ionotify_event_loop_stop"); + s_schedule_task_now(event_loop, &ionotify_event_loop->stop_task); + + return AWS_OP_SUCCESS; +} + +static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + int result = aws_thread_join(&ionotify_event_loop->thread_created_on); + aws_thread_decrement_unjoined_count(); + aws_atomic_store_int(&ionotify_event_loop->event_loop_state, AIELS_LOOP_STOPPED); + return result; +} + +static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + /* if event loop and the caller are the same thread, just schedule and be done with it. */ + if (s_is_on_callers_thread(event_loop)) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Scheduling task %p in-thread for timestamp %llu", + (void *)event_loop, + (void *)task, + (unsigned long long)run_at_nanos); + if (run_at_nanos == 0) { + /* zero denotes "now" task */ + aws_task_scheduler_schedule_now(&ionotify_event_loop->scheduler, task); + } else { + aws_task_scheduler_schedule_future(&ionotify_event_loop->scheduler, task, run_at_nanos); + } + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Scheduling task %p cross-thread for timestamp %llu", + (void *)event_loop, + (void *)task, + (unsigned long long)run_at_nanos); + task->timestamp = run_at_nanos; + + aws_mutex_lock(&ionotify_event_loop->task_pre_queue_mutex); + bool is_first_task = aws_linked_list_empty(&ionotify_event_loop->task_pre_queue); + aws_linked_list_push_back(&ionotify_event_loop->task_pre_queue, &task->node); + aws_mutex_unlock(&ionotify_event_loop->task_pre_queue_mutex); + + /* If the list was not empty, we already sent a cross-thread pulse. No need to send it again. */ + if (is_first_task) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Waking up event-loop thread by sending pulse to connection ID %d", + (void *)event_loop, + ionotify_event_loop->pulse_connection_id); + /* The pulse itself is enough for cross-thread notifications. */ + int user_data_value = 0; + int rc = + MsgSendPulse(ionotify_event_loop->pulse_connection_id, -1, CROSS_THREAD_PULSE_SIGEV_CODE, user_data_value); + int errno_value = errno; + if (rc == -1) { + /* The task was scheduled, but we couldn't notify the main loop about it. According to QNX docs, inability + * to send a pulse indicates that there is no available memory left for the process. Not notifying the loop + * is the minor thing in such a scenario. So, just log the error. */ + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Failed to send cross-thread pulse with errno %d", + (void *)event_loop, + errno_value); + } + } +} + +static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) { + s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */); +} + +static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) { + s_schedule_task_common(event_loop, task, run_at_nanos); +} + +static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Cancelling task %p", (void *)event_loop, (void *)task); + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + aws_task_scheduler_cancel_task(&ionotify_event_loop->scheduler, task); +} + +/* Map aws_ionotify_handle_data to internal ID. */ +static int s_add_handle( + struct aws_ionotify_event_loop *ionotify_event_loop, + struct aws_ionotify_handle_data *handle_data) { + AWS_FATAL_ASSERT(s_is_on_callers_thread(handle_data->event_loop)); + + /* Special constant, _NOTIFY_DATA_MASK, limits the maximum value that can be used as user data in I/O events. */ + int max_handle_id = _NOTIFY_DATA_MASK; + + if (AWS_UNLIKELY(aws_hash_table_get_entry_count(&ionotify_event_loop->handles) == (size_t)max_handle_id)) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Maximum number of registered handles reached", + (void *)handle_data->event_loop); + return AWS_OP_ERR; + } + + struct aws_hash_element *elem = NULL; + int next_handle_id = ionotify_event_loop->last_handle_id; + int was_created = 0; + do { + ++next_handle_id; + if (next_handle_id > max_handle_id) { + next_handle_id = 1; + } + aws_hash_table_create(&ionotify_event_loop->handles, (void *)next_handle_id, &elem, &was_created); + /* next_handle_id is already present in the hash table, skip it. */ + if (was_created == 0) { + elem = NULL; + } + } while (elem == NULL); + + handle_data->handle_id = next_handle_id; + ionotify_event_loop->last_handle_id = next_handle_id; + elem->value = handle_data; + + return AWS_OP_SUCCESS; +} + +struct aws_ionotify_handle_data *s_find_handle( + struct aws_event_loop *event_loop, + struct aws_ionotify_event_loop *ionotify_event_loop, + int handle_id) { + AWS_FATAL_ASSERT(s_is_on_callers_thread(event_loop)); + (void)event_loop; + struct aws_ionotify_handle_data *handle_data = NULL; + struct aws_hash_element *elem = NULL; + aws_hash_table_find(&ionotify_event_loop->handles, (void *)handle_id, &elem); + if (elem != NULL) { + handle_data = elem->value; + } + return handle_data; +} + +static void s_remove_handle( + struct aws_event_loop *event_loop, + struct aws_ionotify_event_loop *ionotify_event_loop, + int handle_id) { + AWS_FATAL_ASSERT(s_is_on_callers_thread(event_loop)); + (void)event_loop; + aws_hash_table_remove(&ionotify_event_loop->handles, (void *)handle_id, NULL, NULL); +} + +static void s_process_ionotify( + struct aws_ionotify_event_loop *ionotify_event_loop, + struct aws_ionotify_handle_data *handle_data, + int event_mask) { + + struct aws_event_loop *event_loop = &ionotify_event_loop->base; + + /* Arm resource manager associated with a given file descriptor in edge-triggered mode. + * After this call, a corresponding resource manager starts sending events. */ + int rc = ionotify(handle_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, event_mask, &handle_data->event); + int errno_value = errno; + if (rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to subscribe to I/O events, errno %d", + (void *)event_loop, + handle_data->handle->data.fd, + errno_value); + handle_data->on_event(event_loop, handle_data->handle, AWS_IO_EVENT_TYPE_ERROR, handle_data->user_data); + return; + } + + /* ionotify can return active conditions if they are among specified. Send notification to kick-start processing fd + * if it has desired conditions. */ + + /* User-provided field has no space for extended conditions, so set field in aws_ionotify_handle_data. */ + if (rc & (_NOTIFY_CONDE_ERR | _NOTIFY_CONDE_NVAL)) { + handle_data->latest_io_event_types |= AWS_IO_EVENT_TYPE_ERROR; + } + if (rc & _NOTIFY_CONDE_HUP) { + handle_data->latest_io_event_types |= AWS_IO_EVENT_TYPE_CLOSED; + } + + if ((rc & (_NOTIFY_COND_OBAND | _NOTIFY_COND_INPUT | _NOTIFY_COND_OUTPUT)) || handle_data->latest_io_event_types) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Sending a kick-start pulse because fd has desired I/O conditions (rc is %d)", + (void *)event_loop, + handle_data->handle->data.fd, + rc); + /* Set _NOTIFY_COND_MASK low bits to ID, the same as ionotify does, so the main loop can process all pulses in + * unified manner. */ + int kick_start_event_mask = rc & _NOTIFY_COND_MASK; + kick_start_event_mask |= handle_data->handle_id; + int send_rc = MsgSendPulse( + ionotify_event_loop->pulse_connection_id, -1, IO_EVENT_KICKSTART_SIGEV_CODE, kick_start_event_mask); + if (send_rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to send a kick-start pulse", + (void *)event_loop, + handle_data->handle->data.fd); + } + + /* QNX resource manager for POSIX pipes has a bug/undocumented behavior when under specific conditions it stops + * sending requested events. Below are more details. + * + * First, a quote from the ionotify docs for _NOTIFY_ACTION_EDGEARM: + * Conditions are considered as met only if a change occurs since the last call to + * ionotify(..., _NOTIFY_ACTION_EDGEARM, ...). Met conditions are returned; a notification is armed for unmet + * conditions. + * + * Now, the issue. If ionotify arms the writing end of the pipe when it has a buffer for data, the returning + * code contains _NOTIFY_COND_OBAND event. This is expected and correct behavior. According to the docs, the + * writing end of the pipe should not be armed in the resource manager in such a case. However, after that, the + * resource manager stops returning _NOTIFY_COND_OBAND for the writing end altogether (i.e. the followup + * ionotify calls does not return _NOTIFY_COND_OBAND). It seems, the resource manager actually arms the writing + * end, but does it incorrectly. + * + * Disarming the met conditions fixes the issue. + * + * NOTE: Sockets are not affected by this issue. Since disarming non-armed conditions shouldn't cause any side + * effects, perform it for everyone. + */ + int active_events = rc & (_NOTIFY_COND_OBAND | _NOTIFY_COND_INPUT | _NOTIFY_COND_OUTPUT); + if (active_events) { + rc = ionotify(handle_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, active_events, NULL); + if (rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to disarm events", + (void *)event_loop, + handle_data->handle->data.fd); + } + } + } +} + +/* Scheduled task that performs the actual subscription using ionotify. */ +static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) { + (void)task; + + /* If task was cancelled, nothing to do. */ + if (status == AWS_TASK_STATUS_CANCELED) { + return; + } + + struct aws_ionotify_handle_data *handle_data = user_data; + struct aws_event_loop *event_loop = handle_data->event_loop; + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Subscribing to events, event mask is %d", + (void *)event_loop, + handle_data->handle->data.fd, + handle_data->events_subscribed); + + /* Map aws_ionotify_handle_data to ID. This ID will be returned with the I/O events from ionotify. */ + if (s_add_handle(ionotify_event_loop, handle_data)) { + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Mapped fd to handle ID %d", + (void *)event_loop, + handle_data->handle->data.fd, + handle_data->handle_id); + /* I/O events from ionotify will be delivered as pulses with a user-defined 28-bit ID. + * SIGEV_PULSE_PRIO_INHERIT means the thread that receives the pulse will run at the initial priority of the + * process. */ + short pulse_priority = SIGEV_PULSE_PRIO_INHERIT; + short pulse_sigev_code = IO_EVENT_PULSE_SIGEV_CODE; + SIGEV_PULSE_INT_INIT( + &handle_data->event, + handle_data->pulse_connection_id, + pulse_priority, + pulse_sigev_code, + handle_data->handle_id); + + /* From the iomgr.h header: + * If extended conditions are requested, and they need to be returned in an armed event, the negative of the + * satisfied conditions are returned in (io_notify_t).i.event.sigev_code. + * Extended conditions are the ones starting with _NOTIFY_CONDE_. + * For that feature to work, special bits in the event structure must be set. */ + handle_data->event.sigev_notify |= SIGEV_FLAG_CODE_UPDATEABLE; + SIGEV_MAKE_UPDATEABLE(&handle_data->event); + + /* The application must register the event by calling MsgRegisterEvent() with the fd processed in ionotify(). + * See: + * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/i/ionotify.html + * https://www.qnx.com/developers/docs/8.0/com.qnx.doc.neutrino.lib_ref/topic/m/msgregisterevent.html + * + * It's enough to register an event only once and then reuse it on followup ionotify rearming calls. + * NOTE: If you create a new sigevent for the same file descriptor, with the same flags, you HAVE to register + * it again. */ + int rc = MsgRegisterEvent(&handle_data->event, handle_data->handle->data.fd); + int errno_value = errno; + if (rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to register sigevent, errno %d", + (void *)event_loop, + handle_data->handle->data.fd, + errno_value); + /* With sigevent not registered in the system, I/O events can't be delivered to the event loop. Notify about + * error via a callback and stop subscribing. */ + handle_data->on_event(event_loop, handle_data->handle, AWS_IO_EVENT_TYPE_ERROR, handle_data->user_data); + return; + } + + handle_data->subscription_state = AISS_SIGEVENT_REGISTERED; + + handle_data->is_subscribed = true; + + /* Everyone is always registered for errors. */ + int event_mask = _NOTIFY_COND_EXTEN | _NOTIFY_CONDE_ERR | _NOTIFY_CONDE_HUP | _NOTIFY_CONDE_NVAL; + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { + event_mask |= _NOTIFY_COND_INPUT; + event_mask |= _NOTIFY_COND_OBAND; + } + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { + event_mask |= _NOTIFY_COND_OUTPUT; + } + + s_process_ionotify(ionotify_event_loop, handle_data, event_mask); +} + +static void s_resubscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) { + (void)task; + + /* If task was cancelled, nothing to do. */ + if (status == AWS_TASK_STATUS_CANCELED) { + return; + } + + struct aws_ionotify_handle_data *handle_data = user_data; + struct aws_event_loop *event_loop = handle_data->event_loop; + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + if (!handle_data->is_subscribed) { + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Resubscribing to events, event mask is %d", + (void *)event_loop, + handle_data->handle->data.fd, + handle_data->events_to_resubscribe); + + handle_data->is_subscribed = true; + + int event_mask = 0; + if (handle_data->events_to_resubscribe & AWS_IO_EVENT_TYPE_READABLE) { + event_mask |= _NOTIFY_COND_INPUT; + event_mask |= _NOTIFY_COND_OBAND; + } + if (handle_data->events_to_resubscribe & AWS_IO_EVENT_TYPE_WRITABLE) { + event_mask |= _NOTIFY_COND_OUTPUT; + } + + handle_data->events_to_resubscribe = 0; + + s_process_ionotify(ionotify_event_loop, handle_data, event_mask); +} + +static void s_process_io_result( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + const struct aws_io_handle_io_op_result *io_op_result) { + + AWS_FATAL_ASSERT(s_is_on_callers_thread(event_loop)); + + AWS_FATAL_ASSERT(handle->additional_data); + struct aws_ionotify_handle_data *handle_data = handle->additional_data; + + if (!handle_data->is_subscribed) { + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Processing I/O operation result: status %d (%s); read status %d (%s); write status %d (%s)", + (void *)event_loop, + handle->data.fd, + io_op_result->error_code, + aws_error_str(io_op_result->error_code), + io_op_result->read_error_code, + aws_error_str(io_op_result->read_error_code), + io_op_result->write_error_code, + aws_error_str(io_op_result->write_error_code)); + + /* ionotify requires resubscribing to events that it delivers. We cannot simply resubscribe on an incoming event, + * because QNX resource managers arm a condition (e.g. socket-is-readable) only if it's not currently present on fd. + * So, we do it here, on getting AWS_IO_READ_WOULD_BLOCK from I/O operation. */ + int events_to_resubscribe = 0; + if (io_op_result->read_error_code == AWS_IO_READ_WOULD_BLOCK) { + events_to_resubscribe |= AWS_IO_EVENT_TYPE_READABLE; + } + if (io_op_result->write_error_code == AWS_IO_READ_WOULD_BLOCK) { + events_to_resubscribe |= AWS_IO_EVENT_TYPE_WRITABLE; + } + + /* Rearm resource manager. */ + if (events_to_resubscribe != 0) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p fd=%d: Got EWOULDBLOCK, rearming fd", (void *)event_loop, handle->data.fd); + /* Mark a newly appeared event for resubscribing and reschedule the resubscribing task in case it's already + * scheduled. */ + handle_data->events_to_resubscribe |= events_to_resubscribe; + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + aws_task_scheduler_cancel_task(&ionotify_event_loop->scheduler, &handle_data->resubscribe_task); + aws_task_scheduler_schedule_now(&ionotify_event_loop->scheduler, &handle_data->resubscribe_task); + } + + /* Event though event loop arms QNX resource managers for all possible error conditions, when the actual error + * occurs, QNX just delivers "fd is READABLE/WRITABLE, ah, and also there is some extended condition, no idea what + * it is" in most cases. The extended condition bit can't be used as an error indicator just by itself, because it's + * also set in success paths as well (e.g. on accepting a new connection). So, the event loop relies on getting + * errors from I/O operations. + * The latest_io_event_types field is used because there is no space for this info in a pulse object. */ + if (io_op_result->error_code == AWS_IO_SOCKET_CLOSED) { + handle_data->latest_io_event_types = AWS_IO_EVENT_TYPE_CLOSED; + } + + /* Notify event loop of error conditions. */ + if (handle_data->latest_io_event_types != 0) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: fd errored, sending UPDATE_ERROR pulse", + (void *)event_loop, + handle_data->handle->data.fd); + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + int send_rc = MsgSendPulse( + ionotify_event_loop->pulse_connection_id, -1, IO_EVENT_UPDATE_ERROR_SIGEV_CODE, handle_data->handle_id); + int errno_value = errno; + if (send_rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to send UPDATE_ERROR pulse, errno %d", + (void *)event_loop, + handle_data->handle->data.fd, + errno_value); + } + } +} + +struct ionotify_io_op_results { + struct aws_io_handle_io_op_result io_op_result; + struct aws_event_loop *event_loop; + struct aws_io_handle *handle; +}; + +static void s_update_io_result_task(struct aws_task *task, void *user_data, enum aws_task_status status) { + struct ionotify_io_op_results *ionotify_io_op_results = user_data; + struct aws_event_loop *event_loop = ionotify_io_op_results->event_loop; + + aws_mem_release(event_loop->alloc, task); + + /* If task was cancelled, nothing to do. */ + if (status == AWS_TASK_STATUS_CANCELED) { + aws_mem_release(event_loop->alloc, ionotify_io_op_results); + return; + } + + s_process_io_result(event_loop, ionotify_io_op_results->handle, &ionotify_io_op_results->io_op_result); + + aws_mem_release(event_loop->alloc, ionotify_io_op_results); +} + +/* This callback is called by I/O operations to notify about their results. */ +static void s_update_io_result( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + const struct aws_io_handle_io_op_result *io_op_result) { + + if (!s_is_on_callers_thread(event_loop)) { + /* Move processing I/O operation results to the event loop thread if the operation is performed in another + * thread.*/ + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Got I/O operation result from another thread", + (void *)event_loop, + handle->data.fd); + struct aws_task *task = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct aws_task)); + struct ionotify_io_op_results *ionotify_io_op_results = + aws_mem_calloc(event_loop->alloc, 1, sizeof(struct ionotify_io_op_results)); + ionotify_io_op_results->event_loop = event_loop; + ionotify_io_op_results->handle = handle; + memcpy(&ionotify_io_op_results->io_op_result, io_op_result, sizeof(struct aws_io_handle_io_op_result)); + aws_task_init(task, s_update_io_result_task, ionotify_io_op_results, "ionotify_event_loop_resubscribe_ct"); + s_schedule_task_now(event_loop, task); + return; + } + + s_process_io_result(event_loop, handle, io_op_result); +} + +static int s_subscribe_to_io_events( + struct aws_event_loop *event_loop, + struct aws_io_handle *handle, + int events, + aws_event_loop_on_event_fn *on_event, + void *user_data) { + + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p fd=%d: Initiate subscription to events", (void *)event_loop, handle->data.fd); + struct aws_ionotify_handle_data *handle_data = + aws_mem_calloc(event_loop->alloc, 1, sizeof(struct aws_ionotify_handle_data)); + handle->additional_data = handle_data; + + handle_data->alloc = event_loop->alloc; + handle_data->handle = handle; + handle_data->event_loop = event_loop; + handle_data->on_event = on_event; + handle_data->events_subscribed = events; + handle_data->pulse_connection_id = ionotify_event_loop->pulse_connection_id; + handle_data->user_data = user_data; + handle_data->handle->update_io_result = s_update_io_result; + + aws_task_init(&handle_data->resubscribe_task, s_resubscribe_task, handle_data, "ionotify_event_loop_resubscribe"); + + aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "ionotify_event_loop_subscribe"); + s_schedule_task_now(event_loop, &handle_data->subscribe_task); + + return AWS_OP_SUCCESS; +} + +static void s_free_io_event_resources(void *user_data) { + struct aws_ionotify_handle_data *handle_data = user_data; + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "Releasing aws_ionotify_handle_data at %p", user_data); + aws_mem_release(handle_data->alloc, (void *)handle_data); +} + +static void s_unsubscribe_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)task; + (void)status; + struct aws_ionotify_handle_data *handle_data = (struct aws_ionotify_handle_data *)arg; + + if (handle_data->subscription_state == AISS_SIGEVENT_REGISTERED) { + int rc = MsgUnregisterEvent(&handle_data->event); + int errno_value = errno; + if (rc == -1) { + /* Not much can be done at this point, just log the error. */ + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to unregister sigevent, errno %d", + (void *)handle_data->event_loop, + handle_data->handle->data.fd, + errno_value); + } + } + + s_free_io_event_resources(handle_data); +} + +static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p fd=%d: Unsubscribing from events", (void *)event_loop, handle->data.fd); + + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + AWS_FATAL_ASSERT(handle->additional_data); + struct aws_ionotify_handle_data *handle_data = handle->additional_data; + + /* Disarm resource manager for a given fd. */ + int event_mask = _NOTIFY_COND_EXTEN | _NOTIFY_CONDE_ERR | _NOTIFY_CONDE_HUP | _NOTIFY_CONDE_NVAL; + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) { + event_mask |= _NOTIFY_COND_INPUT; + event_mask |= _NOTIFY_COND_OBAND; + } + if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) { + event_mask |= _NOTIFY_COND_OUTPUT; + } + int rc = ionotify(handle_data->handle->data.fd, _NOTIFY_ACTION_EDGEARM, event_mask, NULL); + int errno_value = errno; + if (rc == -1) { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Failed to unsubscribe from events, errno %d", + (void *)event_loop, + handle_data->handle->data.fd, + errno_value); + } + + /* We can't clean up yet, because we have schedule tasks and more events to process, + * mark it as unsubscribed and schedule a cleanup task. */ + handle_data->is_subscribed = false; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Removing from handles map using ID %d", + (void *)event_loop, + handle->data.fd, + handle_data->handle_id); + s_remove_handle(event_loop, ionotify_event_loop, handle_data->handle_id); + + handle->additional_data = NULL; + handle->update_io_result = NULL; + + /* There might be pending tasks for handle_data, so put a cleanup task. */ + aws_task_init( + &handle_data->cleanup_task, s_unsubscribe_cleanup_task, handle_data, "ionotify_event_loop_unsubscribe_cleanup"); + s_schedule_task_now(event_loop, &handle_data->cleanup_task); + + return AWS_OP_SUCCESS; +} + +static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + aws_thread_id_t *thread_id = aws_atomic_load_ptr(&ionotify_event_loop->running_thread_id); + return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id()); +} + +static void s_process_task_pre_queue(struct aws_event_loop *event_loop) { + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Processing cross-thread tasks", (void *)event_loop); + + struct aws_linked_list task_pre_queue; + aws_linked_list_init(&task_pre_queue); + + aws_mutex_lock(&ionotify_event_loop->task_pre_queue_mutex); + aws_linked_list_swap_contents(&ionotify_event_loop->task_pre_queue, &task_pre_queue); + aws_mutex_unlock(&ionotify_event_loop->task_pre_queue_mutex); + + while (!aws_linked_list_empty(&task_pre_queue)) { + struct aws_linked_list_node *node = aws_linked_list_pop_front(&task_pre_queue); + struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node); + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Task %p pulled to event-loop, scheduling now.", + (void *)event_loop, + (void *)task); + /* Timestamp 0 is used to denote "now" tasks */ + if (task->timestamp == 0) { + aws_task_scheduler_schedule_now(&ionotify_event_loop->scheduler, task); + } else { + aws_task_scheduler_schedule_future(&ionotify_event_loop->scheduler, task, task->timestamp); + } + } +} + +/** + * This just calls MsgReceive(). + * + * We broke this out into its own function so that the stacktrace clearly shows + * what this thread is doing. We've had a lot of cases where users think this + * thread is deadlocked because it's stuck here. We want it to be clear + * that it's doing nothing on purpose. It's waiting for events to happen... + */ +AWS_NO_INLINE +static rcvid_t aws_event_loop_listen_for_io_events( + int io_events_channel_id, + const uint64_t *timeout, + struct _pulse *pulse, + int *errno_value) { + /* Event of type SIGEV_UNBLOCK makes the timed-out kernel call fail with an error of ETIMEDOUT. */ + struct sigevent notify; + SIGEV_UNBLOCK_INIT(¬ify); + int rc = TimerTimeout(CLOCK_MONOTONIC, _NTO_TIMEOUT_RECEIVE, ¬ify, timeout, NULL); + if (rc == -1) { + *errno_value = errno; + return rc; + } + rcvid_t rcvid = MsgReceive(io_events_channel_id, pulse, sizeof(*pulse), NULL); + if (rcvid == -1) { + *errno_value = errno; + } + return rcvid; +} + +static void s_aws_ionotify_cleanup_aws_lc_thread_local_state(void *user_data) { + (void)user_data; + aws_cal_thread_clean_up(); +} + +static void s_process_pulse(struct aws_event_loop *event_loop, const struct _pulse *pulse) { + + int user_data = pulse->value.sival_int; + + int handle_id = user_data & _NOTIFY_DATA_MASK; + if (handle_id == 0) { + AWS_LOGF_ERROR(AWS_LS_IO_EVENT_LOOP, "id=%p: Got pulse with empty handle ID, ignoring it", (void *)event_loop); + return; + } + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Got pulse for handle ID %d", (void *)event_loop, handle_id); + + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + struct aws_ionotify_handle_data *handle_data = s_find_handle(event_loop, ionotify_event_loop, handle_id); + if (handle_data == NULL) { + /* This situation is totally OK when the corresponding fd is already unsubscribed. */ + AWS_LOGF_DEBUG( + AWS_LS_IO_EVENT_LOOP, + "id=%p: No mapped data found for handle ID %d, fd must be already unsubscribed", + (void *)event_loop, + handle_id); + return; + } + + if (!handle_data->is_subscribed) { + return; + } + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: Processing pulse with code %d", + (void *)event_loop, + handle_data->handle->data.fd, + pulse->code); + int event_mask = 0; + if (pulse->value.sival_int & _NOTIFY_COND_OBAND) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: fd got out-of-band data", + (void *)event_loop, + handle_data->handle->data.fd); + event_mask |= AWS_IO_EVENT_TYPE_READABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_INPUT) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p fd=%d: fd is readable", (void *)event_loop, handle_data->handle->data.fd); + event_mask |= AWS_IO_EVENT_TYPE_READABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_OUTPUT) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p fd=%d: fd is writable", (void *)event_loop, handle_data->handle->data.fd); + event_mask |= AWS_IO_EVENT_TYPE_WRITABLE; + } + if (pulse->value.sival_int & _NOTIFY_COND_EXTEN) { + /* "If extended conditions are requested, and they need to be returned in an armed event, the negative of the + * satisfied conditions are returned in (io_notify_t).i.event.sigev_code" - a quote from iomgr.h. + * pulse.code value is changed whenever fd has the _NOTIFY_COND_EXTEN flag. However, not one bit corresponding + * to any extended flag (or its negation) is ever set in this field. */ + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: fd has extended condition, pulse code is %d", + (void *)event_loop, + handle_data->handle->data.fd, + pulse->code); + } + + if (handle_data->latest_io_event_types) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p fd=%d: latest_io_event_types is non-empty", + (void *)event_loop, + handle_data->handle->data.fd); + event_mask |= handle_data->latest_io_event_types; + /* Reset additional I/O event types to not process them twice. */ + handle_data->latest_io_event_types = 0; + } + + handle_data->on_event(event_loop, handle_data->handle, event_mask, handle_data->user_data); +} + +static void aws_event_loop_thread(void *args) { + struct aws_event_loop *event_loop = args; + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Main loop started", (void *)event_loop); + struct aws_ionotify_event_loop *ionotify_event_loop = event_loop->impl_data; + + /* set thread id to the thread of the event loop */ + aws_atomic_store_ptr(&ionotify_event_loop->running_thread_id, &ionotify_event_loop->thread_created_on.thread_id); + + aws_thread_current_at_exit(s_aws_ionotify_cleanup_aws_lc_thread_local_state, NULL); + + /* Default timeout is 100 seconds. */ + static uint64_t DEFAULT_TIMEOUT_NS = 100ULL * AWS_TIMESTAMP_NANOS; + + uint64_t timeout = DEFAULT_TIMEOUT_NS; + + AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Default timeout %" PRIu64, (void *)event_loop, timeout); + + /* Until stop is called: + * - Call MsgReceive. If a task is scheduled, or a file descriptor has activity, it will return. + * - Process all I/O events. + * - Run all scheduled tasks. + * - Process queued subscription cleanups. + */ + while (ionotify_event_loop->should_continue) { + bool should_process_cross_thread_tasks = false; + + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: Waiting for a maximum of %" PRIu64 " ns", (void *)event_loop, timeout); + struct _pulse pulse; + int errno_value; + rcvid_t rcvid = aws_event_loop_listen_for_io_events( + ionotify_event_loop->io_events_channel_id, &timeout, &pulse, &errno_value); + aws_event_loop_register_tick_start(event_loop); + + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Wake up with rcvid %ld\n", (void *)event_loop, rcvid); + if (rcvid == 0) { + if (pulse.code == CROSS_THREAD_PULSE_SIGEV_CODE) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: MsgReceive got cross-thread pulse", (void *)event_loop); + should_process_cross_thread_tasks = true; + } else { + s_process_pulse(event_loop, &pulse); + } + } else if (rcvid > 0) { + AWS_LOGF_WARN(AWS_LS_IO_EVENT_LOOP, "id=%p: Received QNX message, ignoring it\n", (void *)event_loop); + } else { + if (errno_value == ETIMEDOUT) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Woke up by timeout\n", (void *)event_loop); + } else { + AWS_LOGF_ERROR( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Listening for I/O events failed with errno %d", + (void *)event_loop, + errno_value); + } + } + + /* Run scheduled tasks. */ + if (should_process_cross_thread_tasks) { + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Processing prequeued tasks", (void *)event_loop); + s_process_task_pre_queue(event_loop); + } + + uint64_t now_ns = 0; + event_loop->clock(&now_ns); + AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Running scheduled tasks", (void *)event_loop); + aws_task_scheduler_run_all(&ionotify_event_loop->scheduler, now_ns); + + /* Set timeout for next MsgReceive call. + * If clock fails, or scheduler has no tasks, use default timeout. */ + bool use_default_timeout = false; + + if (event_loop->clock(&now_ns)) { + use_default_timeout = true; + } + + uint64_t next_run_time_ns; + if (!aws_task_scheduler_has_tasks(&ionotify_event_loop->scheduler, &next_run_time_ns)) { + use_default_timeout = true; + } + + if (use_default_timeout) { + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, "id=%p: No more scheduled tasks using default timeout.", (void *)event_loop); + timeout = DEFAULT_TIMEOUT_NS; + } else { + timeout = (next_run_time_ns > now_ns) ? (next_run_time_ns - now_ns) : 0; + AWS_LOGF_TRACE( + AWS_LS_IO_EVENT_LOOP, + "id=%p: Detected more scheduled tasks with the next occurring at %" PRIu64 + ", using timeout of %" PRIu64, + (void *)event_loop, + next_run_time_ns, + timeout); + } + + aws_event_loop_register_tick_end(event_loop); + } + + AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: Exiting main loop", (void *)event_loop); + /* set thread id back to NULL. This should be updated again in destroy, before tasks are canceled. */ + aws_atomic_store_ptr(&ionotify_event_loop->running_thread_id, NULL); +} diff --git a/source/s2n/s2n_tls_channel_handler.c b/source/s2n/s2n_tls_channel_handler.c index 355a64b1b..5aded10bd 100644 --- a/source/s2n/s2n_tls_channel_handler.c +++ b/source/s2n/s2n_tls_channel_handler.c @@ -101,6 +101,7 @@ AWS_STATIC_STRING_FROM_LITERAL(s_rhel_path, "/etc/pki/tls/certs"); AWS_STATIC_STRING_FROM_LITERAL(s_android_path, "/system/etc/security/cacerts"); AWS_STATIC_STRING_FROM_LITERAL(s_free_bsd_path, "/usr/local/share/certs"); AWS_STATIC_STRING_FROM_LITERAL(s_net_bsd_path, "/etc/openssl/certs"); +AWS_STATIC_STRING_FROM_LITERAL(s_qnx_path, "/usr/ssl/certs"); AWS_IO_API const char *aws_determine_default_pki_dir(void) { /* debian variants; OpenBSD (although the directory doesn't exist by default) */ @@ -128,6 +129,11 @@ AWS_IO_API const char *aws_determine_default_pki_dir(void) { return aws_string_c_str(s_net_bsd_path); } + /* QNX */ + if (aws_path_exists(s_qnx_path)) { + return aws_string_c_str(s_qnx_path); + } + return NULL; } @@ -137,6 +143,7 @@ AWS_STATIC_STRING_FROM_LITERAL(s_open_suse_ca_file_path, "/etc/ssl/ca-bundle.pem AWS_STATIC_STRING_FROM_LITERAL(s_open_elec_ca_file_path, "/etc/pki/tls/cacert.pem"); AWS_STATIC_STRING_FROM_LITERAL(s_modern_rhel_ca_file_path, "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"); AWS_STATIC_STRING_FROM_LITERAL(s_openbsd_ca_file_path, "/etc/ssl/cert.pem"); +AWS_STATIC_STRING_FROM_LITERAL(s_qnx_ca_file_path, "/usr/ssl/certs/cacert.pem"); AWS_IO_API const char *aws_determine_default_pki_ca_file(void) { /* debian variants */ @@ -169,6 +176,11 @@ AWS_IO_API const char *aws_determine_default_pki_ca_file(void) { return aws_string_c_str(s_openbsd_ca_file_path); } + /* QNX */ + if (aws_path_exists(s_qnx_ca_file_path)) { + return aws_string_c_str(s_qnx_ca_file_path); + } + return NULL; } diff --git a/source/socket_channel_handler.c b/source/socket_channel_handler.c index e8c9c5499..eb8f47568 100644 --- a/source/socket_channel_handler.c +++ b/source/socket_channel_handler.c @@ -140,6 +140,8 @@ static void s_do_read(struct socket_handler *socket_handler) { if (max_to_read == 0) { return; } + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); size_t total_read = 0; size_t read = 0; @@ -153,10 +155,12 @@ static void s_do_read(struct socket_handler *socket_handler) { if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) { last_error = aws_last_error(); aws_mem_release(message->allocator, message); + io_op_result.read_error_code = last_error; break; } total_read += read; + io_op_result.read_bytes += read; AWS_LOGF_TRACE( AWS_LS_IO_SOCKET_HANDLER, "id=%p: read %llu from socket", @@ -166,6 +170,7 @@ static void s_do_read(struct socket_handler *socket_handler) { if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) { last_error = aws_last_error(); aws_mem_release(message->allocator, message); + io_op_result.read_error_code = last_error; break; } } @@ -183,6 +188,7 @@ static void s_do_read(struct socket_handler *socket_handler) { AWS_ASSERT(last_error != 0); if (last_error != AWS_IO_READ_WOULD_BLOCK) { + io_op_result.read_error_code = last_error; aws_channel_shutdown(socket_handler->slot->channel, last_error); } else { AWS_LOGF_TRACE( @@ -190,6 +196,12 @@ static void s_do_read(struct socket_handler *socket_handler) { "id=%p: out of data to read on socket. " "Waiting on event-loop notification.", (void *)socket_handler->slot->handler); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + } + + if (socket_handler->socket->io_handle.update_io_result) { + socket_handler->socket->io_handle.update_io_result( + socket_handler->socket->event_loop, &socket_handler->socket->io_handle, &io_op_result); } return; } @@ -206,6 +218,11 @@ static void s_do_read(struct socket_handler *socket_handler) { &socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_re_read"); aws_channel_schedule_task_now(socket_handler->slot->channel, &socket_handler->read_task_storage); } + + if (socket_handler->socket->io_handle.update_io_result) { + socket_handler->socket->io_handle.update_io_result( + socket_handler->socket->event_loop, &socket_handler->socket->io_handle, &io_op_result); + } } /* the socket is either readable or errored out. If it's readable, kick off s_do_read() to do its thing. */ diff --git a/tests/event_loop_test.c b/tests/event_loop_test.c index e86448c8b..9e51ffc4e 100644 --- a/tests/event_loop_test.c +++ b/tests/event_loop_test.c @@ -313,6 +313,7 @@ AWS_TEST_CASE(event_loop_completion_events, s_test_event_loop_completion_events) #else /* !AWS_USE_IO_COMPLETION_PORTS */ +# include # include int aws_open_nonblocking_posix_pipe(int pipe_fds[2]); @@ -835,6 +836,14 @@ static int s_state_read_until_blocked(struct thread_tester *tester) { uint8_t buffer[512]; while (simple_pipe_read(&tester->read_handle, buffer, sizeof(buffer)) > 0) { } + if (errno == EAGAIN) { + if (tester->read_handle.update_io_result != NULL) { + struct aws_io_handle_io_op_result io_op_result; + AWS_ZERO_STRUCT(io_op_result); + io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK; + tester->read_handle.update_io_result(tester->event_loop, &tester->read_handle, &io_op_result); + } + } return AWS_OP_SUCCESS; } diff --git a/tests/pipe_test.c b/tests/pipe_test.c index 053c5aefd..e057fd87a 100644 --- a/tests/pipe_test.c +++ b/tests/pipe_test.c @@ -429,6 +429,11 @@ static void s_on_readable_event(struct aws_pipe_read_end *read_end, int error_co } s_signal_done_on_read_end_closed(state); } + } else if (error_code == AWS_ERROR_SUCCESS) { + /* Some event loop implementations (only QNX, to be fair) can't detect a pipe closed one of its ends without + * performing operation on the other end. So, this read operation should notify event loop that the writing end + * is closed. */ + aws_pipe_read(&state->read_end, &state->buffers.dst, NULL); } return;