Skip to content

Commit

Permalink
Add support for io_uring to use tokens.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jul 30, 2023
1 parent 3f6f33c commit 9f9be40
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct IO_Event_Selector_URing {
struct io_uring ring;
size_t pending;
int blocked;
struct IO_Event_Token_List tokens;
};

void IO_Event_Selector_URing_Type_mark(void *_data)
Expand All @@ -60,6 +61,8 @@ void close_internal(struct IO_Event_Selector_URing *data) {
io_uring_queue_exit(&data->ring);
data->ring.ring_fd = -1;
}

IO_Event_Token_List_free(&data->tokens);
}

void IO_Event_Selector_URing_Type_free(void *_data)
Expand Down Expand Up @@ -97,6 +100,8 @@ VALUE IO_Event_Selector_URing_allocate(VALUE self) {
data->pending = 0;
data->blocked = 0;

IO_Event_Token_List_initialize(&data->tokens);

return instance;
}

Expand Down Expand Up @@ -329,8 +334,8 @@ int events_from_poll_flags(short flags) {

struct io_wait_arguments {
struct IO_Event_Selector_URing *data;
VALUE fiber;
short flags;
struct IO_Event_Token *token;
};

static
Expand All @@ -340,9 +345,9 @@ VALUE io_wait_rescue(VALUE _arguments, VALUE exception) {

struct io_uring_sqe *sqe = io_get_sqe(data);

if (DEBUG) fprintf(stderr, "io_wait_rescue:io_uring_prep_poll_remove(%p)\n", (void*)arguments->fiber);
if (DEBUG) fprintf(stderr, "io_wait_rescue:io_uring_prep_poll_remove(%p)\n", (void*)arguments->token);

io_uring_prep_poll_remove(sqe, (uintptr_t)arguments->fiber);
io_uring_prep_poll_remove(sqe, (uintptr_t)arguments->token);
io_uring_sqe_set_data(sqe, NULL);
io_uring_submit_now(data);

Expand All @@ -353,14 +358,14 @@ static
VALUE io_wait_transfer(VALUE _arguments) {
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
struct IO_Event_Selector_URing *data = arguments->data;

VALUE result = IO_Event_Selector_fiber_transfer(data->backend.loop, 0, NULL);
if (DEBUG) fprintf(stderr, "io_wait:IO_Event_Selector_fiber_transfer -> %d\n", RB_NUM2INT(result));

if (!RTEST(result)) {
return Qfalse;
}

// We explicitly filter the resulting events based on the requested events.
// In some cases, poll will report events we didn't ask for.
short flags = arguments->flags & NUM2INT(result);
Expand All @@ -380,15 +385,17 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_io_wait:io_uring_prep_poll_add(descriptor=%d, flags=%d, fiber=%p)\n", descriptor, flags, (void*)fiber);

io_uring_prep_poll_add(sqe, descriptor, flags);
io_uring_sqe_set_data(sqe, (void*)fiber);

struct IO_Event_Token *token = IO_Event_Token_wrap(&data->tokens, fiber);
io_uring_sqe_set_data(sqe, token);

// If we are going to wait, we assume that we are waiting for a while:
io_uring_submit_pending(data);

struct io_wait_arguments io_wait_arguments = {
.data = data,
.fiber = fiber,
.flags = flags
.flags = flags,
.token = token,
};

return rb_rescue(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_rescue, (VALUE)&io_wait_arguments);
Expand Down Expand Up @@ -740,7 +747,8 @@ int select_internal_without_gvl(struct select_arguments *arguments) {
}

static inline
unsigned select_process_completions(struct io_uring *ring) {
unsigned select_process_completions(struct IO_Event_Selector_URing *data) {
struct io_uring *ring = &data->ring;
unsigned completed = 0;
unsigned head;
struct io_uring_cqe *cqe;
Expand All @@ -749,19 +757,27 @@ unsigned select_process_completions(struct io_uring *ring) {
++completed;

// If the operation was cancelled, or the operation has no user data (fiber):
if (cqe->res == -ECANCELED || cqe->user_data == 0 || cqe->user_data == LIBURING_UDATA_TIMEOUT) {
if (cqe->user_data == 0 || cqe->user_data == LIBURING_UDATA_TIMEOUT) {
io_uring_cq_advance(ring, 1);
continue;
}

VALUE fiber = (VALUE)cqe->user_data;
VALUE result = RB_INT2NUM(cqe->res);

if (DEBUG) fprintf(stderr, "cqe res=%d user_data=%p\n", cqe->res, (void*)cqe->user_data);

VALUE fiber = IO_Event_Token_unwrap(&data->tokens, (void*)cqe->user_data);

// If the operation was cancelled, don't do anything:
if (cqe->res == -ECANCELED) {
io_uring_cq_advance(ring, 1);
continue;
}

VALUE result = RB_INT2NUM(cqe->res);
io_uring_cq_advance(ring, 1);

IO_Event_Selector_fiber_transfer(fiber, 1, &result);
if (fiber) {
IO_Event_Selector_fiber_transfer(fiber, 1, &result);
}
}

// io_uring_cq_advance(ring, completed);
Expand All @@ -780,7 +796,7 @@ VALUE IO_Event_Selector_URing_select(VALUE self, VALUE duration) {

int ready = IO_Event_Selector_queue_flush(&data->backend);

int result = select_process_completions(&data->ring);
int result = select_process_completions(data);

// If we:
// 1. Didn't process any ready fibers, and
Expand All @@ -802,7 +818,7 @@ VALUE IO_Event_Selector_URing_select(VALUE self, VALUE duration) {
}

// After waiting/flushing the SQ, check if there are any completions:
result = select_process_completions(&data->ring);
result = select_process_completions(data);
}

return RB_INT2NUM(result);
Expand Down

0 comments on commit 9f9be40

Please sign in to comment.