Skip to content

Commit

Permalink
Make close-{readable,writable} symmetric in taking an optional error-…
Browse files Browse the repository at this point in the history
…context
  • Loading branch information
lukewagner committed Jan 28, 2025
1 parent d43430d commit 532ccf7
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 49 deletions.
47 changes: 18 additions & 29 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ class ReadableStream:
t: ValType
read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']]
cancel: Callable[[], None]
close: Callable[[]]
close: Callable[[Optional[ErrorContext]]]
closed: Callable[[], bool]
closed_with_error: Callable[[], Optional[ErrorContext]]
```
Expand Down Expand Up @@ -1125,7 +1125,7 @@ been returned:
def cancel(self):
self.reset_and_notify_pending()

def close(self, errctx = None):
def close(self, errctx):
if not self.closed_:
self.closed_ = True
self.errctx = errctx
Expand All @@ -1145,11 +1145,6 @@ necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
implementing the stream, `cancel` always returns ownership of the buffer
immediately.

`close` takes an optional `error-context` value which can only be supplied to
the writable end of a stream via `stream.close-writable`. Thus, for the
readable end of the stream, `close` is effectively nullary, matching
`ReadableStream.close`.

Note that `cancel` and `close` notify in opposite directions:
* `cancel` *must* be called on a readable or writable end with an operation
pending, and thus `cancel` notifies the same end that called it.
Expand Down Expand Up @@ -1246,11 +1241,11 @@ class FutureEnd(StreamEnd):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
if buffer.remain() == 0:
self.stream.close()
self.stream.close(errctx = None)
on_copy_done()
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
self.stream.close(errctx = None)
return ret

class ReadableFutureEnd(FutureEnd):
Expand Down Expand Up @@ -3541,7 +3536,6 @@ def pack_copy_result(task, buffer, e):
return buffer.progress
else:
if (errctx := e.stream.closed_with_error()):
assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd))
errctxi = task.inst.error_contexts.add(errctx)
assert(errctxi != 0)
else:
Expand All @@ -3554,11 +3548,6 @@ The order of tests here indicates that, if some progress was made and then the
stream was closed, only the progress is reported and the `CLOSED` status is
left to be discovered next time.

As asserted here, `error-context`s are only possible on the *readable* end of a
stream or future (since, as defined below, only the *writable* end can close
the stream with an `error-context`). Thus, `error-context`s only flow in the
same direction as values, as an optional last value of the stream or future.


### 🔀 `canon {stream,future}.cancel-{read,write}`

Expand Down Expand Up @@ -3631,14 +3620,8 @@ caller can assume that ownership of the buffer has been returned.
For canonical definitions:
```wasm
(canon stream.close-readable $t (core func $f))
(canon future.close-readable $t (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32))`

and for canonical definitions:
```wasm
(canon stream.close-writable $t (core func $f))
(canon future.close-readable $t (core func $f))
(canon future.close-writable $t (core func $f))
```
validation specifies:
Expand All @@ -3649,14 +3632,14 @@ the given index from the current component instance's `waitable` table,
performing the guards and bookkeeping defined by
`{Readable,Writable}{Stream,Future}End.drop()` above.
```python
async def canon_stream_close_readable(t, task, i):
return await close(ReadableStreamEnd, t, task, i, 0)
async def canon_stream_close_readable(t, task, i, errctxi):
return await close(ReadableStreamEnd, t, task, i, errctxi)

async def canon_stream_close_writable(t, task, hi, errctxi):
return await close(WritableStreamEnd, t, task, hi, errctxi)

async def canon_future_close_readable(t, task, i):
return await close(ReadableFutureEnd, t, task, i, 0)
async def canon_future_close_readable(t, task, i, errctxi):
return await close(ReadableFutureEnd, t, task, i, errctxi)

async def canon_future_close_writable(t, task, hi, errctxi):
return await close(WritableFutureEnd, t, task, hi, errctxi)
Expand All @@ -3673,9 +3656,15 @@ async def close(EndT, t, task, hi, errctxi):
e.drop(errctx)
return []
```
Note that only the writable ends of streams and futures can be closed with a
final `error-context` value and thus `error-context`s only flow in the same
direction as values as an optional last value of the stream or future.
Passing a non-zero `errctxi` index indicates that this stream end is being
closed due to an error, with the given `error-context` providing information
that can be printed to aid in debugging. While, as explained above, the
*contents* of the `error-context` value are non-deterministic (and may, e.g.,
be empty), the presence or absence of an `error-context` value is semantically
meainingful for distinguishing between success or failure. Concretely, the
packed `i32` produced by `pack_copy_result` above and returned by
`{stream,future}.{read,write}` operations indicates success or failure by
whether the packed `error-context` index is `0` or not.


### 🔀 `canon error-context.new`
Expand Down
17 changes: 8 additions & 9 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ class ReadableStream:
t: ValType
read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']]
cancel: Callable[[], None]
close: Callable[[]]
close: Callable[[Optional[ErrorContext]]]
closed: Callable[[], bool]
closed_with_error: Callable[[], Optional[ErrorContext]]

Expand Down Expand Up @@ -672,7 +672,7 @@ def reset_and_notify_pending(self):
def cancel(self):
self.reset_and_notify_pending()

def close(self, errctx = None):
def close(self, errctx):
if not self.closed_:
self.closed_ = True
self.errctx = errctx
Expand Down Expand Up @@ -740,11 +740,11 @@ def close_after_copy(self, copy_op, buffer, on_copy_done):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
if buffer.remain() == 0:
self.stream.close()
self.stream.close(errctx = None)
on_copy_done()
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
self.stream.close(errctx = None)
return ret

class ReadableFutureEnd(FutureEnd):
Expand Down Expand Up @@ -2072,7 +2072,6 @@ def pack_copy_result(task, buffer, e):
return buffer.progress
else:
if (errctx := e.stream.closed_with_error()):
assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd))
errctxi = task.inst.error_contexts.add(errctx)
assert(errctxi != 0)
else:
Expand Down Expand Up @@ -2114,14 +2113,14 @@ async def cancel_copy(EndT, event_code, t, sync, task, i):

### 🔀 `canon {stream,future}.close-{readable,writable}`

async def canon_stream_close_readable(t, task, i):
return await close(ReadableStreamEnd, t, task, i, 0)
async def canon_stream_close_readable(t, task, i, errctxi):
return await close(ReadableStreamEnd, t, task, i, errctxi)

async def canon_stream_close_writable(t, task, hi, errctxi):
return await close(WritableStreamEnd, t, task, hi, errctxi)

async def canon_future_close_readable(t, task, i):
return await close(ReadableFutureEnd, t, task, i, 0)
async def canon_future_close_readable(t, task, i, errctxi):
return await close(ReadableFutureEnd, t, task, i, errctxi)

async def canon_future_close_writable(t, task, hi, errctxi):
return await close(WritableFutureEnd, t, task, hi, errctxi)
Expand Down
22 changes: 11 additions & 11 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,8 +1170,8 @@ async def core_func(task, args):
assert(ret == 4)
[ret] = await canon_stream_write(U8Type(), opts, task, wsi1, 0, 4)
assert(ret == 4)
[] = await canon_stream_close_readable(U8Type(), task, rsi1)
[] = await canon_stream_close_readable(U8Type(), task, rsi2)
[] = await canon_stream_close_readable(U8Type(), task, rsi1, 0)
[] = await canon_stream_close_readable(U8Type(), task, rsi2, 0)
[] = await canon_stream_close_writable(U8Type(), task, wsi1, 0)
[] = await canon_stream_close_writable(U8Type(), task, wsi2, 0)
return []
Expand Down Expand Up @@ -1261,7 +1261,7 @@ async def core_func(task, args):
assert(ret == 4)
[ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi1, 0, 4)
assert(ret == definitions.CLOSED)
[] = await canon_stream_close_readable(U8Type(), task, rsi1)
[] = await canon_stream_close_readable(U8Type(), task, rsi1, 0)
assert(mem[0:4] == b'\x05\x06\x07\x08')
[ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4)
assert(ret == 4)
Expand All @@ -1275,7 +1275,7 @@ async def core_func(task, args):
assert(mem[retp+4] == 4)
[ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4)
assert(ret == definitions.CLOSED)
[] = await canon_stream_close_readable(U8Type(), task, rsi2)
[] = await canon_stream_close_readable(U8Type(), task, rsi2, 0)
[ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4)
assert(ret == 4)
[] = await canon_stream_close_writable(U8Type(), task, wsi1, 0)
Expand Down Expand Up @@ -1389,7 +1389,7 @@ async def core_func(task, args):
assert(event == EventCode.STREAM_READ)
assert(mem[retp+0] == rsi)
assert(mem[retp+4] == 2)
[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)

[wsi] = await canon_stream_new(U8Type(), task)
assert(wsi == 1)
Expand Down Expand Up @@ -1511,7 +1511,7 @@ async def core_func2(task, args):
[ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)
[] = await canon_waitable_set_drop(task, seti)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
[] = await canon_error_context_drop(task, errctxi)
Expand Down Expand Up @@ -1603,7 +1603,7 @@ async def core_func2(task, args):
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
errctxi = 1
assert(ret == (definitions.CLOSED | errctxi))
[] = await canon_stream_close_readable(None, task, rsi)
[] = await canon_stream_close_readable(None, task, rsi, 0)
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
[] = await canon_error_context_drop(task, errctxi)
return []
Expand Down Expand Up @@ -1674,7 +1674,7 @@ async def core_func(task, args):
assert(ret == definitions.BLOCKED)
[ret] = await canon_stream_cancel_read(U8Type(), True, task, rsi)
assert(ret == 0)
[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)

[ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp])
assert(ret == 0)
Expand All @@ -1693,7 +1693,7 @@ async def core_func(task, args):
assert(mem[retp+0] == rsi)
assert(mem[retp+4] == 2)
assert(mem[0:2] == b'\x07\x08')
[] = await canon_stream_close_readable(U8Type(), task, rsi)
[] = await canon_stream_close_readable(U8Type(), task, rsi, 0)
[] = await canon_waitable_set_drop(task, seti)

return []
Expand Down Expand Up @@ -1806,7 +1806,7 @@ async def core_func(task, args):
assert(mem[readp] == 43)

[] = await canon_future_close_writable(U8Type(), task, wfi, 0)
[] = await canon_future_close_readable(U8Type(), task, rfi)
[] = await canon_future_close_readable(U8Type(), task, rfi, 0)
[] = await canon_waitable_set_drop(task, seti)

[wfi] = await canon_future_new(U8Type(), task)
Expand All @@ -1831,7 +1831,7 @@ async def core_func(task, args):
assert(mem[readp] == 43)

[] = await canon_future_close_writable(U8Type(), task, wfi, 0)
[] = await canon_future_close_readable(U8Type(), task, rfi)
[] = await canon_future_close_readable(U8Type(), task, rfi, 0)

return []

Expand Down

0 comments on commit 532ccf7

Please sign in to comment.