Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement event loop based on QNX ionotify #669

Open
wants to merge 41 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
aa4f966
Provide I/O operation status back to event loop
sfodagain Aug 26, 2024
abe7747
Add flag for last io result
sfodagain Aug 26, 2024
ce39822
Merge branch 'main' into support-qnx
sfodagain Aug 27, 2024
8f7cbff
Add callback to io handle
sfodagain Aug 27, 2024
50bca0d
Revert unrelated changes
sfodagain Aug 27, 2024
29d9c04
Use shared sources
sfodagain Aug 27, 2024
ad1262d
fixup
sfodagain Aug 27, 2024
28e55e1
fixup
sfodagain Aug 27, 2024
c5f9e00
Use #if everywhere
sfodagain Aug 27, 2024
4ed5a87
Fix kqueue
sfodagain Aug 27, 2024
619319f
Fix pipe tests
sfodagain Aug 27, 2024
be85d89
Remove AWS_ASSERT, use AWS_ZERO_STRUCT
sfodagain Sep 3, 2024
739e0f6
Remove changes made to kqueue
sfodagain Sep 16, 2024
4036be9
Add ionotify event loop (#670)
sfod Sep 16, 2024
d709f28
Remove pipe test fix
sfodagain Sep 16, 2024
d8a8085
Copy posix stuff to qnx
sfodagain Sep 16, 2024
702312a
fixup
sfodagain Sep 17, 2024
eede270
fixup
sfodagain Sep 17, 2024
8bc5d1b
Fix pipe missing events issue
sfodagain Sep 17, 2024
692d125
Handle unsubscribing in a task
sfodagain Sep 17, 2024
c79772e
Handle is_subscribed only in resubscriptions
sfodagain Sep 17, 2024
2bddf95
Add aws_pipe_read to tests
sfodagain Sep 17, 2024
dd34f87
Fix race condition, fix pulse error code
sfodagain Sep 20, 2024
c4dceea
Use separate task for resubscribing
sfodagain Sep 20, 2024
29f900e
Add QNX paths
sfodagain Sep 20, 2024
a8d8366
Remove non-qnx specifics from posix copies
sfodagain Sep 23, 2024
6af39ec
Improve comments and logging
sfodagain Sep 23, 2024
9c35c72
Fix latest_io_event_types
sfodagain Sep 23, 2024
bc653f6
Fix format
sfodagain Sep 23, 2024
48134e8
Merge posix and qnx sources
sfodagain Sep 24, 2024
4b59301
Fix naming and comments
sfodagain Sep 25, 2024
1700c76
Use single destroy
sfodagain Sep 25, 2024
88aff4d
Use atomic for event loop state
sfodagain Sep 26, 2024
78b4d38
Use AWS_FATAL_ASSERT
sfodagain Sep 26, 2024
22f3e24
Fix log format string
sfodagain Sep 26, 2024
52110fb
Remove strerror
sfodagain Sep 26, 2024
93c0c39
Add MsgUnregisterEvent on unsubscribing
sfodagain Sep 26, 2024
477a071
Fix logs, comments, code style
sfodagain Sep 26, 2024
bda976c
Fix naming
sfodagain Sep 26, 2024
be74788
Refactor cleaning up and resubscribing
sfodagain Sep 29, 2024
5d9f58f
Add comment to processing io results
sfodagain Oct 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetB
set(EVENT_LOOP_DEFINE "KQUEUE")
set(USE_S2N ON)

elseif(CMAKE_SYSTEM_NAME STREQUAL "QNX")
file(GLOB AWS_IO_OS_SRC
"source/qnx/*.c"
"source/posix/*.c"
)
set(EVENT_LOOP_DEFINE "IONOTIFY")
set(USE_S2N ON)
list(APPEND PLATFORM_LIBS "socket")
endif()

if (BYO_CRYPTO)
Expand Down Expand Up @@ -231,7 +239,7 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}-config.cmake"
DESTINATION "${LIBRARY_DIRECTORY}/${PROJECT_NAME}/cmake/"
COMPONENT Development)

if (NOT CMAKE_CROSSCOMPILING)
if (NOT CMAKE_CROSSCOMPILING OR AWS_BUILD_QNX_TESTS)
if (BUILD_TESTING)
add_subdirectory(tests)
endif()
Expand Down
25 changes: 25 additions & 0 deletions include/aws/io/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,37 @@ AWS_PUSH_SANE_WARNING_LEVEL

#define AWS_C_IO_PACKAGE_ID 1

struct aws_io_handle;

struct aws_event_loop;

/**
* Results of the I/O operation(s) performed on the aws_io_handle.
*/
struct aws_io_handle_io_op_result {
size_t read_bytes;
size_t written_bytes;
/** Error codes representing generic errors happening on I/O handles. */
int error_code;
/** Error codes specific to reading operations. */
int read_error_code;
/** Error codes specific to writing operations. */
int write_error_code;
};

typedef void(aws_io_handle_update_io_results_fn)(
struct aws_event_loop *,
struct aws_io_handle *,
const struct aws_io_handle_io_op_result *);

struct aws_io_handle {
union {
int fd;
void *handle;
} data;
void *additional_data;
/* Optional callback to return results of I/O operations performed on this handle. */
aws_io_handle_update_io_results_fn *update_io_result;
};

enum aws_io_message_type {
Expand Down
21 changes: 21 additions & 0 deletions source/posix/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,22 @@ int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_b
if (read_val < 0) {
int errno_value = errno; /* Always cache errno before potential side-effect */
if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
if (read_impl->handle.update_io_result) {
struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result);
}
return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
}
return s_raise_posix_error(errno_value);
} else if (read_val == 0) {
if (read_impl->handle.update_io_result) {
struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);
io_op_result.error_code = AWS_IO_SOCKET_CLOSED;
read_impl->handle.update_io_result(read_impl->event_loop, &read_impl->handle, &io_op_result);
}
}

/* Success */
Expand Down Expand Up @@ -454,6 +467,14 @@ static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
/* The pipe is no longer writable. Bail out */
write_impl->is_writable = false;

if (write_impl->handle.update_io_result) {
struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);
io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK;
write_impl->handle.update_io_result(write_impl->event_loop, &write_impl->handle, &io_op_result);
}

return;
}

Expand Down
29 changes: 29 additions & 0 deletions source/posix/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,14 @@ static void s_socket_connect_event(
"id=%p fd=%d: spurious event, waiting for another notification.",
(void *)socket_args->socket,
handle->data.fd);

if (handle->update_io_result) {
struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
handle->update_io_result(event_loop, handle, &io_op_result);
}

return;
}

Expand Down Expand Up @@ -955,6 +963,9 @@ static void s_socket_accept_event(
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);

struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);

if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
int in_fd = 0;
while (socket_impl->continue_accept && in_fd != -1) {
Expand All @@ -966,12 +977,14 @@ static void s_socket_accept_event(
int errno_value = errno; /* Always cache errno before potential side-effect */

if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
io_op_result.read_error_code = AWS_IO_READ_WOULD_BLOCK;
break;
}

int aws_error = aws_socket_get_error(socket);
aws_raise_error(aws_error);
s_on_connection_error(socket, aws_error);
io_op_result.read_error_code = aws_error;
break;
}

Expand Down Expand Up @@ -1064,6 +1077,10 @@ static void s_socket_accept_event(
}
}

if (handle->update_io_result) {
handle->update_io_result(event_loop, handle, &io_op_result);
}

AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET,
"id=%p fd=%d: finished processing incoming connections, "
Expand Down Expand Up @@ -1632,6 +1649,9 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
bool parent_request_failed = false;
bool pushed_to_written_queue = false;

struct aws_io_handle_io_op_result io_op_result;
AWS_ZERO_STRUCT(io_op_result);

/* if a close call happens in the middle, this queue will have been cleaned out from under us. */
while (!aws_linked_list_empty(&socket_impl->write_queue)) {
struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
Expand Down Expand Up @@ -1660,6 +1680,7 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
if (errno_value == EAGAIN) {
AWS_LOGF_TRACE(
AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
io_op_result.write_error_code = AWS_IO_READ_WOULD_BLOCK;
break;
}

Expand All @@ -1672,6 +1693,7 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
aws_error = AWS_IO_SOCKET_CLOSED;
aws_raise_error(aws_error);
purge = true;
io_op_result.write_error_code = aws_error;
break;
}

Expand All @@ -1684,9 +1706,12 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
errno_value);
aws_error = s_determine_socket_error(errno_value);
aws_raise_error(aws_error);
io_op_result.write_error_code = aws_error;
break;
}

io_op_result.written_bytes += (size_t)written;

size_t remaining_to_write = write_request->cursor_cpy.len;

aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
Expand Down Expand Up @@ -1732,6 +1757,10 @@ static int s_process_socket_write_requests(struct aws_socket *socket, struct soc
aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
}

if (socket->io_handle.update_io_result) {
socket->io_handle.update_io_result(socket->event_loop, &socket->io_handle, &io_op_result);
}

/* Only report error if aws_socket_write() invoked this function and its write_request failed */
if (!parent_request_failed) {
return AWS_OP_SUCCESS;
Expand Down
Loading