Skip to content

Commit

Permalink
uringlator/posix: fix bunch of issues found by asserting
Browse files Browse the repository at this point in the history
introducing heavy asserting of the state of both uringlator and posix
code found quite few issues that were hidden, mainly related to
cancelation and dangling operations.
  • Loading branch information
Cloudef committed Jan 29, 2025
1 parent 36c1223 commit 77b9156
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 25 deletions.
18 changes: 12 additions & 6 deletions src/aio/Posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty

var off: usize = 0;
again: while (off < self.pfd.len) {
for (self.pfd.constSlice()[off..], off..) |*pfd, pid| {
for (self.pfd.constSlice()[off..], off..) |pfd, pid| {
off = pid;
if (pfd.revents == 0) continue;
if (pfd.fd == self.source.fd) {
Expand All @@ -173,11 +173,12 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty
const readiness = self.uringlator.ops.getOne(.readiness, id);
std.debug.assert(pfd.fd == readiness.fd);
std.debug.assert(pfd.events == readinessToPollEvents(readiness));
defer {
// do not poll this fd again
self.pfd.swapRemove(@truncate(pid));
self.pid.swapRemove(@truncate(pid - 1));
}
std.debug.assert(self.pending.isSet(id.slot));

// do not poll this fd again
self.pfd.swapRemove(@truncate(pid));
self.pid.swapRemove(@truncate(pid - 1));

const op_type = self.uringlator.ops.getOne(.type, id);
if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.NVAL != 0) {
if (pfd.revents & std.posix.POLL.ERR != 0) {
Expand All @@ -199,12 +200,14 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty
break :again;
}
}

// start it for real this time
if (self.uringlator.ops.getOne(.next, id) != id) {
Uringlator.debug("ready: {}: {} => {}", .{ id, op_type, self.uringlator.ops.getOne(.next, id) });
} else {
Uringlator.debug("ready: {}: {}", .{ id, op_type });
}

try self.uringlator_start(id, op_type);
break :again;
}
Expand Down Expand Up @@ -348,6 +351,7 @@ pub fn uringlator_dequeue(self: *@This(), id: aio.Id, comptime op_type: Operatio
pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
if (self.pending.isSet(id.slot)) {
self.in_flight.set(id.slot);
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null);
switch (op_type) {
.poll => self.uringlator.finish(self, id, error.Success, .thread_unsafe),
.timeout => {
Expand Down Expand Up @@ -491,6 +495,7 @@ pub fn uringlator_cancel(self: *@This(), id: aio.Id, op_type: Operation, err: Op
self.pid.swapRemove(@truncate(idx - 1));
break;
}
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null);
self.uringlator.finish(self, id, err, .thread_unsafe);
return true;
},
Expand All @@ -508,6 +513,7 @@ pub fn uringlator_complete(self: *@This(), id: aio.Id, op_type: Operation, _: Op
},
else => {},
}
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null);
readiness.* = .{};
}

Expand Down
48 changes: 29 additions & 19 deletions src/aio/uringlator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ pub fn Uringlator(BackendOperation: type) type {

const tag = self.ops.getOne(.type, id);
self.queued.swapRemove(@intCast(idx));
self.started.set(id.slot);
try self.start(tag, id, backend);

// start linked timeout immediately as well if there's one
Expand All @@ -305,31 +304,38 @@ pub fn Uringlator(BackendOperation: type) type {
return true;
}

fn cancel(self: *@This(), id: aio.Id, err: Operation.Error, backend: anytype) error{ NotFound, InProgress }!void {
_ = try self.ops.lookup(id);
if (self.started.isSet(id.slot)) {
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null);
if (!backend.uringlator_cancel(id, self.ops.getOne(.type, id), err)) {
return error.InProgress;
}
} else {
self.queued.swapRemoveNeedle(id) catch unreachable;
self.finish(backend, id, err, .thread_unsafe);
}
}

fn start(self: *@This(), op_type: Operation, id: aio.Id, backend: anytype) aio.Error!void {
if (self.ops.getOne(.next, id) != id) {
debug("perform: {}: {} => {}", .{ id, op_type, self.ops.getOne(.next, id) });
} else {
debug("perform: {}: {}", .{ id, op_type });
}
std.debug.assert(!self.started.isSet(id.slot));
self.started.set(id.slot);
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null);
switch (op_type) {
.nop => self.finish(backend, id, error.Success, .thread_unsafe),
.cancel => blk: {
const state = self.ops.getOne(.state, id);
_ = self.ops.lookup(state.cancel.id) catch {
self.finish(backend, id, error.NotFound, .thread_unsafe);
std.debug.assert(state.cancel.id != id);
self.cancel(state.cancel.id, error.Canceled, backend) catch |err| {
self.finish(backend, id, err, .thread_unsafe);
break :blk;
};
if (self.started.isSet(id.slot)) {
if (!backend.uringlator_cancel(state.cancel.id, self.ops.getOne(.type, state.cancel.id), error.Canceled)) {
self.finish(backend, id, error.InProgress, .thread_unsafe);
} else {
self.finish(backend, id, error.Success, .thread_unsafe);
}
} else {
self.queued.swapRemoveNeedle(state.cancel.id) catch unreachable;
self.finish(backend, state.cancel.id, error.Canceled, .thread_unsafe);
self.finish(backend, id, error.Success, .thread_unsafe);
}
self.finish(backend, id, error.Success, .thread_unsafe);
},
else => |tag| try backend.uringlator_start(id, tag),
}
Expand All @@ -342,9 +348,12 @@ pub fn Uringlator(BackendOperation: type) type {
for (finished) |res| {
_ = self.ops.lookup(res.id) catch continue; // raced
const op_type = self.ops.getOne(.type, res.id);
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), res.id) == null);

var failure = res.failure;
if (failure != error.Canceled) {
std.debug.assert(self.started.isSet(res.id.slot));
std.debug.assert(!self.link_lock.isSet(res.id.slot));
switch (op_type) {
.link_timeout => {
const cid = self.ops.getOne(.prev, res.id);
Expand All @@ -362,9 +371,8 @@ pub fn Uringlator(BackendOperation: type) type {
_ = self.ops.lookup(cid) catch break :blk .not_found;
std.debug.assert(self.started.isSet(cid.slot));
std.debug.assert(!self.link_lock.isSet(cid.slot));
const cid_type = self.ops.getOne(.type, cid);
self.ops.setOne(.next, cid, cid); // sever the link
_ = backend.uringlator_cancel(cid, cid_type, error.Canceled);
self.cancel(cid, error.Canceled, backend) catch {};
// ^ even if the operation is not in cancelable state anymore
// the backend will still wait for it to complete
// however, the operation chain will be severed
Expand Down Expand Up @@ -413,19 +421,21 @@ pub fn Uringlator(BackendOperation: type) type {
std.debug.assert(!self.link_lock.isSet(next.slot));
_ = switch (link) {
.unlinked => unreachable, // inconsistent state
.soft => backend.uringlator_cancel(next, .link_timeout, error.Success),
.hard => backend.uringlator_cancel(next, .link_timeout, error.Success),
.soft => self.cancel(next, error.Success, backend) catch {},
.hard => self.cancel(next, error.Success, backend) catch {},
};
}
} else if ((link == .soft or op_type == .link_timeout) and failure != error.Success) {
_ = self.ops.lookup(next) catch unreachable; // inconsistent state
std.debug.assert(!self.started.isSet(next.slot));
std.debug.assert(self.link_lock.isSet(next.slot));
self.finish(backend, next, error.Canceled, .thread_unsafe);
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null);
self.cancel(next, error.Canceled, backend) catch unreachable;
} else {
_ = self.ops.lookup(next) catch unreachable; // inconsistent state
std.debug.assert(!self.started.isSet(next.slot));
std.debug.assert(self.link_lock.isSet(next.slot));
std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null);
self.link_lock.unset(next.slot);
}
}
Expand Down

0 comments on commit 77b9156

Please sign in to comment.