diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 96c896b..030b621 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -91,10 +91,10 @@ jobs: - name: Install zig uses: goto-bus-stop/setup-zig@v2 with: - version: 0.11.0-dev.2154+2089b3f19 + version: 0.12.0-dev.256+8b74eae9c - name: test - run: zig build test -fsummary + run: zig build test --summary all - name: build all benchmarks and examples - run: zig build -Dexample -Dbench -fsummary + run: zig build -Dexample -Dbench --summary all diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig index bcc69cb..b064876 100644 --- a/src/backend/iocp.zig +++ b/src/backend/iocp.zig @@ -186,7 +186,7 @@ pub const Loop = struct { // The list of entry that will be filled with a call to GetQueuedCompletionStatusEx. var entries: [128]windows.OVERLAPPED_ENTRY = undefined; - var wait_rem = @intCast(usize, wait); + var wait_rem = @as(usize, @intCast(wait)); // Handle all of our cancellations first because we may be able to stop submissions from // even happening if its still queued. Plus, cancellations sometimes add more to the @@ -315,7 +315,7 @@ pub const Loop = struct { for (entries[0..count]) |entry| { // We retrieve the Completion from the OVERLAPPED pointer as we know it's a part of // the Completion struct. - const overlapped_ptr: ?*windows.OVERLAPPED = @ptrCast(?*windows.OVERLAPPED, entry.lpOverlapped); + const overlapped_ptr: ?*windows.OVERLAPPED = @as(?*windows.OVERLAPPED, @ptrCast(entry.lpOverlapped)); if (overlapped_ptr == null) { // Probably an async wakeup continue; @@ -351,7 +351,7 @@ pub const Loop = struct { /// /// QueryPerformanceCounter is used to get the current timestamp. pub fn now(self: *Loop) i64 { - return @intCast(i64, self.cached_now); + return @as(i64, @intCast(self.cached_now)); } /// Update the cached time. @@ -464,11 +464,11 @@ pub const Loop = struct { var addr: std.os.sockaddr.storage = undefined; var addr_len: i32 = @sizeOf(std.os.sockaddr.storage); - std.debug.assert(windows.ws2_32.getsockname(asSocket(v.socket), @ptrCast(*std.os.sockaddr, &addr), &addr_len) == 0); + std.debug.assert(windows.ws2_32.getsockname(asSocket(v.socket), @as(*std.os.sockaddr, @ptrCast(&addr)), &addr_len) == 0); var socket_type: i32 = 0; var socket_type_bytes = std.mem.asBytes(&socket_type); - var opt_len: i32 = @intCast(i32, socket_type_bytes.len); + var opt_len: i32 = @as(i32, @intCast(socket_type_bytes.len)); std.debug.assert(windows.ws2_32.getsockopt(asSocket(v.socket), std.os.SOL.SOCKET, std.os.SO.TYPE, socket_type_bytes, &opt_len) == 0); v.internal_accept_socket = windows.WSASocketW(addr.family, socket_type, 0, null, 0, windows.ws2_32.WSA_FLAG_OVERLAPPED) catch |err| { @@ -485,7 +485,7 @@ pub const Loop = struct { &v.storage, 0, 0, - @intCast(u32, @sizeOf(std.os.sockaddr.storage)), + @as(u32, @intCast(@sizeOf(std.os.sockaddr.storage))), &discard, &completion.overlapped, ); @@ -506,7 +506,7 @@ pub const Loop = struct { .close => |v| .{ .result = .{ .close = windows.CloseHandle(v.fd) } }, .connect => |*v| action: { - const result = windows.ws2_32.connect(asSocket(v.socket), &v.addr.any, @intCast(i32, v.addr.getOsSockLen())); + const result = windows.ws2_32.connect(asSocket(v.socket), &v.addr.any, @as(i32, @intCast(v.addr.getOsSockLen()))); if (result != 0) { const err = windows.ws2_32.WSAGetLastError(); break :action switch (err) { @@ -529,6 +529,21 @@ pub const Loop = struct { }; }, + .pread => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset = @intCast(v.offset & 0xFFFF_FFFF_FFFF_FFFF); + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh = @intCast(v.offset >> 32); + break :action if (windows.exp.ReadFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .pread = err }, + }; + }, + .shutdown => |*v| .{ .result = .{ .shutdown = std.os.shutdown(asSocket(v.socket), v.how) } }, .write => |*v| action: { @@ -544,13 +559,28 @@ pub const Loop = struct { }; }, + .pwrite => |*v| action: { + self.associate_fd(completion.handle().?) catch unreachable; + var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.Offset = @intCast(v.offset & 0xFFFF_FFFF_FFFF_FFFF); + completion.overlapped.DUMMYUNIONNAME.DUMMYSTRUCTNAME.OffsetHigh = @intCast(v.offset >> 32); + break :action if (windows.exp.WriteFile(v.fd, buffer, &completion.overlapped)) |_| + .{ + .submitted = {}, + } + else |err| + .{ + .result = .{ .pwrite = err }, + }; + }, + .send => |*v| action: { self.associate_fd(completion.handle().?) catch unreachable; var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; - v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @intCast(u32, buffer.len) }; + v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @as(u32, @intCast(buffer.len)) }; const result = windows.ws2_32.WSASend( asSocket(v.fd), - @ptrCast([*]windows.ws2_32.WSABUF, &v.wsa_buffer), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), 1, null, 0, @@ -570,13 +600,13 @@ pub const Loop = struct { .recv => |*v| action: { self.associate_fd(completion.handle().?) catch unreachable; var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; - v.wsa_buffer = .{ .buf = buffer.ptr, .len = @intCast(u32, buffer.len) }; + v.wsa_buffer = .{ .buf = buffer.ptr, .len = @as(u32, @intCast(buffer.len)) }; var flags: u32 = 0; const result = windows.ws2_32.WSARecv( asSocket(v.fd), - @ptrCast([*]windows.ws2_32.WSABUF, &v.wsa_buffer), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), 1, null, &flags, @@ -596,15 +626,15 @@ pub const Loop = struct { .sendto => |*v| action: { self.associate_fd(completion.handle().?) catch unreachable; var buffer: []const u8 = if (v.buffer == .slice) v.buffer.slice else v.buffer.array.array[0..v.buffer.array.len]; - v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @intCast(u32, buffer.len) }; + v.wsa_buffer = .{ .buf = @constCast(buffer.ptr), .len = @as(u32, @intCast(buffer.len)) }; const result = windows.ws2_32.WSASendTo( asSocket(v.fd), - @ptrCast([*]windows.ws2_32.WSABUF, &v.wsa_buffer), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), 1, null, 0, &v.addr.any, - @intCast(i32, v.addr.getOsSockLen()), + @as(i32, @intCast(v.addr.getOsSockLen())), &completion.overlapped, null, ); @@ -621,18 +651,18 @@ pub const Loop = struct { .recvfrom => |*v| action: { self.associate_fd(completion.handle().?) catch unreachable; var buffer: []u8 = if (v.buffer == .slice) v.buffer.slice else &v.buffer.array; - v.wsa_buffer = .{ .buf = buffer.ptr, .len = @intCast(u32, buffer.len) }; + v.wsa_buffer = .{ .buf = buffer.ptr, .len = @as(u32, @intCast(buffer.len)) }; var flags: u32 = 0; const result = windows.ws2_32.WSARecvFrom( asSocket(v.fd), - @ptrCast([*]windows.ws2_32.WSABUF, &v.wsa_buffer), + @as([*]windows.ws2_32.WSABUF, @ptrCast(&v.wsa_buffer)), 1, null, &flags, &v.addr, - @ptrCast(*i32, &v.addr_size), + @as(*i32, @ptrCast(&v.addr_size)), &completion.overlapped, null, ); @@ -727,7 +757,7 @@ pub const Loop = struct { } }, - inline .read, .write, .recv, .send, .sendto, .recvfrom => |*v| { + inline .read, .pread, .write, .pwrite, .recv, .send, .sendto, .recvfrom => |*v| { if (completion.flags.state == .active) { const result = windows.kernel32.CancelIoEx(asSocket(v.fd), &completion.overlapped); cancel_result.?.* = if (result == windows.FALSE) @@ -777,7 +807,7 @@ pub const Loop = struct { /// Convenience to convert from windows.HANDLE to windows.ws2_32.SOCKET (which are the same thing). inline fn asSocket(h: windows.HANDLE) windows.ws2_32.SOCKET { - return @ptrCast(windows.ws2_32.SOCKET, h); + return @as(windows.ws2_32.SOCKET, @ptrCast(h)); } /// A completion is a request to perform some work with the loop. @@ -854,7 +884,7 @@ pub const Completion = struct { fn handle(self: Completion) ?windows.HANDLE { return switch (self.op) { inline .accept => |*v| v.socket, - inline .read, .write, .recv, .send, .recvfrom, .sendto => |*v| v.fd, + inline .read, .pread, .write, .pwrite, .recv, .send, .recvfrom, .sendto => |*v| v.fd, else => null, }; } @@ -898,7 +928,20 @@ pub const Completion = struct { else => error.Unexpected, } }; } - break :r .{ .read = @intCast(usize, bytes_transferred) }; + break :r .{ .read = @as(usize, @intCast(bytes_transferred)) }; + }, + + .pread => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .read = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .pread = @as(usize, @intCast(bytes_transferred)) }; }, .write => |*v| r: { @@ -911,7 +954,20 @@ pub const Completion = struct { else => error.Unexpected, } }; } - break :r .{ .write = @intCast(usize, bytes_transferred) }; + break :r .{ .write = @as(usize, @intCast(bytes_transferred)) }; + }, + + .pwrite => |*v| r: { + var bytes_transferred: windows.DWORD = 0; + const result = windows.kernel32.GetOverlappedResult(v.fd, &self.overlapped, &bytes_transferred, windows.FALSE); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + break :r .{ .write = switch (err) { + windows.Win32Error.OPERATION_ABORTED => error.Canceled, + else => error.Unexpected, + } }; + } + break :r .{ .pwrite = @as(usize, @intCast(bytes_transferred)) }; }, .send => |*v| r: { @@ -929,7 +985,7 @@ pub const Completion = struct { }, }; } - break :r .{ .send = @intCast(usize, bytes_transferred) }; + break :r .{ .send = @as(usize, @intCast(bytes_transferred)) }; }, .recv => |*v| r: { @@ -952,7 +1008,7 @@ pub const Completion = struct { const socket_type = t: { var socket_type: windows.DWORD = 0; var socket_type_bytes = std.mem.asBytes(&socket_type); - var opt_len: i32 = @intCast(i32, socket_type_bytes.len); + var opt_len: i32 = @as(i32, @intCast(socket_type_bytes.len)); // Here we assume the call will succeed because the socket should be valid. std.debug.assert(windows.ws2_32.getsockopt(asSocket(v.fd), std.os.SOL.SOCKET, std.os.SO.TYPE, socket_type_bytes, &opt_len) == 0); @@ -963,7 +1019,7 @@ pub const Completion = struct { break :r .{ .recv = error.EOF }; } - break :r .{ .recv = @intCast(usize, bytes_transferred) }; + break :r .{ .recv = @as(usize, @intCast(bytes_transferred)) }; }, .sendto => |*v| r: { @@ -981,7 +1037,7 @@ pub const Completion = struct { }, }; } - break :r .{ .sendto = @intCast(usize, bytes_transferred) }; + break :r .{ .sendto = @as(usize, @intCast(bytes_transferred)) }; }, .recvfrom => |*v| r: { @@ -999,7 +1055,7 @@ pub const Completion = struct { }, }; } - break :r .{ .recvfrom = @intCast(usize, bytes_transferred) }; + break :r .{ .recvfrom = @as(usize, @intCast(bytes_transferred)) }; }, .async_wait => .{ .async_wait = {} }, @@ -1036,12 +1092,18 @@ pub const OperationType = enum { /// Read read, + /// Pread + pread, + /// Shutdown all or part of a full-duplex connection. shutdown, /// Write write, + /// Pwrite + pwrite, + /// Send send, @@ -1079,18 +1141,24 @@ pub const Operation = union(OperationType) { internal_accept_socket: ?windows.HANDLE = null, }, + close: struct { + fd: windows.HANDLE, + }, + connect: struct { socket: windows.HANDLE, addr: std.net.Address, }, - close: struct { + read: struct { fd: windows.HANDLE, + buffer: ReadBuffer, }, - read: struct { + pread: struct { fd: windows.HANDLE, buffer: ReadBuffer, + offset: u64, }, shutdown: struct { @@ -1098,13 +1166,17 @@ pub const Operation = union(OperationType) { how: std.os.ShutdownHow = .both, }, - timer: Timer, - write: struct { fd: windows.HANDLE, buffer: WriteBuffer, }, + pwrite: struct { + fd: windows.HANDLE, + buffer: WriteBuffer, + offset: u64, + }, + send: struct { fd: windows.HANDLE, buffer: WriteBuffer, @@ -1132,6 +1204,8 @@ pub const Operation = union(OperationType) { wsa_buffer: windows.ws2_32.WSABUF = undefined, }, + timer: Timer, + cancel: struct { c: *Completion, }, @@ -1146,16 +1220,18 @@ pub const Operation = union(OperationType) { pub const Result = union(OperationType) { noop: void, accept: AcceptError!windows.HANDLE, - connect: ConnectError!void, close: CloseError!void, + connect: ConnectError!void, read: ReadError!usize, + pread: ReadError!usize, shutdown: ShutdownError!void, - timer: TimerError!TimerTrigger, write: WriteError!usize, + pwrite: WriteError!usize, send: WriteError!usize, recv: ReadError!usize, sendto: WriteError!usize, recvfrom: ReadError!usize, + timer: TimerError!TimerTrigger, cancel: CancelError!void, async_wait: AsyncError!void, }; @@ -1301,7 +1377,7 @@ test "iocp: stop" { fn callback(ud: ?*anyopaque, l: *xev.Loop, _: *xev.Completion, r: xev.Result) xev.CallbackAction { _ = l; _ = r; - const b = @ptrCast(*bool, ud.?); + const b = @as(*bool, @ptrCast(ud.?)); b.* = true; return .disarm; } @@ -1335,7 +1411,7 @@ test "iocp: timer" { ) xev.CallbackAction { _ = l; _ = r; - const b = @ptrCast(*bool, ud.?); + const b = @as(*bool, @ptrCast(ud.?)); b.* = true; return .disarm; } @@ -1353,7 +1429,7 @@ test "iocp: timer" { ) xev.CallbackAction { _ = l; _ = r; - const b = @ptrCast(*bool, ud.?); + const b = @as(*bool, @ptrCast(ud.?)); b.* = true; return .disarm; } @@ -1387,7 +1463,7 @@ test "iocp: timer reset" { r: xev.Result, ) xev.CallbackAction { _ = l; - const v = @ptrCast(*?TimerTrigger, ud.?); + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); v.* = r.timer catch unreachable; return .disarm; } @@ -1429,7 +1505,7 @@ test "iocp: timer reset before tick" { r: xev.Result, ) xev.CallbackAction { _ = l; - const v = @ptrCast(*?TimerTrigger, ud.?); + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); v.* = r.timer catch unreachable; return .disarm; } @@ -1467,7 +1543,7 @@ test "iocp: timer reset after trigger" { r: xev.Result, ) xev.CallbackAction { _ = l; - const v = @ptrCast(*?TimerTrigger, ud.?); + const v = @as(*?TimerTrigger, @ptrCast(ud.?)); v.* = r.timer catch unreachable; return .disarm; } @@ -1514,7 +1590,7 @@ test "iocp: timer cancellation" { r: xev.Result, ) xev.CallbackAction { _ = l; - const ptr = @ptrCast(*?TimerTrigger, @alignCast(@alignOf(?TimerTrigger), ud.?)); + const ptr: *?TimerTrigger = @ptrCast(@alignCast(ud.?)); ptr.* = r.timer catch unreachable; return .disarm; } @@ -1544,7 +1620,7 @@ test "iocp: timer cancellation" { _ = l; _ = c; _ = r.cancel catch unreachable; - const ptr = @ptrCast(*bool, @alignCast(@alignOf(bool), ud.?)); + const ptr: *bool = @ptrCast(@alignCast(ud.?)); ptr.* = true; return .disarm; } @@ -1575,7 +1651,7 @@ test "iocp: canceling a completed operation" { r: xev.Result, ) xev.CallbackAction { _ = l; - const ptr = @ptrCast(*?TimerTrigger, @alignCast(@alignOf(?TimerTrigger), ud.?)); + const ptr: *?TimerTrigger = @ptrCast(@alignCast(ud.?)); ptr.* = r.timer catch unreachable; return .disarm; } @@ -1605,7 +1681,7 @@ test "iocp: canceling a completed operation" { _ = l; _ = c; _ = r.cancel catch unreachable; - const ptr = @ptrCast(*bool, @alignCast(@alignOf(bool), ud.?)); + const ptr: *bool = @ptrCast(@alignCast(ud.?)); ptr.* = true; return .disarm; } @@ -1635,7 +1711,7 @@ test "iocp: file IO" { var loop = try Loop.init(.{}); defer loop.deinit(); - const utf16_file_name = (try windows.sliceToPrefixedFileW("test_watcher_file")).span(); + const utf16_file_name = (try windows.sliceToPrefixedFileW(null, "test_watcher_file")).span(); const f_handle = try windows.exp.CreateFile(utf16_file_name, windows.GENERIC_READ | windows.GENERIC_WRITE, 0, null, windows.OPEN_ALWAYS, windows.FILE_FLAG_OVERLAPPED, null); defer windows.exp.DeleteFile(utf16_file_name) catch {}; @@ -1690,7 +1766,7 @@ test "iocp: file IO" { ) xev.CallbackAction { _ = l; _ = c; - const ptr = @ptrCast(*usize, @alignCast(@alignOf(usize), ud.?)); + const ptr: *usize = @ptrCast(@alignCast(ud.?)); ptr.* = r.read catch unreachable; return .disarm; } @@ -1703,6 +1779,82 @@ test "iocp: file IO" { try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); } +test "iocp: file IO with offset" { + const testing = std.testing; + + var loop = try Loop.init(.{}); + defer loop.deinit(); + + const utf16_file_name = (try windows.sliceToPrefixedFileW(null, "test_watcher_file")).span(); + + const f_handle = try windows.exp.CreateFile(utf16_file_name, windows.GENERIC_READ | windows.GENERIC_WRITE, 0, null, windows.OPEN_ALWAYS, windows.FILE_FLAG_OVERLAPPED, null); + defer windows.exp.DeleteFile(utf16_file_name) catch {}; + defer windows.CloseHandle(f_handle); + + // Perform a write and then a read + var write_buf = [_]u8{ 1, 1, 2, 3, 5, 8, 13 }; + var c_write: xev.Completion = .{ + .op = .{ + .pwrite = .{ + .fd = f_handle, + .buffer = .{ .slice = &write_buf }, + .offset = 1, + }, + }, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = ud; + _ = l; + _ = c; + _ = r.pwrite catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_write); + + // Wait for the write + try loop.run(.until_done); + + // Read + var read_buf: [128]u8 = undefined; + var read_len: usize = 0; + var c_read: xev.Completion = .{ + .op = .{ + .pread = .{ + .fd = f_handle, + .buffer = .{ .slice = &read_buf }, + .offset = 1, + }, + }, + .userdata = &read_len, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l: *xev.Loop, + c: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + _ = l; + _ = c; + const ptr: *usize = @ptrCast(@alignCast(ud.?)); + ptr.* = r.pread catch unreachable; + return .disarm; + } + }).callback, + }; + loop.add(&c_read); + + // Wait for the read + try loop.run(.until_done); + try testing.expectEqualSlices(u8, &write_buf, read_buf[0..read_len]); +} + test "iocp: socket accept/connect/send/recv/close" { const mem = std.mem; const net = std.net; @@ -1743,7 +1895,7 @@ test "iocp: socket accept/connect/send/recv/close" { ) xev.CallbackAction { _ = l; _ = c; - const conn = @ptrCast(*Result, @alignCast(@alignOf(Result), ud.?)); + const conn: *Result = @ptrCast(@alignCast(ud.?)); conn.* = r; return .disarm; } @@ -1772,7 +1924,7 @@ test "iocp: socket accept/connect/send/recv/close" { _ = l; _ = c; _ = r.connect catch unreachable; - const b = @ptrCast(*bool, ud.?); + const b = @as(*bool, @ptrCast(ud.?)); b.* = true; return .disarm; } @@ -1833,7 +1985,7 @@ test "iocp: socket accept/connect/send/recv/close" { ) xev.CallbackAction { _ = l; _ = c; - const ptr = @ptrCast(*usize, @alignCast(@alignOf(usize), ud.?)); + const ptr: *usize = @ptrCast(@alignCast(ud.?)); ptr.* = r.recv catch unreachable; return .disarm; } @@ -1865,7 +2017,7 @@ test "iocp: socket accept/connect/send/recv/close" { _ = l; _ = c; _ = r.shutdown catch unreachable; - const ptr = @ptrCast(*bool, @alignCast(@alignOf(bool), ud.?)); + const ptr: *bool = @ptrCast(@alignCast(ud.?)); ptr.* = true; return .disarm; } @@ -1895,7 +2047,7 @@ test "iocp: socket accept/connect/send/recv/close" { ) xev.CallbackAction { _ = l; _ = c; - const ptr = @ptrCast(*?bool, @alignCast(@alignOf(?bool), ud.?)); + const ptr: *?bool = @ptrCast(@alignCast(ud.?)); ptr.* = if (r.recv) |_| false else |err| switch (err) { error.EOF => true, else => false, @@ -1929,7 +2081,7 @@ test "iocp: socket accept/connect/send/recv/close" { _ = l; _ = c; _ = r.close catch unreachable; - const ptr = @ptrCast(*bool, @alignCast(@alignOf(bool), ud.?)); + const ptr: *bool = @ptrCast(@alignCast(ud.?)); ptr.* = true; return .disarm; } @@ -1956,7 +2108,7 @@ test "iocp: socket accept/connect/send/recv/close" { _ = l; _ = c; _ = r.close catch unreachable; - const ptr = @ptrCast(*bool, @alignCast(@alignOf(bool), ud.?)); + const ptr: *bool = @ptrCast(@alignCast(ud.?)); ptr.* = true; return .disarm; } @@ -2006,7 +2158,7 @@ test "iocp: recv cancellation" { ) xev.CallbackAction { _ = l; _ = c; - const ptr = @ptrCast(*Result, @alignCast(@alignOf(Result), ud.?)); + const ptr: *Result = @ptrCast(@alignCast(ud.?)); ptr.* = r; return .disarm; } @@ -2078,7 +2230,7 @@ test "iocp: accept cancellation" { ) xev.CallbackAction { _ = l; _ = c; - const conn = @ptrCast(*Result, @alignCast(@alignOf(Result), ud.?)); + const conn: *Result = @ptrCast(@alignCast(ud.?)); conn.* = r; return .disarm; } diff --git a/src/watcher/async.zig b/src/watcher/async.zig index 0ec0277..e2dc579 100644 --- a/src/watcher/async.zig +++ b/src/watcher/async.zig @@ -434,7 +434,7 @@ fn AsyncIOCP(comptime xev: type) type { r: xev.Result, ) xev.CallbackAction { return @call(.always_inline, cb, .{ - @ptrCast(?*Userdata, @alignCast(@max(1, @alignOf(Userdata)), ud)), + common.userdataValue(Userdata, ud), l_inner, c_inner, if (r.async_wait) |_| {} else |err| err, diff --git a/src/watcher/file.zig b/src/watcher/file.zig index 4f75682..04a5be2 100644 --- a/src/watcher/file.zig +++ b/src/watcher/file.zig @@ -30,7 +30,7 @@ pub fn File(comptime xev: type) type { const FdType = if (xev.backend == .iocp) os.windows.HANDLE else os.socket_t; /// The underlying file - fd: std.os.fd_t, + fd: FdType, pub usingnamespace stream.Stream(xev, Self, .{ .close = true, @@ -111,6 +111,7 @@ pub fn File(comptime xev: type) type { switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { @@ -274,6 +275,7 @@ pub fn File(comptime xev: type) type { switch (xev.backend) { .io_uring, .wasi_poll, + .iocp, => {}, .epoll => { @@ -365,6 +367,8 @@ pub fn File(comptime xev: type) type { test "pread/pwrite" { // wasi: local files don't work with poll (always ready) if (builtin.os.tag == .wasi) return error.SkipZigTest; + // windows: std.fs.File is not opened with OVERLAPPED flag. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; diff --git a/src/watcher/tcp.zig b/src/watcher/tcp.zig index 6947656..5969817 100644 --- a/src/watcher/tcp.zig +++ b/src/watcher/tcp.zig @@ -1,4 +1,5 @@ const std = @import("std"); +const builtin = @import("builtin"); const assert = std.debug.assert; const os = std.os; const stream = @import("stream.zig"); @@ -246,7 +247,8 @@ pub fn TCP(comptime xev: type) type { // Retrieve bound port and initialize client var sock_len = address.getOsSockLen(); - try os.getsockname(server.fd, &address.any, &sock_len); + const fd = if (xev.backend == .iocp) @as(os.windows.ws2_32.SOCKET, @ptrCast(server.fd)) else server.fd; + try os.getsockname(fd, &address.any, &sock_len); const client = try Self.init(address); //const address = try std.net.Address.parseIp4("127.0.0.1", 3132); @@ -399,6 +401,8 @@ pub fn TCP(comptime xev: type) type { test "TCP: Queued writes" { // We have no way to get a socket in WASI from a WASI context. if (xev.backend == .wasi_poll) return error.SkipZigTest; + // Windows doesn't seem to respect the SNDBUF socket option. + if (builtin.os.tag == .windows) return error.SkipZigTest; const testing = std.testing; diff --git a/src/watcher/udp.zig b/src/watcher/udp.zig index bbba24f..162a435 100644 --- a/src/watcher/udp.zig +++ b/src/watcher/udp.zig @@ -249,7 +249,7 @@ fn UDPSendtoIOCP(comptime xev: type) type { /// Bind the address to the socket. pub fn bind(self: Self, addr: std.net.Address) !void { - var socket = @ptrCast(windows.ws2_32.SOCKET, self.fd); + var socket = @as(windows.ws2_32.SOCKET, @ptrCast(self.fd)); try os.setsockopt(socket, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); try os.bind(socket, &addr.any, addr.getOsSockLen()); } @@ -273,6 +273,7 @@ fn UDPSendtoIOCP(comptime xev: type) type { l: *xev.Loop, c: *xev.Completion, s: *State, + addr: std.net.Address, s: Self, b: xev.ReadBuffer, r: ReadError!usize, @@ -299,12 +300,13 @@ fn UDPSendtoIOCP(comptime xev: type) type { c_inner: *xev.Completion, r: xev.Result, ) xev.CallbackAction { - const s_inner = @ptrCast(?*State, @alignCast(@alignOf(State), ud)).?; + const s_inner: *State = @ptrCast(@alignCast(ud.?)); return @call(.always_inline, cb, .{ - @ptrCast(?*Userdata, @alignCast(@max(1, @alignOf(Userdata)), s_inner.userdata)), + common.userdataValue(Userdata, s_inner.userdata), l_inner, c_inner, s_inner, + std.net.Address.initPosix(@alignCast(&c_inner.op.recvfrom.addr)), initFd(c_inner.op.recvfrom.fd), c_inner.op.recvfrom.buffer, r.recvfrom, @@ -362,9 +364,9 @@ fn UDPSendtoIOCP(comptime xev: type) type { c_inner: *xev.Completion, r: xev.Result, ) xev.CallbackAction { - const s_inner = @ptrCast(?*State, @alignCast(@alignOf(State), ud)).?; + const s_inner: *State = @ptrCast(@alignCast(ud.?)); return @call(.always_inline, cb, .{ - @ptrCast(?*Userdata, @alignCast(@max(1, @alignOf(Userdata)), s_inner.userdata)), + common.userdataValue(Userdata, s_inner.userdata), l_inner, c_inner, s_inner, diff --git a/src/windows.zig b/src/windows.zig index 02b24fa..fdfd678 100644 --- a/src/windows.zig +++ b/src/windows.zig @@ -33,7 +33,7 @@ pub const exp = struct { overlapped: ?*windows.OVERLAPPED, ) windows.ReadFileError!?usize { var read: windows.DWORD = 0; - const result: windows.BOOL = windows.kernel32.ReadFile(handle, buffer.ptr, @intCast(windows.DWORD, buffer.len), &read, overlapped); + const result: windows.BOOL = windows.kernel32.ReadFile(handle, buffer.ptr, @as(windows.DWORD, @intCast(buffer.len)), &read, overlapped); if (result == windows.FALSE) { const err = windows.kernel32.GetLastError(); return switch (err) { @@ -42,7 +42,7 @@ pub const exp = struct { }; } - return @intCast(usize, read); + return @as(usize, @intCast(read)); } pub fn WriteFile( @@ -51,7 +51,7 @@ pub const exp = struct { overlapped: ?*windows.OVERLAPPED, ) windows.WriteFileError!?usize { var written: windows.DWORD = 0; - const result: windows.BOOL = windows.kernel32.WriteFile(handle, buffer.ptr, @intCast(windows.DWORD, buffer.len), &written, overlapped); + const result: windows.BOOL = windows.kernel32.WriteFile(handle, buffer.ptr, @as(windows.DWORD, @intCast(buffer.len)), &written, overlapped); if (result == windows.FALSE) { const err = windows.kernel32.GetLastError(); return switch (err) { @@ -60,7 +60,7 @@ pub const exp = struct { }; } - return @intCast(usize, written); + return @as(usize, @intCast(written)); } pub const DeleteFileError = error{} || std.os.UnexpectedError;