Skip to content

Commit

Permalink
Improved semantics for streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 4, 2024
1 parent 205f165 commit b5447c8
Show file tree
Hide file tree
Showing 14 changed files with 27 additions and 70 deletions.
2 changes: 1 addition & 1 deletion lib/protocol/http/body.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ In general, you read chunks of data from a body until it is empty and returns `n

Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, the body will raise some kind of error.

If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body.
If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed).
6 changes: 0 additions & 6 deletions lib/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL)
self.new(body, Zlib::Deflate.new(level, window_size))
end

def stream?
# We might want to revisit this design choice.
# We could wrap the streaming body in a Deflate stream, but that would require an extra stream wrapper which we don't have right now. See also `Digestable#stream?`.
false
end

def read
return if @stream.finished?

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def etag(weak: false)
end
end

def stream?
false
end

def read
if chunk = super
@digest.update(chunk)
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ def rewind
@remaining = @length
end

def stream?
false
end

def read
if @remaining > 0
amount = [@remaining, @block_size].min
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/inflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def self.for(body, encoding = GZIP)
self.new(body, Zlib::Inflate.new(encoding))
end

def stream?
false
end

def read
return if @stream.finished?

Expand Down
13 changes: 6 additions & 7 deletions lib/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ def ready?
false
end

# Whether the stream can be rewound using {rewind}.
def rewindable?
false
end

# Rewind the stream to the beginning.
# @returns [Boolean] Whether the stream was successfully rewound.
def rewind
false
end

# The total length of the body, if known.
# @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown.
def length
nil
end
Expand Down Expand Up @@ -83,12 +88,6 @@ def join
end
end

# Should the internal mechanism prefer to use {call}?
# @returns [Boolean]
def stream?
false
end

# Write the body to the given stream.
#
# If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
Expand All @@ -105,6 +104,7 @@ def call(stream)
end

# Read all remaining chunks into a buffered body and close the underlying input.
#
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Expand All @@ -115,7 +115,6 @@ def as_json(...)
{
class: self.class.name,
length: self.length,
stream: self.stream?,
ready: self.ready?,
empty: self.empty?
}
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ def buffered
Buffered.new(@chunks)
end

def stream?
false
end

def read
if @index < @chunks.size
chunk = @chunks[@index]
Expand Down
8 changes: 0 additions & 8 deletions lib/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,6 @@ def to_json(...)
def inspect
@body.inspect
end

def stream?
@body.stream?
end

def call(stream)
@body.call(stream)
end
end
end
end
Expand Down
5 changes: 0 additions & 5 deletions test/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
let(:compressed_body) {Protocol::HTTP::Body::Deflate.for(body)}
let(:decompressed_body) {Protocol::HTTP::Body::Inflate.for(compressed_body)}

it "should not be a stream" do
expect(compressed_body).not.to be(:stream?)
expect(decompressed_body).not.to be(:stream?)
end

it "should round-trip data" do
body.write("Hello World!")
body.close
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

with '.wrap' do
let(:source) {Protocol::HTTP::Body::Buffered.wrap("HelloWorld")}
let(:message) {Protocol::HTTP::Request.new(nil, nil, 'GET', '/', nil, Protocol::HTTP::Headers.new, body)}
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
let(:path) {File.expand_path('file_spec.txt', __dir__)}
let(:body) {subject.open(path)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

with '#join' do
it "should read entire file" do
expect(body.join).to be == "Hello World"
Expand Down
5 changes: 0 additions & 5 deletions test/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
expect(body).not.to be(:ready?)
end

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

with '#finish' do
it "should return empty buffered representation" do
expect(body.finish).to be(:empty?)
Expand Down Expand Up @@ -67,7 +63,6 @@
expect(body.as_json).to have_keys(
class: be == subject.name,
length: be_nil,
stream: be == false,
ready: be == false,
empty: be == false,
)
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

it "can write and read data" do
3.times do |i|
source.write("Hello World #{i}")
Expand Down
30 changes: 20 additions & 10 deletions test/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
require 'protocol/http/body/wrapper'
require 'protocol/http/body/buffered'
require 'protocol/http/request'

require 'json'
require 'stringio'

describe Protocol::HTTP::Body::Wrapper do
let(:source) {Protocol::HTTP::Body::Buffered.new}
Expand All @@ -32,11 +34,6 @@
expect(body.length).to be == 1
end

it "should proxy stream?" do
expect(source).to receive(:stream?).and_return(true)
expect(body.stream?).to be == true
end

it "should proxy read" do
expect(source).to receive(:read).and_return("!")
expect(body.read).to be == "!"
Expand All @@ -47,11 +44,6 @@
expect(body.inspect).to be(:include?, "!")
end

it "should proxy call" do
expect(source).to receive(:call).and_return(nil)
body.call(nil)
end

with '.wrap' do
let(:message) {Protocol::HTTP::Request.new(nil, nil, 'GET', '/', nil, Protocol::HTTP::Headers.new, body)}

Expand Down Expand Up @@ -88,4 +80,22 @@
expect(JSON.dump(body)).to be == body.to_json
end
end

with "#each" do
it "should invoke close correctly" do
expect(body).to receive(:close)

body.each{}
end
end

with "#stream" do
let(:stream) {StringIO.new}

it "should invoke close correctly" do
expect(body).to receive(:close)

body.call(stream)
end
end
end

0 comments on commit b5447c8

Please sign in to comment.