Skip to content

Commit

Permalink
wasi: introduce platform.Select and use it for poll_oneoff (#1346)
Browse files Browse the repository at this point in the history
The PR introduces the `platform.Select()` API, wrapping `select(2)` on POSIX and emulated in some cases on Windows. RATIONALE.md contains a full explanation of the approach followed in `poll_oneoff` to handle Stdin and the other types of file descriptors, and the clock subscriptions.

It also introduces an abstraction (`StdioFilePoller`) to allow the simulation of different scenarios (waiting for input, input ready, timeout expired, etc.) when unit-testing interactive input.

This closes #1317.

Signed-off-by: Edoardo Vacchi <[email protected]>
  • Loading branch information
evacchi authored Apr 18, 2023
1 parent ab78591 commit ea33606
Show file tree
Hide file tree
Showing 23 changed files with 1,257 additions and 104 deletions.
90 changes: 90 additions & 0 deletions RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,96 @@ See https://github.com/WebAssembly/stack-switching/discussions/38
See https://github.com/WebAssembly/wasi-threads#what-can-be-skipped
See https://slinkydeveloper.com/Kubernetes-controllers-A-New-Hope/

## poll_oneoff

`poll_oneoff` is a WASI API for waiting for I/O events on multiple handles.
It is conceptually similar to the POSIX `poll(2)` syscall.
The name is not `poll`, because it references [“the fact that this function is not efficient
when used repeatedly with the same large set of handles”][poll_oneoff].

We chose to support this API in a handful of cases that work for regular files
and standard input. We currently do not support other types of file descriptors such
as socket handles.

### Clock Subscriptions

As detailed above in [sys.Nanosleep](#sysnanosleep), `poll_oneoff` handles
relative clock subscriptions. In our implementation we use `sys.Nanosleep()`
for this purpose in most cases, except when polling for interactive input
from `os.Stdin` (see more details below).

### FdRead and FdWrite Subscriptions

When subscribing a file descriptor (except `Stdin`) for reads or writes,
the implementation will generally return immediately with success, unless
the file descriptor is unknown. The file descriptor is not checked further
for new incoming data. Any timeout is cancelled, and the API call is able
to return, unless there are subscriptions to `Stdin`: these are handled
separately.

### FdRead and FdWrite Subscription to Stdin

Subscribing `Stdin` for reads (writes make no sense and cause an error),
requires extra care: wazero allows to configure a custom reader for `Stdin`.

In general, if a custom reader is found, the behavior will be the same
as for regular file descriptors: data is assumed to be present and
a success is written back to the result buffer.

However, if the reader is detected to read from `os.Stdin`,
a special code path is followed, invoking `platform.Select()`.

`platform.Select()` is a wrapper for `select(2)` on POSIX systems,
and it is mocked for a handful of cases also on Windows.

### Select on POSIX

On POSIX systems,`select(2)` allows to wait for incoming data on a file
descriptor, and block until either data becomes available or the timeout
expires. It is not surprising that `select(2)` and `poll(2)` have lot in common:
the main difference is how the file descriptor parameters are passed.

Usage of `platform.Select()` is only reserved for the standard input case, because

1. it is really only necessary to handle interactive input: otherwise,
there is no way in Go to peek from Standard Input without actually
reading (and thus consuming) from it;

2. if `Stdin` is connected to a pipe, it is ok in most cases to return
with success immediately;

3. `platform.Select()` is currently a blocking call, irrespective of goroutines,
because the underlying syscall is; thus, it is better to limit its usage.

So, if the subscription is for `os.Stdin` and the handle is detected
to correspond to an interactive session, then `platform.Select()` will be
invoked with a the `Stdin` handle *and* the timeout.

This also means that in this specific case, the timeout is uninterruptible,
unless data becomes available on `Stdin` itself.

### Select on Windows

On Windows the `platform.Select()` is much more straightforward,
and it really just replicates the behavior found in the general cases
for `FdRead` subscriptions: in other words, the subscription to `Stdin`
is immediately acknowledged.

The implementation also support a timeout, but in this case
it relies on `time.Sleep()`, which notably, as compared to the POSIX
case, interruptible and compatible with goroutines.

However, because `Stdin` subscriptions are always acknowledged
without wait and because this code path is always followed only
when at least one `Stdin` subscription is present, then the
timeout is effectively always handled externally.

In any case, the behavior of `platform.Select` on Windows
is sensibly different from the behavior on POSIX platforms;
we plan to refine and further align it in semantics in the future.

[poll_oneoff]: https://github.com/WebAssembly/wasi-poll#why-is-the-function-called-poll_oneoff

## Signed encoding of integer global constant initializers

wazero treats integer global constant initializers signed as their interpretation is not known at declaration time. For
Expand Down
190 changes: 143 additions & 47 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package wasi_snapshot_preview1

import (
"context"
"io/fs"
"syscall"
"time"

"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/internal/platform"
internalsys "github.com/tetratelabs/wazero/internal/sys"
"github.com/tetratelabs/wazero/internal/wasip1"
"github.com/tetratelabs/wazero/internal/wasm"
Expand Down Expand Up @@ -42,6 +43,13 @@ var pollOneoff = newHostFunc(
"in", "out", "nsubscriptions", "result.nevents",
)

type event struct {
eventType byte
userData []byte
errno wasip1.Errno
outOffset uint32
}

func pollOneoffFn(ctx context.Context, mod api.Module, params []uint64) syscall.Errno {
in := uint32(params[0])
out := uint32(params[1])
Expand All @@ -60,6 +68,11 @@ func pollOneoffFn(ctx context.Context, mod api.Module, params []uint64) syscall.
return syscall.EFAULT
}
outBuf, ok := mem.Read(out, nsubscriptions*32)
// zero-out all buffer before writing
for i := range outBuf {
outBuf[i] = 0
}

if !ok {
return syscall.EFAULT
}
Expand All @@ -72,88 +85,171 @@ func pollOneoffFn(ctx context.Context, mod api.Module, params []uint64) syscall.

// Loop through all subscriptions and write their output.

// Extract FS context, used in the body of the for loop for FS access.
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (stdin subscribers).
var stdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
clockEvents := uint32(0)
// Count of all the non-clock subscribers that have been already written back to outBuf.
readySubs := uint32(0)

// Layout is subscription_u: Union
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u
for i := uint32(0); i < nsubscriptions; i++ {
inOffset := i * 48
outOffset := i * 32

eventType := inBuf[inOffset+8] // +8 past userdata
var errno syscall.Errno // errno for this specific event (1-byte)
// +8 past userdata +8 contents_offset
argBuf := inBuf[inOffset+8+8:]
userData := inBuf[inOffset : inOffset+8]

evt := &event{
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
outOffset: outOffset,
}

switch eventType {
case wasip1.EventTypeClock: // handle later
// +8 past userdata +8 contents_offset
errno = processClockEvent(ctx, mod, inBuf[inOffset+8+8:])
case wasip1.EventTypeFdRead, wasip1.EventTypeFdWrite:
// +8 past userdata +8 contents_offset
errno = processFDEvent(mod, eventType, inBuf[inOffset+8+8:])
clockEvents++
newTimeout, err := processClockEvent(argBuf)
if err != 0 {
return err
}
// Min timeout.
if newTimeout < timeout {
timeout = newTimeout
}
// Ack the clock event to the outBuf.
writeEvent(outBuf, evt)
case wasip1.EventTypeFdRead:
fd := le.Uint32(argBuf)
if fd == internalsys.FdStdin {
// if the fd is Stdin, do not ack yet,
// append to a slice for delayed evaluation.
stdinSubs = append(stdinSubs, evt)
} else {
evt.errno = processFDEventRead(fsc, fd)
writeEvent(outBuf, evt)
readySubs++
}
case wasip1.EventTypeFdWrite:
fd := le.Uint32(argBuf)
evt.errno = processFDEventWrite(fsc, fd)
readySubs++
writeEvent(outBuf, evt)
default:
return syscall.EINVAL
}
}

// If there are subscribers with data ready, we have already written them to outBuf,
// and we don't need to wait for the timeout: clear it.
if readySubs != 0 {
timeout = 0
}

// If there are stdin subscribers, check for data with given timeout.
if len(stdinSubs) > 0 {
reader := getStdioFileReader(mod)
// Wait for the timeout to expire, or for some data to become available on Stdin.
stdinReady, err := reader.Poll(timeout)
if err != nil {
return platform.UnwrapOSError(err)
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range stdinSubs {
readySubs++
evt := stdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
}
} else {
// No subscribers, just wait for the given timeout.
sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
}

// Write the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
copy(outBuf, inBuf[inOffset:inOffset+8]) // userdata
if errno != 0 {
outBuf[outOffset+8] = byte(wasip1.ToErrno(errno)) // uint16, but safe as < 255
} else { // special case ass ErrnoSuccess is zero
outBuf[outOffset+8] = 0
if readySubs != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) {
return syscall.EFAULT
}
outBuf[outOffset+9] = 0
le.PutUint32(outBuf[outOffset+10:], uint32(eventType))
// TODO: When FD events are supported, write outOffset+16
}

return 0
}

// processClockEvent supports only relative name events, as that's what's used
// to implement sleep in various compilers including Rust, Zig and TinyGo.
func processClockEvent(_ context.Context, mod api.Module, inBuf []byte) syscall.Errno {
func processClockEvent(inBuf []byte) (time.Duration, syscall.Errno) {
_ /* ID */ = le.Uint32(inBuf[0:8]) // See below
timeout := le.Uint64(inBuf[8:16]) // nanos if relative
_ /* precision */ = le.Uint64(inBuf[16:24]) // Unused
flags := le.Uint16(inBuf[24:32])

var err syscall.Errno
// subclockflags has only one flag defined: subscription_clock_abstime
switch flags {
case 0: // relative time
case 1: // subscription_clock_abstime
return syscall.ENOTSUP
err = syscall.ENOTSUP
default: // subclockflags has only one flag defined.
return syscall.EINVAL
err = syscall.EINVAL
}

// https://linux.die.net/man/3/clock_settime says relative timers are
// unaffected. Since this function only supports relative timeout, we can
// skip name ID validation and use a single sleep function.
if err != 0 {
return 0, err
} else {
// https://linux.die.net/man/3/clock_settime says relative timers are
// unaffected. Since this function only supports relative timeout, we can
// skip name ID validation and use a single sleep function.

sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
return 0
return time.Duration(timeout), 0
}
}

// processFDEvent returns a validation error or syscall.ENOTSUP as file or socket
// subscriptions are not yet supported.
func processFDEvent(mod api.Module, eventType byte, inBuf []byte) syscall.Errno {
fd := le.Uint32(inBuf)
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// processFDEventRead returns ErrnoSuccess if the file exists and ErrnoBadf otherwise.
func processFDEventRead(fsc *internalsys.FSContext, fd uint32) wasip1.Errno {
if _, ok := fsc.LookupFile(fd); ok {
return wasip1.ErrnoSuccess
} else {
return wasip1.ErrnoBadf
}
}

// Choose the best error, which falls back to unsupported, until we support
// files.
errno := syscall.ENOTSUP
if eventType == wasip1.EventTypeFdRead {
if f, ok := fsc.LookupFile(fd); ok {
st, _ := f.Stat()
// if fd is a pipe, then it is not a char device (a tty)
if st.Mode&fs.ModeCharDevice != 0 {
errno = syscall.EBADF
}
} else {
errno = syscall.EBADF
}
} else if eventType == wasip1.EventTypeFdWrite && internalsys.WriterForFile(fsc, fd) == nil {
errno = syscall.EBADF
// processFDEventWrite returns ErrnoNotsup if the file exists and ErrnoBadf otherwise.
func processFDEventWrite(fsc *internalsys.FSContext, fd uint32) wasip1.Errno {
if internalsys.WriterForFile(fsc, fd) == nil {
return wasip1.ErrnoBadf
}
return wasip1.ErrnoNotsup
}

// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
copy(outBuf[evt.outOffset:], evt.userData) // userdata
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[evt.outOffset+9] = 0
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType))
// TODO: When FD events are supported, write outOffset+16
}

return errno
// getStdioFileReader extracts a StdioFileReader for FdStdin from the given api.Module instance.
// and panics if this is not possible.
func getStdioFileReader(mod api.Module) *internalsys.StdioFileReader {
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
if file, ok := fsc.LookupFile(internalsys.FdStdin); ok {
if reader, typeOk := file.File.(*internalsys.StdioFileReader); typeOk {
return reader
}
}
panic("unexpected error: Stdin must always be a StdioFileReader")
}
Loading

0 comments on commit ea33606

Please sign in to comment.