Skip to content

Commit

Permalink
validate serialization and improve errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Sep 4, 2024
1 parent e6af7cd commit b1c998b
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ parking_lot = { workspace = true }
rand = { workspace = true }
rustc-hash = { workspace = true }
serde = { workspace = true }
serde_path_to_error = { workspace = true }
smallvec = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use serde::{Deserialize, Serialize};
use turbo_tasks::{
event::{Event, EventListener},
registry,
util::SharedError,
CellId, KeyValuePair, TaskId, TypedSharedReference, ValueTypeId,
};
Expand Down Expand Up @@ -247,6 +248,13 @@ impl CachedDataItem {
}
}

pub fn is_optional(&self) -> bool {
match self {
CachedDataItem::CellData { .. } => true,
_ => false,
}
}

pub fn new_scheduled(description: impl Fn() -> String + Sync + Send + 'static) -> Self {
CachedDataItem::InProgress {
value: InProgressState::Scheduled {
Expand Down Expand Up @@ -310,6 +318,9 @@ impl CachedDataItemValue {
pub fn is_persistent(&self) -> bool {
match self {
CachedDataItemValue::Output { value } => !value.is_transient(),
CachedDataItemValue::CellData { value } => {
registry::get_value_type(value.0).is_serializable()
}
_ => true,
}
}
Expand Down
75 changes: 69 additions & 6 deletions turbopack/crates/turbo-tasks-backend/src/lmdb_backing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
};

use anyhow::{anyhow, Context, Result};
use bincode::Options;
use lmdb::{Database, DatabaseFlags, Environment, EnvironmentFlags, Transaction, WriteFlags};
use turbo_tasks::{backend::CachedTaskType, KeyValuePair, TaskId};

Expand Down Expand Up @@ -145,7 +146,8 @@ impl BackingStorage for LmdbBackingStorage {
WriteFlags::empty(),
)
.with_context(|| anyhow!("Unable to write next free task id"))?;
let operations = bincode::serialize(&operations)?;
let operations = bincode::serialize(&operations)
.with_context(|| anyhow!("Unable to serialize operations"))?;
tx.put(
self.meta_db,
&IntKey::new(META_KEY_OPERATIONS),
Expand All @@ -163,7 +165,20 @@ impl BackingStorage for LmdbBackingStorage {
Entry::Vacant(entry) => {
let mut map = HashMap::new();
if let Ok(old_data) = tx.get(self.data_db, &IntKey::new(*task)) {
let old_data: Vec<CachedDataItem> = bincode::deserialize(old_data)?;
let old_data: Vec<CachedDataItem> = match bincode::deserialize(old_data) {
Ok(d) => d,
Err(_) => serde_path_to_error::deserialize(
&mut bincode::Deserializer::from_slice(
old_data,
bincode::DefaultOptions::new()
.with_fixint_encoding()
.allow_trailing_bytes(),
),
)
.with_context(|| {
anyhow!("Unable to deserialize old value of {task}: {old_data:?}")
})?,
};
for item in old_data {
let (key, value) = item.into_key_and_value();
map.insert(key, value);
Expand All @@ -179,13 +194,61 @@ impl BackingStorage for LmdbBackingStorage {
}
}
for (task_id, data) in updated_items {
let vec: Vec<CachedDataItem> = data
let mut vec: Vec<CachedDataItem> = data
.into_iter()
.map(|(key, value)| CachedDataItem::from_key_and_value(key, value))
.collect();
let value = bincode::serialize(&vec).with_context(|| {
anyhow!("Unable to serialize data items for {task_id}: {vec:#?}")
})?;
let value = match bincode::serialize(&vec) {
// Ok(value) => value,
Ok(_) | Err(_) => {
let mut error = Ok(());
vec.retain(|item| {
let mut buf = Vec::<u8>::new();
let mut serializer = bincode::Serializer::new(
&mut buf,
bincode::DefaultOptions::new()
.with_fixint_encoding()
.allow_trailing_bytes(),
);
if let Err(err) = serde_path_to_error::serialize(item, &mut serializer) {
if item.is_optional() {
println!("Skipping non-serializable optional item: {item:?}");
} else {
error = Err(err).context({
anyhow!(
"Unable to serialize data item for {task_id}: {item:#?}"
)
});
}
false
} else {
let deserialize: Result<CachedDataItem, _> =
serde_path_to_error::deserialize(
&mut bincode::Deserializer::from_slice(
&buf,
bincode::DefaultOptions::new()
.with_fixint_encoding()
.allow_trailing_bytes(),
),
);
if let Err(err) = deserialize {
println!(
"Data item would not be deserializable {task_id}: \
{err:?}\n{item:#?}"
);
false
} else {
true
}
}
});
error?;

bincode::serialize(&vec).with_context(|| {
anyhow!("Unable to serialize data items for {task_id}: {vec:#?}")
})?
}
};
tx.put(
self.data_db,
&IntKey::new(*task_id),
Expand Down
6 changes: 6 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/utils/chunked_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ impl<T> ChunkedVec<T> {
self.chunks.push(chunk);
}

pub fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
for item in iter {
self.push(item);
}
}

pub fn into_iter(self) -> impl Iterator<Item = T> {
let len = self.len();
ExactSizeIter {
Expand Down
4 changes: 4 additions & 0 deletions turbopack/crates/turbo-tasks/src/value_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl ValueType {
}
}

pub fn is_serializable(&self) -> bool {
self.any_serialization.is_some()
}

pub fn get_magic_deserialize_seed(&self) -> Option<MagicAnyDeserializeSeed> {
self.magic_serialization.map(|s| s.1)
}
Expand Down

0 comments on commit b1c998b

Please sign in to comment.