Skip to content

Commit

Permalink
Showing 1 changed file with 70 additions and 27 deletions.
97 changes: 70 additions & 27 deletions crates/containerd-shim-wasm/src/sandbox/containerd/client.rs
Original file line number Diff line number Diff line change
@@ -11,8 +11,10 @@ use containerd_client::services::v1::leases_client::LeasesClient;
use containerd_client::services::v1::{
Container, DeleteContentRequest, GetContainerRequest, GetImageRequest, Image, Info,
InfoRequest, ReadContentRequest, UpdateRequest, WriteAction, WriteContentRequest,
WriteContentResponse,
};
use containerd_client::tonic::transport::Channel;
use containerd_client::tonic::Streaming;
use containerd_client::{tonic, with_namespace};
use futures::TryStreamExt;
use oci_spec::image::{Arch, ImageManifest, MediaType, Platform};
@@ -30,6 +32,10 @@ use crate::sandbox::oci::{self, WasmLayer};
use crate::with_lease;

static PRECOMPILE_PREFIX: &str = "runwasi.io/precompiled";
// 16MB is the default maximum gRPC message size for gRPC in containerd:
// https://github.com/containerd/containerd/blob/main/defaults/defaults.go
// Conservatively set the max to 15MB to leave room for message overhead
static MAX_WRITE_CHUNK_SIZE_BYTES: i64 = 1024 * 1024 * 15;

pub struct Client {
inner: Channel,
@@ -154,17 +160,17 @@ impl Client {

// Send write request with Stat action to containerd to let it know that we are going to write content
// if the content is already there, it will return early with AlreadyExists
log::debug!("Sending stat request to containerd");
let req = WriteContentRequest {
r#ref: reference.clone(),
action: WriteAction::Stat.into(),
total: len,
expected: expected.clone(),
..Default::default()
};
tx.send(req)
.await
.map_err(|err| ShimError::Containerd(err.to_string()))?;

// Create stream for the channel
let request_stream = ReceiverStream::new(rx);
let request_stream =
with_lease!(request_stream, self.namespace, lease.lease_id.clone());
@@ -176,6 +182,8 @@ impl Client {
}
Err(e) => return Err(ShimError::Containerd(e.to_string())),
};

// Get initial Stat response
let response = response_stream
.message()
.await
@@ -186,42 +194,56 @@ impl Client {
expected
))
})?;
log::debug!(
"Starting to write content for layer {} with current status response {:?}",
expected,
response
);

// There is a scenario where the content might have been removed manually
// but the content isn't removed from the containerd file system yet.
// In this case if we re-add it at before its removed from file system
// we don't need to copy the content again. Container tells us it found the blob
// by returning the offset of the content that was found.
let data_to_write = data[response.offset as usize..].to_vec();
// Separate the content into chunks and send a write request for each chunk.
let mut offset = response.offset;
while offset < len {
let end = (offset + MAX_WRITE_CHUNK_SIZE_BYTES).min(len);
let chunk = &data[offset as usize..end as usize];

let write_request = WriteContentRequest {
action: WriteAction::Write.into(),
// Ignore size verification of each chunk
total: 0,
offset,
data: chunk.to_vec(),
..Default::default()
};
let response =
send_message(write_request, &mut response_stream, &tx, &expected).await?;
log::debug!(
"Writing content for layer {} at offset {} got response: {:?}",
expected,
offset,
response
);
offset = end;
}

// Send a final empty commit request to end the transaction
let commit_request = WriteContentRequest {
action: WriteAction::Commit.into(),
total: len,
offset: response.offset,
offset: len,
expected: expected.clone(),
labels,
data: data_to_write,
data: Vec::new(),
..Default::default()
};
log::debug!(
"Sending commit request to containerd with response: {:?}",
let response =
send_message(commit_request, &mut response_stream, &tx, &expected).await?;
log::info!(
"Validating final response after writing content for layer {}: {:?}",
expected,
response
);
tx.send(commit_request)
.await
.map_err(|err| ShimError::Containerd(format!("commit request error: {}", err)))?;
let response = response_stream
.message()
.await
.map_err(|err| ShimError::Containerd(format!("response stream error: {}", err)))?
.ok_or_else(|| {
ShimError::Containerd(format!(
"no response received after write request for {}",
expected.clone()
))
})?;

log::debug!("Validating response");
// client should validate that all bytes were written and that the digest matches
// Client should validate that all bytes were written and that the digest matches
if response.offset != len {
return Err(ShimError::Containerd(format!(
"failed to write all bytes, expected {} got {}",
@@ -530,6 +552,27 @@ fn is_wasm_layer(media_type: &MediaType, supported_layer_types: &[&str]) -> bool
supported
}

async fn send_message(
request: WriteContentRequest,
response_stream: &mut Streaming<WriteContentResponse>,
tx: &mpsc::Sender<WriteContentRequest>,
digest: &str,
) -> Result<WriteContentResponse> {
tx.send(request)
.await
.map_err(|err| ShimError::Containerd(format!("commit request error: {}", err)))?;
response_stream
.message()
.await
.map_err(|err| ShimError::Containerd(format!("response stream error: {}", err)))?
.ok_or_else(|| {
ShimError::Containerd(format!(
"no response received after write content request for {}",
digest
))
})
}

#[cfg(test)]
mod tests {
use std::path::PathBuf;

0 comments on commit 9f2c474

Please sign in to comment.