diff --git a/python/python/benchmarks/test_index.py b/python/python/benchmarks/test_index.py index 3a0cadd994..2d15807266 100644 --- a/python/python/benchmarks/test_index.py +++ b/python/python/benchmarks/test_index.py @@ -101,11 +101,29 @@ def test_optimize_index( @pytest.mark.benchmark(group="optimize_index") @pytest.mark.parametrize("num_partitions", [100, 300]) def test_train_ivf(test_large_dataset, benchmark, num_partitions): - builder = IndicesBuilder(test_large_dataset) + builder = IndicesBuilder(test_large_dataset, "vector") benchmark.pedantic( builder.train_ivf, - args=["vector"], kwargs={"num_partitions": num_partitions}, iterations=1, rounds=1, ) + + +# Pre-computing partition assigment only makes sense on CUDA and so this benchmark runs +# only on CUDA. +@pytest.mark.benchmark(group="assign_partitions") +@pytest.mark.parametrize("num_partitions", [100, 300]) +def test_partition_assignment(test_large_dataset, benchmark, num_partitions): + from lance.dependencies import torch + + try: + if not torch.cuda.is_available(): + return + except: # noqa: E722 + return + builder = IndicesBuilder(test_large_dataset, "vector") + ivf = builder.train_ivf(num_partitions=num_partitions) + benchmark.pedantic( + builder.assign_ivf_partitions, args=[ivf, None, "cuda"], iterations=1, rounds=1 + ) diff --git a/python/python/lance/indices.py b/python/python/lance/indices.py index d77649447c..039d7deada 100644 --- a/python/python/lance/indices.py +++ b/python/python/lance/indices.py @@ -4,6 +4,7 @@ import warnings from typing import TYPE_CHECKING, Optional, Union +import numpy as np import pyarrow as pa from lance.file import LanceFileReader, LanceFileWriter @@ -153,12 +154,25 @@ class IndicesBuilder: use the `create_index` method on the dataset object. """ - def __init__(self, dataset): + def __init__(self, dataset, column: str): + """ + Create an index builder for the given vector column + + Parameters + ---------- + + dataset: LanceDataset + the dataset containing the data + column: str + The vector column to index, must be a fixed size list of floats + or 1-dimensional fixed-shape tensor column. + """ self.dataset = dataset + self.column = self._normalize_column(column) + self.dimension = self.dataset.schema.field(self.column[0]).type.list_size def train_ivf( self, - column, num_partitions=None, *, distance_type="l2", @@ -209,7 +223,6 @@ def train_ivf( possible minima. In these cases we must terminate or run forever. The max_iters parameter defines a cutoff at which we terminate training. """ - column = self._normalize_column(column) num_rows = self.dataset.count_rows() num_partitions = self._determine_num_partitions(num_partitions, num_rows) self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows) @@ -217,11 +230,10 @@ def train_ivf( self._verify_ivf_params(num_partitions) if accelerator is None: - dimension = self.dataset.schema.field(column[0]).type.list_size ivf_centroids = indices.train_ivf_model( self.dataset._ds, - column[0], - dimension, + self.column[0], + self.dimension, num_partitions, distance_type, sample_rate, @@ -234,7 +246,7 @@ def train_ivf( ivf_centroids, _ = train_ivf_centroids_on_accelerator( self.dataset, - column[0], + self.column[0], num_partitions, distance_type, accelerator, @@ -251,7 +263,6 @@ def train_ivf( def train_pq( self, - column, ivf_model: IvfModel, num_subvectors=None, *, @@ -270,9 +281,6 @@ def train_pq( Parameters ---------- - column: str - The vector column to quantize, must be a fixed size list of floats - or 1-dimensional fixed-shape tensor column. ivf_model: IvfModel The IVF model to use to partition the vectors into clusters. This is needed because PQ is trained on residuals from the IVF model. @@ -291,17 +299,15 @@ def train_pq( max_iters: int This parameter is used in the same way as in the IVF model. """ - column = self._normalize_column(column) num_rows = self.dataset.count_rows() - dimension = self.dataset.schema.field(column[0]).type.list_size - self.dataset.schema.field(column[0]).type.list_size - num_subvectors = self._normalize_pq_params(num_subvectors, dimension) + self.dataset.schema.field(self.column[0]).type.list_size + num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension) self._verify_pq_sample_rate(num_rows, sample_rate) distance_type = ivf_model.distance_type pq_codebook = indices.train_pq_model( self.dataset._ds, - column[0], - dimension, + self.column[0], + self.dimension, num_subvectors, distance_type, sample_rate, @@ -310,6 +316,61 @@ def train_pq( ) return PqModel(num_subvectors, pq_codebook) + def assign_ivf_partitions( + self, + ivf_model: IvfModel, + accelerator: Union[str, "torch.Device"], + *, + output_uri: Optional[str] = None, + ) -> str: + """ + Calculates which IVF partition each vector belongs to. This searches the + IVF centroids and assigns the closest centroid to the vector. The result is + stored in a Lance dataset located at output_uri. The schema of the + partition assignment dataset is: + + row_id: uint64 + partition: uint32 + + Note: There is no advantage to separately computing the partition assignment + without an accelerator. If you are not using an accelerator then you should + skip this method and proceed without precomputed partition assignments. + + Parameters + ---------- + ivf_model: IvfModel + An IvfModel, previously created by ``train_ivf`` which the data will be + assigned to. + accelerator: Union[str, torch.Device] + An optional accelerator to use to offload computation to specialized + hardware. Currently supported values are the same as those in ``train_ivf`` + output_uri: Optional[str], default None + Destination Lance dataset where the partition assignments will be written + Can be None in which case a random directory will be used. + + Returns + ------- + str + The path of the partition assignment dataset (will be equal to + output_uri unless the value is None) + """ + from .dependencies import torch + from .torch.kmeans import KMeans + from .vector import compute_partitions + + centroids = torch.from_numpy( + np.stack(ivf_model.centroids.to_numpy(zero_copy_only=False)) + ).to(accelerator) + kmeans = KMeans( + ivf_model.num_partitions, + metric=ivf_model.distance_type, + device=accelerator, + centroids=centroids, + ) + return compute_partitions( + self.dataset, self.column[0], kmeans, dst_dataset_uri=output_uri + ) + def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int): if num_partitions is None: return round(math.sqrt(num_rows)) diff --git a/python/python/lance/vector.py b/python/python/lance/vector.py index dc7de7d09d..2a8df599b9 100644 --- a/python/python/lance/vector.py +++ b/python/python/lance/vector.py @@ -8,7 +8,6 @@ import logging import re import tempfile -from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterable, Literal, Optional, Union import pyarrow as pa @@ -19,6 +18,8 @@ from .dependencies import numpy as np if TYPE_CHECKING: + from pathlib import Path + from . import LanceDataset @@ -191,7 +192,7 @@ def compute_partitions( column: str, kmeans: Any, # KMeans batch_size: int = 10240, - spill_dir: Union[str, Path] = None, + dst_dataset_uri: Union[str, Path] = None, ) -> str: """Compute partitions for each row using GPU kmeans and spill to disk. @@ -205,8 +206,9 @@ def compute_partitions( KMeans model to use to compute partitions. batch_size: int, default 10240 The batch size used to read the dataset. - spill_dir: Path - The path to store the partitions. + dst_dataset_uri: Union[str, Path], optional + The path to store the partitions. If not specified a random + directory is used instead Returns ------- @@ -215,6 +217,8 @@ def compute_partitions( """ from lance.torch.data import LanceDataset as PytorchLanceDataset + num_rows = dataset.count_rows() + torch_ds = PytorchLanceDataset( dataset, batch_size=batch_size, @@ -228,6 +232,8 @@ def compute_partitions( ] ) + progress = tqdm(total=num_rows) + def _partition_assignment() -> Iterable[pa.RecordBatch]: with torch.no_grad(): for batch in torch_ds: @@ -254,27 +260,25 @@ def _partition_assignment() -> Iterable[pa.RecordBatch]: len(part_batch) - len(ids), ) + progress.update(part_batch.num_rows) yield part_batch - rbr = pa.RecordBatchReader.from_batches( - output_schema, tqdm(_partition_assignment()) - ) - - if spill_dir is None: - spill_dir = tempfile.mkdtemp() - - spill_uri = Path(spill_dir) / "precomputed_partitions.lance" + rbr = pa.RecordBatchReader.from_batches(output_schema, _partition_assignment()) + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() ds = write_dataset( rbr, - spill_uri, + dst_dataset_uri, schema=output_schema, max_rows_per_file=dataset.count_rows(), + use_legacy_format=True, ) assert len(ds.get_fragments()) == 1 files = ds.get_fragments()[0].data_files() assert len(files) == 1 - logging.info("Saved recomputed partitions to %s", spill_uri.absolute()) + progress.close() - return str(spill_uri) + logging.info("Saved precomputed partitions to %s", dst_dataset_uri) + return str(dst_dataset_uri) diff --git a/python/python/tests/test_indices.py b/python/python/tests/test_indices.py index e04a83f79a..5dc943e78b 100644 --- a/python/python/tests/test_indices.py +++ b/python/python/tests/test_indices.py @@ -20,7 +20,7 @@ def gen_dataset(tmpdir, datatype=np.float32): def test_ivf_centroids(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) assert ivf.distance_type == "l2" assert len(ivf.centroids) == 100 @@ -34,7 +34,7 @@ def test_ivf_centroids(tmpdir): @pytest.mark.cuda def test_ivf_centroids_cuda(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, accelerator="cuda") + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, accelerator="cuda") assert ivf.distance_type == "l2" assert len(ivf.centroids) == 100 @@ -43,7 +43,7 @@ def test_ivf_centroids_cuda(tmpdir): def test_ivf_centroids_column_type(tmpdir): def check(column_type, typename): ds = gen_dataset(tmpdir / typename, column_type) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) assert len(ivf.centroids) == 100 ivf.save(str(tmpdir / f"ivf_{typename}")) reloaded = IvfModel.load(str(tmpdir / f"ivf_{typename}")) @@ -58,8 +58,8 @@ def test_ivf_centroids_distance_type(tmpdir): ds = gen_dataset(tmpdir) def check(distance_type): - ivf = IndicesBuilder(ds).train_ivf( - "vectors", sample_rate=16, distance_type=distance_type + ivf = IndicesBuilder(ds, "vectors").train_ivf( + sample_rate=16, distance_type=distance_type ) assert ivf.distance_type == distance_type ivf.save(str(tmpdir / "ivf")) @@ -74,21 +74,21 @@ def check(distance_type): def test_num_partitions(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, num_partitions=10) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, num_partitions=10) assert ivf.num_partitions == 10 @pytest.fixture def ds_with_ivf(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) return ds, ivf def test_gen_pq(tmpdir, ds_with_ivf): ds, ivf = ds_with_ivf - pq = IndicesBuilder(ds).train_pq("vectors", ivf, sample_rate=16) + pq = IndicesBuilder(ds, "vectors").train_pq(ivf, sample_rate=16) assert pq.dimension == 128 assert pq.num_subvectors == 8 @@ -96,3 +96,23 @@ def test_gen_pq(tmpdir, ds_with_ivf): reloaded = PqModel.load(str(tmpdir / "pq")) assert pq.dimension == reloaded.dimension assert pq.codebook == reloaded.codebook + + +@pytest.mark.cuda +def test_assign_partitions(tmpdir): + ds = gen_dataset(tmpdir) + builder = IndicesBuilder(ds, "vectors") + + ivf = builder.train_ivf(sample_rate=16, num_partitions=20) + partitions_uri = builder.assign_ivf_partitions(ivf, accelerator="cuda") + + partitions = lance.dataset(partitions_uri) + found_row_ids = set() + for batch in partitions.to_batches(): + row_ids = batch["row_id"] + for row_id in row_ids: + found_row_ids.add(row_id) + part_ids = batch["partition"] + for part_id in part_ids: + assert part_id.as_py() < 20 + assert len(found_row_ids) == ds.count_rows()