From 19a773da47bf64e82caa213d96e2aa73e3d6da94 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 11 Dec 2023 09:55:11 +0100 Subject: [PATCH 1/5] Allow cheaply cloning a StoreReader to enable user control over block cache usage. --- src/store/reader.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/store/reader.rs b/src/store/reader.rs index 16125a1475..f2d652b561 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -40,6 +40,15 @@ struct BlockCache { } impl BlockCache { + fn new(cache_num_blocks: usize) -> Self { + Self { + cache: NonZeroUsize::new(cache_num_blocks) + .map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))), + cache_hits: Default::default(), + cache_misses: Default::default(), + } + } + fn get_from_cache(&self, pos: usize) -> Option { if let Some(block) = self .cache @@ -128,17 +137,23 @@ impl StoreReader { Ok(StoreReader { decompressor: footer.decompressor, data: data_file, - cache: BlockCache { - cache: NonZeroUsize::new(cache_num_blocks) - .map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))), - cache_hits: Default::default(), - cache_misses: Default::default(), - }, + cache: BlockCache::new(cache_num_blocks), skip_index: Arc::new(skip_index), space_usage, }) } + /// Clones the given store reader with an independent block cache of the given size. + pub fn fork_cache(&self, cache_num_blocks: usize) -> Self { + Self { + decompressor: self.decompressor, + data: self.data.clone(), + cache: BlockCache::new(cache_num_blocks), + skip_index: Arc::clone(&self.skip_index), + space_usage: self.space_usage.clone(), + } + } + pub(crate) fn block_checkpoints(&self) -> impl Iterator + '_ { self.skip_index.checkpoints() } From 49a913f6f85a5f0933e9d185947580533a0d0055 Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 11 Dec 2023 09:59:59 +0100 Subject: [PATCH 2/5] Expose which documents cache together to user code. --- src/store/mod.rs | 2 +- src/store/reader.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/store/mod.rs b/src/store/mod.rs index 7fbd8c1e5c..79c1221042 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -38,7 +38,7 @@ mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; -pub use self::reader::{CacheStats, StoreReader}; +pub use self::reader::{CacheKey, CacheStats, StoreReader}; pub use self::writer::StoreWriter; mod store_compressor; diff --git a/src/store/reader.rs b/src/store/reader.rs index f2d652b561..7cac8ec40e 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -90,6 +90,10 @@ impl BlockCache { } } +/// Opaque cache key which indicates which documents are cached together. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct CacheKey(usize); + #[derive(Debug, Default)] /// CacheStats for the `StoreReader`. pub struct CacheStats { @@ -167,6 +171,20 @@ impl StoreReader { self.cache.stats() } + /// Returns the cache key for a given document + /// + /// These keys are opaque and are not used with the public API, + /// but having the same cache key means that the documents + /// will only require one I/O and decompression operation + /// when retrieve from the same store reader consecutively. + /// + /// Note that looking up the cache key of a document + /// will not yet pull anything into the block cache. + pub fn cache_key(&self, doc_id: DocId) -> crate::Result { + let checkpoint = self.block_checkpoint(doc_id)?; + Ok(CacheKey(checkpoint.byte_range.start)) + } + /// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the /// document. /// From 3d48ce80c584142b099d994ef109a82e8e5f632a Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 11 Dec 2023 11:00:59 +0100 Subject: [PATCH 3/5] Add Searcher::docs_async which efficently fetches multiple documents by group them by segment and block. --- src/core/searcher.rs | 103 +++++++++++++++++++++++++++++++++++++++++++ src/core/tests.rs | 61 ++++++++++++++++++++++++- 2 files changed, 162 insertions(+), 2 deletions(-) diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 3f989696ce..dc54da8717 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -1,4 +1,6 @@ use std::collections::BTreeMap; +#[cfg(feature = "quickwit")] +use std::future::Future; use std::sync::Arc; use std::{fmt, io}; @@ -112,6 +114,107 @@ impl Searcher { store_reader.get_async(doc_address.doc_id).await } + /// Fetches multiple documents in an asynchronous manner. + /// + /// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times, + /// as it groups overlapping requests to segments and blocks and avoids concurrent requests + /// trashing the caches of each other. However, it does so using intermediate data structures + /// and independent block caches so it will be slower if documents from very few blocks are + /// fetched which would have fit into the global block cache. + /// + /// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`) + /// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether + /// it is predominately I/O-bound or rather CPU-bound. + /// + /// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled + /// into the global block cache and hence not be available for subsequent calls. + /// + /// Note that there is no synchronous variant of this method as the same degree of efficiency + /// can be had by accessing documents in address order. + /// + /// # Example + /// + /// ```rust,no_run + /// # use futures::executor::block_on; + /// # use futures::stream::{FuturesUnordered, StreamExt}; + /// # + /// # use tantivy::schema::Schema; + /// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError}; + /// # + /// # let index = Index::create_in_ram(Schema::builder().build()); + /// # let searcher = index.reader()?.searcher(); + /// # + /// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0)); + /// # + /// let mut groups: FuturesUnordered<_> = searcher + /// .docs_async::(doc_addresses)? + /// .collect(); + /// + /// let mut docs = Vec::new(); + /// + /// block_on(async { + /// while let Some(group) = groups.next().await { + /// docs.extend(group?); + /// } + /// + /// Ok::<_, TantivyError>(()) + /// })?; + /// # + /// # Ok::<_, TantivyError>(()) + /// ``` + #[cfg(feature = "quickwit")] + pub fn docs_async( + &self, + doc_addresses: impl IntoIterator, + ) -> crate::Result< + impl Iterator>>> + '_, + > { + use rustc_hash::FxHashMap; + + use crate::store::CacheKey; + use crate::{DocId, SegmentOrdinal}; + + let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec> = Default::default(); + + for doc_address in doc_addresses { + let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize]; + let cache_key = store_reader.cache_key(doc_address.doc_id)?; + + groups + .entry((doc_address.segment_ord, cache_key)) + .or_default() + .push(doc_address.doc_id); + } + + let futures = groups + .into_iter() + .map(|((segment_ord, _cache_key), doc_ids)| { + // Each group fetches documents from exactly one block and + // therefore gets an independent block cache of size one. + let store_reader = self.inner.store_readers[segment_ord as usize].fork_cache(1); + + async move { + let mut docs = Vec::new(); + + for doc_id in doc_ids { + let doc = store_reader.get_async(doc_id).await?; + + docs.push(( + DocAddress { + segment_ord, + doc_id, + }, + doc, + )); + } + + Ok(docs) + } + }); + + Ok(futures) + } + /// Access the schema associated with the index of this searcher. pub fn schema(&self) -> &Schema { &self.inner.schema diff --git a/src/core/tests.rs b/src/core/tests.rs index e215c31f46..41313784b0 100644 --- a/src/core/tests.rs +++ b/src/core/tests.rs @@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() { json_term_writer.set_fast_value(75u64); let postings = inv_idx .read_postings( - &json_term_writer.term(), + json_term_writer.term(), IndexRecordOption::WithFreqsAndPositions, ) .unwrap() @@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() { json_term_writer.set_fast_value(75u64); let mut postings = inv_idx .read_postings( - &json_term_writer.term(), + json_term_writer.term(), IndexRecordOption::WithFreqsAndPositions, ) .unwrap() @@ -474,3 +474,60 @@ fn test_non_text_json_term_freq_bitpacked() { assert_eq!(postings.term_freq(), 1u32); } } + +#[cfg(feature = "quickwit")] +#[test] +fn test_get_many_docs() -> crate::Result<()> { + use futures::executor::block_on; + use futures::stream::{FuturesUnordered, StreamExt}; + + use crate::schema::{OwnedValue, STORED}; + use crate::{DocAddress, TantivyError}; + + let mut schema_builder = Schema::builder(); + let num_field = schema_builder.add_u64_field("num", STORED); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + let mut index_writer: IndexWriter = index.writer_for_tests()?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + for i in 0..10u64 { + let doc = doc!(num_field=>i); + index_writer.add_document(doc)?; + } + + index_writer.commit()?; + let segment_ids = index.searchable_segment_ids()?; + index_writer.merge(&segment_ids).wait().unwrap(); + + let searcher = index.reader()?.searcher(); + assert_eq!(searcher.num_docs(), 10); + + let doc_addresses = (0..10).map(|i| DocAddress::new(0, i)); + + let mut groups: FuturesUnordered<_> = searcher + .docs_async::(doc_addresses)? + .collect(); + + let mut doc_nums = Vec::new(); + + block_on(async { + while let Some(group) = groups.next().await { + for (_doc_address, doc) in group? { + let num_value = doc.get_first(num_field).unwrap(); + + if let OwnedValue::U64(num) = num_value { + doc_nums.push(*num); + } else { + panic!("Expected u64 value"); + } + } + } + + Ok::<_, TantivyError>(()) + })?; + + doc_nums.sort(); + assert_eq!(doc_nums, (0..10).collect::>()); + + Ok(()) +} From 0361a1edaac93a962976b6648fc9d5190ae0c85a Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 11 Dec 2023 11:04:18 +0100 Subject: [PATCH 4/5] Do not expose StoreReader::fork_cache and ::cache_key in the public API if only Searcher::docs_async uses them. --- src/store/mod.rs | 4 +++- src/store/reader.rs | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/store/mod.rs b/src/store/mod.rs index 79c1221042..0dd86f69b7 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -37,8 +37,10 @@ mod reader; mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; +#[cfg(feature = "quickwit")] +pub(crate) use self::reader::CacheKey; pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; -pub use self::reader::{CacheKey, CacheStats, StoreReader}; +pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; mod store_compressor; diff --git a/src/store/reader.rs b/src/store/reader.rs index 7cac8ec40e..0c0de9c128 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -92,7 +92,7 @@ impl BlockCache { /// Opaque cache key which indicates which documents are cached together. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct CacheKey(usize); +pub(crate) struct CacheKey(usize); #[derive(Debug, Default)] /// CacheStats for the `StoreReader`. @@ -148,7 +148,8 @@ impl StoreReader { } /// Clones the given store reader with an independent block cache of the given size. - pub fn fork_cache(&self, cache_num_blocks: usize) -> Self { + #[cfg(feature = "quickwit")] + pub(crate) fn fork_cache(&self, cache_num_blocks: usize) -> Self { Self { decompressor: self.decompressor, data: self.data.clone(), @@ -180,7 +181,8 @@ impl StoreReader { /// /// Note that looking up the cache key of a document /// will not yet pull anything into the block cache. - pub fn cache_key(&self, doc_id: DocId) -> crate::Result { + #[cfg(feature = "quickwit")] + pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result { let checkpoint = self.block_checkpoint(doc_id)?; Ok(CacheKey(checkpoint.byte_range.start)) } From d1177fe22f54b893361e5f5650273f6283fd696c Mon Sep 17 00:00:00 2001 From: Adam Reichold Date: Mon, 11 Dec 2023 11:32:39 +0100 Subject: [PATCH 5/5] Opportunistically seed forked block caches from current one. --- src/core/searcher.rs | 5 +++-- src/store/reader.rs | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/core/searcher.rs b/src/core/searcher.rs index dc54da8717..3bc359a784 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -188,10 +188,11 @@ impl Searcher { let futures = groups .into_iter() - .map(|((segment_ord, _cache_key), doc_ids)| { + .map(|((segment_ord, cache_key), doc_ids)| { // Each group fetches documents from exactly one block and // therefore gets an independent block cache of size one. - let store_reader = self.inner.store_readers[segment_ord as usize].fork_cache(1); + let store_reader = + self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]); async move { let mut docs = Vec::new(); diff --git a/src/store/reader.rs b/src/store/reader.rs index 0c0de9c128..cf3e136637 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -148,15 +148,26 @@ impl StoreReader { } /// Clones the given store reader with an independent block cache of the given size. + /// + /// `cache_keys` is used to seed the forked cache from the current cache + /// if some blocks are already available. #[cfg(feature = "quickwit")] - pub(crate) fn fork_cache(&self, cache_num_blocks: usize) -> Self { - Self { + pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self { + let forked = Self { decompressor: self.decompressor, data: self.data.clone(), cache: BlockCache::new(cache_num_blocks), skip_index: Arc::clone(&self.skip_index), space_usage: self.space_usage.clone(), + }; + + for &CacheKey(pos) in cache_keys { + if let Some(block) = self.cache.get_from_cache(pos) { + forked.cache.put_into_cache(pos, block); + } } + + forked } pub(crate) fn block_checkpoints(&self) -> impl Iterator + '_ {