Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emulate non-blocking STDIN console on Windows #14947

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/crystal/system/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ module Crystal::System::FileDescriptor
# Also used in `IO::FileDescriptor#finalize`.
# def file_descriptor_close

# Returns `true` or `false` if this file descriptor pretends to block or not
# to block the caller thread regardless of the underlying internal file
# descriptor's implementation. Currently used by console STDIN on Windows.
straight-shoota marked this conversation as resolved.
Show resolved Hide resolved
private def emulated_blocking? : Bool?
end

private def system_read(slice : Bytes) : Int32
event_loop.read(self, slice)
end
Expand Down
64 changes: 63 additions & 1 deletion src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ require "c/consoleapi"
require "c/consoleapi2"
require "c/winnls"
require "crystal/system/win32/iocp"
require "crystal/system/thread"

module Crystal::System::FileDescriptor
# Platform-specific type to represent a file descriptor handle to the operating
Expand Down Expand Up @@ -76,13 +77,24 @@ module Crystal::System::FileDescriptor
bytes_written
end

def emulated_blocking? : Bool?
# reading from STDIN is done via a separate thread (see
# `ConsoleUtils.read_console` below)
handle = windows_handle
if LibC.GetConsoleMode(handle, out _) != 0
if handle == LibC.GetStdHandle(LibC::STD_INPUT_HANDLE)
return false
end
end
end

# :nodoc:
def system_blocking?
@system_blocking
end

private def system_blocking=(blocking)
unless blocking == @system_blocking
unless blocking == self.blocking
raise IO::Error.new("Cannot reconfigure `IO::FileDescriptor#blocking` after creation")
end
end
Expand Down Expand Up @@ -318,7 +330,11 @@ module Crystal::System::FileDescriptor
end
end

# `blocking` must be set to `true` because the underlying handles never
# support overlapped I/O; instead, `#emulated_blocking?` should return
# `false` for `STDIN` as it uses a separate thread
io = IO::FileDescriptor.new(handle.address, blocking: true)

# Set sync or flush_on_newline as described in STDOUT and STDERR docs.
# See https://crystal-lang.org/api/toplevel.html#STDERR
if console_handle
Expand Down Expand Up @@ -444,11 +460,57 @@ private module ConsoleUtils
end

private def self.read_console(handle : LibC::HANDLE, slice : Slice(UInt16)) : Int32
@@mtx.synchronize do
@@read_requests << ReadRequest.new(
handle: handle,
slice: slice,
iocp: Crystal::EventLoop.current.iocp,
completion_key: Crystal::IOCP::CompletionKey.new(:stdin_read, ::Fiber.current),
)
@@read_cv.signal
end

::Fiber.suspend

@@mtx.synchronize do
@@bytes_read.shift
end
end

private def self.read_console_blocking(handle : LibC::HANDLE, slice : Slice(UInt16)) : Int32
if 0 == LibC.ReadConsoleW(handle, slice, slice.size, out units_read, nil)
raise IO::Error.from_winerror("ReadConsoleW")
end
units_read.to_i32
end

record ReadRequest, handle : LibC::HANDLE, slice : Slice(UInt16), iocp : LibC::HANDLE, completion_key : Crystal::IOCP::CompletionKey

@@read_cv = ::Thread::ConditionVariable.new
@@read_requests = Deque(ReadRequest).new
@@bytes_read = Deque(Int32).new
@@mtx = ::Thread::Mutex.new
@@reader_thread = ::Thread.new { reader_loop }

private def self.reader_loop
while true
request = @@mtx.synchronize do
loop do
if entry = @@read_requests.shift?
break entry
end
@@read_cv.wait(@@mtx)
end
end

bytes = read_console_blocking(request.handle, request.slice)

@@mtx.synchronize do
@@bytes_read << bytes
LibC.PostQueuedCompletionStatus(request.iocp, LibC::JOB_OBJECT_MSG_EXIT_PROCESS, request.completion_key.object_id, nil)
end
end
end
end

# Enable UTF-8 console I/O for the duration of program execution
Expand Down
35 changes: 26 additions & 9 deletions src/crystal/system/win32/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ require "crystal/system/thread_linked_list"
module Crystal::IOCP
# :nodoc:
class CompletionKey
enum Tag
ProcessRun
StdinRead
end

property fiber : Fiber?
getter tag : Tag

def initialize(@tag : Tag, @fiber : Fiber? = nil)
end
end

def self.wait_queued_completions(timeout, alertable = false, &)
Expand Down Expand Up @@ -39,20 +48,19 @@ module Crystal::IOCP
# at the moment only `::Process#wait` uses a non-nil completion key; all
# I/O operations, including socket ones, do not set this field
case completion_key = Pointer(Void).new(entry.lpCompletionKey).as(CompletionKey?)
when Nil
in Nil
operation = OverlappedOperation.unbox(entry.lpOverlapped)
operation.schedule { |fiber| yield fiber }
else
case entry.dwNumberOfBytesTransferred
when LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS
in CompletionKey
if completion_key_valid?(completion_key, entry.dwNumberOfBytesTransferred)
# if `Process` exits before a call to `#wait`, this fiber will be
# reset already
if fiber = completion_key.fiber
# this ensures the `::Process` doesn't keep an indirect reference to
# `::Thread.current`, as that leads to a finalization cycle
# this ensures existing references to `completion_key` do not keep
# an indirect reference to `::Thread.current`, as that leads to a
# finalization cycle
completion_key.fiber = nil

yield fiber
else
# the `Process` exits before a call to `#wait`; do nothing
end
end
end
Expand All @@ -61,6 +69,15 @@ module Crystal::IOCP
false
end

private def self.completion_key_valid?(completion_key, number_of_bytes_transferred)
case completion_key.tag
in .process_run?
number_of_bytes_transferred.in?(LibC::JOB_OBJECT_MSG_EXIT_PROCESS, LibC::JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS)
in .stdin_read?
true
end
end

class OverlappedOperation
enum State
STARTED
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/win32/process.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct Crystal::System::Process
@thread_id : LibC::DWORD
@process_handle : LibC::HANDLE
@job_object : LibC::HANDLE
@completion_key = IOCP::CompletionKey.new
@completion_key = IOCP::CompletionKey.new(:process_run)

@@interrupt_handler : Proc(::Process::ExitReason, Nil)?
@@interrupt_count = Crystal::AtomicSemaphore.new
Expand Down
8 changes: 8 additions & 0 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,15 @@ class IO::FileDescriptor < IO
Crystal::System::FileDescriptor.from_stdio(fd)
end

# Returns whether I/O operations on this file descriptor block the current
# thread. If false, operations might opt to suspend the current fiber instead.
#
# This might be different from the internal file descriptor. For example, when
# `STDIN` is a terminal on Windows, this returns `false` since the underlying
# blocking reads are done on a completely separate thread.
def blocking
emulated = emulated_blocking?
return emulated unless emulated.nil?
system_blocking?
end

Expand Down
8 changes: 8 additions & 0 deletions src/lib_c/x86_64-windows-msvc/c/ioapiset.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ lib LibC
dwMilliseconds : DWORD,
fAlertable : BOOL
) : BOOL

fun PostQueuedCompletionStatus(
completionPort : HANDLE,
dwNumberOfBytesTransferred : DWORD,
dwCompletionKey : ULONG_PTR,
lpOverlapped : OVERLAPPED*
) : BOOL

fun CancelIoEx(
hFile : HANDLE,
lpOverlapped : OVERLAPPED*
Expand Down
Loading