diff --git a/src/aio/Posix.zig b/src/aio/Posix.zig index 5f2eb91..27658b2 100644 --- a/src/aio/Posix.zig +++ b/src/aio/Posix.zig @@ -157,7 +157,7 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty var off: usize = 0; again: while (off < self.pfd.len) { - for (self.pfd.constSlice()[off..], off..) |*pfd, pid| { + for (self.pfd.constSlice()[off..], off..) |pfd, pid| { off = pid; if (pfd.revents == 0) continue; if (pfd.fd == self.source.fd) { @@ -173,11 +173,12 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty const readiness = self.uringlator.ops.getOne(.readiness, id); std.debug.assert(pfd.fd == readiness.fd); std.debug.assert(pfd.events == readinessToPollEvents(readiness)); - defer { - // do not poll this fd again - self.pfd.swapRemove(@truncate(pid)); - self.pid.swapRemove(@truncate(pid - 1)); - } + std.debug.assert(self.pending.isSet(id.slot)); + + // do not poll this fd again + self.pfd.swapRemove(@truncate(pid)); + self.pid.swapRemove(@truncate(pid - 1)); + const op_type = self.uringlator.ops.getOne(.type, id); if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.NVAL != 0) { if (pfd.revents & std.posix.POLL.ERR != 0) { @@ -199,12 +200,14 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty break :again; } } + // start it for real this time if (self.uringlator.ops.getOne(.next, id) != id) { Uringlator.debug("ready: {}: {} => {}", .{ id, op_type, self.uringlator.ops.getOne(.next, id) }); } else { Uringlator.debug("ready: {}: {}", .{ id, op_type }); } + try self.uringlator_start(id, op_type); break :again; } @@ -348,6 +351,7 @@ pub fn uringlator_dequeue(self: *@This(), id: aio.Id, comptime op_type: Operatio pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void { if (self.pending.isSet(id.slot)) { self.in_flight.set(id.slot); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); switch (op_type) { .poll => self.uringlator.finish(self, id, error.Success, .thread_unsafe), .timeout => { @@ -491,6 +495,7 @@ pub fn uringlator_cancel(self: *@This(), id: aio.Id, op_type: Operation, err: Op self.pid.swapRemove(@truncate(idx - 1)); break; } + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); self.uringlator.finish(self, id, err, .thread_unsafe); return true; }, @@ -508,6 +513,7 @@ pub fn uringlator_complete(self: *@This(), id: aio.Id, op_type: Operation, _: Op }, else => {}, } + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.pid.constSlice(), id) == null); readiness.* = .{}; } diff --git a/src/aio/uringlator.zig b/src/aio/uringlator.zig index 464efdf..4d4717f 100644 --- a/src/aio/uringlator.zig +++ b/src/aio/uringlator.zig @@ -290,7 +290,6 @@ pub fn Uringlator(BackendOperation: type) type { const tag = self.ops.getOne(.type, id); self.queued.swapRemove(@intCast(idx)); - self.started.set(id.slot); try self.start(tag, id, backend); // start linked timeout immediately as well if there's one @@ -305,31 +304,38 @@ pub fn Uringlator(BackendOperation: type) type { return true; } + fn cancel(self: *@This(), id: aio.Id, err: Operation.Error, backend: anytype) error{ NotFound, InProgress }!void { + _ = try self.ops.lookup(id); + if (self.started.isSet(id.slot)) { + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null); + if (!backend.uringlator_cancel(id, self.ops.getOne(.type, id), err)) { + return error.InProgress; + } + } else { + self.queued.swapRemoveNeedle(id) catch unreachable; + self.finish(backend, id, err, .thread_unsafe); + } + } + fn start(self: *@This(), op_type: Operation, id: aio.Id, backend: anytype) aio.Error!void { if (self.ops.getOne(.next, id) != id) { debug("perform: {}: {} => {}", .{ id, op_type, self.ops.getOne(.next, id) }); } else { debug("perform: {}: {}", .{ id, op_type }); } + std.debug.assert(!self.started.isSet(id.slot)); + self.started.set(id.slot); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), id) == null); switch (op_type) { .nop => self.finish(backend, id, error.Success, .thread_unsafe), .cancel => blk: { const state = self.ops.getOne(.state, id); - _ = self.ops.lookup(state.cancel.id) catch { - self.finish(backend, id, error.NotFound, .thread_unsafe); + std.debug.assert(state.cancel.id != id); + self.cancel(state.cancel.id, error.Canceled, backend) catch |err| { + self.finish(backend, id, err, .thread_unsafe); break :blk; }; - if (self.started.isSet(id.slot)) { - if (!backend.uringlator_cancel(state.cancel.id, self.ops.getOne(.type, state.cancel.id), error.Canceled)) { - self.finish(backend, id, error.InProgress, .thread_unsafe); - } else { - self.finish(backend, id, error.Success, .thread_unsafe); - } - } else { - self.queued.swapRemoveNeedle(state.cancel.id) catch unreachable; - self.finish(backend, state.cancel.id, error.Canceled, .thread_unsafe); - self.finish(backend, id, error.Success, .thread_unsafe); - } + self.finish(backend, id, error.Success, .thread_unsafe); }, else => |tag| try backend.uringlator_start(id, tag), } @@ -342,9 +348,12 @@ pub fn Uringlator(BackendOperation: type) type { for (finished) |res| { _ = self.ops.lookup(res.id) catch continue; // raced const op_type = self.ops.getOne(.type, res.id); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), res.id) == null); var failure = res.failure; if (failure != error.Canceled) { + std.debug.assert(self.started.isSet(res.id.slot)); + std.debug.assert(!self.link_lock.isSet(res.id.slot)); switch (op_type) { .link_timeout => { const cid = self.ops.getOne(.prev, res.id); @@ -362,9 +371,8 @@ pub fn Uringlator(BackendOperation: type) type { _ = self.ops.lookup(cid) catch break :blk .not_found; std.debug.assert(self.started.isSet(cid.slot)); std.debug.assert(!self.link_lock.isSet(cid.slot)); - const cid_type = self.ops.getOne(.type, cid); self.ops.setOne(.next, cid, cid); // sever the link - _ = backend.uringlator_cancel(cid, cid_type, error.Canceled); + self.cancel(cid, error.Canceled, backend) catch {}; // ^ even if the operation is not in cancelable state anymore // the backend will still wait for it to complete // however, the operation chain will be severed @@ -413,19 +421,21 @@ pub fn Uringlator(BackendOperation: type) type { std.debug.assert(!self.link_lock.isSet(next.slot)); _ = switch (link) { .unlinked => unreachable, // inconsistent state - .soft => backend.uringlator_cancel(next, .link_timeout, error.Success), - .hard => backend.uringlator_cancel(next, .link_timeout, error.Success), + .soft => self.cancel(next, error.Success, backend) catch {}, + .hard => self.cancel(next, error.Success, backend) catch {}, }; } } else if ((link == .soft or op_type == .link_timeout) and failure != error.Success) { _ = self.ops.lookup(next) catch unreachable; // inconsistent state std.debug.assert(!self.started.isSet(next.slot)); std.debug.assert(self.link_lock.isSet(next.slot)); - self.finish(backend, next, error.Canceled, .thread_unsafe); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null); + self.cancel(next, error.Canceled, backend) catch unreachable; } else { _ = self.ops.lookup(next) catch unreachable; // inconsistent state std.debug.assert(!self.started.isSet(next.slot)); std.debug.assert(self.link_lock.isSet(next.slot)); + std.debug.assert(std.mem.indexOfScalar(aio.Id, self.queued.constSlice(), next) != null); self.link_lock.unset(next.slot); } }