Skip to content

Commit

Permalink
coro: more graceful teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Jun 25, 2024
1 parent fc95334 commit 17a13f7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
12 changes: 8 additions & 4 deletions src/coro/Frame.zig
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ inline fn entrypoint(
.result = &res,
};
frame.fiber.getUserDataPtr().* = @intFromPtr(&frame);
scheduler.frames.append(&frame.link);
scheduler.frames.prepend(&frame.link);
out_frame.* = &frame;

debug("spawned: {}", .{frame});
Expand Down Expand Up @@ -164,9 +164,13 @@ pub fn complete(self: *@This(), mode: CompleteMode, comptime Result: type) Resul
}
}

const res: Result = @as(*Result, @ptrCast(@alignCast(self.result))).*;
self.deinit();
return res;
if (comptime Result != void) {
const res: Result = @as(*Result, @ptrCast(@alignCast(self.result))).*;
self.deinit();
return res;
} else {
self.deinit();
}
}

pub fn tryCancel(self: *@This()) bool {
Expand Down
22 changes: 12 additions & 10 deletions src/coro/Scheduler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub fn init(allocator: std.mem.Allocator, opts: InitOptions) aio.Error!@This() {
}

pub fn deinit(self: *@This()) void {
self.run(.cancel) catch @panic("unrecovable");
if (self.state == .tear_down) self.state = .helper_spawned;
self.run(.cancel) catch @panic("unrecovable"); // if all tasks aren't dead yet
var next = self.frames.first;
while (next) |node| {
next = node.next;
Expand Down Expand Up @@ -84,15 +85,16 @@ pub const CompleteMode = Frame.CompleteMode;

/// Run until all tasks are complete.
pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void {
while (self.state != .tear_down) {
if (mode == .cancel) {
var next = self.frames.first;
while (next) |node| {
next = node.next;
_ = node.data.cast().tryCancel();
}
if (mode == .cancel) {
// start canceling tasks starting from the most recent one
while (self.frames.first) |node| {
if (self.state == .tear_down) return error.Unexpected;
node.data.cast().complete(.cancel, void);
}
} else {
while (self.state != .tear_down) {
if (try self.tick(.blocking) == 0) break;
}
if (try self.tick(.blocking) == 0) break;
}
}

Expand All @@ -103,7 +105,7 @@ pub fn run(self: *@This(), mode: CompleteMode) aio.Error!void {
fn helper(self: *@This()) void {
Frame.yield(.reset_event);
const scope = std.log.scoped(.coro);
while (self.state != .tear_down) {
while (true) { // this task gets cleaned up from deinit
_ = self.tick(.blocking) catch |err| switch (err) {
error.NoDevice => unreachable,
error.SystemOutdated => unreachable,
Expand Down

0 comments on commit 17a13f7

Please sign in to comment.