diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index af38fdef..26c24b40 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -1052,7 +1052,7 @@ class ReadableStream: t: ValType read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']] cancel: Callable[[], None] - close: Callable[[]] + close: Callable[[Optional[ErrorContext]]] closed: Callable[[], bool] closed_with_error: Callable[[], Optional[ErrorContext]] ``` @@ -1125,7 +1125,7 @@ been returned: def cancel(self): self.reset_and_notify_pending() - def close(self, errctx = None): + def close(self, errctx): if not self.closed_: self.closed_ = True self.errctx = errctx @@ -1145,11 +1145,6 @@ necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is implementing the stream, `cancel` always returns ownership of the buffer immediately. -`close` takes an optional `error-context` value which can only be supplied to -the writable end of a stream via `stream.close-writable`. Thus, for the -readable end of the stream, `close` is effectively nullary, matching -`ReadableStream.close`. - Note that `cancel` and `close` notify in opposite directions: * `cancel` *must* be called on a readable or writable end with an operation pending, and thus `cancel` notifies the same end that called it. @@ -1246,11 +1241,11 @@ class FutureEnd(StreamEnd): assert(buffer.remain() == 1) def on_copy_done_wrapper(): if buffer.remain() == 0: - self.stream.close() + self.stream.close(errctx = None) on_copy_done() ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: - self.stream.close() + self.stream.close(errctx = None) return ret class ReadableFutureEnd(FutureEnd): @@ -3541,7 +3536,6 @@ def pack_copy_result(task, buffer, e): return buffer.progress else: if (errctx := e.stream.closed_with_error()): - assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd)) errctxi = task.inst.error_contexts.add(errctx) assert(errctxi != 0) else: @@ -3554,11 +3548,6 @@ The order of tests here indicates that, if some progress was made and then the stream was closed, only the progress is reported and the `CLOSED` status is left to be discovered next time. -As asserted here, `error-context`s are only possible on the *readable* end of a -stream or future (since, as defined below, only the *writable* end can close -the stream with an `error-context`). Thus, `error-context`s only flow in the -same direction as values, as an optional last value of the stream or future. - ### 🔀 `canon {stream,future}.cancel-{read,write}` @@ -3631,14 +3620,8 @@ caller can assume that ownership of the buffer has been returned. For canonical definitions: ```wasm (canon stream.close-readable $t (core func $f)) -(canon future.close-readable $t (core func $f)) -``` -validation specifies: -* `$f` is given type `(func (param i32))` - -and for canonical definitions: -```wasm (canon stream.close-writable $t (core func $f)) +(canon future.close-readable $t (core func $f)) (canon future.close-writable $t (core func $f)) ``` validation specifies: @@ -3649,14 +3632,14 @@ the given index from the current component instance's `waitable` table, performing the guards and bookkeeping defined by `{Readable,Writable}{Stream,Future}End.drop()` above. ```python -async def canon_stream_close_readable(t, task, i): - return await close(ReadableStreamEnd, t, task, i, 0) +async def canon_stream_close_readable(t, task, i, errctxi): + return await close(ReadableStreamEnd, t, task, i, errctxi) async def canon_stream_close_writable(t, task, hi, errctxi): return await close(WritableStreamEnd, t, task, hi, errctxi) -async def canon_future_close_readable(t, task, i): - return await close(ReadableFutureEnd, t, task, i, 0) +async def canon_future_close_readable(t, task, i, errctxi): + return await close(ReadableFutureEnd, t, task, i, errctxi) async def canon_future_close_writable(t, task, hi, errctxi): return await close(WritableFutureEnd, t, task, hi, errctxi) @@ -3673,9 +3656,15 @@ async def close(EndT, t, task, hi, errctxi): e.drop(errctx) return [] ``` -Note that only the writable ends of streams and futures can be closed with a -final `error-context` value and thus `error-context`s only flow in the same -direction as values as an optional last value of the stream or future. +Passing a non-zero `errctxi` index indicates that this stream end is being +closed due to an error, with the given `error-context` providing information +that can be printed to aid in debugging. While, as explained above, the +*contents* of the `error-context` value are non-deterministic (and may, e.g., +be empty), the presence or absence of an `error-context` value is semantically +meainingful for distinguishing between success or failure. Concretely, the +packed `i32` produced by `pack_copy_result` above and returned by +`{stream,future}.{read,write}` operations indicates success or failure by +whether the packed `error-context` index is `0` or not. ### 🔀 `canon error-context.new` diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index f5d3e60d..0f198e1d 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -640,7 +640,7 @@ class ReadableStream: t: ValType read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']] cancel: Callable[[], None] - close: Callable[[]] + close: Callable[[Optional[ErrorContext]]] closed: Callable[[], bool] closed_with_error: Callable[[], Optional[ErrorContext]] @@ -672,7 +672,7 @@ def reset_and_notify_pending(self): def cancel(self): self.reset_and_notify_pending() - def close(self, errctx = None): + def close(self, errctx): if not self.closed_: self.closed_ = True self.errctx = errctx @@ -740,11 +740,11 @@ def close_after_copy(self, copy_op, buffer, on_copy_done): assert(buffer.remain() == 1) def on_copy_done_wrapper(): if buffer.remain() == 0: - self.stream.close() + self.stream.close(errctx = None) on_copy_done() ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: - self.stream.close() + self.stream.close(errctx = None) return ret class ReadableFutureEnd(FutureEnd): @@ -2072,7 +2072,6 @@ def pack_copy_result(task, buffer, e): return buffer.progress else: if (errctx := e.stream.closed_with_error()): - assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd)) errctxi = task.inst.error_contexts.add(errctx) assert(errctxi != 0) else: @@ -2114,14 +2113,14 @@ async def cancel_copy(EndT, event_code, t, sync, task, i): ### 🔀 `canon {stream,future}.close-{readable,writable}` -async def canon_stream_close_readable(t, task, i): - return await close(ReadableStreamEnd, t, task, i, 0) +async def canon_stream_close_readable(t, task, i, errctxi): + return await close(ReadableStreamEnd, t, task, i, errctxi) async def canon_stream_close_writable(t, task, hi, errctxi): return await close(WritableStreamEnd, t, task, hi, errctxi) -async def canon_future_close_readable(t, task, i): - return await close(ReadableFutureEnd, t, task, i, 0) +async def canon_future_close_readable(t, task, i, errctxi): + return await close(ReadableFutureEnd, t, task, i, errctxi) async def canon_future_close_writable(t, task, hi, errctxi): return await close(WritableFutureEnd, t, task, hi, errctxi) diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 8af438d6..17c4bac0 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1170,8 +1170,8 @@ async def core_func(task, args): assert(ret == 4) [ret] = await canon_stream_write(U8Type(), opts, task, wsi1, 0, 4) assert(ret == 4) - [] = await canon_stream_close_readable(U8Type(), task, rsi1) - [] = await canon_stream_close_readable(U8Type(), task, rsi2) + [] = await canon_stream_close_readable(U8Type(), task, rsi1, 0) + [] = await canon_stream_close_readable(U8Type(), task, rsi2, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi2, 0) return [] @@ -1261,7 +1261,7 @@ async def core_func(task, args): assert(ret == 4) [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi1, 0, 4) assert(ret == definitions.CLOSED) - [] = await canon_stream_close_readable(U8Type(), task, rsi1) + [] = await canon_stream_close_readable(U8Type(), task, rsi1, 0) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) assert(ret == 4) @@ -1275,7 +1275,7 @@ async def core_func(task, args): assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.CLOSED) - [] = await canon_stream_close_readable(U8Type(), task, rsi2) + [] = await canon_stream_close_readable(U8Type(), task, rsi2, 0) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4) assert(ret == 4) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) @@ -1389,7 +1389,7 @@ async def core_func(task, args): assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi) assert(mem[retp+4] == 2) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [wsi] = await canon_stream_new(U8Type(), task) assert(wsi == 1) @@ -1511,7 +1511,7 @@ async def core_func2(task, args): [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) @@ -1603,7 +1603,7 @@ async def core_func2(task, args): [ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2) errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) - [] = await canon_stream_close_readable(None, task, rsi) + [] = await canon_stream_close_readable(None, task, rsi, 0) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1674,7 +1674,7 @@ async def core_func(task, args): assert(ret == definitions.BLOCKED) [ret] = await canon_stream_cancel_read(U8Type(), True, task, rsi) assert(ret == 0) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) @@ -1693,7 +1693,7 @@ async def core_func(task, args): assert(mem[retp+0] == rsi) assert(mem[retp+4] == 2) assert(mem[0:2] == b'\x07\x08') - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1806,7 +1806,7 @@ async def core_func(task, args): assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) - [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_future_close_readable(U8Type(), task, rfi, 0) [] = await canon_waitable_set_drop(task, seti) [wfi] = await canon_future_new(U8Type(), task) @@ -1831,7 +1831,7 @@ async def core_func(task, args): assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) - [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_future_close_readable(U8Type(), task, rfi, 0) return []