diff --git a/Cargo.lock b/Cargo.lock index 83d0a1e484..eccbd91907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2518,7 +2518,7 @@ dependencies = [ [[package]] name = "fsst" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "lance-datagen", @@ -3413,7 +3413,7 @@ dependencies = [ [[package]] name = "lance" -version = "0.22.1" +version = "0.23.0" dependencies = [ "all_asserts", "approx", @@ -3492,7 +3492,7 @@ dependencies = [ [[package]] name = "lance-arrow" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-buffer", @@ -3509,7 +3509,7 @@ dependencies = [ [[package]] name = "lance-core" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-buffer", @@ -3548,7 +3548,7 @@ dependencies = [ [[package]] name = "lance-datafusion" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3576,7 +3576,7 @@ dependencies = [ [[package]] name = "lance-datagen" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3593,7 +3593,7 @@ dependencies = [ [[package]] name = "lance-encoding" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrayref", "arrow", @@ -3640,7 +3640,7 @@ dependencies = [ [[package]] name = "lance-encoding-datafusion" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-buffer", @@ -3673,7 +3673,7 @@ dependencies = [ [[package]] name = "lance-file" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-arith", "arrow-array", @@ -3716,7 +3716,7 @@ dependencies = [ [[package]] name = "lance-index" -version = "0.22.1" +version = "0.23.0" dependencies = [ "approx", "arrow", @@ -3780,7 +3780,7 @@ dependencies = [ [[package]] name = "lance-io" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-arith", @@ -3824,7 +3824,7 @@ dependencies = [ [[package]] name = "lance-jni" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-schema", @@ -3846,7 +3846,7 @@ dependencies = [ [[package]] name = "lance-linalg" -version = "0.22.1" +version = "0.23.0" dependencies = [ "approx", "arrow-arith", @@ -3875,7 +3875,7 @@ dependencies = [ [[package]] name = "lance-table" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3920,7 +3920,7 @@ dependencies = [ [[package]] name = "lance-test-macros" -version = "0.22.1" +version = "0.23.0" dependencies = [ "proc-macro2", "quote", @@ -3929,7 +3929,7 @@ dependencies = [ [[package]] name = "lance-testing" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index 7df7484079..41cf6f24fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ exclude = ["python"] resolver = "2" [workspace.package] -version = "0.22.1" +version = "0.23.0" edition = "2021" authors = ["Lance Devs "] license = "Apache-2.0" @@ -44,21 +44,21 @@ categories = [ rust-version = "1.80.1" [workspace.dependencies] -lance = { version = "=0.22.1", path = "./rust/lance" } -lance-arrow = { version = "=0.22.1", path = "./rust/lance-arrow" } -lance-core = { version = "=0.22.1", path = "./rust/lance-core" } -lance-datafusion = { version = "=0.22.1", path = "./rust/lance-datafusion" } -lance-datagen = { version = "=0.22.1", path = "./rust/lance-datagen" } -lance-encoding = { version = "=0.22.1", path = "./rust/lance-encoding" } -lance-encoding-datafusion = { version = "=0.22.1", path = "./rust/lance-encoding-datafusion" } -lance-file = { version = "=0.22.1", path = "./rust/lance-file" } -lance-index = { version = "=0.22.1", path = "./rust/lance-index" } -lance-io = { version = "=0.22.1", path = "./rust/lance-io" } -lance-jni = { version = "=0.22.1", path = "./java/core/lance-jni" } -lance-linalg = { version = "=0.22.1", path = "./rust/lance-linalg" } -lance-table = { version = "=0.22.1", path = "./rust/lance-table" } -lance-test-macros = { version = "=0.22.1", path = "./rust/lance-test-macros" } -lance-testing = { version = "=0.22.1", path = "./rust/lance-testing" } +lance = { version = "=0.23.0", path = "./rust/lance" } +lance-arrow = { version = "=0.23.0", path = "./rust/lance-arrow" } +lance-core = { version = "=0.23.0", path = "./rust/lance-core" } +lance-datafusion = { version = "=0.23.0", path = "./rust/lance-datafusion" } +lance-datagen = { version = "=0.23.0", path = "./rust/lance-datagen" } +lance-encoding = { version = "=0.23.0", path = "./rust/lance-encoding" } +lance-encoding-datafusion = { version = "=0.23.0", path = "./rust/lance-encoding-datafusion" } +lance-file = { version = "=0.23.0", path = "./rust/lance-file" } +lance-index = { version = "=0.23.0", path = "./rust/lance-index" } +lance-io = { version = "=0.23.0", path = "./rust/lance-io" } +lance-jni = { version = "=0.23.0", path = "./java/core/lance-jni" } +lance-linalg = { version = "=0.23.0", path = "./rust/lance-linalg" } +lance-table = { version = "=0.23.0", path = "./rust/lance-table" } +lance-test-macros = { version = "=0.23.0", path = "./rust/lance-test-macros" } +lance-testing = { version = "=0.23.0", path = "./rust/lance-testing" } approx = "0.5.1" # Note that this one does not include pyarrow arrow = { version = "53.2", optional = false, features = ["prettyprint"] } @@ -114,7 +114,7 @@ datafusion-physical-expr = { version = "44.0" } deepsize = "0.2.0" dirs = "5.0.0" either = "1.0" -fsst = { version = "=0.22.1", path = "./rust/lance-encoding/src/compression_algo/fsst" } +fsst = { version = "=0.23.0", path = "./rust/lance-encoding/src/compression_algo/fsst" } futures = "0.3" http = "1.1.0" hyperloglogplus = { version = "0.4.1", features = ["const-loop"] } diff --git a/java/core/pom.xml b/java/core/pom.xml index c36be91607..0b8f6d98db 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -8,7 +8,7 @@ com.lancedb lance-parent - 0.21.2 + 0.23.0 ../pom.xml diff --git a/java/pom.xml b/java/pom.xml index 4f6349cbd4..3cdfd51c99 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -6,7 +6,7 @@ com.lancedb lance-parent - 0.21.2 + 0.23.0 pom Lance Parent diff --git a/java/spark/pom.xml b/java/spark/pom.xml index 3eba9e07df..a7ed358adf 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -8,7 +8,7 @@ com.lancedb lance-parent - 0.21.2 + 0.23.0 ../pom.xml @@ -112,7 +112,7 @@ com.lancedb lance-core - 0.21.2 + 0.23.0 org.apache.spark diff --git a/python/Cargo.lock b/python/Cargo.lock index 87542e7a08..d75b34e64f 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -2166,7 +2166,7 @@ dependencies = [ [[package]] name = "fsst" -version = "0.22.1" +version = "0.23.0" dependencies = [ "rand", ] @@ -3019,7 +3019,7 @@ dependencies = [ [[package]] name = "lance" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-arith", @@ -3080,7 +3080,7 @@ dependencies = [ [[package]] name = "lance-arrow" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-buffer", @@ -3097,7 +3097,7 @@ dependencies = [ [[package]] name = "lance-core" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-buffer", @@ -3133,7 +3133,7 @@ dependencies = [ [[package]] name = "lance-datafusion" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3159,7 +3159,7 @@ dependencies = [ [[package]] name = "lance-datagen" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3174,7 +3174,7 @@ dependencies = [ [[package]] name = "lance-encoding" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrayref", "arrow", @@ -3212,7 +3212,7 @@ dependencies = [ [[package]] name = "lance-file" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-arith", "arrow-array", @@ -3246,7 +3246,7 @@ dependencies = [ [[package]] name = "lance-index" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -3301,7 +3301,7 @@ dependencies = [ [[package]] name = "lance-io" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-arith", @@ -3339,7 +3339,7 @@ dependencies = [ [[package]] name = "lance-linalg" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow-array", "arrow-ord", @@ -3362,7 +3362,7 @@ dependencies = [ [[package]] name = "lance-table" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", @@ -4512,7 +4512,7 @@ dependencies = [ [[package]] name = "pylance" -version = "0.22.1" +version = "0.23.0" dependencies = [ "arrow", "arrow-array", diff --git a/python/Cargo.toml b/python/Cargo.toml index 918a2adff7..12ba21e477 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pylance" -version = "0.22.1" +version = "0.23.0" edition = "2021" authors = ["Lance Devs "] rust-version = "1.65" diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e056e0f6b4..22f419de36 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -20,6 +20,7 @@ use lance_core::ROW_ADDR; use lance_datafusion::projection::ProjectionPlan; use lance_file::datatypes::populate_schema_dictionary; use lance_file::version::LanceFileVersion; +use lance_index::DatasetIndexExt; use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry}; use lance_io::object_writer::ObjectWriter; use lance_io::traits::WriteExt; @@ -38,7 +39,7 @@ use rowids::get_row_id_index; use serde::{Deserialize, Serialize}; use snafu::{location, Location}; use std::borrow::Cow; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -72,7 +73,10 @@ use self::transaction::{Operation, Transaction}; use self::write::write_fragments_internal; use crate::datatypes::Schema; use crate::error::box_error; -use crate::io::commit::{commit_detached_transaction, commit_new_dataset, commit_transaction}; +use crate::io::commit::{ + commit_detached_transaction, commit_new_dataset, commit_transaction, + detect_overlapping_fragments, +}; use crate::session::Session; use crate::utils::temporal::{timestamp_to_nanos, utc_now, SystemTime}; use crate::{Error, Result}; @@ -1301,6 +1305,45 @@ impl Dataset { .try_collect::>() .await?; + // Validate indices + let indices = self.load_indices().await?; + self.validate_indices(&indices)?; + + Ok(()) + } + + fn validate_indices(&self, indices: &[Index]) -> Result<()> { + // Make sure there are no duplicate ids + let mut index_ids = HashSet::new(); + for index in indices.iter() { + if !index_ids.insert(&index.uuid) { + return Err(Error::corrupt_file( + self.manifest_file.clone(), + format!( + "Duplicate index id {} found in dataset {:?}", + &index.uuid, self.base + ), + location!(), + )); + } + } + + // For each index name, make sure there is no overlap in fragment bitmaps + if let Err(err) = detect_overlapping_fragments(indices) { + let mut message = "Overlapping fragments detected in dataset.".to_string(); + for (index_name, overlapping_frags) in err.bad_indices { + message.push_str(&format!( + "\nIndex {:?} has overlapping fragments: {:?}", + index_name, overlapping_frags + )); + } + return Err(Error::corrupt_file( + self.manifest_file.clone(), + message, + location!(), + )); + }; + Ok(()) } @@ -4338,6 +4381,39 @@ mod tests { ); } + #[tokio::test] + async fn test_fix_v0_21_0_corrupt_fragment_bitmap() { + // In v0.21.0 and earlier, delta indices had a bug where the fragment bitmap + // could contain fragments that are part of other index deltas. + + // Copy over table + let test_dir = copy_test_data_to_tmp("v0.21.0/bad_index_fragment_bitmap").unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + + let mut dataset = Dataset::open(test_uri).await.unwrap(); + + let validate_res = dataset.validate().await; + assert!(validate_res.is_err()); + assert_eq!(dataset.load_indices().await.unwrap()[0].name, "vector_idx"); + assert!(dataset.index_statistics("vector_idx").await.is_err()); + + // Force a migration + dataset.delete("false").await.unwrap(); + dataset.validate().await.unwrap(); + + let indices = dataset.load_indices().await.unwrap(); + assert_eq!(indices.len(), 2); + fn get_bitmap(meta: &Index) -> Vec { + meta.fragment_bitmap.as_ref().unwrap().iter().collect() + } + assert_eq!(get_bitmap(&indices[0]), vec![0]); + assert_eq!(get_bitmap(&indices[1]), vec![1]); + + let stats = dataset.index_statistics("vector_idx").await.unwrap(); + let stats: serde_json::Value = serde_json::from_str(&stats).unwrap(); + assert_eq!(stats["num_indexed_fragments"], 2); + } + #[rstest] #[tokio::test] async fn test_bfloat16_roundtrip( diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 7ac0b115d0..51f46ad312 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -590,23 +590,39 @@ impl DatasetIndexExt for Dataset { let index_type = indices[0].index_type().to_string(); let indexed_fragments_per_delta = self.indexed_fragments(index_name).await?; - let num_indexed_rows_per_delta = self.indexed_fragments(index_name).await? - .iter() - .map(|frags| { - frags.iter().map(|f| f.num_rows().expect("Fragment should have row counts, please upgrade lance and trigger a single right to fix this")).sum::() - }) - .collect::>(); + let num_indexed_rows_per_delta = indexed_fragments_per_delta + .iter() + .map(|frags| { + let mut sum = 0; + for frag in frags.iter() { + sum += frag.num_rows().ok_or_else(|| Error::Internal { + message: "Fragment should have row counts, please upgrade lance and \ + trigger a single write to fix this" + .to_string(), + location: location!(), + })?; + } + Ok(sum) + }) + .collect::>>()?; - let num_indexed_fragments = indexed_fragments_per_delta - .clone() - .into_iter() - .flatten() - .map(|f| f.id) - .collect::>() - .len(); + let mut fragment_ids = HashSet::new(); + for frags in indexed_fragments_per_delta.iter() { + for frag in frags.iter() { + if !fragment_ids.insert(frag.id) { + return Err(Error::Internal { + message: "Overlap in indexed fragments. Please upgrade to lance >= 0.23.0 \ + and trigger a single write to fix this" + .to_string(), + location: location!(), + }); + } + } + } + let num_indexed_fragments = fragment_ids.len(); let num_unindexed_fragments = self.fragments().len() - num_indexed_fragments; - let num_indexed_rows = num_indexed_rows_per_delta.iter().last().unwrap(); + let num_indexed_rows: usize = num_indexed_rows_per_delta.iter().cloned().sum(); let num_unindexed_rows = self.count_rows(None).await? - num_indexed_rows; let stats = json!({ @@ -1150,7 +1166,6 @@ mod tests { #[tokio::test] async fn test_optimize_delta_indices() { - let test_dir = tempdir().unwrap(); let dimensions = 16; let column_name = "vec"; let vec_field = Field::new( @@ -1186,8 +1201,7 @@ mod tests { schema.clone(), ); - let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = Dataset::write(reader, test_uri, None).await.unwrap(); + let mut dataset = Dataset::write(reader, "memory://", None).await.unwrap(); let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 10); dataset .create_index( @@ -1210,24 +1224,44 @@ mod tests { .await .unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + async fn get_stats(dataset: &Dataset, name: &str) -> serde_json::Value { + serde_json::from_str(&dataset.index_statistics(name).await.unwrap()).unwrap() + } + async fn get_meta(dataset: &Dataset, name: &str) -> Vec { + dataset + .load_indices() + .await + .unwrap() + .iter() + .filter(|m| m.name == name) + .cloned() + .collect() + } + fn get_bitmap(meta: &IndexMetadata) -> Vec { + meta.fragment_bitmap.as_ref().unwrap().iter().collect() + } + + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 0); assert_eq!(stats["num_indexed_rows"], 512); assert_eq!(stats["num_indexed_fragments"], 1); assert_eq!(stats["num_indices"], 1); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 1); + assert_eq!(get_bitmap(&meta[0]), vec![0]); let reader = RecordBatchIterator::new(vec![record_batch].into_iter().map(Ok), schema.clone()); dataset.append(reader, None).await.unwrap(); - let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 512); assert_eq!(stats["num_indexed_rows"], 512); assert_eq!(stats["num_indexed_fragments"], 1); assert_eq!(stats["num_unindexed_fragments"], 1); assert_eq!(stats["num_indices"], 1); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 1); + assert_eq!(get_bitmap(&meta[0]), vec![0]); dataset .optimize_indices(&OptimizeOptions { @@ -1236,13 +1270,15 @@ mod tests { }) .await .unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 512); assert_eq!(stats["num_indexed_rows"], 512); assert_eq!(stats["num_indexed_fragments"], 1); assert_eq!(stats["num_unindexed_fragments"], 1); assert_eq!(stats["num_indices"], 1); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 1); + assert_eq!(get_bitmap(&meta[0]), vec![0]); // optimize the other index dataset @@ -1252,22 +1288,26 @@ mod tests { }) .await .unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 512); assert_eq!(stats["num_indexed_rows"], 512); assert_eq!(stats["num_indexed_fragments"], 1); assert_eq!(stats["num_unindexed_fragments"], 1); assert_eq!(stats["num_indices"], 1); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 1); + assert_eq!(get_bitmap(&meta[0]), vec![0]); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("other_vec_idx").await.unwrap()) - .unwrap(); + let stats = get_stats(&dataset, "other_vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 0); assert_eq!(stats["num_indexed_rows"], 1024); assert_eq!(stats["num_indexed_fragments"], 2); assert_eq!(stats["num_unindexed_fragments"], 0); assert_eq!(stats["num_indices"], 2); + let meta = get_meta(&dataset, "other_vec_idx").await; + assert_eq!(meta.len(), 2); + assert_eq!(get_bitmap(&meta[0]), vec![0]); + assert_eq!(get_bitmap(&meta[1]), vec![1]); dataset .optimize_indices(&OptimizeOptions { @@ -1276,15 +1316,17 @@ mod tests { }) .await .unwrap(); - let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 0); assert_eq!(stats["num_indexed_rows"], 1024); assert_eq!(stats["num_indexed_fragments"], 2); assert_eq!(stats["num_unindexed_fragments"], 0); assert_eq!(stats["num_indices"], 2); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 2); + assert_eq!(get_bitmap(&meta[0]), vec![0]); + assert_eq!(get_bitmap(&meta[1]), vec![1]); dataset .optimize_indices(&OptimizeOptions { @@ -1293,13 +1335,15 @@ mod tests { }) .await .unwrap(); - let stats: serde_json::Value = - serde_json::from_str(&dataset.index_statistics("vec_idx").await.unwrap()).unwrap(); + let stats = get_stats(&dataset, "vec_idx").await; assert_eq!(stats["num_unindexed_rows"], 0); assert_eq!(stats["num_indexed_rows"], 1024); assert_eq!(stats["num_indexed_fragments"], 2); assert_eq!(stats["num_unindexed_fragments"], 0); assert_eq!(stats["num_indices"], 1); + let meta = get_meta(&dataset, "vec_idx").await; + assert_eq!(meta.len(), 1); + assert_eq!(get_bitmap(&meta[0]), vec![0, 1]); } #[tokio::test] diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 49c4f1728f..d295b32587 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -71,15 +71,18 @@ pub async fn merge_indices<'a>( let unindexed = dataset.unindexed_fragments(&old_indices[0].name).await?; let mut frag_bitmap = RoaringBitmap::new(); - old_indices.iter().for_each(|idx| { - frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); - }); unindexed.iter().for_each(|frag| { frag_bitmap.insert(frag.id as u32); }); let (new_uuid, indices_merged) = match indices[0].index_type() { it if it.is_scalar() => { + // There are no delta indices for scalar, so adding all indexed + // fragments to the new index. + old_indices.iter().for_each(|idx| { + frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); + }); + let index = dataset .open_scalar_index(&column.name, &old_indices[0].uuid.to_string()) .await?; @@ -104,6 +107,14 @@ pub async fn merge_indices<'a>( Ok((new_uuid, 1)) } it if it.is_vector() => { + let start_pos = old_indices + .len() + .saturating_sub(options.num_indices_to_merge); + let indices_to_merge = &old_indices[start_pos..]; + indices_to_merge.iter().for_each(|idx| { + frag_bitmap.extend(idx.fragment_bitmap.as_ref().unwrap().iter()); + }); + let new_data_stream = if unindexed.is_empty() { None } else { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index a280d81e6d..923e739a9d 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -504,11 +504,9 @@ async fn optimize_ivf_pq_indices( let mut ivf_mut = IvfModel::new(first_idx.ivf.centroids.clone().unwrap()); - let start_pos = if options.num_indices_to_merge > existing_indices.len() { - 0 - } else { - existing_indices.len() - options.num_indices_to_merge - }; + let start_pos = existing_indices + .len() + .saturating_sub(options.num_indices_to_merge); let indices_to_merge = existing_indices[start_pos..] .iter() diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index e8e77e4b41..64dff4a804 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -497,8 +497,16 @@ fn must_recalculate_fragment_bitmap(index: &Index, version: Option<&WriterVersio /// /// Indices might be missing `fragment_bitmap`, so this function will add it. async fn migrate_indices(dataset: &Dataset, indices: &mut [Index]) -> Result<()> { + let needs_recalculating = match detect_overlapping_fragments(indices) { + Ok(()) => vec![], + Err(BadFragmentBitmapError { bad_indices }) => { + bad_indices.into_iter().map(|(name, _)| name).collect() + } + }; for index in indices { - if must_recalculate_fragment_bitmap(index, dataset.manifest.writer_version.as_ref()) { + if needs_recalculating.contains(&index.name) + || must_recalculate_fragment_bitmap(index, dataset.manifest.writer_version.as_ref()) + { debug_assert_eq!(index.fields.len(), 1); let idx_field = dataset.schema().field_by_id(index.fields[0]).ok_or_else(|| Error::Internal { message: format!("Index with uuid {} referred to field with id {} which did not exist in dataset", index.uuid, index.fields[0]), location: location!() })?; // We need to calculate the fragments covered by the index @@ -517,6 +525,40 @@ async fn migrate_indices(dataset: &Dataset, indices: &mut [Index]) -> Result<()> Ok(()) } +pub(crate) struct BadFragmentBitmapError { + pub bad_indices: Vec<(String, Vec)>, +} + +/// Detect whether a given index has overlapping fragment bitmaps in it's index +/// segments. +pub(crate) fn detect_overlapping_fragments( + indices: &[Index], +) -> std::result::Result<(), BadFragmentBitmapError> { + let index_names: HashSet<&str> = indices.iter().map(|i| i.name.as_str()).collect(); + let mut bad_indices = Vec::new(); // (index_name, overlapping_fragments) + for name in index_names { + let mut seen_fragment_ids = HashSet::new(); + let mut overlap = Vec::new(); + for index in indices.iter().filter(|i| i.name == name) { + if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() { + for fragment in fragment_bitmap { + if !seen_fragment_ids.insert(fragment) { + overlap.push(fragment); + } + } + } + } + if !overlap.is_empty() { + bad_indices.push((name.to_string(), overlap)); + } + } + if bad_indices.is_empty() { + Ok(()) + } else { + Err(BadFragmentBitmapError { bad_indices }) + } +} + pub(crate) async fn do_commit_detached_transaction( dataset: &Dataset, object_store: &ObjectStore, diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/ca9b1111-abfc-4fde-b4cc-8e667b84e65d/index.idx b/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/ca9b1111-abfc-4fde-b4cc-8e667b84e65d/index.idx new file mode 100644 index 0000000000..a5e08e4bbc Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/ca9b1111-abfc-4fde-b4cc-8e667b84e65d/index.idx differ diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/dc833a6e-a710-48aa-af24-9ab80f30700c/index.idx b/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/dc833a6e-a710-48aa-af24-9ab80f30700c/index.idx new file mode 100644 index 0000000000..019ec584ec Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/_indices/dc833a6e-a710-48aa-af24-9ab80f30700c/index.idx differ diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/_transactions/3-f68af88b-ea42-4fec-9feb-2b5bb3f48223.txn b/test_data/v0.21.0/bad_index_fragment_bitmap/_transactions/3-f68af88b-ea42-4fec-9feb-2b5bb3f48223.txn new file mode 100644 index 0000000000..c8a4c7c1fc Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/_transactions/3-f68af88b-ea42-4fec-9feb-2b5bb3f48223.txn differ diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/_versions/4.manifest b/test_data/v0.21.0/bad_index_fragment_bitmap/_versions/4.manifest new file mode 100644 index 0000000000..34ff0a1892 Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/_versions/4.manifest differ diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/data/0e45e8ed-1d98-4e07-a4a6-67ca3d194291.lance b/test_data/v0.21.0/bad_index_fragment_bitmap/data/0e45e8ed-1d98-4e07-a4a6-67ca3d194291.lance new file mode 100644 index 0000000000..7bcaf3cacd Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/data/0e45e8ed-1d98-4e07-a4a6-67ca3d194291.lance differ diff --git a/test_data/v0.21.0/bad_index_fragment_bitmap/data/c042e881-07a6-4a65-96b9-c3f31ea3bb47.lance b/test_data/v0.21.0/bad_index_fragment_bitmap/data/c042e881-07a6-4a65-96b9-c3f31ea3bb47.lance new file mode 100644 index 0000000000..783190b834 Binary files /dev/null and b/test_data/v0.21.0/bad_index_fragment_bitmap/data/c042e881-07a6-4a65-96b9-c3f31ea3bb47.lance differ diff --git a/test_data/v0.21.0/datagen.py b/test_data/v0.21.0/datagen.py new file mode 100644 index 0000000000..f0d2edcaf1 --- /dev/null +++ b/test_data/v0.21.0/datagen.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright The Lance Authors + +from datetime import timedelta + +import lance +import pyarrow as pa +import pyarrow.compute as pc + +# To generate the test file, we should be running this version of lance +assert lance.__version__ == "0.21.0" + +data = pa.table( + { + "vector": pa.FixedSizeListArray.from_arrays( + pc.random(16 * 256).cast(pa.float32()), 16 + ) + } +) +ds = lance.write_dataset(data, "bad_index_fragment_bitmap") +ds.create_index("vector", index_type="IVF_PQ", num_partitions=1, num_sub_vectors=1) + +data2 = pa.table( + { + "vector": pa.FixedSizeListArray.from_arrays( + pc.random(16 * 32).cast(pa.float32()), 16 + ) + } +) +ds.insert(data2) +ds.optimize.optimize_indices(num_indices_to_merge=0) + +ds.cleanup_old_versions(older_than=timedelta(0)) + +indices = ds.list_indices() +assert len(indices) == 2 +# There is overlap in fragment_ids, which is not allowed +assert indices[0]["fragment_ids"] == {0} +assert indices[1]["fragment_ids"] == {0, 1}