Skip to content

Commit

Permalink
NFC tracker: prepare for variable chunk size on the write side
Browse files Browse the repository at this point in the history
Hide WRITE_SNAPSHOT_GRANULARITY behind a function, and rename the
constant to `*DEFAULT_*WRITE_SNAPSHOT_GRANULARITY`, to enforce using
the function.

The Tracker object now keeps track of its snapshot granularity (i.e.,
chunk size, except for the last chunk which may be shorter) as a field...
and pipes it all the way to the serialised ManifestV1.

Manual inspection shows that the remaining reference changes (a lot of
which also transitively turned into functions) are for performance
heuristics (copier.rs, unzstd.rs), and thus also apply the current
default as the min value, to avoid acting too weird when the actual
chunk size is eventually configured much smaller than expected.

TESTED=it builds
  • Loading branch information
pkhuong committed Jan 22, 2024
1 parent bc46072 commit c5b2bdf
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
9 changes: 6 additions & 3 deletions src/copier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,13 @@ const CHUNK_COMPRESSION_LEVEL: i32 = 0;
/// long spans of trivially compressible 0s.
const FAST_COMPRESSION_LEVEL: i32 = 1;

/// When the raw data is at least this large, convert default (0)
/// When the raw data is *more than* this large, convert default (0)
/// compression to `FAST_COMPRESSION_LEVEL`: we assume that's not
/// sqlite db data, and probably incompressible.
const FAST_COMPRESSION_AUTO_SIZE: usize = (crate::tracker::WRITE_SNAPSHOT_GRANULARITY as usize) + 1;
fn fast_compression_auto_limit() -> usize {
crate::tracker::DEFAULT_WRITE_SNAPSHOT_GRANULARITY
.max(crate::tracker::write_snapshot_granularity()) as usize
}

lazy_static::lazy_static! {
static ref RECENT_WORK: Mutex<RecentWorkSet> = Mutex::new(RecentWorkSet::new(COPY_REQUEST_MEMORY, COPY_REQUEST_JITTER));
Expand Down Expand Up @@ -840,7 +843,7 @@ async fn copy_file(
// maximum we expect for a db chunk, assume it's probably
// fingerprints (incompressible), and override that by telling
// zstd to optimise for speed over quality.
let level = if level == 0 && bytes.len() >= FAST_COMPRESSION_AUTO_SIZE {
let level = if level == 0 && bytes.len() > fast_compression_auto_limit() {
FAST_COMPRESSION_LEVEL
} else {
level
Expand Down
25 changes: 18 additions & 7 deletions src/tracker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use crate::result::Result;
mod invariants;
mod snapshot_file_contents;

/// We snapshot db files in 64KB content-addressed chunks.
/// We snapshot db files in 64KB content-addressed chunks by default.
///
/// When changing this value, consider tweaking the set of well-known
/// chunks that writers do no need to publish. Currently, this set
/// only contains the chunk of 64 KiB (`loader::WELL_KNOWN_ZERO_CHUNK_SIZE`)
/// bytes. This set can grow backward compatibly, as long as readers
/// learn about well-known chunks before writers.
pub(crate) const WRITE_SNAPSHOT_GRANULARITY: u64 = 1 << 16;
pub(crate) const DEFAULT_WRITE_SNAPSHOT_GRANULARITY: u64 = 1 << 16;

/// Don't generate a base fingerprint chunk for a list of fingerprints
/// shorter than `BASE_CHUNK_MIN_LENGTH`.
Expand All @@ -58,6 +58,12 @@ const BASE_CHUNK_MIN_LENGTH: usize = 600;
/// essentially never the same.
const BUNDLED_CHUNK_OFFSETS: [u64; 1] = [0];

/// We snapshot db files in content-addressed chunks of this many
/// bytes.
pub(crate) fn write_snapshot_granularity() -> u64 {
DEFAULT_WRITE_SNAPSHOT_GRANULARITY
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum MutationState {
Clean, // No mutation since the last snapshot
Expand Down Expand Up @@ -106,6 +112,9 @@ pub(crate) struct Tracker {
// this db's manifest. Keeping this chunk alive guarantees we can
// find it in the global cache, and thus avoids useless GETs.
recent_base_chunk: Option<Arc<crate::loader::Chunk>>,

// Base size for content-addressable chunks.
snapshot_granularity: u64,
}

impl Tracker {
Expand Down Expand Up @@ -166,6 +175,7 @@ impl Tracker {
backing_file_state: MutationState::Unknown,
previous_version_id: Vec::new(),
recent_base_chunk: None,
snapshot_granularity: write_snapshot_granularity(),
}))
}

Expand Down Expand Up @@ -278,9 +288,10 @@ impl Tracker {

self.backing_file_state = MutationState::Dirty;

let snapshot_granularity = write_snapshot_granularity();
if !buf.is_null()
&& count == WRITE_SNAPSHOT_GRANULARITY
&& (offset % WRITE_SNAPSHOT_GRANULARITY) == 0
&& count == snapshot_granularity
&& (offset % snapshot_granularity) == 0
&& !self.should_bundle_chunk_at_offset(offset)
{
// When sqlite fires off a writes that's exactly
Expand All @@ -304,12 +315,12 @@ impl Tracker {

self.dirty_chunks.insert(offset, value);
} else if count > 0 {
let min = offset / WRITE_SNAPSHOT_GRANULARITY;
let max = offset.saturating_add(count - 1) / WRITE_SNAPSHOT_GRANULARITY;
let min = offset / snapshot_granularity;
let max = offset.saturating_add(count - 1) / snapshot_granularity;

for chunk_index in min..=max {
self.dirty_chunks
.insert(WRITE_SNAPSHOT_GRANULARITY * chunk_index, None);
.insert(snapshot_granularity * chunk_index, None);
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/tracker/snapshot_file_contents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use crate::result::Result;
use super::MutationState;
use super::Tracker;
use super::BASE_CHUNK_MIN_LENGTH;
use super::WRITE_SNAPSHOT_GRANULARITY;

/// What should we do with the current base chunk fingerprint list?
enum BaseChunkAction {
Expand Down Expand Up @@ -374,8 +373,9 @@ impl Tracker {
.metadata()
.map_err(|e| chain_error!(e, "failed to stat file", ?self.path))?
.len();
let num_chunks = len / WRITE_SNAPSHOT_GRANULARITY
+ (if (len % WRITE_SNAPSHOT_GRANULARITY) > 0 {
let snapshot_granularity = self.snapshot_granularity;
let num_chunks = len / snapshot_granularity
+ (if (len % snapshot_granularity) > 0 {
1
} else {
0
Expand All @@ -398,7 +398,7 @@ impl Tracker {
let grown = (fprints.len() as u64) < num_chunks;
let wrote_past_end = self
.dirty_chunks
.range(fprints.len() as u64 * WRITE_SNAPSHOT_GRANULARITY..=u64::MAX)
.range(fprints.len() as u64 * snapshot_granularity..=u64::MAX)
.next()
.is_some();
let delta = (grown || wrote_past_end) as u64;
Expand All @@ -418,8 +418,8 @@ impl Tracker {
// And make sure list's size matches the file's.
chunk_fprints.resize(num_chunks as usize, Fingerprint::new(0, 0));

// Box this allocation to avoid a 64KB stack allocation.
let mut buf = Box::new([0u8; WRITE_SNAPSHOT_GRANULARITY as usize]);
// Box this allocation to avoid a large stack allocation.
let mut buf = vec![0u8; snapshot_granularity as usize].into_boxed_slice();

let mut num_snapshotted: usize = 0;
let mut bundled_chunks = Vec::new();
Expand All @@ -434,9 +434,9 @@ impl Tracker {
let update = &mut |chunk_index, expected_fprint| -> Result<bool> {
num_snapshotted += 1;

let begin = chunk_index * WRITE_SNAPSHOT_GRANULARITY;
let end = if (len - begin) > WRITE_SNAPSHOT_GRANULARITY {
begin + WRITE_SNAPSHOT_GRANULARITY
let begin = chunk_index * snapshot_granularity;
let end = if (len - begin) > snapshot_granularity {
begin + snapshot_granularity
} else {
len
};
Expand Down Expand Up @@ -483,7 +483,7 @@ impl Tracker {
};

for (base, expected_fprint) in &self.dirty_chunks {
let chunk_index = base / WRITE_SNAPSHOT_GRANULARITY;
let chunk_index = base / snapshot_granularity;

// Everything greater than or equal to `backfill_begin`
// will be handled by the loop below. This avoids
Expand Down Expand Up @@ -514,7 +514,7 @@ impl Tracker {
if cfg!(not(feature = "test_vfs"))
&& !self
.dirty_chunks
.contains_key(&(random_index * WRITE_SNAPSHOT_GRANULARITY))
.contains_key(&(random_index * snapshot_granularity))
{
// We don't *have* to get these additional chunks, so
// we don't want to bubble up errors.
Expand Down Expand Up @@ -615,7 +615,7 @@ impl Tracker {
generated_by: crate::manifest_schema::generator_version_bytes(),
chunks: compressible,
bundled_chunks,
base_chunk_size: None,
base_chunk_size: Some(self.snapshot_granularity),
}),
};

Expand Down
8 changes: 5 additions & 3 deletions src/unzstd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
/// How big to set the initial capacity by default.
///
/// We expect to mostly decompress chunks of this size.
const BOUNDED_VECTOR_SIZE_INITIAL_CAPACITY: usize =
crate::tracker::WRITE_SNAPSHOT_GRANULARITY as usize;
fn bounded_vector_size_initial_capacity() -> usize {
crate::tracker::DEFAULT_WRITE_SNAPSHOT_GRANULARITY
.max(crate::tracker::write_snapshot_granularity()) as usize
}

/// A `Writer` that dumps bytes to `dst` and fails instead of writing
/// more than `max` bytes.
Expand All @@ -27,7 +29,7 @@ impl BoundedVectorSink {
fn new(max: usize) -> Self {
Self {
dst: Some(Vec::with_capacity(
max.clamp(0, BOUNDED_VECTOR_SIZE_INITIAL_CAPACITY),
max.clamp(0, bounded_vector_size_initial_capacity()),
)),
max,
}
Expand Down

0 comments on commit c5b2bdf

Please sign in to comment.