Skip to content

Commit

Permalink
Fix a weird bug with duplicate CQEs. (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Aug 23, 2023
1 parent 6fe874f commit 6925550
Showing 1 changed file with 63 additions and 23 deletions.
86 changes: 63 additions & 23 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

enum {
DEBUG = 0,
DEBUG_COMPLETION = 0,
};

static VALUE IO_Event_Selector_URing = Qnil;
Expand Down Expand Up @@ -165,6 +166,8 @@ struct IO_Event_Selector_URing_Completion * IO_Event_Selector_URing_Completion_a
IO_Event_List_clear(&completion->list);
}

if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_acquire(%p, limit=%ld)\n", (void*)completion, selector->completions.limit);

waiting->completion = completion;
completion->waiting = waiting;

Expand All @@ -174,6 +177,8 @@ struct IO_Event_Selector_URing_Completion * IO_Event_Selector_URing_Completion_a
inline static
void IO_Event_Selector_URing_Completion_cancel(struct IO_Event_Selector_URing_Completion *completion)
{
if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_cancel(%p)\n", (void*)completion);

if (completion->waiting) {
completion->waiting->completion = NULL;
completion->waiting = NULL;
Expand All @@ -183,13 +188,17 @@ void IO_Event_Selector_URing_Completion_cancel(struct IO_Event_Selector_URing_Co
inline static
void IO_Event_Selector_URing_Completion_release(struct IO_Event_Selector_URing *selector, struct IO_Event_Selector_URing_Completion *completion)
{
if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Completion_release(%p)\n", (void*)completion);

IO_Event_Selector_URing_Completion_cancel(completion);
IO_Event_List_prepend(&selector->free_list, &completion->list);
}

inline static
void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing *selector, struct IO_Event_Selector_URing_Waiting *waiting)
void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing_Waiting *waiting)
{
if (DEBUG_COMPLETION) fprintf(stderr, "IO_Event_Selector_URing_Waiting_cancel(%p, %p)\n", (void*)waiting, (void*)waiting->completion);

if (waiting->completion) {
waiting->completion->waiting = NULL;
waiting->completion = NULL;
Expand All @@ -198,10 +207,13 @@ void IO_Event_Selector_URing_Waiting_cancel(struct IO_Event_Selector_URing *sele
waiting->fiber = 0;
}

struct IO_Event_List_Type IO_Event_Selector_URing_Completion_Type = {};

void IO_Event_Selector_URing_Completion_initialize(void *element)
{
struct IO_Event_Selector_URing_Completion *completion = element;
IO_Event_List_initialize(&completion->list);
completion->list.type = &IO_Event_Selector_URing_Completion_Type;
}

void IO_Event_Selector_URing_Completion_free(void *element)
Expand Down Expand Up @@ -357,6 +369,12 @@ int io_uring_submit_now(struct IO_Event_Selector_URing *selector) {
}
}

static
void IO_Event_Selector_URing_submit_sqe(struct io_uring_sqe *sqe)
{
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_submit_sqe(%p): user_data=%p opcode=%d\n", sqe, (void*)sqe->user_data, sqe->opcode);
}

// Submit a pending operation. This does not submit the operation immediately, but instead defers it to the next call to `io_uring_submit_flush` or `io_uring_submit_now`. This is useful for operations that are not urgent, but should be used with care as it can lead to a deadlock if the submission queue is not flushed.
static
void io_uring_submit_pending(struct IO_Event_Selector_URing *selector) {
Expand Down Expand Up @@ -407,7 +425,7 @@ VALUE process_wait_ensure(VALUE _arguments) {

close(arguments->descriptor);

IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting);
IO_Event_Selector_URing_Waiting_cancel(arguments->waiting);

return Qnil;
}
Expand Down Expand Up @@ -442,6 +460,7 @@ VALUE IO_Event_Selector_URing_process_wait(VALUE self, VALUE fiber, VALUE _pid,
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_process_wait:io_uring_prep_poll_add(%p)\n", (void*)fiber);
io_uring_prep_poll_add(sqe, descriptor, POLLIN|POLLHUP|POLLERR);
io_uring_sqe_set_data(sqe, completion);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_pending(selector);

return rb_ensure(process_wait_transfer, (VALUE)&process_wait_arguments, process_wait_ensure, (VALUE)&process_wait_arguments);
Expand Down Expand Up @@ -485,12 +504,18 @@ static
VALUE io_wait_ensure(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;

// We may want to consider cancellation. Be aware that the order of operations is important here:
// io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0);
// io_uring_sqe_set_data(sqe, NULL);
// io_uring_submit_now(selector);
if (DEBUG) fprintf(stderr, "io_wait_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion);

IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting);
// If the operation is still in progress, cancel it:
if (arguments->waiting->completion) {
struct io_uring_sqe *sqe = io_get_sqe(arguments->selector);
io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0);
io_uring_sqe_set_data(sqe, NULL);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_now(arguments->selector);
}

IO_Event_Selector_URing_Waiting_cancel(arguments->waiting);

return Qnil;
};
Expand All @@ -502,6 +527,8 @@ VALUE io_wait_transfer(VALUE _arguments) {

IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL);

if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);

if (arguments->waiting->result) {
// We explicitly filter the resulting events based on the requested events.
// In some cases, poll will report events we didn't ask for.
Expand Down Expand Up @@ -531,6 +558,7 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting);

io_uring_sqe_set_data(sqe, completion);
IO_Event_Selector_URing_submit_sqe(sqe);

// If we are going to wait, we assume that we are waiting for a while:
io_uring_submit_pending(selector);
Expand Down Expand Up @@ -581,10 +609,11 @@ io_read_submit(VALUE _arguments)
struct IO_Event_Selector_URing *selector = arguments->selector;
struct io_uring_sqe *sqe = io_get_sqe(selector);

if (DEBUG) fprintf(stderr, "io_read_submit:io_uring_prep_read(fiber=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, arguments->descriptor, arguments->buffer, arguments->length);
if (DEBUG) fprintf(stderr, "io_read_submit:io_uring_prep_read(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length);

io_uring_prep_read(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor));
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_now(selector);

IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL);
Expand All @@ -600,16 +629,17 @@ io_read_ensure(VALUE _arguments)

struct io_uring_sqe *sqe = io_get_sqe(selector);

if (DEBUG) fprintf(stderr, "io_read_cancel:io_uring_prep_cancel(fiber=%p)\n", (void*)arguments->waiting);
if (DEBUG) fprintf(stderr, "io_read_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion);

// If the operation has already completed, we don't need to cancel it:
if (!arguments->waiting->result) {
io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0);
// If the operation is still in progress, cancel it:
if (arguments->waiting->completion) {
io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0);
io_uring_sqe_set_data(sqe, NULL);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_now(selector);
}

IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting);
IO_Event_Selector_URing_Waiting_cancel(arguments->waiting);

return Qnil;
}
Expand Down Expand Up @@ -704,10 +734,11 @@ io_write_submit(VALUE _argument)

struct io_uring_sqe *sqe = io_get_sqe(selector);

if (DEBUG) fprintf(stderr, "io_write_submit:io_uring_prep_write(fiber=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, arguments->descriptor, arguments->buffer, arguments->length);
if (DEBUG) fprintf(stderr, "io_write_submit:io_uring_prep_write(waiting=%p, completion=%p, descriptor=%d, buffer=%p, length=%ld)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion, arguments->descriptor, arguments->buffer, arguments->length);

io_uring_prep_write(sqe, arguments->descriptor, arguments->buffer, arguments->length, io_seekable(arguments->descriptor));
io_uring_sqe_set_data(sqe, arguments->waiting->completion);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_pending(selector);

IO_Event_Selector_fiber_transfer(selector->backend.loop, 0, NULL);
Expand All @@ -723,15 +754,17 @@ io_write_ensure(VALUE _argument)

struct io_uring_sqe *sqe = io_get_sqe(selector);

if (DEBUG) fprintf(stderr, "io_wait_rescue:io_uring_prep_cancel(%p)\n", (void*)arguments->waiting);
if (DEBUG) fprintf(stderr, "io_write_ensure:io_uring_prep_cancel(waiting=%p, completion=%p)\n", (void*)arguments->waiting, (void*)arguments->waiting->completion);

if (!arguments->waiting->result) {
io_uring_prep_cancel(sqe, (void*)arguments->waiting, 0);
// If the operation is still in progress, cancel it:
if (arguments->waiting->completion) {
io_uring_prep_cancel(sqe, (void*)arguments->waiting->completion, 0);
io_uring_sqe_set_data(sqe, NULL);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_now(selector);
}

IO_Event_Selector_URing_Waiting_cancel(arguments->selector, arguments->waiting);
IO_Event_Selector_URing_Waiting_cancel(arguments->waiting);

return Qnil;
}
Expand Down Expand Up @@ -829,6 +862,7 @@ VALUE IO_Event_Selector_URing_io_close(VALUE self, VALUE io) {

io_uring_prep_close(sqe, descriptor);
io_uring_sqe_set_data(sqe, NULL);
IO_Event_Selector_URing_submit_sqe(sqe);
io_uring_submit_now(selector);
} else {
close(descriptor);
Expand Down Expand Up @@ -919,32 +953,38 @@ unsigned select_process_completions(struct IO_Event_Selector_URing *selector) {
unsigned head;
struct io_uring_cqe *cqe;

if (DEBUG) fprintf(stderr, "select_process_completions...\n");

io_uring_for_each_cqe(ring, head, cqe) {
if (DEBUG) fprintf(stderr, "select_process_completions: cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data);

++completed;

// If the operation was cancelled, or the operation has no user data (fiber):
// If the operation was cancelled, or the operation has no user data:
if (cqe->user_data == 0 || cqe->user_data == LIBURING_UDATA_TIMEOUT) {
io_uring_cq_advance(ring, 1);
continue;
}

if (DEBUG) fprintf(stderr, "cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data);

struct IO_Event_Selector_URing_Completion *completion = (void*)cqe->user_data;
struct IO_Event_Selector_URing_Waiting *waiting = completion->waiting;

if (DEBUG) fprintf(stderr, "select_process_completions: completion=%p waiting=%p\n", (void*)completion, (void*)waiting);

if (waiting) {
waiting->result = cqe->res;
waiting->flags = cqe->flags;
}

io_uring_cq_advance(ring, 1);

if (waiting && waiting->fiber) {
assert(waiting->result != -ECANCELED);

IO_Event_Selector_fiber_transfer(waiting->fiber, 0, NULL);
}

// This marks the waiting operation as "complete":
IO_Event_Selector_URing_Completion_release(selector, completion);
io_uring_cq_advance(ring, 1);
}

if (DEBUG && completed > 0) fprintf(stderr, "select_process_completions(completed=%d)\n", completed);
Expand Down

0 comments on commit 6925550

Please sign in to comment.