From 6dc13ce25f16e7f9234269a90c83dbf09e2f1a20 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 5 Sep 2024 17:18:11 +1200 Subject: [PATCH] A lot more tests. --- lib/protocol/http/body/streamable.rb | 32 +++++++++- lib/protocol/http/body/writable.rb | 30 ++++----- test/protocol/http/body/file.rb | 16 +++++ test/protocol/http/body/streamable.rb | 91 ++++++++++++++++++++++++++- test/protocol/http/body/wrapper.rb | 6 ++ test/protocol/http/body/writable.rb | 70 +++++++++++++++++++++ 6 files changed, 226 insertions(+), 19 deletions(-) diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb index a63afd1..9e8f641 100644 --- a/lib/protocol/http/body/streamable.rb +++ b/lib/protocol/http/body/streamable.rb @@ -15,12 +15,27 @@ module Body # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. class Streamable < Readable + class Closed < StandardError + end + def initialize(block, input = nil) @block = block @input = input @output = nil end + # Closing a stream indicates we are no longer interested in reading from it. + def close(error = nil) + if @input + @input.close + @input = nil + end + + if @output + @output.close(error) + end + end + attr :block class Output @@ -32,10 +47,20 @@ def initialize(input, block) @fiber = Fiber.new do |from| @from = from block.call(stream) + rescue Closed + # Ignore. + ensure @fiber = nil + + # No more chunks will be generated: + if from = @from + @from = nil + from.transfer(nil) + end end end + # Can be invoked by the block to write to the stream. def write(chunk) if from = @from @from = nil @@ -45,12 +70,13 @@ def write(chunk) end end - def close - @fiber = nil - + # Can be invoked by the block to close the stream. + def close(error = nil) if from = @from @from = nil from.transfer(nil) + elsif @fiber + @fiber.raise(error || Closed) end end diff --git a/lib/protocol/http/body/writable.rb b/lib/protocol/http/body/writable.rb index b64041e..c9bf874 100644 --- a/lib/protocol/http/body/writable.rb +++ b/lib/protocol/http/body/writable.rb @@ -32,7 +32,7 @@ def length @length end - # Stop generating output; cause the next call to write to fail with the given error. + # Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body. def close(error = nil) unless @closed @queue.close @@ -49,23 +49,17 @@ def closed? end def ready? - !@queue.empty? + !@queue.empty? || @queue.closed? end # Has the producer called #finish and has the reader consumed the nil token? def empty? - @finished + @queue.empty? && @queue.closed? end # Read the next available chunk. def read - return if @finished - - unless chunk = @queue.pop - @finished = true - end - - return chunk + @queue.pop end # Write a single chunk to the body. Signal completion by calling `#finish`. @@ -89,12 +83,18 @@ def inspect private def status - if @finished - 'finished' - elsif @closed - 'closing' + if @queue.empty? + if @queue.closed? + 'closed' + else + 'waiting' + end else - 'waiting' + if @queue.closed? + 'closing' + else + 'ready' + end end end end diff --git a/test/protocol/http/body/file.rb b/test/protocol/http/body/file.rb index c37a640..ead3ec8 100644 --- a/test/protocol/http/body/file.rb +++ b/test/protocol/http/body/file.rb @@ -9,6 +9,12 @@ let(:path) {File.expand_path('file_spec.txt', __dir__)} let(:body) {subject.open(path)} + with '#stream?' do + it "should be streamable" do + expect(body).to be(:stream?) + end + end + with '#join' do it "should read entire file" do expect(body.join).to be == "Hello World" @@ -70,4 +76,14 @@ expect(body.read).to be == "ll" end end + + with "#call" do + let(:output) {StringIO.new} + + it "can stream output" do + body.call(output) + + expect(output.string).to be == "Hello World" + end + end end diff --git a/test/protocol/http/body/streamable.rb b/test/protocol/http/body/streamable.rb index c37da9c..f367d16 100644 --- a/test/protocol/http/body/streamable.rb +++ b/test/protocol/http/body/streamable.rb @@ -14,7 +14,14 @@ end end - let(:body) {subject.new(block)} + let(:input) {nil} + let(:body) {subject.new(block, input)} + + with "#stream?" do + it "should be streamable" do + expect(body).to be(:stream?) + end + end with '#block' do it "should wrap block" do @@ -28,6 +35,37 @@ expect(body.read).to be == "World" expect(body.read).to be == nil end + + with "block that doesn't close" do + let(:block) do + proc do |stream| + stream.write("Hello") + stream.write("World") + end + end + + it "can read the body" do + expect(body.read).to be == "Hello" + expect(body.read).to be == "World" + expect(body.read).to be == nil + end + end + + with "a block that allows stream to escape" do + let(:block) do + proc do |stream| + @stream = stream + end + end + + it "can read the body" do + expect(body.read).to be == nil + + expect do + @stream.write("!") + end.to raise_exception(RuntimeError, message: be =~ /Stream is not being read!/) + end + end end with '#each' do @@ -44,6 +82,30 @@ body.call(stream) expect(stream.string).to be == "HelloWorld" end + + with "a block that raises an error" do + let(:block) do + proc do |stream| + raise "Oh no... a wild error appeared!" + end + end + + it "closes the stream if an error occurs" do + stream = StringIO.new + expect(body).to receive(:close) + + expect do + body.call(stream) + end.to raise_exception(RuntimeError, message: be =~ /Oh no... a wild error appeared!/) + end + end + end + + with '#close' do + it "can close the body" do + expect(body.read).to be == "Hello" + body.close + end end with "nested fiber" do @@ -59,4 +121,31 @@ expect(body.read).to be == "Hello" end end + + with "buffered input" do + let(:input) {Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"])} + + let(:block) do + proc do |stream| + while chunk = stream.read_partial + stream.write(chunk) + end + end + end + + it "can read from input" do + expect(body.read).to be == "Hello" + expect(body.read).to be == " " + expect(body.read).to be == "World" + end + + it "can stream to output" do + output = StringIO.new + stream = Protocol::HTTP::Body::Stream.new(input, output) + + body.call(stream) + + expect(output.string).to be == "Hello World" + end + end end diff --git a/test/protocol/http/body/wrapper.rb b/test/protocol/http/body/wrapper.rb index e77d949..991225a 100644 --- a/test/protocol/http/body/wrapper.rb +++ b/test/protocol/http/body/wrapper.rb @@ -14,6 +14,12 @@ let(:source) {Protocol::HTTP::Body::Buffered.new} let(:body) {subject.new(source)} + with '#stream?' do + it "should not be streamable" do + expect(body).not.to be(:stream?) + end + end + it "should proxy close" do expect(source).to receive(:close).and_return(nil) body.close diff --git a/test/protocol/http/body/writable.rb b/test/protocol/http/body/writable.rb index de94759..3c6ea4d 100644 --- a/test/protocol/http/body/writable.rb +++ b/test/protocol/http/body/writable.rb @@ -10,4 +10,74 @@ let(:body) {subject.new} it_behaves_like Protocol::HTTP::Body::AWritableBody + + with "#length" do + it "should be unspecified by default" do + expect(body.length).to be_nil + end + end + + with "#closed?" do + it "should not be closed by default" do + expect(body).not.to be(:closed?) + end + end + + with "#ready?" do + it "should be ready if chunks are available" do + expect(body).not.to be(:ready?) + + body.write("Hello") + + expect(body).to be(:ready?) + end + + it "should be ready if closed" do + body.close + + expect(body).to be(:ready?) + end + end + + with "#empty?" do + it "should be empty if closed with no pending chunks" do + expect(body).not.to be(:empty?) + + body.close + + expect(body).to be(:empty?) + end + + it "should become empty when pending chunks are read" do + body.write("Hello") + body.close + + expect(body).not.to be(:empty?) + body.read + expect(body).to be(:empty?) + end + + it "should not be empty if chunks are available" do + body.write("Hello") + expect(body).not.to be(:empty?) + end + end + + with "#write" do + it "should write chunks" do + body.write("Hello") + body.write("World") + + expect(body.read).to be == "Hello" + expect(body.read).to be == "World" + end + + it "can't write to closed body" do + body.close + + expect do + body.write("Hello") + end.to raise_exception(Protocol::HTTP::Body::Writable::Closed) + end + end end