Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
elliottt committed May 14, 2024
1 parent c39c72c commit f10ddef
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 124 deletions.
3 changes: 2 additions & 1 deletion lib/src/component/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ where
let mut buf = Vec::with_capacity(max_len);

let mut finished = true;
for item in iter.skip(usize::try_from(cursor).unwrap()) {
let skip_amt = usize::try_from(cursor).expect("u32 can fit in usize");
for item in iter.skip(skip_amt) {
let bytes = item.as_ref();

let needed = buf.len() + bytes.len() + 1;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/component/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ impl log::Host for Session {
Ok(self.log_endpoint_handle(name).into())
}

async fn write(&mut self, h: log::Handle, msg: String) -> Result<u64, FastlyError> {
async fn write(&mut self, h: log::Handle, msg: String) -> Result<u32, FastlyError> {
let endpoint = self.log_endpoint(h.into())?;
let msg = msg.as_bytes();
endpoint.write_entry(&msg)?;
Ok(u64::try_from(msg.len()).unwrap())
Ok(u32::try_from(msg.len()).unwrap())
}
}
16 changes: 16 additions & 0 deletions lib/src/component/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
use {crate::linking::ComponentCtx, wasmtime::component};

/// This error type is used to classify two errors that can arise in a host-side implementation of
/// the fastly api:
///
/// * Application errors that are recoverable, and returned to the guest, and
/// * Traps that are expected to cause the guest to tear down immediately.
///
/// So a return type of `Result<T, FastlyError>` is morally equivalent to
/// `Result<Result<T, ApplicationError>, TrapError>`, but the former is much more pleasant to
/// program with.
///
/// We write explicit `From` impls for errors that we raise throughout the implementation of the
/// compute apis, so that we're able to make the choice between an application error and a trap.
pub enum FastlyError {
/// An application error, that will be communicated back to the guest through the
/// `fastly:api/types/error` type.
FastlyError(anyhow::Error),

/// An trap, which will cause wasmtime to immediately terminate the guest.
Trap(anyhow::Error),
}

Expand Down
26 changes: 7 additions & 19 deletions lib/src/component/secret_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@ use {
super::fastly::api::secret_store,
super::FastlyError,
crate::{
body::Body,
error::Error,
object_store::{ObjectKey, ObjectStoreError},
secret_store::SecretLookup,
session::Session,
wiggle_abi::SecretStoreError,
error::Error, secret_store::SecretLookup, session::Session, wiggle_abi::SecretStoreError,
},
};

Expand All @@ -27,19 +22,12 @@ impl secret_store::Host for Session {
store: secret_store::StoreHandle,
key: String,
) -> Result<Option<secret_store::SecretHandle>, FastlyError> {
let store = self.get_obj_store_key(store.into()).unwrap();
let key = ObjectKey::new(&key)?;
match self.obj_lookup(store, &key) {
Ok(obj) => {
let new_handle = self.insert_body(Body::from(obj));
Ok(Some(new_handle.into()))
}
// Don't write to the invalid handle as the SDK will return Ok(None)
// if the object does not exist. We need to return `Ok(())` here to
// make sure Viceroy does not crash
Err(ObjectStoreError::MissingObject) => Ok(None),
Err(err) => Err(err.into()),
}
let store_name = self.secret_store_name(store.into()).ok_or_else(|| {
FastlyError::from(SecretStoreError::InvalidSecretStoreHandle(store.into()))
})?;
Ok(self
.secret_handle(&store_name, &key)
.map(secret_store::SecretHandle::from))
}

async fn plaintext(
Expand Down
209 changes: 110 additions & 99 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ impl ExecuteCtx {
let config = &configure_wasmtime(is_component, profiling_strategy);
let engine = Engine::new(config)?;
let instance_pre = if is_component {
assert_eq!(
unknown_import_behavior,
UnknownImportBehavior::LinkError,
"Wasm components do not support unknown import behaviors other than link-time errors",
);
if unknown_import_behavior != UnknownImportBehavior::LinkError {
return Err(Error::Other(anyhow::anyhow!(
"Wasm components do not support unknown import behaviors other than link-time errors"
)));
}

warn!(
"
Expand Down Expand Up @@ -418,118 +418,126 @@ impl ExecuteCtx {
self.secret_stores.clone(),
);

if let Instance::Component(instance_pre) = self.instance_pre.as_ref() {
let req = session.downstream_request();
let body = session.downstream_request_body();

let mut store = ComponentCtx::create_store(&self, session, None, |_| {})
.map_err(ExecutionError::Context)?;

let (compute, _instance) = compute::Compute::instantiate_pre(&mut store, instance_pre)
.await
.map_err(ExecutionError::Instantiation)?;

let result = compute
.fastly_api_reactor()
.call_serve(&mut store, req.into(), body.into())
.await
.map_err(ExecutionError::Typechecking)?;
let guest_profile_path = self.guest_profile_path.as_deref().map(|path| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
path.join(format!("{}-{}.json", now, req_id))
});

let outcome = match result {
Ok(()) => Ok(()),
Err(()) => {
event!(Level::ERROR, "WebAssembly exited with an error");
Err(ExecutionError::WasmTrap(anyhow::Error::msg("failed")))
match self.instance_pre.as_ref() {
Instance::Component(instance_pre) => {
if self.guest_profile_path.is_some() {
warn!("Components do not currently support the guest profiler");
}
};

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();
let req = session.downstream_request();
let body = session.downstream_request_body();

let request_duration = Instant::now().duration_since(start_timestamp);
let mut store = ComponentCtx::create_store(&self, session, None, |_| {})
.map_err(ExecutionError::Context)?;

info!(
"request completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64),
);
let (compute, _instance) =
compute::Compute::instantiate_pre(&mut store, instance_pre)
.await
.map_err(ExecutionError::Instantiation)?;

info!("request completed in {:.0?}", request_duration);
let result = compute
.fastly_api_reactor()
.call_serve(&mut store, req.into(), body.into())
.await
.map_err(ExecutionError::Typechecking)?;

return outcome;
}
let outcome = match result {
Ok(()) => Ok(()),
Err(()) => {
event!(Level::ERROR, "WebAssembly exited with an error");
Err(ExecutionError::WasmTrap(anyhow::Error::msg("failed")))
}
};

let (module, instance_pre) = self.instance_pre.unwrap_module();
// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();

let guest_profile_path = self.guest_profile_path.as_deref().map(|path| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
path.join(format!("{}-{}.json", now, req_id))
});
let profiler = self.guest_profile_path.is_some().then(|| {
let program_name = "main";
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
vec![(program_name.to_string(), module.clone())],
)
});
let request_duration = Instant::now().duration_since(start_timestamp);

// We currently have to postpone linking and instantiation to the guest task
// due to wasmtime limitations, in particular the fact that `Instance` is not `Send`.
// However, the fact that the module itself is created within `ExecuteCtx::new`
// means that the heavy lifting happens only once.
let mut store =
create_store(&self, session, profiler, |_| {}).map_err(ExecutionError::Context)?;
info!(
"request completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64),
);

let instance = instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;
info!("request completed in {:.0?}", request_duration);

// Pull out the `_start` function, which by convention with WASI is the main entry point for
// an application.
let main_func = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(ExecutionError::Typechecking)?;
outcome
}

// Invoke the entrypoint function, which may or may not send a downstream response.
let outcome = match main_func.call_async(&mut store, ()).await {
Ok(_) => Ok(()),
Err(e) => {
if let Some(exit) = e.downcast_ref::<I32Exit>() {
if exit.0 == 0 {
Ok(())
} else {
event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
Err(ExecutionError::WasmTrap(e))
Instance::Module(module, instance_pre) => {
let profiler = self.guest_profile_path.is_some().then(|| {
let program_name = "main";
GuestProfiler::new(
program_name,
EPOCH_INTERRUPTION_PERIOD,
vec![(program_name.to_string(), module.clone())],
)
});

// We currently have to postpone linking and instantiation to the guest task
// due to wasmtime limitations, in particular the fact that `Instance` is not `Send`.
// However, the fact that the module itself is created within `ExecuteCtx::new`
// means that the heavy lifting happens only once.
let mut store = create_store(&self, session, profiler, |_| {})
.map_err(ExecutionError::Context)?;

let instance = instance_pre
.instantiate_async(&mut store)
.await
.map_err(ExecutionError::Instantiation)?;

// Pull out the `_start` function, which by convention with WASI is the main entry point for
// an application.
let main_func = instance
.get_typed_func::<(), ()>(&mut store, "_start")
.map_err(ExecutionError::Typechecking)?;

// Invoke the entrypoint function, which may or may not send a downstream response.
let outcome = match main_func.call_async(&mut store, ()).await {
Ok(_) => Ok(()),
Err(e) => {
if let Some(exit) = e.downcast_ref::<I32Exit>() {
if exit.0 == 0 {
Ok(())
} else {
event!(Level::ERROR, "WebAssembly exited with error: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
} else {
event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
}
} else {
event!(Level::ERROR, "WebAssembly trapped: {:?}", e);
Err(ExecutionError::WasmTrap(e))
}
}
};
};

// If we collected a profile, write it to the file
write_profile(&mut store, guest_profile_path.as_ref());
// If we collected a profile, write it to the file
write_profile(&mut store, guest_profile_path.as_ref());

// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();
// Ensure the downstream response channel is closed, whether or not a response was
// sent during execution.
store.data_mut().close_downstream_response_sender();

let request_duration = Instant::now().duration_since(start_timestamp);
let request_duration = Instant::now().duration_since(start_timestamp);

info!(
"request completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64)
);
info!(
"request completed using {} of WebAssembly heap",
bytesize::ByteSize::b(store.data().limiter().memory_allocated as u64)
);

info!("request completed in {:.0?}", request_duration);
info!("request completed in {:.0?}", request_duration);

outcome
outcome
}
}
}

pub async fn run_main(self, program_name: &str, args: &[String]) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -639,7 +647,10 @@ impl Drop for ExecuteCtx {
}
}

fn configure_wasmtime(component: bool, profiling_strategy: ProfilingStrategy) -> wasmtime::Config {
fn configure_wasmtime(
allow_components: bool,
profiling_strategy: ProfilingStrategy,
) -> wasmtime::Config {
use wasmtime::{
Config, InstanceAllocationStrategy, PoolingAllocationConfig, WasmBacktraceDetails,
};
Expand Down Expand Up @@ -687,7 +698,7 @@ fn configure_wasmtime(component: bool, profiling_strategy: ProfilingStrategy) ->
pooling_allocation_config,
));

if component {
if allow_components {
config.wasm_component_model(true);
}

Expand Down
11 changes: 9 additions & 2 deletions lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ComponentCtx {
///
/// [ctx]: ../wiggle_abi/struct.ExecuteCtx.html
/// [store]: https://docs.rs/wasmtime/latest/wasmtime/struct.Store.html
pub(crate) fn create_store<'a>(
pub(crate) fn create_store(
ctx: &ExecuteCtx,
session: Session,
guest_profiler: Option<GuestProfiler>,
Expand Down Expand Up @@ -155,7 +155,7 @@ impl WasmCtx {
///
/// [ctx]: ../wiggle_abi/struct.ExecuteCtx.html
/// [store]: https://docs.rs/wasmtime/latest/wasmtime/struct.Store.html
pub(crate) fn create_store<'a>(
pub(crate) fn create_store(
ctx: &ExecuteCtx,
session: Session,
guest_profiler: Option<GuestProfiler>,
Expand Down Expand Up @@ -192,6 +192,13 @@ pub(crate) fn create_store<'a>(
fn make_wasi_ctx(ctx: &ExecuteCtx, session: &Session) -> WasiCtxBuilder {
let mut wasi_ctx = WasiCtxBuilder::new();

wasi_ctx.preopened_dir(
"tmp",
"/kv-store",
wasmtime_wasi::DirPerms::all(),
wasmtime_wasi::FilePerms::all(),
).unwrap();

// Viceroy provides the same `FASTLY_*` environment variables that the production
// Compute platform provides:

Expand Down
2 changes: 1 addition & 1 deletion lib/wit/deps/fastly/compute.wit
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ interface log {

endpoint-get: func(name: string) -> result<handle, error>;

write: func(h: handle, msg: string) -> result<u64, error>;
write: func(h: handle, msg: string) -> result<u32, error>;
}

/*
Expand Down

0 comments on commit f10ddef

Please sign in to comment.