Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Transparently fetch multiple documents grouped per block #2276

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions src/core/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::BTreeMap;
#[cfg(feature = "quickwit")]
use std::future::Future;
use std::sync::Arc;
use std::{fmt, io};

Expand Down Expand Up @@ -112,6 +114,108 @@ impl Searcher {
store_reader.get_async(doc_address.doc_id).await
}

/// Fetches multiple documents in an asynchronous manner.
///
/// This method is more efficient than calling [`doc_async`](Self::doc_async) multiple times,
/// as it groups overlapping requests to segments and blocks and avoids concurrent requests
/// trashing the caches of each other. However, it does so using intermediate data structures
/// and independent block caches so it will be slower if documents from very few blocks are
/// fetched which would have fit into the global block cache.
///
/// The caller is expected to poll these futures concurrently (e.g. using `FuturesUnordered`)
/// or in parallel (e.g. using `JoinSet`) as fits best with the given use case, i.e. whether
/// it is predominately I/O-bound or rather CPU-bound.
///
/// Note that any blocks brought into any of the per-segment-and-block groups will not be pulled
/// into the global block cache and hence not be available for subsequent calls.
///
/// Note that there is no synchronous variant of this method as the same degree of efficiency
/// can be had by accessing documents in address order.
///
/// # Example
///
/// ```rust,no_run
/// # use futures::executor::block_on;
/// # use futures::stream::{FuturesUnordered, StreamExt};
/// #
/// # use tantivy::schema::Schema;
/// # use tantivy::{DocAddress, Index, TantivyDocument, TantivyError};
/// #
/// # let index = Index::create_in_ram(Schema::builder().build());
/// # let searcher = index.reader()?.searcher();
/// #
/// # let doc_addresses = (0..10).map(|_| DocAddress::new(0, 0));
/// #
/// let mut groups: FuturesUnordered<_> = searcher
/// .docs_async::<TantivyDocument>(doc_addresses)?
/// .collect();
///
/// let mut docs = Vec::new();
///
/// block_on(async {
/// while let Some(group) = groups.next().await {
/// docs.extend(group?);
/// }
///
/// Ok::<_, TantivyError>(())
/// })?;
/// #
/// # Ok::<_, TantivyError>(())
/// ```
#[cfg(feature = "quickwit")]
pub fn docs_async<D: DocumentDeserialize>(
&self,
doc_addresses: impl IntoIterator<Item = DocAddress>,
) -> crate::Result<
impl Iterator<Item = impl Future<Output = crate::Result<Vec<(DocAddress, D)>>>> + '_,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature is admittedly somewhat unwieldy even though relatively easy to use in practice as the test shows. I wonder if I should add a no_run example showing usage with FuturesUnordered? (no_run to avoid making the example unreadable due to index setup code. Alternatively the setup code could be hidden.)

Copy link
Collaborator Author

@adamreichold adamreichold Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a no_run example on how to use this with FuturesUnordered and block_on.

> {
use rustc_hash::FxHashMap;

use crate::store::CacheKey;
use crate::{DocId, SegmentOrdinal};

let mut groups: FxHashMap<(SegmentOrdinal, CacheKey), Vec<DocId>> = Default::default();

for doc_address in doc_addresses {
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
let cache_key = store_reader.cache_key(doc_address.doc_id)?;

groups
.entry((doc_address.segment_ord, cache_key))
.or_default()
.push(doc_address.doc_id);
}

let futures = groups
.into_iter()
.map(|((segment_ord, cache_key), doc_ids)| {
// Each group fetches documents from exactly one block and
// therefore gets an independent block cache of size one.
let store_reader =
self.inner.store_readers[segment_ord as usize].fork_cache(1, &[cache_key]);

async move {
let mut docs = Vec::new();

for doc_id in doc_ids {
let doc = store_reader.get_async(doc_id).await?;

docs.push((
DocAddress {
segment_ord,
doc_id,
},
doc,
));
}

Ok(docs)
}
});

Ok(futures)
}

/// Access the schema associated with the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.inner.schema
Expand Down
61 changes: 59 additions & 2 deletions src/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ fn test_non_text_json_term_freq() {
json_term_writer.set_fast_value(75u64);
let postings = inv_idx
.read_postings(
&json_term_writer.term(),
json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
Expand Down Expand Up @@ -462,7 +462,7 @@ fn test_non_text_json_term_freq_bitpacked() {
json_term_writer.set_fast_value(75u64);
let mut postings = inv_idx
.read_postings(
&json_term_writer.term(),
json_term_writer.term(),
IndexRecordOption::WithFreqsAndPositions,
)
.unwrap()
Expand All @@ -474,3 +474,60 @@ fn test_non_text_json_term_freq_bitpacked() {
assert_eq!(postings.term_freq(), 1u32);
}
}

#[cfg(feature = "quickwit")]
#[test]
fn test_get_many_docs() -> crate::Result<()> {
use futures::executor::block_on;
use futures::stream::{FuturesUnordered, StreamExt};

use crate::schema::{OwnedValue, STORED};
use crate::{DocAddress, TantivyError};

let mut schema_builder = Schema::builder();
let num_field = schema_builder.add_u64_field("num", STORED);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
for i in 0..10u64 {
let doc = doc!(num_field=>i);
index_writer.add_document(doc)?;
}

index_writer.commit()?;
let segment_ids = index.searchable_segment_ids()?;
index_writer.merge(&segment_ids).wait().unwrap();

let searcher = index.reader()?.searcher();
assert_eq!(searcher.num_docs(), 10);

let doc_addresses = (0..10).map(|i| DocAddress::new(0, i));

let mut groups: FuturesUnordered<_> = searcher
.docs_async::<TantivyDocument>(doc_addresses)?
.collect();

let mut doc_nums = Vec::new();

block_on(async {
while let Some(group) = groups.next().await {
for (_doc_address, doc) in group? {
let num_value = doc.get_first(num_field).unwrap();

if let OwnedValue::U64(num) = num_value {
doc_nums.push(*num);
} else {
panic!("Expected u64 value");
}
}
}

Ok::<_, TantivyError>(())
})?;

doc_nums.sort();
assert_eq!(doc_nums, (0..10).collect::<Vec<u64>>());

Ok(())
}
2 changes: 2 additions & 0 deletions src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
#[cfg(feature = "quickwit")]
pub(crate) use self::reader::CacheKey;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;
Expand Down
58 changes: 52 additions & 6 deletions src/store/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ struct BlockCache {
}

impl BlockCache {
fn new(cache_num_blocks: usize) -> Self {
Self {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
}
}

fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self
.cache
Expand Down Expand Up @@ -81,6 +90,10 @@ impl BlockCache {
}
}

/// Opaque cache key which indicates which documents are cached together.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(crate) struct CacheKey(usize);

#[derive(Debug, Default)]
/// CacheStats for the `StoreReader`.
pub struct CacheStats {
Expand Down Expand Up @@ -128,17 +141,35 @@ impl StoreReader {
Ok(StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: NonZeroUsize::new(cache_num_blocks)
.map(|cache_num_blocks| Mutex::new(LruCache::new(cache_num_blocks))),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::new(skip_index),
space_usage,
})
}

/// Clones the given store reader with an independent block cache of the given size.
///
/// `cache_keys` is used to seed the forked cache from the current cache
/// if some blocks are already available.
#[cfg(feature = "quickwit")]
pub(crate) fn fork_cache(&self, cache_num_blocks: usize, cache_keys: &[CacheKey]) -> Self {
let forked = Self {
decompressor: self.decompressor,
data: self.data.clone(),
cache: BlockCache::new(cache_num_blocks),
skip_index: Arc::clone(&self.skip_index),
space_usage: self.space_usage.clone(),
};

for &CacheKey(pos) in cache_keys {
if let Some(block) = self.cache.get_from_cache(pos) {
forked.cache.put_into_cache(pos, block);
}
}

forked
}

pub(crate) fn block_checkpoints(&self) -> impl Iterator<Item = Checkpoint> + '_ {
self.skip_index.checkpoints()
}
Expand All @@ -152,6 +183,21 @@ impl StoreReader {
self.cache.stats()
}

/// Returns the cache key for a given document
///
/// These keys are opaque and are not used with the public API,
/// but having the same cache key means that the documents
/// will only require one I/O and decompression operation
/// when retrieve from the same store reader consecutively.
///
/// Note that looking up the cache key of a document
/// will not yet pull anything into the block cache.
#[cfg(feature = "quickwit")]
pub(crate) fn cache_key(&self, doc_id: DocId) -> crate::Result<CacheKey> {
let checkpoint = self.block_checkpoint(doc_id)?;
Ok(CacheKey(checkpoint.byte_range.start))
}

/// Get checkpoint for `DocId`. The checkpoint can be used to load a block containing the
/// document.
///
Expand Down
Loading