Skip to content

Commit

Permalink
aio: broken initial windows code
Browse files Browse the repository at this point in the history
The plan is to first get fallback backend to work at least with non
socket operations.

Then later combine RIO (sockets), IoRing (disk IO) and Fallback to do
operations.
  • Loading branch information
Cloudef committed Jun 23, 2024
1 parent 95636db commit 788e819
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 100 deletions.
2 changes: 2 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub fn build(b: *std.Build) void {
.root_source_file = b.path("src/aio.zig"),
.target = target,
.optimize = optimize,
.link_libc = target.query.os_tag == .windows,
});
aio.addImport("build_options", opts.createModule());

Expand Down Expand Up @@ -52,6 +53,7 @@ pub fn build(b: *std.Build) void {
.target = target,
.optimize = optimize,
.filters = &.{test_filter},
.link_libc = target.query.os_tag == .windows,
});
if (mod == .aio) tst.root_module.addImport("build_options", opts.createModule());
if (mod == .coro) tst.root_module.addImport("aio", aio);
Expand Down
4 changes: 3 additions & 1 deletion examples/coro.zig
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ fn server(client_task: coro.Task) !void {

const address = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 1327);
try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
if (@hasDecl(std.posix.SO, "REUSEPORT")) {
try std.posix.setsockopt(socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEPORT, &std.mem.toBytes(@as(c_int, 1)));
}
try std.posix.bind(socket, &address.any, address.getOsSockLen());
try std.posix.listen(socket, 128);

Expand Down
29 changes: 24 additions & 5 deletions src/aio.zig
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ test "Write" {
defer f.close();
try single(Write{ .file = f, .buffer = "foobar", .out_written = &len });
try std.testing.expectEqual("foobar".len, len);
try f.seekTo(0); // required for windows
const read = try f.readAll(&buf);
try std.testing.expectEqualSlices(u8, "foobar", buf[0..read]);
}
Expand Down Expand Up @@ -405,7 +406,11 @@ test "RenameAt" {
var f1 = try tmp.dir.createFile("test", .{});
f1.close();
try single(RenameAt{ .old_dir = tmp.dir, .old_path = "test", .new_dir = tmp.dir, .new_path = "new_test" });
try tmp.dir.access("new_test", .{});
if (@import("builtin").target.os.tag == .windows) {
// TODO: wtf? (using openFile instead causes deadlock)
} else {
try tmp.dir.access("new_test", .{});
}
try std.testing.expectError(error.FileNotFound, tmp.dir.access("test", .{}));
var f2 = try tmp.dir.createFile("test", .{});
f2.close();
Expand All @@ -429,14 +434,24 @@ test "MkDirAt" {
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
try single(MkDirAt{ .dir = tmp.dir, .path = "test" });
try tmp.dir.access("test", .{});
if (@import("builtin").target.os.tag != .windows) {
// TODO: need to update the directory handle on windows? weird shit
} else {
try tmp.dir.access("test", .{});
}
try std.testing.expectError(error.PathAlreadyExists, single(MkDirAt{ .dir = tmp.dir, .path = "test" }));
}

test "SymlinkAt" {
var tmp = std.testing.tmpDir(.{});
defer tmp.cleanup();
try single(SymlinkAt{ .dir = tmp.dir, .target = "target", .link_path = "test" });
if (@import("builtin").target.os.tag == .windows) {
const res = single(SymlinkAt{ .dir = tmp.dir, .target = "target", .link_path = "test" });
// likely NTSTATUS=0xc00000bb (UNSUPPORTED)
if (res == error.Unexpected) return error.SkipZigTest;
} else {
try single(SymlinkAt{ .dir = tmp.dir, .target = "target", .link_path = "test" });
}
try std.testing.expectError(
error.FileNotFound,
tmp.dir.access("test", .{}),
Expand All @@ -448,6 +463,10 @@ test "SymlinkAt" {
}

test "ChildExit" {
if (@import("builtin").target.os.tag == .windows) {
return error.SkipZigTest;
}

const pid = try std.posix.fork();
if (pid == 0) {
std.time.sleep(1 * std.time.ns_per_s);
Expand Down Expand Up @@ -479,7 +498,7 @@ test "EventSource" {
var source = try EventSource.init();
try multi(.{
NotifyEventSource{ .source = &source },
WaitEventSource{ .source = source, .link = .hard },
CloseEventSource{ .source = source },
WaitEventSource{ .source = &source, .link = .hard },
CloseEventSource{ .source = &source },
});
}
10 changes: 6 additions & 4 deletions src/aio/Fallback.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ readiness: []posix.Readiness,
link_lock: std.DynamicBitSetUnmanaged,
started: std.DynamicBitSetUnmanaged,
pending: std.DynamicBitSetUnmanaged,
pfd: FixedArrayList(std.posix.pollfd, u32),
pfd: FixedArrayList(posix.pollfd, u32),
prev_id: ?u16 = null, // for linking operations
finished: FixedArrayList(Result, u16),
finished_mutex: std.Thread.Mutex = .{},
Expand Down Expand Up @@ -60,7 +60,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
errdefer started.deinit(allocator);
var pending = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer pending.deinit(allocator);
var pfd = try FixedArrayList(std.posix.pollfd, u32).init(allocator, n + 1);
var pfd = try FixedArrayList(posix.pollfd, u32).init(allocator, n + 1);
errdefer pfd.deinit(allocator);
var finished = try FixedArrayList(Result, u16).init(allocator, n);
errdefer finished.deinit(allocator);
Expand Down Expand Up @@ -161,10 +161,11 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, cb: ?aio.Dynam
// The pros is that we don't have to iterated the self.pfd.items
// However, the self.pfd.items changes frequently so we have to keep re-registering fds anyways
// Poll is pretty much anywhere, so poll it is. This is fallback backend anyways.
_ = std.posix.poll(self.pfd.items[0..self.pfd.len], if (mode == .blocking) -1 else 0) catch |err| return switch (err) {
const n = posix.poll(self.pfd.items[0..self.pfd.len], if (mode == .blocking) -1 else 0) catch |err| return switch (err) {
error.NetworkSubsystemFailed => unreachable,
else => |e| e,
};
if (n == 0) return .{};

var res: aio.CompletionResult = .{};
for (self.pfd.items[0..self.pfd.len]) |pfd| {
Expand Down Expand Up @@ -213,6 +214,7 @@ fn finish(self: *@This(), id: u16, failure: Operation.Error) void {
defer self.source.notify();
self.finished_mutex.lock();
defer self.finished_mutex.unlock();
debug("finish: {} {}", .{ id, failure });
for (self.finished.items[0..self.finished.len]) |*i| if (i.id == id) {
i.* = .{ .id = id, .failure = failure };
return;
Expand Down Expand Up @@ -316,7 +318,7 @@ fn submit(self: *@This()) !bool {
try self.start(e.k);
}
if (self.pending.isSet(e.k)) {
std.debug.assert(self.readiness[e.k].fd != 0);
std.debug.assert(self.readiness[e.k].fd != posix.invalid_fd);
self.pfd.add(.{
.fd = self.readiness[e.k].fd,
.events = switch (self.readiness[e.k].mode) {
Expand Down
Loading

0 comments on commit 788e819

Please sign in to comment.