From 77b915645df668d3a1bb522946bac381443e4f85 Mon Sep 17 00:00:00 2001 From: Jari Vetoniemi Date: Wed, 29 Jan 2025 11:18:09 +0900 Subject: [PATCH] uringlator/posix: fix bunch of issues found by asserting introducing heavy asserting of the state of both uringlator and posix code found quite few issues that were hidden, mainly related to cancelation and dangling operations. --- src/aio/Posix.zig | 18 ++++++++++------ src/aio/uringlator.zig | 48 +++++++++++++++++++++++++----------------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/aio/Posix.zig b/src/aio/Posix.zig index 5f2eb91..27658b2 100644 --- a/src/aio/Posix.zig +++ b/src/aio/Posix.zig @@ -157,7 +157,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty var off: usize = 0; again: while (off < self.pfd.len) { - for (self.pfd.constSlice()[off..], off..) |*pfd, pid| { + for (self.pfd.constSlice()[off..], off..) |pfd, pid| { off = pid; if (pfd.revents == 0) continue; if (pfd.fd == self.source.fd) { @@ -173,11 +173,12 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty const readiness = self.uringlator.ops.getOne(.readiness, id); std.debug.assert(pfd.fd == readiness.fd); std.debug.assert(pfd.events == readinessToPollEvents(readiness)); - defer { - // do not poll this fd again - self.pfd.swapRemove(@truncate(pid)); - self.pid.swapRemove(@truncate(pid - 1)); - } + std.debug.assert(self.pending.isSet(id.slot)); + + // do not poll this fd again + self.pfd.swapRemove(@truncate(pid)); + self.pid.swapRemove(@truncate(pid - 1)); + const op_type = self.uringlator.ops.getOne(.type, id); if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.NVAL != 0) { if (pfd.revents & std.posix.POLL.ERR != 0) { @@ -199,12 +200,14 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty break :again; } } + // start it for real this time if (self.uringlator.ops.getOne(.next, id) != id) { Uringlator.debug("ready: {}: {} => {}", .{ id, op_type, self.uringlator.ops.getOne(.next, id) }); } else { Uringlator.debug("ready: {}: {}", .{ id, op_type }); } + try self.uringlator_start(id, op_type); break :again; } @@ -348,6 +351,7 @@ pub fn uringlator_dequeue(self: *@This(), id: aio.Id, comptime op_type: Operatio pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void { if (self.pending.isSet(id.slot)) { self.in_flight.set(id.slot); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); switch (op_type) { .poll => self.uringlator.finish(self, id, error.Success, .thread_unsafe), .timeout => { @@ -491,6 +495,7 @@ pub fn uringlator_cancel(self: *@This(), id: aio.Id, op_type: Operation, err: Op self.pid.swapRemove(@truncate(idx - 1)); break; } + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); self.uringlator.finish(self, id, err, .thread_unsafe); return true; }, @@ -508,6 +513,7 @@ pub fn uringlator_complete(self: *@This(), id: aio.Id, op_type: Operation, _: Op }, else => {}, } + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); readiness.* = .{}; } diff --git a/src/aio/uringlator.zig b/src/aio/uringlator.zig index 464efdf..4d4717f 100644 --- a/src/aio/uringlator.zig +++ b/src/aio/uringlator.zig @@ -290,7 +290,6 @@ pub fn Uringlator(BackendOperation: type) type { const tag = self.ops.getOne(.type, id); self.queued.swapRemove(@intCast(idx)); - self.started.set(id.slot); try self.start(tag, id, backend); // start linked timeout immediately as well if there's one @@ -305,31 +304,38 @@ pub fn Uringlator(BackendOperation: type) type { return true; } + fn cancel(self: *@This(), id: aio.Id, err: Operation.Error, backend: anytype) error{ NotFound, InProgress }!void { + _ = try self.ops.lookup(id); + if (self.started.isSet(id.slot)) { + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null); + if (!backend.uringlator_cancel(id, self.ops.getOne(.type, id), err)) { + return error.InProgress; + } + } else { + self.queued.swapRemoveNeedle(id) catch unreachable; + self.finish(backend, id, err, .thread_unsafe); + } + } + fn start(self: *@This(), op_type: Operation, id: aio.Id, backend: anytype) aio.Error!void { if (self.ops.getOne(.next, id) != id) { debug("perform: {}: {} => {}", .{ id, op_type, self.ops.getOne(.next, id) }); } else { debug("perform: {}: {}", .{ id, op_type }); } + std.debug.assert(!self.started.isSet(id.slot)); + self.started.set(id.slot); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null); switch (op_type) { .nop => self.finish(backend, id, error.Success, .thread_unsafe), .cancel => blk: { const state = self.ops.getOne(.state, id); - _ = self.ops.lookup(state.cancel.id) catch { - self.finish(backend, id, error.NotFound, .thread_unsafe); + std.debug.assert(state.cancel.id != id); + self.cancel(state.cancel.id, error.Canceled, backend) catch |err| { + self.finish(backend, id, err, .thread_unsafe); break :blk; }; - if (self.started.isSet(id.slot)) { - if (!backend.uringlator_cancel(state.cancel.id, self.ops.getOne(.type, state.cancel.id), error.Canceled)) { - self.finish(backend, id, error.InProgress, .thread_unsafe); - } else { - self.finish(backend, id, error.Success, .thread_unsafe); - } - } else { - self.queued.swapRemoveNeedle(state.cancel.id) catch unreachable; - self.finish(backend, state.cancel.id, error.Canceled, .thread_unsafe); - self.finish(backend, id, error.Success, .thread_unsafe); - } + self.finish(backend, id, error.Success, .thread_unsafe); }, else => |tag| try backend.uringlator_start(id, tag), } @@ -342,9 +348,12 @@ pub fn Uringlator(BackendOperation: type) type { for (finished) |res| { _ = self.ops.lookup(res.id) catch continue; // raced const op_type = self.ops.getOne(.type, res.id); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), res.id) == null); var failure = res.failure; if (failure != error.Canceled) { + std.debug.assert(self.started.isSet(res.id.slot)); + std.debug.assert(!self.link_lock.isSet(res.id.slot)); switch (op_type) { .link_timeout => { const cid = self.ops.getOne(.prev, res.id); @@ -362,9 +371,8 @@ pub fn Uringlator(BackendOperation: type) type { _ = self.ops.lookup(cid) catch break :blk .not_found; std.debug.assert(self.started.isSet(cid.slot)); std.debug.assert(!self.link_lock.isSet(cid.slot)); - const cid_type = self.ops.getOne(.type, cid); self.ops.setOne(.next, cid, cid); // sever the link - _ = backend.uringlator_cancel(cid, cid_type, error.Canceled); + self.cancel(cid, error.Canceled, backend) catch {}; // ^ even if the operation is not in cancelable state anymore // the backend will still wait for it to complete // however, the operation chain will be severed @@ -413,19 +421,21 @@ pub fn Uringlator(BackendOperation: type) type { std.debug.assert(!self.link_lock.isSet(next.slot)); _ = switch (link) { .unlinked => unreachable, // inconsistent state - .soft => backend.uringlator_cancel(next, .link_timeout, error.Success), - .hard => backend.uringlator_cancel(next, .link_timeout, error.Success), + .soft => self.cancel(next, error.Success, backend) catch {}, + .hard => self.cancel(next, error.Success, backend) catch {}, }; } } else if ((link == .soft or op_type == .link_timeout) and failure != error.Success) { _ = self.ops.lookup(next) catch unreachable; // inconsistent state std.debug.assert(!self.started.isSet(next.slot)); std.debug.assert(self.link_lock.isSet(next.slot)); - self.finish(backend, next, error.Canceled, .thread_unsafe); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null); + self.cancel(next, error.Canceled, backend) catch unreachable; } else { _ = self.ops.lookup(next) catch unreachable; // inconsistent state std.debug.assert(!self.started.isSet(next.slot)); std.debug.assert(self.link_lock.isSet(next.slot)); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null); self.link_lock.unset(next.slot); } }