Skip to content

Commit

Permalink
posix: fix darwin build
Browse files Browse the repository at this point in the history
  • Loading branch information
Cloudef committed Feb 5, 2025
1 parent 1d8a1c6 commit 9b165ea
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions src/aio/Posix.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ const needs_kludge = switch (builtin.target.os.tag) {
};

const single_threaded = builtin.single_threaded or aio.options.max_threads == 1;
const needs_source = !single_threaded or needs_kludge;

tqueue: TimerQueue, // timer queue implementing linux -like timers
pfd: if (!single_threaded) FixedArrayList(std.posix.pollfd, u32) else FixedArrayList(std.posix.pollfd, u16), // current fds that we must poll for wakeup
pfd: if (needs_source) FixedArrayList(std.posix.pollfd, u32) else FixedArrayList(std.posix.pollfd, u16), // current fds that we must poll for wakeup
pid: FixedArrayList(aio.Id, u16), // maps pfd to id
posix_pool: if (!single_threaded) DynamicThreadPool else void, // thread pool for performing operations, not all operations will be performed here
kludge_pool: if (needs_kludge) DynamicThreadPool else void, // thread pool for performing operations which can't be polled for readiness
kludge_pool: if (needs_kludge and !builtin.single_threaded) DynamicThreadPool else void, // thread pool for performing operations which can't be polled for readiness
pending: std.DynamicBitSetUnmanaged, // operation is pending on readiness fd (poll)
in_flight: std.DynamicBitSetUnmanaged, // operation is executing and can't be canceled
source: if (!single_threaded) EventSource else void, // when threaded operations finish, they signal it using this event source
source: if (needs_source) EventSource else void, // when threaded operations finish, they signal it using this event source
signaled: bool = false, // some operations have signaled immediately, optimization to avoid running poll when not required
uringlator: Uringlator,

Expand All @@ -42,9 +43,9 @@ pub fn isSupported(_: []const Operation) bool {
pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
var tqueue = try TimerQueue.init(allocator);
errdefer tqueue.deinit();
var pfd = switch (single_threaded) {
true => try FixedArrayList(std.posix.pollfd, u16).init(allocator, n),
false => try FixedArrayList(std.posix.pollfd, u32).init(allocator, @as(u32, @intCast(n)) + 1),
var pfd = switch (needs_source) {
true => try FixedArrayList(std.posix.pollfd, u32).init(allocator, @as(u32, @intCast(n)) + 1),
false => try FixedArrayList(std.posix.pollfd, u16).init(allocator, n),
};
errdefer pfd.deinit(allocator);
var pid = try FixedArrayList(aio.Id, u16).init(allocator, n);
Expand All @@ -61,7 +62,7 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
},
};
errdefer if (!single_threaded) posix_pool.deinit();
var kludge_pool = switch (needs_kludge) {
var kludge_pool = switch (needs_kludge and !builtin.single_threaded) {
// Kludge threads are used when operation cannot be polled for readiness.
// One example is macos's /dev/tty which can only be queried for readiness using select/pselect.
// <https://lists.apple.com/archives/Darwin-dev/2006/Apr/msg00066.html>
Expand All @@ -77,16 +78,16 @@ pub fn init(allocator: std.mem.Allocator, n: u16) aio.Error!@This() {
},
false => {},
};
errdefer if (needs_kludge) kludge_pool.deinit();
errdefer if (needs_kludge and !builtin.single_threaded) kludge_pool.deinit();
var pending_set = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer pending_set.deinit(allocator);
var in_flight_set = try std.DynamicBitSetUnmanaged.initEmpty(allocator, n);
errdefer in_flight_set.deinit(allocator);
var uringlator = try Uringlator.init(allocator, n);
errdefer uringlator.deinit(allocator);
var source = if (!single_threaded) try EventSource.init() else {};
errdefer if (!single_threaded) source.deinit();
if (!single_threaded) {
var source = if (needs_source) try EventSource.init() else {};
errdefer if (needs_source) source.deinit();
if (needs_source) {
pfd.add(.{ .fd = source.fd, .events = std.posix.POLL.IN, .revents = 0 }) catch unreachable;
}
return .{
Expand All @@ -106,12 +107,12 @@ pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
self.uringlator.shutdown(self);
self.tqueue.deinit();
if (!single_threaded) self.posix_pool.deinit();
if (needs_kludge) self.kludge_pool.deinit();
if (needs_kludge and !builtin.single_threaded) self.kludge_pool.deinit();
self.pfd.deinit(allocator);
self.pid.deinit(allocator);
self.pending.deinit(allocator);
self.in_flight.deinit(allocator);
if (!single_threaded) self.source.deinit();
if (needs_source) self.source.deinit();
self.uringlator.deinit(allocator);
self.* = undefined;
}
Expand Down Expand Up @@ -161,24 +162,24 @@ pub fn complete(self: *@This(), mode: aio.Dynamic.CompletionMode, handler: anyty
off = pid;
handled += 1;

if (!single_threaded and pfd.fd == self.source.fd) {
if (needs_source and pfd.fd == self.source.fd) {
std.debug.assert(pid == 0);
std.debug.assert(pfd.revents & std.posix.POLL.NVAL == 0);
std.debug.assert(pfd.revents & std.posix.POLL.ERR == 0);
std.debug.assert(pfd.revents & std.posix.POLL.HUP == 0);
self.source.waitNonBlocking() catch break;
self.signaled = true; // threaded operation finished
} else {
std.debug.assert(pid >= @intFromBool(!single_threaded));
const id = self.pid.constSlice()[pid - @intFromBool(!single_threaded)];
std.debug.assert(pid >= @intFromBool(needs_source));
const id = self.pid.constSlice()[pid - @intFromBool(needs_source)];
const readiness = self.uringlator.ops.getOne(.readiness, id);
std.debug.assert(pfd.fd == readiness.fd);
std.debug.assert(pfd.events == readinessToPollEvents(readiness));
std.debug.assert(self.pending.isSet(id.slot));

// do not poll this fd again
self.pfd.swapRemove(@truncate(pid));
self.pid.swapRemove(@truncate(pid - @intFromBool(!single_threaded)));
self.pid.swapRemove(@truncate(pid - @intFromBool(needs_source)));

const op_type = self.uringlator.ops.getOne(.type, id);
if (pfd.revents & std.posix.POLL.ERR != 0 or pfd.revents & std.posix.POLL.NVAL != 0) {
Expand Down Expand Up @@ -408,8 +409,7 @@ pub fn uringlator_start(self: *@This(), id: aio.Id, op_type: Operation) !void {
const state = self.uringlator.ops.getOnePtr(.state, id);
const result = self.uringlator.ops.getOne(.out_result, id);
const readiness = self.uringlator.ops.getOne(.readiness, id);
if (needs_kludge and tag == .read_tty) {
@branchHint(.unlikely);
if (needs_kludge and !builtin.single_threaded and tag == .read_tty) {
try self.kludge_pool.spawn(blockingPosixExecutor, .{ self, tag, state.toOp(tag, result), id, readiness, .thread_safe });
}
if (comptime builtin.target.os.tag == .wasi) {
Expand Down Expand Up @@ -503,7 +503,7 @@ pub fn uringlator_cancel(self: *@This(), id: aio.Id, op_type: Operation, err: Op
},
else => if (self.pending.isSet(id.slot) and !self.in_flight.isSet(id.slot)) {
const readiness = self.uringlator.ops.getOnePtr(.readiness, id);
const off = @intFromBool(!single_threaded);
const off = @intFromBool(needs_source);
for (self.pfd.constSlice()[off..], self.pid.constSlice(), off..) |pfd, pid, idx| {
if (pid != id) continue;
std.debug.assert(pfd.fd == readiness.fd);
Expand Down

0 comments on commit 9b165ea

Please sign in to comment.