diff --git a/src/crystal/system/unix/epoll/event_loop.cr b/src/crystal/system/unix/epoll/event_loop.cr index 93ff09a8d7b2..58e5ac880384 100644 --- a/src/crystal/system/unix/epoll/event_loop.cr +++ b/src/crystal/system/unix/epoll/event_loop.cr @@ -97,19 +97,19 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop pd = Evented.arena.get(index) if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0 - pd.value.@readers.consume_each { |event| unsafe_resume_io(event) } - pd.value.@writers.consume_each { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } return end if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP - pd.value.@readers.consume_each { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN - pd.value.@readers.ready { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } end if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT - pd.value.@writers.ready { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } end end diff --git a/src/crystal/system/unix/evented/event.cr b/src/crystal/system/unix/evented/event.cr index 1cfc4a58f04c..ad8fbd5a8276 100644 --- a/src/crystal/system/unix/evented/event.cr +++ b/src/crystal/system/unix/evented/event.cr @@ -30,7 +30,7 @@ struct Crystal::Evented::Event # True if an IO event has timed out (i.e. we're past `#wake_at`). getter? timed_out : Bool = false - # The event can be added into different lists. See `Waiters` and `Timers`. + # The event can be added to `Waiters` lists. include PointerLinkedList::Node def initialize(@type : Type, @fiber, @index = nil, timeout : Time::Span? = nil) diff --git a/src/crystal/system/unix/evented/event_loop.cr b/src/crystal/system/unix/evented/event_loop.cr index 1bb552de2972..8a5594ff4af4 100644 --- a/src/crystal/system/unix/evented/event_loop.cr +++ b/src/crystal/system/unix/evented/event_loop.cr @@ -291,11 +291,11 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop io.__evloop_data = Arena::INVALID_INDEX Evented.arena.free(index) do |pd| - pd.value.@readers.consume_each do |event| + pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event)) end - pd.value.@writers.consume_each do |event| + pd.value.@writers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event)) end @@ -313,7 +313,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop end private def wait_readable(io, timeout = nil) : Nil - wait(:io_read, io, :readers, timeout) { raise IO::TimeoutError.new("Read timed out") } + wait_readable(io, timeout) { raise IO::TimeoutError.new("Read timed out") } end private def wait_readable(io, timeout = nil, &) : Nil @@ -321,7 +321,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop end private def wait_writable(io, timeout = nil) : Nil - wait(:io_write, io, :writers, timeout) { raise IO::TimeoutError.new("Write timed out") } + wait_writable(io, timeout) { raise IO::TimeoutError.new("Write timed out") } end private def wait_writable(io, timeout = nil, &) : Nil @@ -364,12 +364,6 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop else Fiber.suspend end - - {% if flag?(:preview_mt) %} - # we can safely reset readyness here, since we're about to retry the - # actual syscall - %pd.value.@{{waiters.id}}.@ready.set(false, :relaxed) - {% end %} end private def check_open(io : IO) diff --git a/src/crystal/system/unix/evented/waiters.cr b/src/crystal/system/unix/evented/waiters.cr index c98bdcb12ee0..25a0125670c8 100644 --- a/src/crystal/system/unix/evented/waiters.cr +++ b/src/crystal/system/unix/evented/waiters.cr @@ -4,19 +4,31 @@ # Thread safe: mutations are protected with a lock, and race conditions are # handled through the ready atomic. struct Crystal::Evented::Waiters - @ready = Atomic(Bool).new(false) + {% if flag?(:preview_mt) %} + @ready = false + @closed = false + {% end %} @lock = SpinLock.new @list = PointerLinkedList(Event).new + # Adds an event to the waiting list. May return false immediately if another + # thread marked the list as ready in parallel, returns true otherwise. def add(event : Pointer(Event)) : Bool {% if flag?(:preview_mt) %} - # check for readiness since another thread running the evloop might be - # trying to dequeue an event while we're waiting on the lock (failure to - # notice notice the IO is ready) - return false if ready? - @lock.sync do - return false if ready? + if @closed + # another thread closed the fd or we received a fd error or hup event: + # the fd will never block again + return false + end + + if @ready + # another thread readied the fd before the current thread got to add + # the event: don't block and resets @ready for the next loop + @ready = false + return false + end + @list.push(event) end {% else %} @@ -26,21 +38,13 @@ struct Crystal::Evented::Waiters true end - def delete(event) : Nil + def delete(event : Pointer(Event)) : Nil @lock.sync { @list.delete(event) } end - def consume_each(&) : Nil - @lock.sync do - @list.consume_each { |event| yield event } - end - end - - def ready? : Bool - @ready.swap(false, :relaxed) - end - - def ready(& : Pointer(Event) -> Bool) : Nil + # Removes one pending event or marks the list as ready when there are no + # pending events (we got notified of readiness before a thread enqueued). + def ready_one(& : Pointer(Event) -> Bool) : Nil @lock.sync do {% if flag?(:preview_mt) %} # loop until the block succesfully processes an event (it may have to @@ -51,7 +55,7 @@ struct Crystal::Evented::Waiters else # no event queued but another thread may be waiting for the lock to # add an event: set as ready to resolve the race condition - @ready.set(true, :relaxed) + @ready = true return end end @@ -62,4 +66,16 @@ struct Crystal::Evented::Waiters {% end %} end end + + # Dequeues all pending events and marks the list as closed. This must be + # called when a fd is closed or an error or hup event occurred. + def ready_all(& : Pointer(Event) ->) : Nil + @lock.sync do + @list.consume_each { |event| yield event } + + {% if flag?(:preview_mt) %} + @closed = true + {% end %} + end + end end diff --git a/src/crystal/system/unix/kqueue/event_loop.cr b/src/crystal/system/unix/kqueue/event_loop.cr index 95702d56e293..5c04d47b128e 100644 --- a/src/crystal/system/unix/kqueue/event_loop.cr +++ b/src/crystal/system/unix/kqueue/event_loop.cr @@ -123,8 +123,8 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF # apparently some systems may report EOF on write with EVFILT_READ instead # of EVFILT_WRITE, so let's wake all waiters: - pd.value.@readers.consume_each { |event| unsafe_resume_io(event) } - pd.value.@writers.consume_each { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } return end @@ -132,16 +132,16 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop when LibC::EVFILT_READ if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR # OPTIMIZE: pass errno (kevent.data) through PollDescriptor - pd.value.@readers.consume_each { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_all { |event| unsafe_resume_io(event) } else - pd.value.@readers.ready { |event| unsafe_resume_io(event) } + pd.value.@readers.ready_one { |event| unsafe_resume_io(event) } end when LibC::EVFILT_WRITE if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR # OPTIMIZE: pass errno (kevent.data) through PollDescriptor - pd.value.@writers.consume_each { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_all { |event| unsafe_resume_io(event) } else - pd.value.@writers.ready { |event| unsafe_resume_io(event) } + pd.value.@writers.ready_one { |event| unsafe_resume_io(event) } end end end