diff --git a/ext/io/event/selector/uring.c b/ext/io/event/selector/uring.c index 65ca8f5d..d9be022f 100644 --- a/ext/io/event/selector/uring.c +++ b/ext/io/event/selector/uring.c @@ -34,6 +34,7 @@ enum { DEBUG = 0, + DEBUG_COMPLETION = 0, }; static VALUE IO_Event_Selector_URing = Qnil; @@ -165,6 +166,8 @@ struct IO_Event_Selector_URing_Completion * IO_Event_Selector_URing_Completion_a IO_Event_List_clear(&completion->list); } + if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_acquire(%p, limit=%ld)\n", (void*)completion, selector->completions.limit); + waiting->completion = completion; completion->waiting = waiting; @@ -174,6 +177,8 @@ struct IO_Event_Selector_URing_Completion * IO_Event_Selector_URing_Completion_a inline static void IO_Event_Selector_URing_Completion_cancel(struct IO_Event_Selector_URing_Completion *completion) { + if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_cancel(%p)\n", (void*)completion); + if (completion->waiting) { completion->waiting->completion = NULL; completion->waiting = NULL; @@ -183,13 +188,17 @@ void IO_Event_Selector_URing_Completion_cancel(struct IO_Event_Selector_URing_Co inline static void IO_Event_Selector_URing_Completion_release(struct IO_Event_Selector_URing *selector, struct IO_Event_Selector_URing_Completion *completion) { + if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_release(%p)\n", (void*)completion); + IO_Event_Selector_URing_Completion_cancel(completion); IO_Event_List_prepend(&selector->free_list, &completion->list); } inline static -void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing *selector, struct IO_Event_Selector_URing_Waiting *waiting) +void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing_Waiting *waiting) { + if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Waiting_cancel(%p, %p)\n", (void*)waiting, (void*)waiting->completion); + if (waiting->completion) { waiting->completion->waiting = NULL; waiting->completion = NULL; @@ -198,10 +207,13 @@ void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing *sele waiting->fiber = 0; } +struct IO_Event_List_Type IO_Event_Selector_URing_Completion_Type = {}; + void IO_Event_Selector_URing_Completion_initialize(void *element) { struct IO_Event_Selector_URing_Completion *completion = element; IO_Event_List_initialize(&completion->list); + completion->list.type = &IO_Event_Selector_URing_Completion_Type; } void IO_Event_Selector_URing_Completion_free(void *element) @@ -357,6 +369,12 @@ int io_uring_submit_now(struct IO_Event_Selector_URing *selector) { } } +static +void IO_Event_Selector_URing_submit_sqe(struct io_uring_sqe *sqe) +{ + if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_submit_sqe(%p): user_data=%p opcode=%d\n", sqe, (void*)sqe->user_data, sqe->opcode); +} + // Submit a pending operation. This does not submit the operation immediately, but instead defers it to the next call to `io_uring_submit_flush` or `io_uring_submit_now`. This is useful for operations that are not urgent, but should be used with care as it can lead to a deadlock if the submission queue is not flushed. static void io_uring_submit_pending(struct IO_Event_Selector_URing *selector) { @@ -407,7 +425,7 @@ VALUE process_wait_ensure(VALUE _arguments) { close(arguments->descriptor); - IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting); + IO_Event_Selector_URing_Waiting_cancel(arguments->waiting); return Qnil; } @@ -442,6 +460,7 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid, if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber); io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR); io_uring_sqe_set_data(sqe, completion); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_pending(selector); return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments); @@ -485,12 +504,18 @@ static VALUE io_wait_ensure(VALUE _arguments) { struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments; - // We may want to consider cancellation. Be aware that the order of operations is important here: - // io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0); - // io_uring_sqe_set_data(sqe, NULL); - // io_uring_submit_now(selector); + if (DEBUG) fprintf(stderr, "io_wait_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion); - IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting); + // If the operation is still in progress, cancel it: + if (arguments->waiting->completion) { + struct io_uring_sqe *sqe = io_get_sqe(arguments->selector); + io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0); + io_uring_sqe_set_data(sqe, NULL); + IO_Event_Selector_URing_submit_sqe(sqe); + io_uring_submit_now(arguments->selector); + } + + IO_Event_Selector_URing_Waiting_cancel(arguments->waiting); return Qnil; }; @@ -502,6 +527,8 @@ VALUE io_wait_transfer(VALUE _arguments) { IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL); + if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result); + if (arguments->waiting->result) { // We explicitly filter the resulting events based on the requested events. // In some cases, poll will report events we didn't ask for. @@ -531,6 +558,7 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting); io_uring_sqe_set_data(sqe, completion); + IO_Event_Selector_URing_submit_sqe(sqe); // If we are going to wait, we assume that we are waiting for a while: io_uring_submit_pending(selector); @@ -581,10 +609,11 @@ io_read_submit(VALUE _arguments) struct IO_Event_Selector_URing *selector = arguments->selector; struct io_uring_sqe *sqe = io_get_sqe(selector); - if (DEBUG) fprintf(stderr, "io_read_submit:io_uring_prep_read(fiber=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, arguments->descriptor, arguments->buffer, arguments->length); + if (DEBUG) fprintf(stderr, "io_read_submit:io_uring_prep_read(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length); io_uring_prep_read(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor)); io_uring_sqe_set_data(sqe, arguments->waiting->completion); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_now(selector); IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL); @@ -600,16 +629,17 @@ io_read_ensure(VALUE _arguments) struct io_uring_sqe *sqe = io_get_sqe(selector); - if (DEBUG) fprintf(stderr, "io_read_cancel:io_uring_prep_cancel(fiber=%p)\n", (void*)arguments->waiting); + if (DEBUG) fprintf(stderr, "io_read_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion); - // If the operation has already completed, we don't need to cancel it: - if (!arguments->waiting->result) { - io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0); + // If the operation is still in progress, cancel it: + if (arguments->waiting->completion) { + io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0); io_uring_sqe_set_data(sqe, NULL); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_now(selector); } - IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting); + IO_Event_Selector_URing_Waiting_cancel(arguments->waiting); return Qnil; } @@ -704,10 +734,11 @@ io_write_submit(VALUE _argument) struct io_uring_sqe *sqe = io_get_sqe(selector); - if (DEBUG) fprintf(stderr, "io_write_submit:io_uring_prep_write(fiber=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, arguments->descriptor, arguments->buffer, arguments->length); + if (DEBUG) fprintf(stderr, "io_write_submit:io_uring_prep_write(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length); io_uring_prep_write(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor)); io_uring_sqe_set_data(sqe, arguments->waiting->completion); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_pending(selector); IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL); @@ -723,15 +754,17 @@ io_write_ensure(VALUE _argument) struct io_uring_sqe *sqe = io_get_sqe(selector); - if (DEBUG) fprintf(stderr, "io_wait_rescue:io_uring_prep_cancel(%p)\n", (void*)arguments->waiting); + if (DEBUG) fprintf(stderr, "io_write_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion); - if (!arguments->waiting->result) { - io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0); + // If the operation is still in progress, cancel it: + if (arguments->waiting->completion) { + io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0); io_uring_sqe_set_data(sqe, NULL); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_now(selector); } - IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting); + IO_Event_Selector_URing_Waiting_cancel(arguments->waiting); return Qnil; } @@ -829,6 +862,7 @@ VALUE IO_Event_Selector_URing_io_close(VALUE self, VALUE io) { io_uring_prep_close(sqe, descriptor); io_uring_sqe_set_data(sqe, NULL); + IO_Event_Selector_URing_submit_sqe(sqe); io_uring_submit_now(selector); } else { close(descriptor); @@ -919,32 +953,38 @@ unsigned select_process_completions(struct IO_Event_Selector_URing *selector) { unsigned head; struct io_uring_cqe *cqe; + if (DEBUG) fprintf(stderr, "select_process_completions...\n"); + io_uring_for_each_cqe(ring, head, cqe) { + if (DEBUG) fprintf(stderr, "select_process_completions: cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data); + ++completed; - // If the operation was cancelled, or the operation has no user data (fiber): + // If the operation was cancelled, or the operation has no user data: if (cqe->user_data == 0 || cqe->user_data == LIBURING_UDATA_TIMEOUT) { io_uring_cq_advance(ring, 1); continue; } - if (DEBUG) fprintf(stderr, "cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data); - struct IO_Event_Selector_URing_Completion *completion = (void*)cqe->user_data; struct IO_Event_Selector_URing_Waiting *waiting = completion->waiting; + if (DEBUG) fprintf(stderr, "select_process_completions: completion=%p waiting=%p\n", (void*)completion, (void*)waiting); + if (waiting) { waiting->result = cqe->res; waiting->flags = cqe->flags; } - io_uring_cq_advance(ring, 1); - if (waiting && waiting->fiber) { + assert(waiting->result != -ECANCELED); + IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL); } + // This marks the waiting operation as "complete": IO_Event_Selector_URing_Completion_release(selector, completion); + io_uring_cq_advance(ring, 1); } if (DEBUG && completed > 0) fprintf(stderr, "select_process_completions(completed=%d)\n", completed);