Skip to content

Commit

Permalink
LibWeb: Implement min option for ReadableStreamBYOBReader.read()
Browse files Browse the repository at this point in the history
When the min option is given the read will only be fulfilled when there
are min or more elements available in the readable byte stream.

When the min option is not given the default value for min is 1.
  • Loading branch information
kennethmyhra authored and awesomekling committed Jul 11, 2024
1 parent 3850214 commit 907dc84
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
This -⌄
will not be processed as one chunk
This -> will be processed as one chunk
Stream closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<script src="../include.js"></script>
<script>
const CHUNK1 = "This -⌄";
const CHUNK2 = " will not be processed as one chunk";
const CHUNK3 = "This ->";
const CHUNK4 = " will be processed as one chunk";
const CHUNK5 = "WHF!";

// A read is fulfilled when there are N or more elements available in the stream.
// We start off by allowing a read to be fulfilled with 1 element available.
let minElementsAvailable = 1;
let pullCount = 0;
const readableStream = new ReadableStream({
start(controller) {

},
pull(controller) {
++pullCount;
const encoder = new TextEncoder();
if (pullCount === 1) {
controller.enqueue(encoder.encode(CHUNK1));
} else if (pullCount === 2) {
controller.enqueue(encoder.encode(CHUNK2));
} else if (pullCount === 3) {
controller.enqueue(encoder.encode(CHUNK3));
} else if (pullCount === 4) {
controller.enqueue(encoder.encode(CHUNK4));
// FIXME: Move/remove this controller.close() when we resolve the FIXME right below.
controller.close();
}
// FIXME: This should have been fulfilled since we are closing the controller at this point, currently we do not.
//} else if (pullCount === 5) {
// controller.enqueue(encoder.encode(CHUNK5));
// controller.close();
//}

// Now a read will only be fulfilled when at least 8 elements is available in the stream.
if (pullCount === 2) {
minElementsAvailable = 8;
}
},
type: "bytes",
});

async function readStream(stream) {
const reader = stream.getReader({ mode: "byob" });

let buffer = new ArrayBuffer(200);
let offset = 0;
let byteLength = 40;
while (true) {
let result = await reader.read(new Uint8Array(buffer, offset, byteLength), { min: minElementsAvailable });
if (result.done) {
println("Stream closed");
break;
}
println(new TextDecoder().decode(result.value));

buffer = result.value.buffer;
offset += result.value.byteLength;
}
}

asyncTest(async done => {
await readStream(readableStream);
done();
});
</script>
69 changes: 40 additions & 29 deletions Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ WebIDL::ExceptionOr<ReadableStreamPair> readable_byte_stream_tee(JS::Realm& real
auto read_into_request = realm.heap().allocate_without_realm<ByteStreamTeeBYOBReadRequest>(realm, stream, params, cancel_promise, *byob_branch, *other_branch, for_branch2);

// 5. Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
readable_stream_byob_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamBYOBReader>>(), view, read_into_request);
readable_stream_byob_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamBYOBReader>>(), view, 1, read_into_request);
});

// 17. Let pull1Algorithm be the following steps:
Expand Down Expand Up @@ -1656,34 +1656,36 @@ void readable_byte_stream_controller_fill_head_pull_into_descriptor(ReadableByte
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue
bool readable_byte_stream_controller_fill_pull_into_descriptor_from_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor)
{
// 1. Let elementSize be pullIntoDescriptor.[[elementSize]].
auto element_size = pull_into_descriptor.element_size;

// 2. Let currentAlignedBytes be pullIntoDescriptor’s bytes filled − (pullIntoDescriptor’s bytes filled mod elementSize).
auto current_aligned_bytes = pull_into_descriptor.bytes_filled - (pull_into_descriptor.bytes_filled % pull_into_descriptor.element_size);

// 3. Let maxBytesToCopy be min(controller.[[queueTotalSize]], pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
// 1. Let maxBytesToCopy be min(controller.[[queueTotalSize]], pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
auto max_bytes_to_copy = min(controller.queue_total_size(), pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled);

// 4. Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
// 2. Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
u64 max_bytes_filled = pull_into_descriptor.bytes_filled + max_bytes_to_copy;

// 5. Let maxAlignedBytes be maxBytesFilled − (maxBytesFilled mod elementSize).
auto max_aligned_bytes = max_bytes_filled - (max_bytes_filled % element_size);

// 6. Let totalBytesToCopyRemaining be maxBytesToCopy.
// 3. Let totalBytesToCopyRemaining be maxBytesToCopy.
auto total_bytes_to_copy_remaining = max_bytes_to_copy;

// 7. Let ready be false.
// 4. Let ready be false.
bool ready = false;

// 8. If maxAlignedBytes > currentAlignedBytes,
if (max_aligned_bytes > current_aligned_bytes) {
// 5. Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill);

// 6. Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
auto remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size;

// 7. Let maxAlignedBytes be maxBytesFilled − remainderBytes.
auto max_aligned_bytes = max_bytes_filled - remainder_bytes;

// 8. If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
if (max_aligned_bytes >= pull_into_descriptor.minimum_fill) {
// 1. Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
total_bytes_to_copy_remaining = max_aligned_bytes - pull_into_descriptor.bytes_filled;

// 2. Set ready to true.
ready = true;

// NOTE: A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it.
}

// 9. Let queue be controller.[[queue]].
Expand Down Expand Up @@ -1735,8 +1737,8 @@ bool readable_byte_stream_controller_fill_pull_into_descriptor_from_queue(Readab
// 2. Assert: pullIntoDescriptor’s bytes filled > 0.
VERIFY(pull_into_descriptor.bytes_filled > 0);

// 3. Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s element size.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.element_size);
// 3. Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill);
}

// 12. Return ready.
Expand Down Expand Up @@ -1789,7 +1791,7 @@ JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm
// 3. Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
VERIFY(bytes_filled <= pull_into_descriptor.byte_length);

// 4. Assert: bytesFilled mod elementSize is 0.
// 4. Assert: the remainder after dividing bytesFilled by elementSize is 0.
VERIFY(bytes_filled % element_size == 0);

// 5. Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
Expand All @@ -1800,7 +1802,7 @@ JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm
}

// https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into
void readable_byte_stream_controller_pull_into(ReadableByteStreamController& controller, WebIDL::ArrayBufferView& view, ReadIntoRequest& read_into_request)
void readable_byte_stream_controller_pull_into(ReadableByteStreamController& controller, WebIDL::ArrayBufferView& view, u64 min, ReadIntoRequest& read_into_request)
{
auto& vm = controller.vm();
auto& realm = controller.realm();
Expand Down Expand Up @@ -1832,7 +1834,16 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
}
}

// 5. Let byteOffset be view.[[ByteOffset]].
// 5. Let minimumFill be min × elementSize.
u64 minimum_fill = min * element_size;

// 6. Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
VERIFY(minimum_fill <= view.byte_length());

// 7. Assert: the remainder after dividing minimumFill by elementSize is 0.
VERIFY(minimum_fill % element_size == 0);

// 8. Let byteOffset be view.[[ByteOffset]].
auto byte_offset = view.byte_offset();

// 6. Let byteLength be view.[[ByteLength]].
Expand Down Expand Up @@ -1862,6 +1873,7 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
.byte_offset = byte_offset,
.byte_length = byte_length,
.bytes_filled = 0,
.minimum_fill = minimum_fill,
.element_size = element_size,
.view_constructor = *ctor,
.reader_type = ReaderType::Byob,
Expand Down Expand Up @@ -1935,7 +1947,7 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
}

// https://streams.spec.whatwg.org/#readable-stream-byob-reader-read
void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::ArrayBufferView& view, ReadIntoRequest& read_into_request)
void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::ArrayBufferView& view, u64 min, ReadIntoRequest& read_into_request)
{
// 1. Let stream be reader.[[stream]].
auto stream = reader.stream();
Expand All @@ -1952,7 +1964,7 @@ void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::
}
// 5. Otherwise, perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, readIntoRequest).
else {
readable_byte_stream_controller_pull_into(*stream->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>(), view, read_into_request);
readable_byte_stream_controller_pull_into(*stream->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>(), view, min, read_into_request);
}
}

Expand Down Expand Up @@ -2261,8 +2273,7 @@ WebIDL::ExceptionOr<void> readable_byte_stream_controller_respond_in_readable_st
}

// 4. If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
// FIXME: Support minimum fill.
if (pull_into_descriptor.bytes_filled < pull_into_descriptor.element_size)
if (pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill)
return {};

// NOTE: A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it.
Expand Down Expand Up @@ -2722,8 +2733,8 @@ WebIDL::ExceptionOr<void> readable_byte_stream_controller_close(ReadableByteStre
// 1. Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
auto& first_pending_pull_into = controller.pending_pull_intos().first();

// 2. If firstPendingPullInto’s bytes filled > 0,
if (first_pending_pull_into.bytes_filled > 0) {
// 2. If the remainder after dividing firstPendingPullInto’s bytes filled by firstPendingPullInto’s element size is not 0,
if (first_pending_pull_into.bytes_filled % first_pending_pull_into.element_size != 0) {
// 1. Let e be a new TypeError exception.
auto error = JS::TypeError::create(realm, "Cannot close controller in the middle of processing a write request"sv);

Expand Down Expand Up @@ -3454,8 +3465,8 @@ void readable_byte_stream_controller_commit_pull_into_descriptor(ReadableStream&

// 4. If stream.[[state]] is "closed",
if (stream.is_closed()) {
// 1. Assert: pullIntoDescriptor’s bytes filled is 0.
VERIFY(pull_into_descriptor.bytes_filled == 0);
// 1. Assert: the remainder after dividing pullIntoDescriptor’s bytes filled by pullIntoDescriptor’s element size is 0.
VERIFY(pull_into_descriptor.bytes_filled % pull_into_descriptor.element_size == 0);

// 2. Set done to true.
done = true;
Expand Down
5 changes: 3 additions & 2 deletions Userland/Libraries/LibWeb/Streams/AbstractOperations.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <LibWeb/WebIDL/CallbackType.h>
#include <LibWeb/WebIDL/ExceptionOr.h>
#include <LibWeb/WebIDL/Promise.h>
#include <LibWeb/WebIDL/Types.h>

namespace Web::Streams {

Expand Down Expand Up @@ -61,8 +62,8 @@ void readable_stream_reader_generic_release(ReadableStreamGenericReaderMixin&);
void readable_stream_default_reader_error_read_requests(ReadableStreamDefaultReader&, JS::Value error);
void readable_stream_byob_reader_error_read_into_requests(ReadableStreamBYOBReader&, JS::Value error);
JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm&, PullIntoDescriptor const&);
void readable_byte_stream_controller_pull_into(ReadableByteStreamController&, WebIDL::ArrayBufferView&, ReadIntoRequest&);
void readable_stream_byob_reader_read(ReadableStreamBYOBReader&, WebIDL::ArrayBufferView&, ReadIntoRequest&);
void readable_byte_stream_controller_pull_into(ReadableByteStreamController&, WebIDL::ArrayBufferView&, u64 min, ReadIntoRequest&);
void readable_stream_byob_reader_read(ReadableStreamBYOBReader&, WebIDL::ArrayBufferView&, u64 min, ReadIntoRequest&);
void readable_byte_stream_controller_fill_head_pull_into_descriptor(ReadableByteStreamController const&, u64 size, PullIntoDescriptor&);

void readable_stream_default_reader_read(ReadableStreamDefaultReader&, ReadRequest&);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ void ReadableByteStreamController::pull_steps(JS::NonnullGCPtr<ReadRequest> read
.byte_offset = 0,
.byte_length = *m_auto_allocate_chunk_size,
.bytes_filled = 0,
.minimum_fill = 1,
.element_size = 1,
.view_constructor = *realm.intrinsics().uint8_array_constructor(),
.reader_type = ReaderType::Default,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct PullIntoDescriptor {
// A nonnegative integer number of bytes that have been written into the buffer so far
u64 bytes_filled;

// https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill
// A positive integer representing the minimum number of bytes that must be written into the buffer before the associated read() request may be fulfilled. By default, this equals the element size.
u64 minimum_fill;

// https://streams.spec.whatwg.org/#pull-into-descriptor-element-size
// A positive integer representing the number of bytes that can be written into the buffer at a time, using views of the type described by the view constructor
u64 element_size;
Expand Down
40 changes: 33 additions & 7 deletions Userland/Libraries/LibWeb/Streams/ReadableStreamBYOBReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class BYOBReaderReadIntoRequest : public ReadIntoRequest {
JS_DEFINE_ALLOCATOR(BYOBReaderReadIntoRequest);

// https://streams.spec.whatwg.org/#byob-reader-read
JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::ArrayBufferView>& view)
JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::ArrayBufferView>& view, ReadableStreamBYOBReaderReadOptions options)
{
auto& realm = this->realm();

Expand All @@ -129,16 +129,42 @@ JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}

// 4. If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
// 4. If options["min"] is 0, return a promise rejected with a TypeError exception.
if (options.min == 0) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::TypeError, "options[\"min\'] cannot have a value of 0."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}

// 5. If view has a [[TypedArrayName]] internal slot,
if (view->is_typed_array_base()) {
auto const& typed_array = *view->bufferable_object().get<JS::NonnullGCPtr<JS::TypedArrayBase>>();

// 1. If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
if (options.min > typed_array.array_length().length()) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::RangeError, "options[\"min\"] cannot be larger than the length of the view."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
}

// 6. Otherwise (i.e., it is a DataView),
if (view->is_data_view()) {
// 1. If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
if (options.min > view->byte_length()) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::RangeError, "options[\"min\"] cannot be larger than the length of the view."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
}

// 7. If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
if (!m_stream) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::TypeError, "Cannot read from an empty stream"sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}

// 5. Let promise be a new promise.
// 8. Let promise be a new promise.
auto promise_capability = WebIDL::create_promise(realm);

// 6. Let readIntoRequest be a new read-into request with the following items:
// 9. Let readIntoRequest be a new read-into request with the following items:
// chunk steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → false ]».
// close steps, given chunk
Expand All @@ -147,10 +173,10 @@ JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::
// Reject promise with e.
auto read_into_request = heap().allocate_without_realm<BYOBReaderReadIntoRequest>(realm, promise_capability);

// 7. Perform ! ReadableStreamBYOBReaderRead(this, view, readIntoRequest).
readable_stream_byob_reader_read(*this, *view, *read_into_request);
// 10. Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
readable_stream_byob_reader_read(*this, *view, options.min, *read_into_request);

// 8. Return promise.
// 11. Return promise.
return JS::NonnullGCPtr { verify_cast<JS::Promise>(*promise_capability->promise()) };
}
}
8 changes: 7 additions & 1 deletion Userland/Libraries/LibWeb/Streams/ReadableStreamBYOBReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
#include <LibWeb/Bindings/PlatformObject.h>
#include <LibWeb/Forward.h>
#include <LibWeb/Streams/ReadableStreamGenericReader.h>
#include <LibWeb/WebIDL/Types.h>

namespace Web::Streams {

// https://streams.spec.whatwg.org/#dictdef-readablestreambyobreaderreadoptions
struct ReadableStreamBYOBReaderReadOptions {
WebIDL::UnsignedLongLong min = 1;
};

// https://streams.spec.whatwg.org/#read-into-request
class ReadIntoRequest : public JS::Cell {
JS_CELL(ReadIntoRequest, JS::Cell);
Expand Down Expand Up @@ -45,7 +51,7 @@ class ReadableStreamBYOBReader final

virtual ~ReadableStreamBYOBReader() override = default;

JS::NonnullGCPtr<JS::Promise> read(JS::Handle<WebIDL::ArrayBufferView>&);
JS::NonnullGCPtr<JS::Promise> read(JS::Handle<WebIDL::ArrayBufferView>&, ReadableStreamBYOBReaderReadOptions options = {});

void release_lock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
interface ReadableStreamBYOBReader {
constructor(ReadableStream stream);

Promise<ReadableStreamReadResult> read(ArrayBufferView view);
Promise<ReadableStreamReadResult> read(ArrayBufferView view, optional ReadableStreamBYOBReaderReadOptions options = {});

undefined releaseLock();
};
ReadableStreamBYOBReader includes ReadableStreamGenericReader;

dictionary ReadableStreamBYOBReaderReadOptions {
[EnforceRange] unsigned long long min = 1;
};

0 comments on commit 907dc84

Please sign in to comment.