From 87c3105823fd4348af20e10b2a62f806140b28fd Mon Sep 17 00:00:00 2001 From: kcbanner Date: Sat, 28 Oct 2023 17:59:07 -0400 Subject: [PATCH 1/3] iocp: Add support for JobObjects watcher: Add support for watching processes using IOCP --- src/backend/iocp.zig | 126 +++++++++++++++++++++++++++++++--- src/watcher/process.zig | 146 ++++++++++++++++++++++++++++++++++++++-- src/windows.zig | 138 +++++++++++++++++++++++++++++++++++++ 3 files changed, 396 insertions(+), 14 deletions(-) diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig index 73fa7a2..f5ec9df 100644 --- a/src/backend/iocp.zig +++ b/src/backend/iocp.zig @@ -313,14 +313,43 @@ pub const Loop = struct { // Go through the entries and perform completions callbacks. for (entries[0..count]) |entry| { - // We retrieve the Completion from the OVERLAPPED pointer as we know it's a part of - // the Completion struct. - const overlapped_ptr: ?*windows.OVERLAPPED = @as(?*windows.OVERLAPPED, @ptrCast(entry.lpOverlapped)); - if (overlapped_ptr == null) { - // Probably an async wakeup - continue; - } - var completion = @fieldParentPtr(Completion, "overlapped", overlapped_ptr.?); + const completion: *Completion = if (entry.lpCompletionKey == 0) completion: { + // We retrieve the Completion from the OVERLAPPED pointer as we know it's a part of + // the Completion struct. + const overlapped_ptr: ?*windows.OVERLAPPED = @as(?*windows.OVERLAPPED, @ptrCast(entry.lpOverlapped)); + if (overlapped_ptr == null) { + // Probably an async wakeup + continue; + } + + break :completion @fieldParentPtr(Completion, "overlapped", overlapped_ptr.?); + } else completion: { + // JobObjects are a special case where the OVERLAPPED_ENTRY fields are interpreted differently. + // When JOBOBJECT_ASSOCIATE_COMPLETION_PORT is used, lpOverlapped actually contains the message + // value, and not the address of the overlapped structure. The Completion pointer is passed + // as the completion key instead. + const completion: *Completion = @ptrFromInt(entry.lpCompletionKey); + const message_type: windows.exp.JOB_OBJECT_MSG_TYPE = @enumFromInt(entry.dwNumberOfBytesTransferred); + completion.result = switch (message_type) { + inline .JOB_OBJECT_MSG_END_OF_JOB_TIME, + .JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT, + .JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO, + .JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT, + .JOB_OBJECT_MSG_SILO_TERMINATED, + => |t| .{ .job_object = @unionInit(JobObjectResult, @tagName(t), {}) }, + inline .JOB_OBJECT_MSG_END_OF_PROCESS_TIME, + .JOB_OBJECT_MSG_NEW_PROCESS, + .JOB_OBJECT_MSG_EXIT_PROCESS, + .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, + .JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT, + .JOB_OBJECT_MSG_JOB_MEMORY_LIMIT, + .JOB_OBJECT_MSG_NOTIFICATION_LIMIT, + => |t| .{ .job_object = @unionInit(JobObjectResult, @tagName(t), @intFromPtr(entry.lpOverlapped)) }, + else => .{ .job_object = JobObjectError.UnknownMessageType }, + }; + + break :completion completion; + }; wait_rem -|= 1; @@ -699,6 +728,27 @@ pub const Loop = struct { self.asyncs.push(completion); break :action .{ .async_wait = {} }; }, + + .job_object => |*v| action: { + if (!v.associated) { + var port = windows.exp.JOBOBJECT_ASSOCIATE_COMPLETION_PORT{ + .CompletionKey = @intFromPtr(completion), + .CompletionPort = self.iocp_handle, + }; + + windows.exp.SetInformationJobObject( + v.job, + .JobObjectAssociateCompletionPortInformation, + &port, + @sizeOf(windows.exp.JOBOBJECT_ASSOCIATE_COMPLETION_PORT), + ) catch |err| break :action .{ .result = .{ .job_object = err } }; + + v.associated = true; + } + + break :action .{ .submitted = {} }; + }, + }; switch (action) { @@ -1071,6 +1121,9 @@ pub const Completion = struct { }, .async_wait => .{ .async_wait = {} }, + + .job_object => self.result.?, + }; } @@ -1137,6 +1190,10 @@ pub const OperationType = enum { /// Wait for an async event to be posted. async_wait, + + /// Receive a notification from a job object associated with a completion port + job_object, + }; /// All the supported operations of this event loop. These are always @@ -1225,6 +1282,15 @@ pub const Operation = union(OperationType) { async_wait: struct { wakeup: std.atomic.Atomic(bool) = .{ .value = false }, }, + + job_object: struct { + job: windows.HANDLE, + userdata: ?*anyopaque, + + /// Tracks if the job has been associated with the completion port. + /// Do not use this, it is used internally. + associated: bool = false, + }, }; /// The result type based on the operation type. For a callback, the @@ -1246,6 +1312,7 @@ pub const Result = union(OperationType) { timer: TimerError!TimerTrigger, cancel: CancelError!void, async_wait: AsyncError!void, + job_object: JobObjectError!JobObjectResult, }; pub const CancelError = error{ @@ -1306,6 +1373,49 @@ pub const TimerTrigger = enum { cancel, }; +pub const JobObjectError = error{ + UnknownMessageType, + Unexpected, +}; + +pub const JobObjectResult = union(windows.exp.JOB_OBJECT_MSG_TYPE) { + /// Time limit was reached + JOB_OBJECT_MSG_END_OF_JOB_TIME: void, + + /// Process exceed its time limit. Value is the process ID. + JOB_OBJECT_MSG_END_OF_PROCESS_TIME: usize, + + /// Active process limit was exceeded + JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT: void, + + /// Active process count is zero + JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: void, + + /// Process has been added to the job. Value is the process ID. + JOB_OBJECT_MSG_NEW_PROCESS: usize, + + /// Process exited. Value is the process ID. + JOB_OBJECT_MSG_EXIT_PROCESS: usize, + + /// Process exited abnormally. Value is the process ID. + JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS: usize, + + /// Process exceeded its memory limit. Value is the process ID. + JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT: usize, + + /// Process exceeded the job-wide memory limit. Value is the process ID. + JOB_OBJECT_MSG_JOB_MEMORY_LIMIT: usize, + + /// Resource limit was exceeded. Value is the process ID. + JOB_OBJECT_MSG_NOTIFICATION_LIMIT: usize, + + /// Undocumented + JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT: void, + + /// Undocumented + JOB_OBJECT_MSG_SILO_TERMINATED: void, +}; + /// ReadBuffer are the various options for reading. pub const ReadBuffer = union(enum) { /// Read into this slice. diff --git a/src/watcher/process.zig b/src/watcher/process.zig index c6053f5..3506133 100644 --- a/src/watcher/process.zig +++ b/src/watcher/process.zig @@ -14,9 +14,10 @@ pub fn Process(comptime xev: type) type { .kqueue => ProcessKqueue(xev), + .iocp => ProcessIocp(xev), + // Unsupported .wasi_poll => struct {}, - .iocp => struct {}, }; } @@ -130,7 +131,7 @@ fn ProcessPidFd(comptime xev: type) type { } /// Common tests - pub usingnamespace ProcessTests(xev, Self); + pub usingnamespace ProcessTests(xev, Self, &.{ "sh", "-c", "exit 0" }, &.{ "sh", "-c", "exit 42" }); }; } @@ -200,17 +201,150 @@ fn ProcessKqueue(comptime xev: type) type { } /// Common tests - pub usingnamespace ProcessTests(xev, Self); + pub usingnamespace ProcessTests(xev, Self, &.{ "sh", "-c", "exit 0" }, &.{ "sh", "-c", "exit 42" }); + }; +} + +const windows = @import("../windows.zig"); +fn ProcessIocp(comptime xev: type) type { + return struct { + const Self = @This(); + + pub const WaitError = xev.Sys.JobObjectError; + + job: windows.HANDLE, + process: windows.HANDLE, + + pub fn init(process: os.pid_t) !Self { + const current_process = windows.kernel32.GetCurrentProcess(); + + // Duplicate the process handle so we don't rely on the caller keeping it alive + var dup_process: windows.HANDLE = undefined; + const dup_result = windows.kernel32.DuplicateHandle( + current_process, + process, + current_process, + &dup_process, + 0, + windows.FALSE, + windows.DUPLICATE_SAME_ACCESS, + ); + if (dup_result == 0) return windows.unexpectedError(windows.kernel32.GetLastError()); + + const job = try windows.exp.CreateJobObject(null, null); + errdefer _ = windows.kernel32.CloseHandle(job); + + // Setting this limit information is required so that we only get process exit + // notifications for this process - without it we would get notifications for + // all of its child processes as well. + + var extended_limit_information = std.mem.zeroInit(windows.exp.JOBOBJECT_EXTENDED_LIMIT_INFORMATION, .{ + .BasicLimitInformation = std.mem.zeroInit(windows.exp.JOBOBJECT_BASIC_LIMIT_INFORMATION, .{ + .LimitFlags = windows.exp.JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK, + }), + }); + + try windows.exp.SetInformationJobObject( + job, + .JobObjectExtendedLimitInformation, + &extended_limit_information, + @sizeOf(@TypeOf(extended_limit_information)), + ); + + try windows.exp.AssignProcessToJobObject(job, dup_process); + + return .{ + .process = dup_process, + .job = job, + }; + } + + pub fn deinit(self: *Self) void { + _ = windows.kernel32.CloseHandle(self.job); + _ = windows.kernel32.CloseHandle(self.process); + } + + pub fn wait( + self: Self, + loop: *xev.Loop, + c: *xev.Completion, + comptime Userdata: type, + userdata: ?*Userdata, + comptime cb: *const fn ( + ud: ?*Userdata, + l: *xev.Loop, + c: *xev.Completion, + r: WaitError!u32, + ) xev.CallbackAction, + ) void { + c.* = .{ + .op = .{ + .job_object = .{ + .job = self.job, + .userdata = self.process, + }, + }, + .userdata = userdata, + .callback = (struct { + fn callback( + ud: ?*anyopaque, + l_inner: *xev.Loop, + c_inner: *xev.Completion, + r: xev.Result, + ) xev.CallbackAction { + if (r.job_object) |result| { + switch (result) { + .JOB_OBJECT_MSG_EXIT_PROCESS, + .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, + => |pid| { + // Don't need to check PID as we've specified JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK + _ = pid; + + var exit_code: windows.DWORD = undefined; + const process: windows.HANDLE = @ptrCast(c_inner.op.job_object.userdata); + const has_code = windows.kernel32.GetExitCodeProcess(process, &exit_code) != 0; + if (!has_code) std.log.warn("unable to get exit code for process={}", .{windows.kernel32.GetLastError()}); + + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + if (has_code) exit_code else WaitError.Unexpected, + }); + }, + else => return .rearm, + } + } else |err| { + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + err, + }); + } + } + }).callback, + }; + loop.add(c); + } + + /// Common tests + pub usingnamespace ProcessTests(xev, Self, &.{ "cmd.exe", "/C", "exit 0" }, &.{ "cmd.exe", "/C", "exit 42" }); }; } -fn ProcessTests(comptime xev: type, comptime Impl: type) type { +fn ProcessTests( + comptime xev: type, + comptime Impl: type, + comptime argv_0: []const []const u8, + comptime argv_42: []const []const u8, +) type { return struct { test "process wait" { const testing = std.testing; const alloc = testing.allocator; - var child = std.ChildProcess.init(&.{ "sh", "-c", "exit 0" }, alloc); + var child = std.ChildProcess.init(argv_0, alloc); try child.spawn(); var loop = try xev.Loop.init(.{}); @@ -243,7 +377,7 @@ fn ProcessTests(comptime xev: type, comptime Impl: type) type { const testing = std.testing; const alloc = testing.allocator; - var child = std.ChildProcess.init(&.{ "sh", "-c", "exit 42" }, alloc); + var child = std.ChildProcess.init(argv_42, alloc); try child.spawn(); var loop = try xev.Loop.init(.{}); diff --git a/src/windows.zig b/src/windows.zig index fdfd678..225a859 100644 --- a/src/windows.zig +++ b/src/windows.zig @@ -5,6 +5,100 @@ pub usingnamespace std.os.windows; /// Namespace containing missing utils from std pub const exp = struct { + pub const JOBOBJECT_ASSOCIATE_COMPLETION_PORT = extern struct { + CompletionKey: windows.ULONG_PTR, + CompletionPort: windows.HANDLE, + }; + + pub const JOBOBJECT_BASIC_LIMIT_INFORMATION = extern struct { + PerProcessUserTimeLimit: windows.LARGE_INTEGER, + PerJobUserTimeLimit: windows.LARGE_INTEGER, + LimitFlags: windows.DWORD, + MinimumWorkingSetSize: windows.SIZE_T, + MaximumWorkingSetSize: windows.SIZE_T, + ActiveProcessLimit: windows.DWORD, + Affinity: windows.ULONG_PTR, + PriorityClass: windows.DWORD, + SchedulingClass: windows.DWORD, + }; + + pub const IO_COUNTERS = extern struct { + ReadOperationCount: windows.ULONGLONG, + WriteOperationCount: windows.ULONGLONG, + OtherOperationCount: windows.ULONGLONG, + ReadTransferCount: windows.ULONGLONG, + WriteTransferCount: windows.ULONGLONG, + OtherTransferCount: windows.ULONGLONG, + }; + + pub const JOBOBJECT_EXTENDED_LIMIT_INFORMATION = extern struct { + BasicLimitInformation: JOBOBJECT_BASIC_LIMIT_INFORMATION, + IoInfo: IO_COUNTERS, + ProcessMemoryLimit: windows.SIZE_T, + JobMemoryLimit: windows.SIZE_T, + PeakProcessMemoryUsed: windows.SIZE_T, + PeakJobMemoryUsed: windows.SIZE_T, + }; + + pub const JOB_OBJECT_LIMIT_ACTIVE_PROCESS = 0x00000008; + pub const JOB_OBJECT_LIMIT_AFFINITY = 0x00000010; + pub const JOB_OBJECT_LIMIT_BREAKAWAY_OK = 0x00000800; + pub const JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION = 0x00000400; + pub const JOB_OBJECT_LIMIT_JOB_MEMORY = 0x00000200; + pub const JOB_OBJECT_LIMIT_JOB_TIME = 0x00000004; + pub const JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000; + pub const JOB_OBJECT_LIMIT_PRESERVE_JOB_TIME = 0x00000004; + pub const JOB_OBJECT_LIMIT_PRIORITY_CLASS = 0x00000020; + pub const JOB_OBJECT_LIMIT_PROCESS_MEMORY = 0x00000100; + pub const JOB_OBJECT_LIMIT_PROCESS_TIME = 0x00000002; + pub const JOB_OBJECT_LIMIT_SCHEDULING_CLASS = 0x00000080; + pub const JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK = 0x00001000; + pub const JOB_OBJECT_LIMIT_SUBSET_AFFINITY = 0x00004000; + pub const JOB_OBJECT_LIMIT_WORKINGSET = 0x00000001; + + pub const JOBOBJECT_INFORMATION_CLASS = enum(c_int) { + JobObjectAssociateCompletionPortInformation = 7, + JobObjectBasicLimitInformation = 2, + JobObjectBasicUIRestrictions = 4, + JobObjectCpuRateControlInformation = 15, + JobObjectEndOfJobTimeInformation = 6, + JobObjectExtendedLimitInformation = 9, + JobObjectGroupInformation = 11, + JobObjectGroupInformationEx = 14, + JobObjectLimitViolationInformation2 = 34, + JobObjectNetRateControlInformation = 32, + JobObjectNotificationLimitInformation = 12, + JobObjectNotificationLimitInformation2 = 33, + JobObjectSecurityLimitInformation = 5, + }; + + pub const JOB_OBJECT_MSG_TYPE = enum(windows.DWORD) { + JOB_OBJECT_MSG_END_OF_JOB_TIME = 1, + JOB_OBJECT_MSG_END_OF_PROCESS_TIME = 2, + JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT = 3, + JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO = 4, + JOB_OBJECT_MSG_NEW_PROCESS = 6, + JOB_OBJECT_MSG_EXIT_PROCESS = 7, + JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS = 8, + JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT = 9, + JOB_OBJECT_MSG_JOB_MEMORY_LIMIT = 10, + JOB_OBJECT_MSG_NOTIFICATION_LIMIT = 11, + JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT = 12, + JOB_OBJECT_MSG_SILO_TERMINATED = 13, + _, + }; + + pub const kernel32 = struct { + pub extern "kernel32" fn CreateJobObjectA(lpSecurityAttributes: ?*windows.SECURITY_ATTRIBUTES, lpName: ?windows.LPCSTR) callconv(windows.WINAPI) windows.HANDLE; + pub extern "kernel32" fn AssignProcessToJobObject(hJob: windows.HANDLE, hProcess: windows.HANDLE) callconv(windows.WINAPI) windows.BOOL; + pub extern "kernel32" fn SetInformationJobObject( + hJob: windows.HANDLE, + JobObjectInformationClass: JOBOBJECT_INFORMATION_CLASS, + lpJobObjectInformation: windows.LPVOID, + cbJobObjectInformationLength: windows.DWORD, + ) callconv(windows.WINAPI) windows.BOOL; + }; + pub const CreateFileError = error{} || std.os.UnexpectedError; pub fn CreateFile( @@ -74,4 +168,48 @@ pub const exp = struct { }; } } + + pub const CreateJobObjectError = error{AlreadyExists} || std.os.UnexpectedError; + pub fn CreateJobObject( + lpSecurityAttributes: ?*windows.SECURITY_ATTRIBUTES, + lpName: ?windows.LPCSTR, + ) !windows.HANDLE { + const handle = kernel32.CreateJobObjectA(lpSecurityAttributes, lpName); + return switch (windows.kernel32.GetLastError()) { + .SUCCESS => handle, + .ALREADY_EXISTS => CreateJobObjectError.AlreadyExists, + else => |err| windows.unexpectedError(err), + }; + } + + pub fn AssignProcessToJobObject(hJob: windows.HANDLE, hProcess: windows.HANDLE) std.os.UnexpectedError!void { + const result: windows.BOOL = kernel32.AssignProcessToJobObject(hJob, hProcess); + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + else => windows.unexpectedError(err), + }; + } + } + + pub fn SetInformationJobObject( + hJob: windows.HANDLE, + JobObjectInformationClass: JOBOBJECT_INFORMATION_CLASS, + lpJobObjectInformation: windows.LPVOID, + cbJobObjectInformationLength: windows.DWORD, + ) std.os.UnexpectedError!void { + const result: windows.BOOL = kernel32.SetInformationJobObject( + hJob, + JobObjectInformationClass, + lpJobObjectInformation, + cbJobObjectInformationLength, + ); + + if (result == windows.FALSE) { + const err = windows.kernel32.GetLastError(); + return switch (err) { + else => windows.unexpectedError(err), + }; + } + } }; From d71ddeb19c6697875c2e0e68a49d04c05bc26199 Mon Sep 17 00:00:00 2001 From: kcbanner Date: Tue, 31 Oct 2023 01:05:32 -0400 Subject: [PATCH 2/3] iocp: fix a race condition in the process watcher Previously, there was a race condition in the IOCP process watcher where if the process exited before wait() was called, it would hang indefinitely. I addressed this by adding a new result type for when a job is associated with the completion port, and calling the callback with this result so that the user code has a chance to check any invariants. Changes: - Call the completion callback when the job is associated with the IOCP - Add a test that tests the case of the process exiting before the wait() call - Rework job object handling to not require @unionInit. The user code can now interpret the message data however they want --- src/backend/iocp.zig | 79 ++++++++++++----------------------------- src/watcher/process.zig | 71 ++++++++++++++++++++++++++++++++---- src/windows.zig | 3 ++ 3 files changed, 89 insertions(+), 64 deletions(-) diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig index f5ec9df..dcf5e0a 100644 --- a/src/backend/iocp.zig +++ b/src/backend/iocp.zig @@ -329,25 +329,12 @@ pub const Loop = struct { // value, and not the address of the overlapped structure. The Completion pointer is passed // as the completion key instead. const completion: *Completion = @ptrFromInt(entry.lpCompletionKey); - const message_type: windows.exp.JOB_OBJECT_MSG_TYPE = @enumFromInt(entry.dwNumberOfBytesTransferred); - completion.result = switch (message_type) { - inline .JOB_OBJECT_MSG_END_OF_JOB_TIME, - .JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT, - .JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO, - .JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT, - .JOB_OBJECT_MSG_SILO_TERMINATED, - => |t| .{ .job_object = @unionInit(JobObjectResult, @tagName(t), {}) }, - inline .JOB_OBJECT_MSG_END_OF_PROCESS_TIME, - .JOB_OBJECT_MSG_NEW_PROCESS, - .JOB_OBJECT_MSG_EXIT_PROCESS, - .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, - .JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT, - .JOB_OBJECT_MSG_JOB_MEMORY_LIMIT, - .JOB_OBJECT_MSG_NOTIFICATION_LIMIT, - => |t| .{ .job_object = @unionInit(JobObjectResult, @tagName(t), @intFromPtr(entry.lpOverlapped)) }, - else => .{ .job_object = JobObjectError.UnknownMessageType }, - }; - + completion.result = .{ .job_object = .{ + .message = .{ + .type = @enumFromInt(entry.dwNumberOfBytesTransferred), + .value = @intFromPtr(entry.lpOverlapped), + }, + } }; break :completion completion; }; @@ -744,11 +731,18 @@ pub const Loop = struct { ) catch |err| break :action .{ .result = .{ .job_object = err } }; v.associated = true; + const action = completion.callback(completion.userdata, self, completion, .{ .job_object = .{ .associated = {} } }); + switch (action) { + .disarm => { + completion.flags.state = .dead; + return; + }, + .rearm => break :action .{ .submitted = {} }, + } } break :action .{ .submitted = {} }; }, - }; switch (action) { @@ -1123,7 +1117,6 @@ pub const Completion = struct { .async_wait => .{ .async_wait = {} }, .job_object => self.result.?, - }; } @@ -1193,7 +1186,6 @@ pub const OperationType = enum { /// Receive a notification from a job object associated with a completion port job_object, - }; /// All the supported operations of this event loop. These are always @@ -1378,42 +1370,15 @@ pub const JobObjectError = error{ Unexpected, }; -pub const JobObjectResult = union(windows.exp.JOB_OBJECT_MSG_TYPE) { - /// Time limit was reached - JOB_OBJECT_MSG_END_OF_JOB_TIME: void, - - /// Process exceed its time limit. Value is the process ID. - JOB_OBJECT_MSG_END_OF_PROCESS_TIME: usize, +pub const JobObjectResult = union(enum) { + // The job object was associated with the completion port + associated: void, - /// Active process limit was exceeded - JOB_OBJECT_MSG_ACTIVE_PROCESS_LIMIT: void, - - /// Active process count is zero - JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: void, - - /// Process has been added to the job. Value is the process ID. - JOB_OBJECT_MSG_NEW_PROCESS: usize, - - /// Process exited. Value is the process ID. - JOB_OBJECT_MSG_EXIT_PROCESS: usize, - - /// Process exited abnormally. Value is the process ID. - JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS: usize, - - /// Process exceeded its memory limit. Value is the process ID. - JOB_OBJECT_MSG_PROCESS_MEMORY_LIMIT: usize, - - /// Process exceeded the job-wide memory limit. Value is the process ID. - JOB_OBJECT_MSG_JOB_MEMORY_LIMIT: usize, - - /// Resource limit was exceeded. Value is the process ID. - JOB_OBJECT_MSG_NOTIFICATION_LIMIT: usize, - - /// Undocumented - JOB_OBJECT_MSG_JOB_CYCLE_TIME_LIMIT: void, - - /// Undocumented - JOB_OBJECT_MSG_SILO_TERMINATED: void, + /// A message was recived on the completion port for this job object + message: struct { + type: windows.exp.JOB_OBJECT_MSG_TYPE, + value: usize, + }, }; /// ReadBuffer are the various options for reading. diff --git a/src/watcher/process.zig b/src/watcher/process.zig index 3506133..f79fb59 100644 --- a/src/watcher/process.zig +++ b/src/watcher/process.zig @@ -294,25 +294,47 @@ fn ProcessIocp(comptime xev: type) type { ) xev.CallbackAction { if (r.job_object) |result| { switch (result) { - .JOB_OBJECT_MSG_EXIT_PROCESS, - .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, - => |pid| { - // Don't need to check PID as we've specified JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK - _ = pid; + .associated => { + // There was a period of time between when the job object was created + // and when it was associated with the completion port. We may have + // missed a notification, so check if it's still alive. var exit_code: windows.DWORD = undefined; const process: windows.HANDLE = @ptrCast(c_inner.op.job_object.userdata); const has_code = windows.kernel32.GetExitCodeProcess(process, &exit_code) != 0; if (!has_code) std.log.warn("unable to get exit code for process={}", .{windows.kernel32.GetLastError()}); + if (exit_code == windows.exp.STILL_ACTIVE) return .rearm; return @call(.always_inline, cb, .{ common.userdataValue(Userdata, ud), l_inner, c_inner, - if (has_code) exit_code else WaitError.Unexpected, + exit_code, }); }, - else => return .rearm, + .message => |message| { + switch (message.type) { + .JOB_OBJECT_MSG_EXIT_PROCESS, + .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, + => { + // Don't need to check the message value (PID) as we've + // specified JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK + + var exit_code: windows.DWORD = undefined; + const process: windows.HANDLE = @ptrCast(c_inner.op.job_object.userdata); + const has_code = windows.kernel32.GetExitCodeProcess(process, &exit_code) != 0; + if (!has_code) std.log.warn("unable to get exit code for process={}", .{windows.kernel32.GetLastError()}); + + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + if (has_code) exit_code else WaitError.Unexpected, + }); + }, + else => return .rearm, + } + }, } } else |err| { return @call(.always_inline, cb, .{ @@ -405,5 +427,40 @@ fn ProcessTests( try loop.run(.until_done); try testing.expectEqual(@as(u32, 42), code.?); } + + test "process wait on a process that already exited" { + const testing = std.testing; + const alloc = testing.allocator; + + var child = std.ChildProcess.init(argv_0, alloc); + try child.spawn(); + + var loop = try xev.Loop.init(.{}); + defer loop.deinit(); + + var p = try Impl.init(child.id); + defer p.deinit(); + + _ = try child.wait(); + + // Wait + var code: ?u32 = null; + var c_wait: xev.Completion = undefined; + p.wait(&loop, &c_wait, ?u32, &code, (struct { + fn callback( + ud: ?*?u32, + _: *xev.Loop, + _: *xev.Completion, + r: Impl.WaitError!u32, + ) xev.CallbackAction { + ud.?.* = r catch unreachable; + return .disarm; + } + }).callback); + + // Wait for wake + try loop.run(.until_done); + try testing.expectEqual(@as(u32, 0), code.?); + } }; } diff --git a/src/windows.zig b/src/windows.zig index 225a859..fcf82ce 100644 --- a/src/windows.zig +++ b/src/windows.zig @@ -5,6 +5,9 @@ pub usingnamespace std.os.windows; /// Namespace containing missing utils from std pub const exp = struct { + pub const STATUS_PENDING = 0x00000103; + pub const STILL_ACTIVE = STATUS_PENDING; + pub const JOBOBJECT_ASSOCIATE_COMPLETION_PORT = extern struct { CompletionKey: windows.ULONG_PTR, CompletionPort: windows.HANDLE, From 908155edf4f106b7e63b8370fb2429dd4a114782 Mon Sep 17 00:00:00 2001 From: kcbanner Date: Tue, 31 Oct 2023 02:03:53 -0400 Subject: [PATCH 3/3] iocp process watcher: instead of setting limits on the job, compare PIDs --- src/backend/iocp.zig | 1 - src/watcher/process.zig | 47 ++++++++++++++--------------------------- src/windows.zig | 1 + 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/backend/iocp.zig b/src/backend/iocp.zig index dcf5e0a..e986513 100644 --- a/src/backend/iocp.zig +++ b/src/backend/iocp.zig @@ -1366,7 +1366,6 @@ pub const TimerTrigger = enum { }; pub const JobObjectError = error{ - UnknownMessageType, Unexpected, }; diff --git a/src/watcher/process.zig b/src/watcher/process.zig index f79fb59..f9228c5 100644 --- a/src/watcher/process.zig +++ b/src/watcher/process.zig @@ -234,28 +234,11 @@ fn ProcessIocp(comptime xev: type) type { const job = try windows.exp.CreateJobObject(null, null); errdefer _ = windows.kernel32.CloseHandle(job); - // Setting this limit information is required so that we only get process exit - // notifications for this process - without it we would get notifications for - // all of its child processes as well. - - var extended_limit_information = std.mem.zeroInit(windows.exp.JOBOBJECT_EXTENDED_LIMIT_INFORMATION, .{ - .BasicLimitInformation = std.mem.zeroInit(windows.exp.JOBOBJECT_BASIC_LIMIT_INFORMATION, .{ - .LimitFlags = windows.exp.JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK, - }), - }); - - try windows.exp.SetInformationJobObject( - job, - .JobObjectExtendedLimitInformation, - &extended_limit_information, - @sizeOf(@TypeOf(extended_limit_information)), - ); - try windows.exp.AssignProcessToJobObject(job, dup_process); return .{ - .process = dup_process, .job = job, + .process = dup_process, }; } @@ -313,27 +296,29 @@ fn ProcessIocp(comptime xev: type) type { }); }, .message => |message| { - switch (message.type) { + const result_inner = switch (message.type) { .JOB_OBJECT_MSG_EXIT_PROCESS, .JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS, - => { - // Don't need to check the message value (PID) as we've - // specified JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK + => b: { + const process: windows.HANDLE = @ptrCast(c_inner.op.job_object.userdata); + const pid = windows.exp.kernel32.GetProcessId(process); + if (pid == 0) break :b WaitError.Unexpected; + if (message.value != pid) return .rearm; var exit_code: windows.DWORD = undefined; - const process: windows.HANDLE = @ptrCast(c_inner.op.job_object.userdata); const has_code = windows.kernel32.GetExitCodeProcess(process, &exit_code) != 0; if (!has_code) std.log.warn("unable to get exit code for process={}", .{windows.kernel32.GetLastError()}); - - return @call(.always_inline, cb, .{ - common.userdataValue(Userdata, ud), - l_inner, - c_inner, - if (has_code) exit_code else WaitError.Unexpected, - }); + break :b if (has_code) exit_code else WaitError.Unexpected; }, else => return .rearm, - } + }; + + return @call(.always_inline, cb, .{ + common.userdataValue(Userdata, ud), + l_inner, + c_inner, + result_inner + }); }, } } else |err| { diff --git a/src/windows.zig b/src/windows.zig index fcf82ce..a8bb380 100644 --- a/src/windows.zig +++ b/src/windows.zig @@ -92,6 +92,7 @@ pub const exp = struct { }; pub const kernel32 = struct { + pub extern "kernel32" fn GetProcessId(Process: windows.HANDLE) callconv(windows.WINAPI) windows.DWORD; pub extern "kernel32" fn CreateJobObjectA(lpSecurityAttributes: ?*windows.SECURITY_ATTRIBUTES, lpName: ?windows.LPCSTR) callconv(windows.WINAPI) windows.HANDLE; pub extern "kernel32" fn AssignProcessToJobObject(hJob: windows.HANDLE, hProcess: windows.HANDLE) callconv(windows.WINAPI) windows.BOOL; pub extern "kernel32" fn SetInformationJobObject(