Skip to content

Commit

Permalink
Fix: race condition in Waiters#add vs Waiters#consume_each
Browse files Browse the repository at this point in the history
One thread may close a file descriptor or run the evloop and receive an
error or hup event, which will wakeup all pending fibers. Further
attempts should fail since the fd is closed, but another thread might
be trying to add a waiter, which may go unnoticed by `#consume_each`
that doesn't manipulate the `@ready` variable.

This patch resolves this by distinguishing between ready (we want to
wakeup one waiter) and closed (the fd will never go to block anymore).

Also simplifies the implementation to rely on the lock, instead of the
lock + atomics. We'll see later if the lock is an issue and how to
optimize in that case.
  • Loading branch information
ysbaddaden committed Oct 3, 2024
1 parent 2a84f73 commit 589993b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 42 deletions.
10 changes: 5 additions & 5 deletions src/crystal/system/unix/epoll/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,19 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop
pd = Evented.arena.get(index)

if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0
pd.value.@readers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@writers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
return
end

if (events & LibC::EPOLLRDHUP) == LibC::EPOLLRDHUP
pd.value.@readers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
elsif (events & LibC::EPOLLIN) == LibC::EPOLLIN
pd.value.@readers.ready { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
end

if (events & LibC::EPOLLOUT) == LibC::EPOLLOUT
pd.value.@writers.ready { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/crystal/system/unix/evented/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct Crystal::Evented::Event
# True if an IO event has timed out (i.e. we're past `#wake_at`).
getter? timed_out : Bool = false

# The event can be added into different lists. See `Waiters` and `Timers`.
# The event can be added to `Waiters` lists.
include PointerLinkedList::Node

def initialize(@type : Type, @fiber, @index = nil, timeout : Time::Span? = nil)
Expand Down
14 changes: 4 additions & 10 deletions src/crystal/system/unix/evented/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
io.__evloop_data = Arena::INVALID_INDEX

Evented.arena.free(index) do |pd|
pd.value.@readers.consume_each do |event|
pd.value.@readers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
end

pd.value.@writers.consume_each do |event|
pd.value.@writers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event))
end

Expand All @@ -313,15 +313,15 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
end

private def wait_readable(io, timeout = nil) : Nil
wait(:io_read, io, :readers, timeout) { raise IO::TimeoutError.new("Read timed out") }
wait_readable(io, timeout) { raise IO::TimeoutError.new("Read timed out") }
end

private def wait_readable(io, timeout = nil, &) : Nil
wait(:io_read, io, :readers, timeout) { yield }
end

private def wait_writable(io, timeout = nil) : Nil
wait(:io_write, io, :writers, timeout) { raise IO::TimeoutError.new("Write timed out") }
wait_writable(io, timeout) { raise IO::TimeoutError.new("Write timed out") }
end

private def wait_writable(io, timeout = nil, &) : Nil
Expand Down Expand Up @@ -364,12 +364,6 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop
else
Fiber.suspend
end

{% if flag?(:preview_mt) %}
# we can safely reset readyness here, since we're about to retry the
# actual syscall
%pd.value.@{{waiters.id}}.@ready.set(false, :relaxed)
{% end %}
end

private def check_open(io : IO)
Expand Down
56 changes: 36 additions & 20 deletions src/crystal/system/unix/evented/waiters.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,31 @@
# Thread safe: mutations are protected with a lock, and race conditions are
# handled through the ready atomic.
struct Crystal::Evented::Waiters
@ready = Atomic(Bool).new(false)
{% if flag?(:preview_mt) %}
@ready = false
@closed = false
{% end %}
@lock = SpinLock.new
@list = PointerLinkedList(Event).new

# Adds an event to the waiting list. May return false immediately if another
# thread marked the list as ready in parallel, returns true otherwise.
def add(event : Pointer(Event)) : Bool
{% if flag?(:preview_mt) %}
# check for readiness since another thread running the evloop might be
# trying to dequeue an event while we're waiting on the lock (failure to
# notice notice the IO is ready)
return false if ready?

@lock.sync do
return false if ready?
if @closed
# another thread closed the fd or we received a fd error or hup event:
# the fd will never block again
return false
end

if @ready
# another thread readied the fd before the current thread got to add
# the event: don't block and resets @ready for the next loop
@ready = false
return false
end

@list.push(event)
end
{% else %}
Expand All @@ -26,21 +38,13 @@ struct Crystal::Evented::Waiters
true
end

def delete(event) : Nil
def delete(event : Pointer(Event)) : Nil
@lock.sync { @list.delete(event) }
end

def consume_each(&) : Nil
@lock.sync do
@list.consume_each { |event| yield event }
end
end

def ready? : Bool
@ready.swap(false, :relaxed)
end

def ready(& : Pointer(Event) -> Bool) : Nil
# Removes one pending event or marks the list as ready when there are no
# pending events (we got notified of readiness before a thread enqueued).
def ready_one(& : Pointer(Event) -> Bool) : Nil
@lock.sync do
{% if flag?(:preview_mt) %}
# loop until the block succesfully processes an event (it may have to
Expand All @@ -51,7 +55,7 @@ struct Crystal::Evented::Waiters
else
# no event queued but another thread may be waiting for the lock to
# add an event: set as ready to resolve the race condition
@ready.set(true, :relaxed)
@ready = true
return
end
end
Expand All @@ -62,4 +66,16 @@ struct Crystal::Evented::Waiters
{% end %}
end
end

# Dequeues all pending events and marks the list as closed. This must be
# called when a fd is closed or an error or hup event occurred.
def ready_all(& : Pointer(Event) ->) : Nil
@lock.sync do
@list.consume_each { |event| yield event }

{% if flag?(:preview_mt) %}
@closed = true
{% end %}
end
end
end
12 changes: 6 additions & 6 deletions src/crystal/system/unix/kqueue/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,25 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop
if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF
# apparently some systems may report EOF on write with EVFILT_READ instead
# of EVFILT_WRITE, so let's wake all waiters:
pd.value.@readers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@writers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
return
end

case kevent.value.filter
when LibC::EVFILT_READ
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
pd.value.@readers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_all { |event| unsafe_resume_io(event) }
else
pd.value.@readers.ready { |event| unsafe_resume_io(event) }
pd.value.@readers.ready_one { |event| unsafe_resume_io(event) }
end
when LibC::EVFILT_WRITE
if (kevent.value.fflags & LibC::EV_ERROR) == LibC::EV_ERROR
# OPTIMIZE: pass errno (kevent.data) through PollDescriptor
pd.value.@writers.consume_each { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_all { |event| unsafe_resume_io(event) }
else
pd.value.@writers.ready { |event| unsafe_resume_io(event) }
pd.value.@writers.ready_one { |event| unsafe_resume_io(event) }
end
end
end
Expand Down

0 comments on commit 589993b

Please sign in to comment.