Skip to content

Commit

Permalink
coro/aio: bunch of fixes mostly related to cancel
Browse files Browse the repository at this point in the history
Coro is getting more battletested now that I'm actually writing some
examples with it. Cancelation should now work. Some error handling bugs
and lack of flexibility for coro in aio were addressed as well.
  • Loading branch information
Cloudef committed Jun 25, 2024
1 parent 3709bc7 commit 1d0e629
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 157 deletions.
3 changes: 2 additions & 1 deletion docs/pages/coro-context-switches.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
To yield running task to the caller use the following.
The function takes a enum value as a argument representing the yield state of the task.
Enum value that corresponds to the integer `0` is resevered to indicate non yield state.
Yielding may return the `error.Canceled` if the task has been canceled.

```zig
coro.yield(SomeEnum.value);
try coro.yield(SomeEnum.value);
```

The current yield state of a task can be checked with `state` method.
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/coro-io.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ outside a task then the call would be equal to calling the equal function from t
### Cancellations

Use `aio.Cancel` operation to cancel the currently running operations in a task.
The `out_error` of such operation will then be set as `error.OperationCanceled`.
The `out_error` of such operation will then be set as `error.Canceled`.

Alternatively it's possible to call `task.complete(.cancel);` to actively cancel a task and collect its partial result.
14 changes: 7 additions & 7 deletions examples/coro.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub const std_options: std.Options = .{
.log_level = .debug,
};

fn server(lock: *coro.ResetEvent) !void {
fn server(startup: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
Expand All @@ -32,7 +32,7 @@ fn server(lock: *coro.ResetEvent) !void {
try std.posix.bind(socket, &address.any, address.getOsSockLen());
try std.posix.listen(socket, 128);

lock.set();
startup.set();

var client_sock: std.posix.socket_t = undefined;
try coro.io.single(aio.Accept{ .socket = socket, .out_socket = &client_sock });
Expand All @@ -54,7 +54,7 @@ fn server(lock: *coro.ResetEvent) !void {
});
}

fn client(lock: *coro.ResetEvent) !void {
fn client(startup: *coro.ResetEvent) !void {
var socket: std.posix.socket_t = undefined;
try coro.io.single(aio.Socket{
.domain = std.posix.AF.INET,
Expand All @@ -63,7 +63,7 @@ fn client(lock: *coro.ResetEvent) !void {
.out_socket = &socket,
});

lock.wait();
try startup.wait();

const address = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 1327);
try coro.io.single(aio.Connect{
Expand Down Expand Up @@ -95,8 +95,8 @@ pub fn main() !void {
defer _ = gpa.deinit();
var scheduler = try coro.Scheduler.init(gpa.allocator(), .{});
defer scheduler.deinit();
var lock: coro.ResetEvent = .{};
_ = try scheduler.spawn(client, .{&lock}, .{});
_ = try scheduler.spawn(server, .{&lock}, .{});
var startup: coro.ResetEvent = .{};
_ = try scheduler.spawn(client, .{&startup}, .{});
_ = try scheduler.spawn(server, .{&startup}, .{});
try scheduler.run(.wait);
}
40 changes: 28 additions & 12 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ pub const CompletionResult = struct {
/// Queue operations dynamically and complete them on demand
pub const Dynamic = struct {
pub const Uop = ops.Operation.Union;
pub const Callback = *const fn (uop: Uop, id: Id, failed: bool) void;
pub const QueueCallback = *const fn (uop: Uop, id: Id) void;
pub const CompletionCallback = *const fn (uop: Uop, id: Id, failed: bool) void;

io: IO,
/// Used by coro implementation
callback: ?Callback = null,

/// Used by the coro implementation
queue_callback: ?QueueCallback = null,
completion_callback: ?CompletionCallback = null,

pub inline fn init(allocator: std.mem.Allocator, n: u16) Error!@This() {
return .{ .io = try IO.init(allocator, n) };
Expand All @@ -70,14 +74,14 @@ pub const Dynamic = struct {
if (comptime ti == .Struct and ti.Struct.is_tuple) {
if (comptime operations.len == 0) @compileError("no work to be done");
var work = struct { ops: @TypeOf(operations) }{ .ops = operations };
return self.io.queue(operations.len, &work);
return self.io.queue(operations.len, &work, self.queue_callback);
} else if (comptime ti == .Array) {
if (comptime operations.len == 0) @compileError("no work to be done");
var work = struct { ops: @TypeOf(operations) }{ .ops = operations };
return self.io.queue(operations.len, &work);
return self.io.queue(operations.len, &work, self.queue_callback);
} else {
var work = struct { ops: @TypeOf(.{operations}) }{ .ops = .{operations} };
return self.io.queue(1, &work);
return self.io.queue(1, &work, self.queue_callback);
}
}

Expand All @@ -91,15 +95,15 @@ pub const Dynamic = struct {
/// Complete operations
/// Returns the number of completed operations, `0` if no operations were completed
pub inline fn complete(self: *@This(), mode: CompletionMode) Error!CompletionResult {
return self.io.complete(mode, self.callback);
return self.io.complete(mode, self.completion_callback);
}

/// Block until all opreations are complete
/// Returns the number of errors occured, 0 if there were no errors
pub inline fn completeAll(self: *@This()) Error!u16 {
var num_errors: u16 = 0;
while (true) {
const res = try self.io.complete(.blocking, self.callback);
const res = try self.io.complete(.blocking, self.completion_callback);
num_errors += res.num_errors;
if (res.num_completed == 0) break;
}
Expand Down Expand Up @@ -228,7 +232,18 @@ test "Nop" {
defer dynamic.deinit(std.testing.allocator);
try dynamic.queue(Nop{ .domain = @enumFromInt(255), .ident = 69, .userdata = 42 });
const Lel = struct {
fn cb(uop: Dynamic.Uop, _: Id, failed: bool) void {
fn queue(uop: Dynamic.Uop, _: Id) void {
switch (uop) {
.nop => |*op| {
std.debug.assert(255 == @intFromEnum(op.domain));
std.debug.assert(69 == op.ident);
std.debug.assert(42 == op.userdata);
},
else => @panic("nope"),
}
}

fn completion(uop: Dynamic.Uop, _: Id, failed: bool) void {
switch (uop) {
.nop => |*op| {
std.debug.assert(!failed);
Expand All @@ -240,7 +255,8 @@ test "Nop" {
}
}
};
dynamic.callback = Lel.cb;
dynamic.queue_callback = Lel.queue;
dynamic.completion_callback = Lel.completion;
try std.testing.expectEqual(0, dynamic.completeAll());
}

Expand Down Expand Up @@ -342,7 +358,7 @@ test "LinkTimeout" {
LinkTimeout{ .ns = 1 * std.time.ns_per_s, .out_error = &err2 },
});
try std.testing.expectEqual(2, num_errors);
try std.testing.expectEqual(error.OperationCanceled, err);
try std.testing.expectEqual(error.Canceled, err);
try std.testing.expectEqual(error.Expired, err2);
}
{
Expand Down Expand Up @@ -391,7 +407,7 @@ test "Cancel" {
try dynamic.queue(Cancel{ .id = id });
const num_errors = try dynamic.completeAll();
try std.testing.expectEqual(1, num_errors);
try std.testing.expectEqual(error.OperationCanceled, err);
try std.testing.expectEqual(error.Canceled, err);
try std.testing.expect(timer.lap() < std.time.ns_per_s);
}

Expand Down
24 changes: 12 additions & 12 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,19 @@ inline fn queueOperation(self: *@This(), op: anytype) aio.Error!u16 {
return id;
}

pub fn queue(self: *@This(), comptime len: u16, work: anytype) aio.Error!void {
pub fn queue(self: *@This(), comptime len: u16, work: anytype, cb: ?aio.Dynamic.QueueCallback) aio.Error!void {
if (comptime len == 1) {
_ = try self.queueOperation(&work.ops[0]);
const id = try self.queueOperation(&work.ops[0]);
if (cb) |f| f(self.ops.nodes[id].used, @enumFromInt(id));
} else {
var ids: std.BoundedArray(u16, len) = .{};
errdefer for (ids.constSlice()) |id| self.removeOp(id);
inline for (&work.ops) |*op| {
ids.append(try self.queueOperation(op)) catch unreachable;
}
inline for (&work.ops) |*op| ids.append(try self.queueOperation(op)) catch unreachable;
if (cb) |f| for (ids.constSlice()) |id| f(self.ops.nodes[id].used, @enumFromInt(id));
}
}

pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.Callback) aio.Error!aio.CompletionResult {
pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynamic.CompletionCallback) aio.Error!aio.CompletionResult {
if (!try self.submit()) return .{};
defer self.pfd.reset();

Expand All @@ -166,16 +166,16 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynam
var res: aio.CompletionResult = .{};
for (self.pfd.items[0..self.pfd.len]) |pfd| {
if (pfd.revents == 0) continue;
std.debug.assert(pfd.revents & std.posix.POLL.NVAL == 0);
if (pfd.fd == self.source.fd) {
std.debug.assert(pfd.revents & std.posix.POLL.NVAL == 0);
std.debug.assert(pfd.revents & std.posix.POLL.ERR == 0);
std.debug.assert(pfd.revents & std.posix.POLL.HUP == 0);
self.source.wait();
res = self.handleFinished(cb);
} else {
var iter = self.ops.iterator();
while (iter.next()) |e| if (pfd.fd == self.readiness[e.k].fd) {
if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.HUP != 0) {
if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.HUP != 0 or pfd.revents & std.posix.POLL.NVAL != 0) {
self.finish(e.k, error.Unexpected);
continue;
}
Expand All @@ -195,7 +195,7 @@ pub fn immediate(comptime len: u16, work: anytype) aio.Error!u16 {
const allocator = sfb.get();
var wrk = try init(allocator, len);
defer wrk.deinit(allocator);
try wrk.queue(len, work);
try wrk.queue(len, work, null);
var n: u16 = len;
var num_errors: u16 = 0;
while (n > 0) {
Expand Down Expand Up @@ -226,7 +226,7 @@ fn cancel(self: *@This(), id: u16) enum { in_progress, not_found, ok } {
return .not_found;
}
// collect the result later
self.finish(id, error.OperationCanceled);
self.finish(id, error.Canceled);
return .ok;
}

Expand Down Expand Up @@ -346,7 +346,7 @@ fn completition(op: anytype, self: *@This(), res: Result) void {
}
}

fn handleFinished(self: *@This(), cb: ?aio.Dynamic.Callback) aio.CompletionResult {
fn handleFinished(self: *@This(), cb: ?aio.Dynamic.CompletionCallback) aio.CompletionResult {
{
self.finished_mutex.lock();
defer self.finished_mutex.unlock();
Expand All @@ -363,7 +363,7 @@ fn handleFinished(self: *@This(), cb: ?aio.Dynamic.Callback) aio.CompletionResul
debug("complete: {}: {} [OK]", .{ res.id, std.meta.activeTag(self.ops.nodes[res.id].used) });
}

if (self.ops.nodes[res.id].used == .link_timeout and res.failure == error.OperationCanceled) {
if (self.ops.nodes[res.id].used == .link_timeout and res.failure == error.Canceled) {
// special case
} else {
num_errors += @intFromBool(res.failure != error.Success);
Expand Down
Loading

0 comments on commit 1d0e629

Please sign in to comment.