From 5a8fb8caf028ffbd7a6a5c4c2522fb0f66166e51 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 11 Jul 2024 22:14:04 -0700 Subject: [PATCH] feat: add standalone partition assignment operation (#2556) This also fixes up the tqdm progress bar for partition assignment so that it has a definite end. This also makes one small tweak to the indices builder, moving the column argument into the builder constructor, since this argument will be shared by all methods in the class. --- python/python/benchmarks/test_index.py | 22 +++++- python/python/lance/indices.py | 95 +++++++++++++++++++++----- python/python/lance/vector.py | 34 +++++---- python/python/tests/test_indices.py | 36 +++++++--- 4 files changed, 145 insertions(+), 42 deletions(-) diff --git a/python/python/benchmarks/test_index.py b/python/python/benchmarks/test_index.py index 3a0cadd994..2d15807266 100644 --- a/python/python/benchmarks/test_index.py +++ b/python/python/benchmarks/test_index.py @@ -101,11 +101,29 @@ def test_optimize_index( @pytest.mark.benchmark(group="optimize_index") @pytest.mark.parametrize("num_partitions", [100, 300]) def test_train_ivf(test_large_dataset, benchmark, num_partitions): - builder = IndicesBuilder(test_large_dataset) + builder = IndicesBuilder(test_large_dataset, "vector") benchmark.pedantic( builder.train_ivf, - args=["vector"], kwargs={"num_partitions": num_partitions}, iterations=1, rounds=1, ) + + +# Pre-computing partition assigment only makes sense on CUDA and so this benchmark runs +# only on CUDA. +@pytest.mark.benchmark(group="assign_partitions") +@pytest.mark.parametrize("num_partitions", [100, 300]) +def test_partition_assignment(test_large_dataset, benchmark, num_partitions): + from lance.dependencies import torch + + try: + if not torch.cuda.is_available(): + return + except: # noqa: E722 + return + builder = IndicesBuilder(test_large_dataset, "vector") + ivf = builder.train_ivf(num_partitions=num_partitions) + benchmark.pedantic( + builder.assign_ivf_partitions, args=[ivf, None, "cuda"], iterations=1, rounds=1 + ) diff --git a/python/python/lance/indices.py b/python/python/lance/indices.py index d77649447c..039d7deada 100644 --- a/python/python/lance/indices.py +++ b/python/python/lance/indices.py @@ -4,6 +4,7 @@ import warnings from typing import TYPE_CHECKING, Optional, Union +import numpy as np import pyarrow as pa from lance.file import LanceFileReader, LanceFileWriter @@ -153,12 +154,25 @@ class IndicesBuilder: use the `create_index` method on the dataset object. """ - def __init__(self, dataset): + def __init__(self, dataset, column: str): + """ + Create an index builder for the given vector column + + Parameters + ---------- + + dataset: LanceDataset + the dataset containing the data + column: str + The vector column to index, must be a fixed size list of floats + or 1-dimensional fixed-shape tensor column. + """ self.dataset = dataset + self.column = self._normalize_column(column) + self.dimension = self.dataset.schema.field(self.column[0]).type.list_size def train_ivf( self, - column, num_partitions=None, *, distance_type="l2", @@ -209,7 +223,6 @@ def train_ivf( possible minima. In these cases we must terminate or run forever. The max_iters parameter defines a cutoff at which we terminate training. """ - column = self._normalize_column(column) num_rows = self.dataset.count_rows() num_partitions = self._determine_num_partitions(num_partitions, num_rows) self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows) @@ -217,11 +230,10 @@ def train_ivf( self._verify_ivf_params(num_partitions) if accelerator is None: - dimension = self.dataset.schema.field(column[0]).type.list_size ivf_centroids = indices.train_ivf_model( self.dataset._ds, - column[0], - dimension, + self.column[0], + self.dimension, num_partitions, distance_type, sample_rate, @@ -234,7 +246,7 @@ def train_ivf( ivf_centroids, _ = train_ivf_centroids_on_accelerator( self.dataset, - column[0], + self.column[0], num_partitions, distance_type, accelerator, @@ -251,7 +263,6 @@ def train_ivf( def train_pq( self, - column, ivf_model: IvfModel, num_subvectors=None, *, @@ -270,9 +281,6 @@ def train_pq( Parameters ---------- - column: str - The vector column to quantize, must be a fixed size list of floats - or 1-dimensional fixed-shape tensor column. ivf_model: IvfModel The IVF model to use to partition the vectors into clusters. This is needed because PQ is trained on residuals from the IVF model. @@ -291,17 +299,15 @@ def train_pq( max_iters: int This parameter is used in the same way as in the IVF model. """ - column = self._normalize_column(column) num_rows = self.dataset.count_rows() - dimension = self.dataset.schema.field(column[0]).type.list_size - self.dataset.schema.field(column[0]).type.list_size - num_subvectors = self._normalize_pq_params(num_subvectors, dimension) + self.dataset.schema.field(self.column[0]).type.list_size + num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension) self._verify_pq_sample_rate(num_rows, sample_rate) distance_type = ivf_model.distance_type pq_codebook = indices.train_pq_model( self.dataset._ds, - column[0], - dimension, + self.column[0], + self.dimension, num_subvectors, distance_type, sample_rate, @@ -310,6 +316,61 @@ def train_pq( ) return PqModel(num_subvectors, pq_codebook) + def assign_ivf_partitions( + self, + ivf_model: IvfModel, + accelerator: Union[str, "torch.Device"], + *, + output_uri: Optional[str] = None, + ) -> str: + """ + Calculates which IVF partition each vector belongs to. This searches the + IVF centroids and assigns the closest centroid to the vector. The result is + stored in a Lance dataset located at output_uri. The schema of the + partition assignment dataset is: + + row_id: uint64 + partition: uint32 + + Note: There is no advantage to separately computing the partition assignment + without an accelerator. If you are not using an accelerator then you should + skip this method and proceed without precomputed partition assignments. + + Parameters + ---------- + ivf_model: IvfModel + An IvfModel, previously created by ``train_ivf`` which the data will be + assigned to. + accelerator: Union[str, torch.Device] + An optional accelerator to use to offload computation to specialized + hardware. Currently supported values are the same as those in ``train_ivf`` + output_uri: Optional[str], default None + Destination Lance dataset where the partition assignments will be written + Can be None in which case a random directory will be used. + + Returns + ------- + str + The path of the partition assignment dataset (will be equal to + output_uri unless the value is None) + """ + from .dependencies import torch + from .torch.kmeans import KMeans + from .vector import compute_partitions + + centroids = torch.from_numpy( + np.stack(ivf_model.centroids.to_numpy(zero_copy_only=False)) + ).to(accelerator) + kmeans = KMeans( + ivf_model.num_partitions, + metric=ivf_model.distance_type, + device=accelerator, + centroids=centroids, + ) + return compute_partitions( + self.dataset, self.column[0], kmeans, dst_dataset_uri=output_uri + ) + def _determine_num_partitions(self, num_partitions: Optional[int], num_rows: int): if num_partitions is None: return round(math.sqrt(num_rows)) diff --git a/python/python/lance/vector.py b/python/python/lance/vector.py index dc7de7d09d..2a8df599b9 100644 --- a/python/python/lance/vector.py +++ b/python/python/lance/vector.py @@ -8,7 +8,6 @@ import logging import re import tempfile -from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterable, Literal, Optional, Union import pyarrow as pa @@ -19,6 +18,8 @@ from .dependencies import numpy as np if TYPE_CHECKING: + from pathlib import Path + from . import LanceDataset @@ -191,7 +192,7 @@ def compute_partitions( column: str, kmeans: Any, # KMeans batch_size: int = 10240, - spill_dir: Union[str, Path] = None, + dst_dataset_uri: Union[str, Path] = None, ) -> str: """Compute partitions for each row using GPU kmeans and spill to disk. @@ -205,8 +206,9 @@ def compute_partitions( KMeans model to use to compute partitions. batch_size: int, default 10240 The batch size used to read the dataset. - spill_dir: Path - The path to store the partitions. + dst_dataset_uri: Union[str, Path], optional + The path to store the partitions. If not specified a random + directory is used instead Returns ------- @@ -215,6 +217,8 @@ def compute_partitions( """ from lance.torch.data import LanceDataset as PytorchLanceDataset + num_rows = dataset.count_rows() + torch_ds = PytorchLanceDataset( dataset, batch_size=batch_size, @@ -228,6 +232,8 @@ def compute_partitions( ] ) + progress = tqdm(total=num_rows) + def _partition_assignment() -> Iterable[pa.RecordBatch]: with torch.no_grad(): for batch in torch_ds: @@ -254,27 +260,25 @@ def _partition_assignment() -> Iterable[pa.RecordBatch]: len(part_batch) - len(ids), ) + progress.update(part_batch.num_rows) yield part_batch - rbr = pa.RecordBatchReader.from_batches( - output_schema, tqdm(_partition_assignment()) - ) - - if spill_dir is None: - spill_dir = tempfile.mkdtemp() - - spill_uri = Path(spill_dir) / "precomputed_partitions.lance" + rbr = pa.RecordBatchReader.from_batches(output_schema, _partition_assignment()) + if dst_dataset_uri is None: + dst_dataset_uri = tempfile.mkdtemp() ds = write_dataset( rbr, - spill_uri, + dst_dataset_uri, schema=output_schema, max_rows_per_file=dataset.count_rows(), + use_legacy_format=True, ) assert len(ds.get_fragments()) == 1 files = ds.get_fragments()[0].data_files() assert len(files) == 1 - logging.info("Saved recomputed partitions to %s", spill_uri.absolute()) + progress.close() - return str(spill_uri) + logging.info("Saved precomputed partitions to %s", dst_dataset_uri) + return str(dst_dataset_uri) diff --git a/python/python/tests/test_indices.py b/python/python/tests/test_indices.py index e04a83f79a..5dc943e78b 100644 --- a/python/python/tests/test_indices.py +++ b/python/python/tests/test_indices.py @@ -20,7 +20,7 @@ def gen_dataset(tmpdir, datatype=np.float32): def test_ivf_centroids(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) assert ivf.distance_type == "l2" assert len(ivf.centroids) == 100 @@ -34,7 +34,7 @@ def test_ivf_centroids(tmpdir): @pytest.mark.cuda def test_ivf_centroids_cuda(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, accelerator="cuda") + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, accelerator="cuda") assert ivf.distance_type == "l2" assert len(ivf.centroids) == 100 @@ -43,7 +43,7 @@ def test_ivf_centroids_cuda(tmpdir): def test_ivf_centroids_column_type(tmpdir): def check(column_type, typename): ds = gen_dataset(tmpdir / typename, column_type) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) assert len(ivf.centroids) == 100 ivf.save(str(tmpdir / f"ivf_{typename}")) reloaded = IvfModel.load(str(tmpdir / f"ivf_{typename}")) @@ -58,8 +58,8 @@ def test_ivf_centroids_distance_type(tmpdir): ds = gen_dataset(tmpdir) def check(distance_type): - ivf = IndicesBuilder(ds).train_ivf( - "vectors", sample_rate=16, distance_type=distance_type + ivf = IndicesBuilder(ds, "vectors").train_ivf( + sample_rate=16, distance_type=distance_type ) assert ivf.distance_type == distance_type ivf.save(str(tmpdir / "ivf")) @@ -74,21 +74,21 @@ def check(distance_type): def test_num_partitions(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16, num_partitions=10) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16, num_partitions=10) assert ivf.num_partitions == 10 @pytest.fixture def ds_with_ivf(tmpdir): ds = gen_dataset(tmpdir) - ivf = IndicesBuilder(ds).train_ivf("vectors", sample_rate=16) + ivf = IndicesBuilder(ds, "vectors").train_ivf(sample_rate=16) return ds, ivf def test_gen_pq(tmpdir, ds_with_ivf): ds, ivf = ds_with_ivf - pq = IndicesBuilder(ds).train_pq("vectors", ivf, sample_rate=16) + pq = IndicesBuilder(ds, "vectors").train_pq(ivf, sample_rate=16) assert pq.dimension == 128 assert pq.num_subvectors == 8 @@ -96,3 +96,23 @@ def test_gen_pq(tmpdir, ds_with_ivf): reloaded = PqModel.load(str(tmpdir / "pq")) assert pq.dimension == reloaded.dimension assert pq.codebook == reloaded.codebook + + +@pytest.mark.cuda +def test_assign_partitions(tmpdir): + ds = gen_dataset(tmpdir) + builder = IndicesBuilder(ds, "vectors") + + ivf = builder.train_ivf(sample_rate=16, num_partitions=20) + partitions_uri = builder.assign_ivf_partitions(ivf, accelerator="cuda") + + partitions = lance.dataset(partitions_uri) + found_row_ids = set() + for batch in partitions.to_batches(): + row_ids = batch["row_id"] + for row_id in row_ids: + found_row_ids.add(row_id) + part_ids = batch["partition"] + for part_id in part_ids: + assert part_id.as_py() < 20 + assert len(found_row_ids) == ds.count_rows()