Skip to content

Commit

Permalink
Add tests for cancellation. (#79)
Browse files Browse the repository at this point in the history
* Add tests for cancellation.

* Rework "Interrupt" -> "Interruptable" tests.

* Updated pure Ruby Select implementation + tests.

* Improve coverage.
  • Loading branch information
ioquatix authored Aug 24, 2023
1 parent 807ea38 commit 9641de6
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 38 deletions.
1 change: 1 addition & 0 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
- macos

ruby:
- "3.0"
- "3.1"
- "3.2"
- "head"
Expand Down
62 changes: 61 additions & 1 deletion lib/io/event/selector/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,66 @@ def again?(errno)
errno == EAGAIN or errno == EWOULDBLOCK
end

if Support.fiber_scheduler_v2?
if Support.fiber_scheduler_v3?
# Ruby 3.3+, full IO::Buffer support.

# @parameter length [Integer] The minimum number of bytes to read.
# @parameter offset [Integer] The offset into the buffer to read to.
def io_read(fiber, io, buffer, length, offset = 0)
total = 0

Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.read(io, 0, offset)}

if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break
else
total += result
break if total >= length
offset += result
end
end
end

return total
end

# @parameter length [Integer] The minimum number of bytes to write.
# @parameter offset [Integer] The offset into the buffer to write from.
def io_write(fiber, io, buffer, length, offset = 0)
total = 0

Selector.nonblock(io) do
while true
result = Fiber.blocking{buffer.write(io, 0, offset)}

if result < 0
if again?(result)
self.io_wait(fiber, io, IO::READABLE)
else
return result
end
elsif result == 0
break result
else
total += result
break if total >= length
offset += result
end
end
end

return total
end
elsif Support.fiber_scheduler_v2?
# Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
def io_read(fiber, io, buffer, length, offset = 0)
total = 0

Expand Down Expand Up @@ -219,6 +278,7 @@ def io_write(fiber, io, buffer, length, offset = 0)
return total
end
elsif Support.fiber_scheduler_v1?
# Ruby <= 3.1, limited IO::Buffer support.
def io_read(fiber, _io, buffer, length, offset = 0)
io = IO.for_fd(_io.fileno, autoclose: false)
total = 0
Expand Down
11 changes: 11 additions & 0 deletions lib/io/event/support.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ def self.fiber_scheduler_v1?
def self.fiber_scheduler_v2?
IO.const_defined?(:Buffer) and Fiber.respond_to?(:blocking) and IO::Buffer.instance_method(:read).arity == -1
end

def self.fiber_scheduler_v3?
if fiber_scheduler_v2?
begin
IO::Buffer.new.slice(0, 0).write(STDOUT)
return true
rescue
return false
end
end
end
end
end
end
7 changes: 5 additions & 2 deletions test/io/event/selector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def transfer
]
end

it "can wait consecutively on two different io objects that share a fd" do
it "can wait consecutively on two different io objects that share the same file descriptor" do
fiber = Fiber.new do
events << :write1
remote.puts "Hello World"
Expand All @@ -252,11 +252,14 @@ def transfer
events << :new_io
fileno = local.fileno
local.close

new_local, new_remote = UNIXSocket.pair
# Make sure we attempt to wait on the same FD.

# Make sure we attempt to wait on the same file descriptor:
if new_remote.fileno == fileno
new_local, new_remote = new_remote, new_local
end

if new_local.fileno != fileno
warn "Could not create new IO object with same FD, test ineffective!"
end
Expand Down
88 changes: 88 additions & 0 deletions test/io/event/selector/cancellable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2021-2023, by Samuel Williams.
# Copyright, 2023, by Math Ieu.

require 'io/event'
require 'io/event/selector'
require 'socket'

require 'unix_socket'

Cancellable = Sus::Shared("cancellable") do
with 'a pipe' do
let(:pipe) {IO.pipe}
let(:input) {pipe.first}
let(:output) {pipe.last}

def after
super
input.close
output.close
end

it "can cancel reads" do
skip "Ignore"

reader = Fiber.new do
buffer = IO::Buffer.new(64)

10.times do
expect{selector.io_read(Fiber.current, input, buffer, 1)}.to raise_exception(Interrupt)
end
end

# Enter the `io_read` operation:
reader.transfer

while reader.alive?
reader.raise(Interrupt)
selector.select(0)
end
end

it "can cancel waits" do
reader = Fiber.new do
buffer = IO::Buffer.new(64)

10.times do
expect{selector.io_wait(Fiber.current, input, IO::READABLE)}.to raise_exception(Interrupt)
selector.io_read(Fiber.current, input, buffer, 1)
end
end

# Enter the `io_read` operation:
reader.transfer

while reader.alive?
reader.raise(Interrupt)
output.write(".")
selector.select(0.1)
end
end
end
end

IO::Event::Selector.constants.each do |name|
klass = IO::Event::Selector.const_get(name)

# Don't run the test if the selector doesn't support `io_read`/`io_write`:
next unless klass.instance_methods.include?(:io_read)

describe(klass, unique: name) do
def before
@loop = Fiber.current
@selector = subject.new(@loop)
end

def after
@selector&.close
end

attr :loop
attr :selector

it_behaves_like Cancellable
end
end
35 changes: 0 additions & 35 deletions test/io/event/selector/interrupt.rb

This file was deleted.

39 changes: 39 additions & 0 deletions test/io/event/selector/interruptable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require 'io/event'
require 'io/event/selector'
require 'socket'

Interruptable = Sus::Shared("interruptable") do
it "can interrupt sleeping selector" do
result = nil

thread = Thread.new do
Thread.current.report_on_exception = false
selector = subject.new(Fiber.current)

Thread.handle_interrupt(::SignalException => :never) do
result = selector.select(nil)
end
end

# Wait for thread to enter the selector:
sleep(0.001) until thread.status == "sleep"

thread.raise(::Interrupt)

expect{thread.join}.to raise_exception(::Interrupt)
expect(result).to be == 0
end
end

IO::Event::Selector.constants.each do |name|
klass = IO::Event::Selector.const_get(name)

describe(klass, unique: name) do
it_behaves_like Interruptable
end
end

0 comments on commit 9641de6

Please sign in to comment.