From 6ab6ba8c1d35cf43c6e373dabf2715aadc51aa16 Mon Sep 17 00:00:00 2001 From: Sabri Eyuboglu <32822771+seyuboglu@users.noreply.github.com> Date: Mon, 9 Aug 2021 17:03:18 -0400 Subject: [PATCH] Make column and DataPanel testing more modular (#118) Closes #109 The BlockManager (see #104) introduces a need for more robust DataPanel testing that tests DataPanels with a diverse set of columns. As we add more columns, we don't want to have to update the DataPanel tests for each new column. Instead, we should specify a TestBed for each column that plugs in to the DataPanel tests. Started this for NumpyArrayColumn with #108. Co-authored-by: Priya --- .coveragerc | 4 +- meerkat/__init__.py | 1 - meerkat/block/manager.py | 12 + meerkat/block/numpy_block.py | 4 +- meerkat/block/pandas_block.py | 3 + meerkat/block/tensor_block.py | 2 +- meerkat/columns/abstract.py | 28 +- meerkat/columns/cell_column.py | 7 + meerkat/columns/image_column.py | 44 +- meerkat/columns/lambda_column.py | 43 +- meerkat/columns/list_column.py | 3 + meerkat/columns/numpy_column.py | 25 +- meerkat/columns/pandas_column.py | 28 +- meerkat/columns/tensor_column.py | 6 + meerkat/datapanel.py | 30 +- meerkat/errors.py | 4 + meerkat/mixins/cloneable.py | 7 +- meerkat/mixins/inspect_fn.py | 12 +- meerkat/mixins/mapping.py | 2 - meerkat/nn/__init__.py | 12 +- tests/meerkat/columns/abstract.py | 280 ++++ tests/meerkat/columns/test_image_column.py | 279 ++++ tests/meerkat/columns/test_numpy_column.py | 293 ++-- tests/meerkat/columns/test_pandas_column.py | 456 +++--- tests/meerkat/columns/test_tensor_column.py | 344 ++--- tests/meerkat/test_datapanel.py | 1527 ++++++++----------- tests/testbeds.py | 118 +- 27 files changed, 1929 insertions(+), 1645 deletions(-) create mode 100644 tests/meerkat/columns/abstract.py create mode 100644 tests/meerkat/columns/test_image_column.py diff --git a/.coveragerc b/.coveragerc index 086c28ffe..551d9cbe7 100644 --- a/.coveragerc +++ b/.coveragerc @@ -12,4 +12,6 @@ exclude_lines = ignore_errors = True omit = tests/* - meerkat/contrib/* \ No newline at end of file + meerkat/contrib/* + meerkat/nn/* + setup.py \ No newline at end of file diff --git a/meerkat/__init__.py b/meerkat/__init__.py index 1d49d6ac6..0f75a8703 100644 --- a/meerkat/__init__.py +++ b/meerkat/__init__.py @@ -9,7 +9,6 @@ initialize_logging() -from meerkat import nn from meerkat.cells.abstract import AbstractCell from meerkat.cells.imagepath import ImagePath from meerkat.cells.spacy import LazySpacyCell, SpacyCell diff --git a/meerkat/block/manager.py b/meerkat/block/manager.py index 2fbdc07e6..8ca509375 100644 --- a/meerkat/block/manager.py +++ b/meerkat/block/manager.py @@ -277,6 +277,18 @@ def _repr_pandas_(self): dfs.append(pd.DataFrame({k: self[k]._repr_pandas_() for k in cols})) return pd.concat(objs=dfs, axis=1) + def view(self): + mgr = BlockManager() + for name, col in self._columns.items(): + mgr.add_column(col.view(), name) + return mgr + + def copy(self): + mgr = BlockManager() + for name, col in self._columns.items(): + mgr.add_column(col.copy(), name) + return mgr + def _serialize_block_index(index: BlockIndex) -> Union[Dict, str, int]: if not isinstance(index, (int, str, slice)): diff --git a/meerkat/block/numpy_block.py b/meerkat/block/numpy_block.py index 5f9806f89..74ed9ee1a 100644 --- a/meerkat/block/numpy_block.py +++ b/meerkat/block/numpy_block.py @@ -38,7 +38,7 @@ def signature(self) -> Hashable: dtype=self.data.dtype, ) - def _get_data(self, index: BlockIndex) -> np.ndarray: + def _get_data(self, index: BlockIndex, materialize: bool = True) -> np.ndarray: return self.data[:, index] @classmethod @@ -59,7 +59,7 @@ def from_data(cls, data: np.ndarray) -> Tuple[NumpyBlock, Mapping[str, BlockInde data = np.expand_dims(data, axis=1) block_index = 0 elif data.shape[1] == 1: - block_index = 0 + block_index = slice(0, 1) else: block_index = slice(0, data.shape[1]) diff --git a/meerkat/block/pandas_block.py b/meerkat/block/pandas_block.py index b3d8e3442..c0feb9fbc 100644 --- a/meerkat/block/pandas_block.py +++ b/meerkat/block/pandas_block.py @@ -84,6 +84,9 @@ def _convert_index(index): if isinstance(index, TensorColumn): # need to convert to numpy for boolean indexing return index.data.numpy() + if isinstance(index, pd.Series): + # need to convert to numpy for boolean indexing + return index.values from meerkat.columns.pandas_column import PandasSeriesColumn if isinstance(index, PandasSeriesColumn): diff --git a/meerkat/block/tensor_block.py b/meerkat/block/tensor_block.py index 1ca8d385c..ef29a3c1e 100644 --- a/meerkat/block/tensor_block.py +++ b/meerkat/block/tensor_block.py @@ -64,7 +64,7 @@ def from_data( data = torch.unsqueeze(data, dim=1) block_index = 0 elif data.shape[1] == 1: - block_index = 0 + block_index = slice(0, 1) else: block_index = slice(0, data.shape[1]) diff --git a/meerkat/columns/abstract.py b/meerkat/columns/abstract.py index 9e6f8ccc0..ec34749aa 100644 --- a/meerkat/columns/abstract.py +++ b/meerkat/columns/abstract.py @@ -73,9 +73,6 @@ def __str__(self): def streamlit(self): return self._repr_pandas_() - def _unpack_data(self, data): - return super(AbstractColumn, self)._unpack_data(data) - def _set_data(self, data): if self.is_blockable(): data = self._unpack_block_view(data) @@ -225,10 +222,9 @@ def __len__(self): return self.full_length() def full_length(self): - # Length of the underlying data stored in the column - if self._data is not None: - return len(self._data) - return 0 + if self._data is None: + return 0 + return len(self._data) def _repr_pandas_(self) -> pd.Series: raise NotImplementedError @@ -244,7 +240,7 @@ def _repr_html_(self): @capture_provenance() def filter( self, - function: Optional[Callable] = None, + function: Callable, with_indices=False, input_columns: Optional[Union[str, List[str]]] = None, is_batched_fn: bool = False, @@ -256,15 +252,11 @@ def filter( **kwargs, ) -> Optional[AbstractColumn]: """Filter the elements of the column using a function.""" - # Just return if the function is None - if function is None: - logger.info("`function` None, returning None.") - return None # Return if `self` has no examples if not len(self): - logger.info("Dataset empty, returning None.") - return None + logger.info("Dataset empty, returning it .") + return self # Get some information about the function function_properties = self._inspect_function( @@ -304,6 +296,14 @@ def concat(columns: Sequence[AbstractColumn]) -> None: # implement specific ones for ListColumn, NumpyColumn etc. raise NotImplementedError + def is_equal(self, other: AbstractColumn) -> bool: + """Tests whether two columns. + + Args: + other (AbstractColumn): [description] + """ + raise NotImplementedError() + def batch( self, batch_size: int = 1, diff --git a/meerkat/columns/cell_column.py b/meerkat/columns/cell_column.py index a353bd8f1..9dab2d7e1 100644 --- a/meerkat/columns/cell_column.py +++ b/meerkat/columns/cell_column.py @@ -60,3 +60,10 @@ def concat(columns: Sequence[CellColumn]): return columns[0].__class__.from_cells( list(tz.concat([c.data for c in columns])) ) + + def is_equal(self, other: AbstractColumn) -> bool: + return ( + (self.__class__ == other.__class__) + and (len(self) == len(other)) + and all([self.lz[idx] == other.lz[idx] for idx in range(len(self))]) + ) diff --git a/meerkat/columns/image_column.py b/meerkat/columns/image_column.py index 139a2d394..5f4802547 100644 --- a/meerkat/columns/image_column.py +++ b/meerkat/columns/image_column.py @@ -6,8 +6,9 @@ import pandas as pd from meerkat.cells.imagepath import ImagePath +from meerkat.columns.abstract import AbstractColumn from meerkat.columns.cell_column import CellColumn -from meerkat.columns.lambda_column import LambdaColumn +from meerkat.columns.lambda_column import LambdaCell, LambdaColumn from meerkat.columns.pandas_column import PandasSeriesColumn from meerkat.tools.lazy_loader import LazyLoader @@ -16,6 +17,32 @@ logger = logging.getLogger(__name__) +class ImageCell(LambdaCell): + def __init__( + self, + transform: callable = None, + loader: callable = None, + data: str = None, + ): + self.loader = self.default_loader if loader is None else loader + self.transform = transform + self._data = data + + def fn(self, filepath: str): + image = self.loader(filepath) + if self.transform is not None: + image = self.transform(image) + return image + + def __eq__(self, other): + return ( + (other.__class__ == self.__class__) + and (self.data == other.data) + and (self.transform == other.transform) + and (self.loader == other.loader) + ) + + class ImageColumn(LambdaColumn): def __init__( self, @@ -25,12 +52,13 @@ def __init__( *args, **kwargs, ): - super(ImageColumn, self).__init__( - PandasSeriesColumn.from_data(data), *args, **kwargs - ) + super(ImageColumn, self).__init__(PandasSeriesColumn(data), *args, **kwargs) self.loader = self.default_loader if loader is None else loader self.transform = transform + def _create_cell(self, data: object) -> ImageCell: + return ImageCell(data=data, loader=self.loader, transform=self.transform) + def fn(self, filepath: str): image = self.loader(filepath) if self.transform is not None: @@ -65,6 +93,14 @@ def _state_keys(cls) -> Collection: def _repr_pandas_(self) -> pd.Series: return "ImageCell(" + self.data.data.reset_index(drop=True) + ")" + def is_equal(self, other: AbstractColumn) -> bool: + return ( + (other.__class__ == self.__class__) + and (self.loader == other.loader) + and (self.transform == other.transform) + and self.data.is_equal(other.data) + ) + class ImageCellColumn(CellColumn): def __init__(self, *args, **kwargs): diff --git a/meerkat/columns/lambda_column.py b/meerkat/columns/lambda_column.py index 172c68e4f..d54e5158d 100644 --- a/meerkat/columns/lambda_column.py +++ b/meerkat/columns/lambda_column.py @@ -41,6 +41,13 @@ def get(self, *args, **kwargs): else: return self.fn(self.data) + def __eq__(self, other): + return ( + (other.__class__ == self.__class__) + and (self.data == other.data) + and (self.fn == other.fn) + ) + class LambdaColumn(AbstractColumn): def __init__( @@ -56,29 +63,35 @@ def __init__( self.fn = fn self._output_type = output_type - def __getattr__(self, name): - if not self._output_type: - raise AttributeError(name) + # TODO (Sabri): reconsider whether this is important functionality. it's not clear + # to me that this is that useful. + # def __getattr__(self, name): + # if not self._output_type: + # raise AttributeError(name) + + # data = self[:2] + # if not hasattr(data, name): + # raise AttributeError(name) - data = self[:2] - if not hasattr(data, name): - raise AttributeError(name) + # data = self[:] + # return data.__getattr__(name - data = self[:] - return data.__getattr__(name) + def _set(self, index, value): + raise ValueError("Cannot setitem on a `LambdaColumn`.") def fn(self, data: object): """Subclasses like `ImageColumn` should be able to implement their own version.""" raise NotImplementedError + def _create_cell(self, data: object) -> LambdaCell: + return LambdaCell(fn=self.fn, data=data) + def _get_cell(self, index: int, materialize: bool = True): if materialize: return self.fn(self._data._get(index, materialize=True)) else: - return LambdaCell( - fn=self.fn, data=self._data._get(index, materialize=False) - ) + return self._create_cell(data=self._data._get(index, materialize=False)) def _get_batch(self, indices: np.ndarray, materialize: bool = True): if materialize: @@ -141,6 +154,14 @@ def _write_data(self, path): # TODO (Sabri): avoid redundant writes in dataframes return self.data.write(os.path.join(path, "data")) + def is_equal(self, other: AbstractColumn) -> bool: + if other.__class__ != self.__class__: + return False + if self.fn != other.fn: + return False + + return self.data.is_equal(other.data) + @staticmethod def _read_data(path: str): # TODO (Sabri): make this work for dataframes underlying the lambda column diff --git a/meerkat/columns/list_column.py b/meerkat/columns/list_column.py index d3cdc2b04..b68a53d03 100644 --- a/meerkat/columns/list_column.py +++ b/meerkat/columns/list_column.py @@ -74,3 +74,6 @@ def concat(cls, columns: Sequence[ListColumn]): if issubclass(cls, CloneableMixin): return columns[0]._clone(data=data) return cls.from_list(data) + + def is_equal(self, other: AbstractColumn) -> bool: + return (self.__class__ == other.__class__) and self.data == other.data diff --git a/meerkat/columns/numpy_column.py b/meerkat/columns/numpy_column.py index 04ed34618..9580be7c2 100644 --- a/meerkat/columns/numpy_column.py +++ b/meerkat/columns/numpy_column.py @@ -15,7 +15,6 @@ from meerkat.block.abstract import BlockView from meerkat.block.numpy_block import NumpyBlock from meerkat.columns.abstract import AbstractColumn -from meerkat.mixins.cloneable import CloneableMixin from meerkat.writers.concat_writer import ConcatWriter Representer.add_representer(abc.ABCMeta, Representer.represent_name) @@ -44,7 +43,7 @@ class NumpyArrayColumn( def __init__( self, - data: Sequence = None, + data: Sequence, *args, **kwargs, ): @@ -54,25 +53,30 @@ def __init__( "Cannot create `NumpyArrayColumn` from a `BlockView` not " "referencing a `NumpyBlock`." ) - elif data is not None and not isinstance(data, np.memmap): + elif not isinstance(data, np.memmap): data = np.asarray(data) super(NumpyArrayColumn, self).__init__(data=data, *args, **kwargs) # TODO (sabri): need to support str here _HANDLED_TYPES = (np.ndarray, numbers.Number) - def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): + def __array_ufunc__(self, ufunc: np.ufunc, method, *inputs, **kwargs): out = kwargs.get("out", ()) for x in inputs + out: # Only support operations with instances of _HANDLED_TYPES. # Use ArrayLike instead of type(self) for isinstance to # allow subclasses that don't override __array_ufunc__ to # handle ArrayLike objects. - if not isinstance(x, self._HANDLED_TYPES + (NumpyArrayColumn,)): + if not isinstance(x, self._HANDLED_TYPES + (NumpyArrayColumn,)) and not ( + # support for at index + method == "at" + and isinstance(x, list) + ): return NotImplemented # Defer to the implementation of the ufunc on unwrapped values. inputs = tuple(x.data if isinstance(x, NumpyArrayColumn) else x for x in inputs) + if out: kwargs["out"] = tuple( x.data if isinstance(x, NumpyArrayColumn) else x for x in out @@ -87,7 +91,7 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): return None else: # one return value - return type(self)(data=result) + return self._clone(data=result) def __getattr__(self, name): try: @@ -143,9 +147,12 @@ def _read_data( @classmethod def concat(cls, columns: Sequence[NumpyArrayColumn]): data = np.concatenate([c.data for c in columns]) - if issubclass(cls, CloneableMixin): - return columns[0]._clone(data=data) - return cls.from_array(data) + return columns[0]._clone(data=data) + + def is_equal(self, other: AbstractColumn) -> bool: + if other.__class__ != self.__class__: + return False + return (self.data == other.data).all() @classmethod def get_writer(cls, mmap: bool = False, template: AbstractColumn = None): diff --git a/meerkat/columns/pandas_column.py b/meerkat/columns/pandas_column.py index 44b0e8a42..348e79c85 100644 --- a/meerkat/columns/pandas_column.py +++ b/meerkat/columns/pandas_column.py @@ -32,7 +32,6 @@ from meerkat.block.abstract import BlockView from meerkat.block.pandas_block import PandasBlock from meerkat.columns.abstract import AbstractColumn -from meerkat.mixins.cloneable import CloneableMixin Representer.add_representer(abc.ABCMeta, Representer.represent_name) @@ -143,7 +142,7 @@ def _set_data(self, data: object): "Cannot create `PandasSeriesColumn` from a `BlockView` not " "referencing a `PandasBlock`." ) - elif data is not None: + elif not isinstance(data, pd.Series): data = pd.Series(data) super(PandasSeriesColumn, self)._set_data(data) @@ -170,10 +169,10 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): if type(result) is tuple: # multiple return values - return tuple(type(self)(x) for x in result) + return tuple(type(self)(x) for x in result) # pragma: no cover elif method == "at": # no return value - return None + return None # pragma: no cover else: # one return value return type(self)(result) @@ -210,7 +209,7 @@ def _get(self, index, materialize: bool = True): return data def _set_cell(self, index, value): - self._data[index] = value + self._data.iloc[index] = value def _set_batch(self, indices, values): self._data.iloc[indices] = values @@ -218,9 +217,7 @@ def _set_batch(self, indices, values): @classmethod def concat(cls, columns: Sequence[PandasSeriesColumn]): data = pd.concat([c.data for c in columns]) - if issubclass(cls, CloneableMixin): - return columns[0]._clone(data=data) - return cls.from_array(data) + return columns[0]._clone(data=data) def _write_data(self, path: str) -> None: data_path = os.path.join(path, "data.pd") @@ -231,8 +228,6 @@ def _read_data( path: str, ): data_path = os.path.join(path, "data.pd") - if not os.path.exists(data_path): - data_path = path # Load in the data return pd.read_pickle(data_path) @@ -243,8 +238,19 @@ def _repr_pandas_(self) -> pd.Series: def to_tensor(self) -> torch.Tensor: """Use `column.to_tensor()` instead of `torch.tensor(column)`, which is very slow.""" + dtype = self.data.values.dtype + if not np.issubdtype(dtype, np.number): + raise ValueError( + f"Cannot convert `PandasSeriesColumn` with dtype={dtype} to tensor." + ) + # TODO (Sabri): understand why `torch.tensor(column)` is so slow - return torch.tensor(self.data) + return torch.tensor(self.data.values) + + def is_equal(self, other: AbstractColumn) -> bool: + if other.__class__ != self.__class__: + return False + return (self.data.values == other.data.values).all() def to_pandas(self) -> pd.Series: return self.data diff --git a/meerkat/columns/tensor_column.py b/meerkat/columns/tensor_column.py index e7587ebee..e0a5ece81 100644 --- a/meerkat/columns/tensor_column.py +++ b/meerkat/columns/tensor_column.py @@ -173,3 +173,9 @@ def _write_data(self, path: str) -> None: @staticmethod def _read_data(path: str) -> torch.Tensor: return torch.load(os.path.join(path, "data.pt")) + + def is_equal(self, other: AbstractColumn) -> bool: + return (other.__class__ == self.__class__) and (self.data == other.data).all() + + def to_tensor(self) -> torch.Tensor: + return self.data diff --git a/meerkat/datapanel.py b/meerkat/datapanel.py index f9259fb00..b6fe3aa96 100644 --- a/meerkat/datapanel.py +++ b/meerkat/datapanel.py @@ -358,6 +358,9 @@ def add_column( f"set `overwrite=True` to overwrite." ) + if name in self.all_columns: + self.remove_column(name) + column = AbstractColumn.from_data(data) assert len(column) == len(self), ( @@ -368,11 +371,8 @@ def add_column( # Add the column self.data[name] = column - if name not in self.all_columns: - self.all_columns.append(name) - if self._visible_columns is not None: - self.visible_columns.append(name) + self.visible_columns = self.visible_columns + [name] # Set features self._set_features() @@ -746,12 +746,6 @@ def _collate(self, batch: List): dp = self._clone(data=new_batch) return dp - def _copy_data(self) -> object: - return {name: column.copy() for name, column in self.data.items()} - - def _view_data(self) -> object: - return {name: column.view() for name, column in self.data.items()} - @staticmethod def _convert_to_batch_fn( function: Callable, with_indices: bool, materialize: bool = True, **kwargs @@ -861,7 +855,7 @@ def update( # Get some information about the function dp = self[input_columns] if input_columns is not None else self function_properties = dp._inspect_function( - function, with_indices, is_batched_fn, materialize=materialize + function, with_indices, is_batched_fn, materialize=materialize, **kwargs ) assert ( function_properties.dict_output @@ -973,6 +967,7 @@ def filter( with_indices, is_batched_fn=is_batched_fn, materialize=materialize, + **kwargs, ) assert function_properties.bool_output, "function must return boolean." @@ -1097,13 +1092,6 @@ def write( metadata_path = os.path.join(path, "meta.yaml") yaml.dump(metadata, open(metadata_path, "w")) - @classmethod - def from_state(cls, state: Dict, *args, **kwargs) -> DataPanel: - datapanel = super(DataPanel, cls).from_state(state, *args, **kwargs) - datapanel._create_logdir() - datapanel._set_features() - return datapanel - @classmethod def _state_keys(cls) -> set: """List of attributes that describe the state of the object.""" @@ -1113,3 +1101,9 @@ def _state_keys(cls) -> set: "_info", "_split", } + + def _view_data(self) -> object: + return self.data.view() + + def _copy_data(self) -> object: + return self.data.copy() diff --git a/meerkat/errors.py b/meerkat/errors.py index 4fb7bf254..74c1aae25 100644 --- a/meerkat/errors.py +++ b/meerkat/errors.py @@ -8,3 +8,7 @@ class ConcatError(ValueError): class ConsolidationError(ValueError): pass + + +class ExperimentalWarning(FutureWarning): + pass diff --git a/meerkat/mixins/cloneable.py b/meerkat/mixins/cloneable.py index ac506e4c2..600f82783 100644 --- a/meerkat/mixins/cloneable.py +++ b/meerkat/mixins/cloneable.py @@ -18,7 +18,7 @@ def __init__(self, *args, **kwargs): @classmethod def _state_keys(cls) -> set: - """ """ + """""" raise NotImplementedError() @classmethod @@ -37,7 +37,7 @@ def _clone(self, data: object = None): if isinstance(self, BlockableMixin) and self.is_blockable(): data = self._pack_block_view() else: - data = self.data + data = self._view_data() state = self._get_state(clone=True) @@ -49,6 +49,9 @@ def _clone(self, data: object = None): def _copy_data(self) -> object: raise NotImplementedError + def _view_data(self) -> object: + return self.data + def _get_state(self, clone: bool = False) -> dict: state = {key: getattr(self, key) for key in self._state_keys()} if clone: diff --git a/meerkat/mixins/inspect_fn.py b/meerkat/mixins/inspect_fn.py index 5c11590b1..5e1dbb19e 100644 --- a/meerkat/mixins/inspect_fn.py +++ b/meerkat/mixins/inspect_fn.py @@ -53,6 +53,8 @@ def _inspect_function( # lazy import to avoid circular dependency from meerkat.columns.abstract import AbstractColumn + from meerkat.columns.numpy_column import NumpyArrayColumn + from meerkat.columns.tensor_column import TensorColumn if isinstance(output, Mapping): # `function` returns a dict output @@ -75,8 +77,14 @@ def _inspect_function( elif ( isinstance(output, (bool, np.bool_)) - or (isinstance(output, np.ndarray) and output.dtype == np.bool) - or (isinstance(output, torch.Tensor) and output.dtype == torch.bool) + or ( + isinstance(output, (np.ndarray, NumpyArrayColumn)) + and output.dtype == bool + ) + or ( + isinstance(output, (torch.Tensor, TensorColumn)) + and output.dtype == torch.bool + ) ): # `function` returns a bool diff --git a/meerkat/mixins/mapping.py b/meerkat/mixins/mapping.py index 3bbfc2af7..d10bcb05c 100644 --- a/meerkat/mixins/mapping.py +++ b/meerkat/mixins/mapping.py @@ -32,8 +32,6 @@ def map( from meerkat.datapanel import DataPanel """Map a function over the elements of the column.""" - # Check if need to materialize: - # TODO(karan): figure out if we need materialize=False # Just return if the function is None if function is None: diff --git a/meerkat/nn/__init__.py b/meerkat/nn/__init__.py index b2c0b0fa5..ddd45c051 100644 --- a/meerkat/nn/__init__.py +++ b/meerkat/nn/__init__.py @@ -1,5 +1,15 @@ -"""Import nn module classes""" +"""Import nn module classes.""" # flake8: noqa +import warnings + +from meerkat.errors import ExperimentalWarning + +warnings.warn( + ExperimentalWarning( + "The `meerkat.nn` module is experimental and has limited test coverage. " + "Proceed with caution." + ) +) from meerkat.nn.embedding_column import EmbeddingColumn from meerkat.nn.huggingfacemodel import HuggingfaceModel diff --git a/tests/meerkat/columns/abstract.py b/tests/meerkat/columns/abstract.py new file mode 100644 index 000000000..aad1bbc99 --- /dev/null +++ b/tests/meerkat/columns/abstract.py @@ -0,0 +1,280 @@ +import os +import pickle +from functools import wraps +from itertools import product + +import numpy as np +import pandas as pd +import pytest + +from meerkat.datapanel import DataPanel + + +@pytest.fixture +def testbed(request, tmpdir): + testbed_class, config = request.param + return testbed_class(**config, tmpdir=tmpdir) + + +class AbstractColumnTestBed: + + DEFAULT_CONFIG = {} + + @classmethod + def get_params(cls, config: dict = None, params: dict = None, single: bool = False): + updated_config = cls.DEFAULT_CONFIG.copy() + if config is not None: + updated_config.update(config) + configs = [ + (cls, config) + for config in map( + dict, + product(*[[(k, v) for v in vs] for k, vs in updated_config.items()]), + ) + ] + if single: + configs = configs[:1] + if params is None: + return { + "argnames": "testbed", + "argvalues": configs, + "ids": [str(config) for config in configs], + } + else: + argvalues = list(product(configs, *params.values())) + return { + "argnames": "testbed," + ",".join(params.keys()), + "argvalues": argvalues, + "ids": [",".join(map(str, values)) for values in argvalues], + } + + @classmethod + @wraps(pytest.mark.parametrize) + def parametrize( + cls, config: dict = None, params: dict = None, single: bool = False + ): + return pytest.mark.parametrize( + **cls.get_params(config=config, params=params, single=single), + indirect=["testbed"] + ) + + @classmethod + def single(cls, tmpdir): + return cls(**cls.get_params(single=True)["argvalues"][0][1], tmpdir=tmpdir) + + def get_map_spec(self, key: str = "default"): + raise NotImplementedError() + + def get_data(self, index): + raise NotImplementedError() + + @staticmethod + def assert_data_equal(data1: np.ndarray, data2: np.ndarray): + raise NotImplementedError() + + +class TestAbstractColumn: + __test__ = False + testbed_class: type = None + column_class: type = None + + def test_getitem(self, testbed, index_type: type = np.array): + col = testbed.col + + testbed.assert_data_equal(testbed.get_data(1), col[1]) + + for index in [ + slice(2, 4, 1), + (np.arange(len(col)) % 2).astype(bool), + np.array([0, 3, 5, 6]), + ]: + col_index = index_type(index) if not isinstance(index, slice) else index + data = testbed.get_data(index) + result = col[col_index] + testbed.assert_data_equal(data, result.data) + + if type(result) == type(col): + # if the getitem returns a column of the same type, enforce that all the + # attributes were cloned over appropriately. We don't want to check this + # for columns that return columns of different type from getitem + # (e.g. LambdaColumn) + assert col._clone(data=data).is_equal(result) + + def _get_data_to_set(self, testbed, data_index): + raise NotImplementedError + + def test_set_item(self, testbed, index_type: type = np.array): + col = testbed.col + + for index in [ + 1, + slice(2, 4, 1), + (np.arange(len(col)) % 2).astype(bool), + np.array([0, 3, 5, 6]), + ]: + col_index = index_type(index) if isinstance(index, np.ndarray) else index + data_to_set = self._get_data_to_set(testbed, index) + col[col_index] = data_to_set + testbed.assert_data_equal(data_to_set, testbed.get_data(index)) + + def test_map_return_single( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + """`map`, single return,""" + col = testbed.col + map_spec = testbed.get_map_spec(batched=batched, materialize=materialize) + + def func(x): + out = map_spec["fn"](x) + return out + + result = col.map( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type=map_spec.get("output_type", None), + ) + assert result.is_equal(map_spec["expected_result"]) + + def test_map_return_single_w_kwarg( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + """`map`, single return,""" + col = testbed.col + kwarg = 2 + map_spec = testbed.get_map_spec( + batched=batched, materialize=materialize, kwarg=kwarg + ) + + def func(x, k=0): + out = map_spec["fn"](x, k=k) + return out + + result = col.map( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type=map_spec.get("output_type", None), + k=kwarg, + ) + assert result.is_equal(map_spec["expected_result"]) + + def test_map_return_multiple( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool = True + ): + """`map`, single return,""" + col = testbed.col + map_specs = { + "map1": testbed.get_map_spec( + batched=batched, materialize=materialize, salt=1 + ), + "map2": testbed.get_map_spec( + batched=batched, materialize=materialize, salt=2 + ), + } + + def func(x): + out = {key: map_spec["fn"](x) for key, map_spec in map_specs.items()} + return out + + result = col.map( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type=list(map_specs.values())[0].get("output_type", None), + ) + assert isinstance(result, DataPanel) + for key, map_spec in map_specs.items(): + assert result[key].is_equal(map_spec["expected_result"]) + + def test_filter_1( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool = True + ): + """multiple_dim=False.""" + col = testbed.col + filter_spec = testbed.get_filter_spec(batched=batched, materialize=materialize) + + def func(x): + out = filter_spec["fn"](x) + return out + + result = col.filter( + func, batch_size=4, is_batched_fn=batched, materialize=materialize + ) + + assert result.is_equal(filter_spec["expected_result"]) + + def test_copy(self, testbed: AbstractColumnTestBed): + col, _ = testbed.col, testbed.data + col_copy = col.copy() + + assert isinstance(col_copy, self.column_class) + assert col.is_equal(col_copy) + + def test_pickle(self, testbed): + # important for dataloader + col = testbed.col + buf = pickle.dumps(col) + new_col = pickle.loads(buf) + + assert isinstance(new_col, self.column_class) + assert col.is_equal(new_col) + + def test_io(self, tmp_path, testbed: AbstractColumnTestBed): + # uses the tmp_path fixture which will provide a + # temporary directory unique to the test invocation, + # important for dataloader + col, _ = testbed.col, testbed.data + + path = os.path.join(tmp_path, "test") + col.write(path) + + new_col = self.column_class.read(path) + + assert isinstance(new_col, self.column_class) + assert col.is_equal(new_col) + + def test_head(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + length = 10 + result = testbed.col.head(length) + assert len(result) == length + assert result.is_equal(testbed.col.lz[:length]) + + def test_tail(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + length = 10 + result = testbed.col.tail(length) + assert len(result) == length + assert result.is_equal(testbed.col.lz[-length:]) + + def test_repr_html(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + testbed.col._repr_html_() + + def test_str(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + result = str(testbed.col) + assert isinstance(result, str) + + def test_repr(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + result = repr(testbed.col) + assert isinstance(result, str) + + def test_streamlit(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + testbed.col.streamlit() + + def test_repr_pandas(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + series = testbed.col._repr_pandas_() + assert isinstance(series, pd.Series) + + def test_to_pandas(self, tmpdir): + testbed = self.testbed_class.single(tmpdir=tmpdir) + series = testbed.col.to_pandas() + assert isinstance(series, pd.Series) diff --git a/tests/meerkat/columns/test_image_column.py b/tests/meerkat/columns/test_image_column.py new file mode 100644 index 000000000..39025eaf6 --- /dev/null +++ b/tests/meerkat/columns/test_image_column.py @@ -0,0 +1,279 @@ +"""Unittests for NumpyColumn.""" +from __future__ import annotations + +import os +from typing import List, Union + +import numpy as np +import pytest +import torch +import torchvision.datasets.folder as folder +from PIL import Image +from torchvision.transforms.functional import to_tensor + +from meerkat import ImageColumn +from meerkat.columns.abstract import AbstractColumn +from meerkat.columns.image_column import ImageCell +from meerkat.columns.lambda_column import LambdaCell +from meerkat.columns.list_column import ListColumn +from meerkat.columns.pandas_column import PandasSeriesColumn +from meerkat.columns.tensor_column import TensorColumn + +from .abstract import AbstractColumnTestBed, TestAbstractColumn + + +class ImageColumnTestBed(AbstractColumnTestBed): + + DEFAULT_CONFIG = {"transform": [True, False]} + + def __init__( + self, + tmpdir: str, + length: int = 16, + transform: bool = False, + seed: int = 123, + ): + self.image_paths = [] + self.image_arrays = [] + self.ims = [] + self.data = [] + + transform = to_tensor if transform else None + + for i in range(0, length): + self.image_paths.append(os.path.join(tmpdir, "{}.png".format(i))) + self.image_arrays.append((i * np.ones((4, 4, 3))).astype(np.uint8)) + im = Image.fromarray(self.image_arrays[-1]) + self.ims.append(im) + self.data.append(transform(im) if transform else im) + im.save(self.image_paths[-1]) + if transform is not None: + self.data = torch.stack(self.data) + self.transform = transform + self.col = ImageColumn.from_filepaths( + self.image_paths, transform=transform, loader=folder.default_loader + ) + + def get_map_spec( + self, + batched: bool = True, + materialize: bool = False, + kwarg: int = 0, + salt: int = 1, + ): + if not materialize: + if batched: + return {"fn": lambda x, k=0: x, "expected_result": self.col} + else: + # can't check for cell column equivalence because the `fn` is a bound + # method of different objects (since we perform batching then convert) + # non-batched fns to batched functions, so we call get + if self.transform is None: + return { + "fn": lambda x, k=0: x.get().rotate(45 + salt + k), + "expected_result": ListColumn( + [im.rotate(45 + salt + kwarg) for im in self.ims] + ), + } + else: + return { + "fn": lambda x, k=0: x.get() + salt + k, + "expected_result": TensorColumn( + torch.stack([self.transform(im) for im in self.ims]) + + salt + + kwarg + ), + } + else: + if self.transform is None: + return { + "fn": (lambda x, k=0: [im.rotate(45 + salt + k) for im in x]) + if batched + else (lambda x, k=0: x.rotate(45 + salt + k)), + "expected_result": ListColumn( + [im.rotate(45 + salt + kwarg) for im in self.ims] + ), + } + else: + return { + "fn": lambda x, k=0: x + salt + k, + "expected_result": TensorColumn( + torch.stack([self.transform(im) for im in self.ims]) + + salt + + kwarg + ), + } + + def get_filter_spec( + self, + batched: bool = True, + materialize: bool = False, + salt: int = 1, + kwarg: int = 0, + ): + if not materialize: + if batched: + return { + "fn": lambda x, k=0: [ + int(os.path.splitext(os.path.basename(cell.data))[0]) + < (4 + salt + k) + for cell in x.lz + ], + "expected_result": self.col.lz[: 4 + salt + kwarg], + } + else: + return { + "fn": ( + lambda x, k=0: int( + os.path.splitext(os.path.basename(x.data))[0] + ) + < (4 + salt + k) + ), + "expected_result": self.col.lz[: 4 + salt + kwarg], + } + else: + if self.transform is None: + return { + "fn": (lambda x, k=0: [im.rotate(45 + salt + k) for im in x]) + if batched + else (lambda x, k=0: x.rotate(45 + salt + k)), + "expected_result": ListColumn( + [im.rotate(45 + salt + kwarg) for im in self.ims] + ), + } + else: + return { + "fn": lambda x, k=0: ( + (x.mean(dim=[1, 2, 3]) if batched else x.mean()) > salt + k + ).to(bool), + "expected_result": self.col.lz[ + torch.stack([self.transform(im) for im in self.ims]) + .mean(dim=[1, 2, 3]) + .numpy() + > salt + kwarg + ], + } + + def get_data(self, index, materialize: bool = True): + if materialize: + if isinstance(index, int): + return self.data[index] + + if self.transform is not None: + return self.data[index] + else: + index = np.arange(len(self.data))[index] + return [self.data[idx] for idx in index] + else: + if isinstance(index, int): + return ImageCell( + data=self.image_paths[index], + loader=self.col.loader, + transform=self.col.transform, + ) + index = np.arange(len(self.data))[index] + return PandasSeriesColumn([self.image_paths[idx] for idx in index]) + + @staticmethod + def assert_data_equal( + data1: Union[Image.Image, AbstractColumn, List, torch.Tensor], + data2: Union[Image.Image, AbstractColumn, List, torch.Tensor], + ): + if isinstance(data1, Image.Image) or isinstance(data1, List): + assert data1 == data2 + elif isinstance(data1, AbstractColumn): + assert data1.is_equal(data2) + elif torch.is_tensor(data1): + assert (data1 == data2).all() + elif isinstance(data1, LambdaCell): + assert data1 == data2 + else: + raise ValueError( + "Cannot assert data equal between objects type:" + f" {type(data1), type(data2)}" + ) + + +@pytest.fixture +def testbed(request, tmpdir): + testbed_class, config = request.param + return testbed_class(**config, tmpdir=tmpdir) + + +class TestImageColumn(TestAbstractColumn): + + __test__ = True + testbed_class: type = ImageColumnTestBed + column_class: type = ImageColumn + + def _get_data_to_set(self, testbed, data_index): + return np.zeros_like(testbed.get_data(data_index)) + + @ImageColumnTestBed.parametrize(single=True, params={"index_type": [np.ndarray]}) + def test_set_item(self, testbed, index_type: type): + with pytest.raises(ValueError, match="Cannot setitem on a `LambdaColumn`."): + testbed.col[0] = 0 + + @ImageColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_getitem(self, testbed, index_type: type): + return super().test_getitem(testbed, index_type=index_type) + + @ImageColumnTestBed.parametrize( + config={"transform": [True]}, + params={"batched": [True, False], "materialize": [True, False]}, + ) + def test_filter_1( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + return super().test_filter_1(testbed, batched, materialize=materialize) + + @ImageColumnTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} + ) + def test_map_return_multiple( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + return super().test_map_return_multiple( + testbed, batched, materialize=materialize + ) + + @ImageColumnTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} + ) + def test_map_return_single( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + return super().test_map_return_single(testbed, batched, materialize) + + @ImageColumnTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} + ) + def test_map_return_single_w_kwarg( + self, testbed: AbstractColumnTestBed, batched: bool, materialize: bool + ): + return super().test_map_return_single_w_kwarg(testbed, batched, materialize) + + @ImageColumnTestBed.parametrize() + def test_copy(self, testbed: AbstractColumnTestBed): + return super().test_copy(testbed) + + @ImageColumnTestBed.parametrize() + def test_io(self, tmp_path, testbed): + # uses the tmp_path fixture which will provide a + # temporary directory unique to the test invocation, + # important for dataloader + col, _ = testbed.col, testbed.data + + path = os.path.join(tmp_path, "test") + col.write(path) + + new_col = self.column_class.read(path) + + assert isinstance(new_col, self.column_class) + # can't check if the functions are the same since they point to different + # methods + assert col.data.is_equal(new_col.data) + + @ImageColumnTestBed.parametrize() + def test_pickle(self, testbed): + super().test_pickle(testbed) diff --git a/tests/meerkat/columns/test_numpy_column.py b/tests/meerkat/columns/test_numpy_column.py index 31adbd4a0..f41750edf 100644 --- a/tests/meerkat/columns/test_numpy_column.py +++ b/tests/meerkat/columns/test_numpy_column.py @@ -1,19 +1,16 @@ -"""Unittests for NumpyColumn.""" -import os -import pickle -from functools import wraps -from itertools import product - import numpy as np import numpy.testing as np_test +import pandas as pd import pytest import torch from meerkat import NumpyArrayColumn -from meerkat.datapanel import DataPanel +from meerkat.block.tensor_block import TensorBlock + +from .abstract import AbstractColumnTestBed, TestAbstractColumn -class NumpyArrayColumnTestBed: +class NumpyArrayColumnTestBed(AbstractColumnTestBed): DEFAULT_CONFIG = { "num_dims": [1, 2, 3], @@ -22,9 +19,16 @@ class NumpyArrayColumnTestBed: } def __init__( - self, length: int = 16, num_dims: int = True, dim_length: int = 5, dtype="float" + self, + length: int = 16, + num_dims: int = True, + dim_length: int = 5, + dtype="float", + seed: int = 123, + tmpdir: str = None, ): - np.random.seed(123) + self.dtype = dtype + np.random.seed(seed) array = ( np.random.random((length, *[dim_length for _ in range(num_dims - 1)])) * 10 ) @@ -33,177 +37,150 @@ def __init__( self.col = NumpyArrayColumn.from_array(array) self.data = array - @classmethod - def get_params(cls, config: dict = None, params: dict = None): - updated_config = cls.DEFAULT_CONFIG.copy() - if config is not None: - updated_config.update(config) - configs = list( - map( - dict, - product(*[[(k, v) for v in vs] for k, vs in updated_config.items()]), - ) - ) - if params is None: - return "config", configs - else: - return "config," + ",".join(params.keys()), product( - configs, *params.values() - ) - - @classmethod - @wraps(pytest.mark.parametrize) - def parametrize(cls, config: dict = None, params: dict = None): - return pytest.mark.parametrize( - *NumpyArrayColumnTestBed.get_params(config=config, params=params) - ) - - -def test_from_array(): - # Build a dataset from a batch - array = np.random.rand(10, 3, 3) - col = NumpyArrayColumn.from_array(array) - - assert (col == array).all() - np_test.assert_equal(len(col), 10) - - -@NumpyArrayColumnTestBed.parametrize( - config={"num_dims": [2]}, - params={"batched": [True, False], "use_kwargs": [True, False]}, -) -def test_map_return_single(config, batched, use_kwargs): - """`map`, single return,""" - testbed = NumpyArrayColumnTestBed(**config) - col, array = testbed.col, testbed.data - - def func(x, bias=0): - out = x.mean(axis=-1) + bias - return out - - bias = 1 if use_kwargs else 0 - kwargs = {"bias": bias} if use_kwargs else {} - - result = col.map(func, batch_size=4, is_batched_fn=batched, **kwargs) - - assert isinstance(result, NumpyArrayColumn) - np_test.assert_equal(len(result), len(array)) - assert (result == array.mean(axis=-1) + bias).all() - - -@NumpyArrayColumnTestBed.parametrize( - config={"num_dims": [2]}, - params={"batched": [True, False], "use_kwargs": [True, False]}, -) -def test_map_return_multiple(config, batched, use_kwargs): - """`map`, multiple return.""" - testbed = NumpyArrayColumnTestBed(**config) - col, array = testbed.col, testbed.data - - def func(x, bias=0): - return {"mean": x.mean(axis=-1) + bias, "std": x.std(axis=-1) + bias} - - bias = 1 if use_kwargs else 0 - kwargs = {"bias": bias} if use_kwargs else {} - - result = col.map(func, batch_size=4, is_batched_fn=batched, **kwargs) - assert isinstance(result, DataPanel) - assert isinstance(result["std"], NumpyArrayColumn) - assert isinstance(result["mean"], NumpyArrayColumn) - np_test.assert_equal(len(result), len(array)) - assert (result["mean"] == array.mean(axis=-1) + bias).all() - assert (result["std"] == array.std(axis=-1) + bias).all() + def get_map_spec( + self, + batched: bool = True, + materialize: bool = False, + kwarg: int = 0, + salt: int = 1, + ): + return { + "fn": lambda x, k=0: x + salt + k, + "expected_result": NumpyArrayColumn.from_array( + self.col.data + salt + kwarg + ), + } + + def get_filter_spec( + self, + batched: bool = True, + materialize: bool = False, + kwarg: int = 0, + salt: int = 1, + ): + return { + "fn": lambda x, k=0: x > 3 + k + salt, + "expected_result": self.col[self.col.data > 3 + salt + kwarg], + } + def get_data(self, index, materialize=True): + return self.data[index] -@NumpyArrayColumnTestBed.parametrize() -def test_set_item_1(config): - testbed = NumpyArrayColumnTestBed(**config) - col, array = testbed.col, testbed.data - index = [0, 3] - not_index = [i for i in range(col.shape[0]) if i not in index] - col[index] = 0 + @staticmethod + def assert_data_equal(data1: np.ndarray, data2: np.ndarray): + assert (data1 == data2).all() - assert (col[not_index] == array[not_index]).all() - assert (col[index] == 0).all() +@pytest.fixture +def testbed(request, tmpdir): + testbed_class, config = request.param + return testbed_class(**config, tmpdir=tmpdir) -@NumpyArrayColumnTestBed.parametrize() -def test_set_item_2(config): - testbed = NumpyArrayColumnTestBed(**config) - col, array = testbed.col, testbed.data - index = 0 - not_index = [i for i in range(col.shape[0]) if i != index] - col[index] = 0 - assert (col[not_index] == array[not_index]).all() - assert (col[index] == 0).all() +class TestNumpyArrayColumn(TestAbstractColumn): + __test__ = True + testbed_class: type = NumpyArrayColumnTestBed + column_class: type = NumpyArrayColumn -@NumpyArrayColumnTestBed.parametrize( - config={"num_dims": [1]}, - params={"batched": [True, False], "use_kwargs": [True, False]}, -) -def test_filter_1(config, batched, use_kwargs): - """multiple_dim=False.""" - testbed = NumpyArrayColumnTestBed(**config) - col, array = testbed.col, testbed.data + def test_init_block(self): + block_view = TensorBlock(torch.zeros(10, 10))[0] + with pytest.raises(ValueError): + NumpyArrayColumn(block_view) - def func(x, thresh=20): - return x > thresh + def _get_data_to_set(self, testbed, data_index): + return np.zeros_like(testbed.get_data(data_index)) - thresh = 10 if use_kwargs else 20 - kwargs = {"thresh": thresh} if use_kwargs else {} + @NumpyArrayColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_set_item(self, testbed, index_type: type): + return super().test_set_item(testbed, index_type=index_type) - result = col.filter(func, batch_size=4, is_batched_fn=batched, **kwargs) - assert isinstance(result, NumpyArrayColumn) - assert len(result) == (array > thresh).sum() + @NumpyArrayColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_getitem(self, testbed, index_type: type): + return super().test_getitem(testbed, index_type=index_type) + @NumpyArrayColumnTestBed.parametrize( + config={"num_dims": [1], "dim_length": [1]}, params={"batched": [True, False]} + ) + def test_filter_1(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_filter_1(testbed, batched, materialize=True) -@NumpyArrayColumnTestBed.parametrize() -def test_pickle(config): - # important for dataloader - testbed = NumpyArrayColumnTestBed(**config) - col, _ = testbed.col, testbed.data - buf = pickle.dumps(col) - new_col = pickle.loads(buf) + @NumpyArrayColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_multiple(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_multiple(testbed, batched, materialize=True) - assert isinstance(new_col, NumpyArrayColumn) - assert (col == new_col).all() + @NumpyArrayColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_single(testbed, batched, materialize=True) + @NumpyArrayColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single_w_kwarg( + self, testbed: AbstractColumnTestBed, batched: bool + ): + return super().test_map_return_single_w_kwarg( + testbed, batched, materialize=True + ) -@NumpyArrayColumnTestBed.parametrize() -def test_io(tmp_path, config): - # uses the tmp_path fixture which will provide a - # temporary directory unique to the test invocation, - # important for dataloader - testbed = NumpyArrayColumnTestBed(**config) - col, _ = testbed.col, testbed.data + @NumpyArrayColumnTestBed.parametrize() + def test_copy(self, testbed: AbstractColumnTestBed): + return super().test_copy(testbed) - path = os.path.join(tmp_path, "test") - col.write(path) + @NumpyArrayColumnTestBed.parametrize() + def test_io(self, tmp_path, testbed): + super().test_io(tmp_path, testbed) - new_col = NumpyArrayColumn.read(path) + @NumpyArrayColumnTestBed.parametrize() + def test_pickle(self, testbed): + super().test_pickle(testbed) - assert isinstance(new_col, NumpyArrayColumn) - assert (col == new_col).all() + @NumpyArrayColumnTestBed.parametrize() + def test_to_tensor(self, testbed): + col, _ = testbed.col, testbed.data + tensor = col.to_tensor() -@NumpyArrayColumnTestBed.parametrize() -def test_copy(config): - testbed = NumpyArrayColumnTestBed(**config) - col, _ = testbed.col, testbed.data - col_copy = col.copy() + assert torch.is_tensor(tensor) + assert (col == tensor.numpy()).all() - assert isinstance(col_copy, NumpyArrayColumn) - assert (col == col_copy).all() + def test_from_array(self): + # Build a dataset from a batch + array = np.random.rand(10, 3, 3) + col = NumpyArrayColumn.from_array(array) + assert (col == array).all() + np_test.assert_equal(len(col), 10) -@NumpyArrayColumnTestBed.parametrize() -def test_to_tensor(config): - testbed = NumpyArrayColumnTestBed(**config) - col, _ = testbed.col, testbed.data + @NumpyArrayColumnTestBed.parametrize() + def test_to_pandas(self, testbed): + series = testbed.col.to_pandas() - tensor = col.to_tensor() + assert isinstance(series, pd.Series) - assert torch.is_tensor(tensor) - assert (col == tensor.numpy()).all() + if testbed.col.shape == 1: + assert (series.values == testbed.col.data).all() + else: + for idx in range(len(testbed.col)): + assert (series.iloc[idx] == testbed.col[idx]).all() + + @NumpyArrayColumnTestBed.parametrize() + def test_repr_pandas(self, testbed): + series = testbed.col.to_pandas() + assert isinstance(series, pd.Series) + + def test_ufunc_out(self): + out = np.zeros(3) + a = NumpyArrayColumn([1, 2, 3]) + b = NumpyArrayColumn([1, 2, 3]) + result = np.add(a, b, out=out) + assert result.data is out + + def test_ufunc_at(self): + a = NumpyArrayColumn([1, 2, 3]) + result = np.add.at(a, [0, 1, 1], 1) + assert result is None + assert a.is_equal(NumpyArrayColumn([2, 4, 3])) + + def test_ufunc_unhandled(self): + a = NumpyArrayColumn([1, 2, 3]) + with pytest.raises(TypeError): + a == "a" diff --git a/tests/meerkat/columns/test_pandas_column.py b/tests/meerkat/columns/test_pandas_column.py index b729d9122..05ba41ae7 100644 --- a/tests/meerkat/columns/test_pandas_column.py +++ b/tests/meerkat/columns/test_pandas_column.py @@ -1,272 +1,194 @@ """Unittests for NumpyColumn.""" -import os -import pickle -from itertools import product + import numpy as np import pandas as pd import pytest - -from meerkat import NumpyArrayColumn, PandasSeriesColumn -from meerkat.columns.tensor_column import TensorColumn -from meerkat.datapanel import DataPanel - -from ...testbeds import MockAnyColumn, MockColumn, MockStrColumn - - -def test_str_accessor(): - testbed = MockStrColumn(col_type=PandasSeriesColumn) - col = testbed.col - - upper_col = col.str.upper() - assert isinstance(upper_col, PandasSeriesColumn) - assert ( - upper_col.values == np.array([f"ROW_{idx}" for idx in testbed.visible_rows]) - ).all() - - -def test_dt_accessor(): - testbed = MockAnyColumn( - data=[f"01/{idx+1}/2001" for idx in range(16)], - col_type=PandasSeriesColumn, - ) - col = testbed.col - col = pd.to_datetime(col) - day_col = col.dt.day - assert isinstance(day_col, PandasSeriesColumn) - assert (day_col.values == np.array(testbed.visible_rows) + 1).all() - - -def test_cat_accessor(): - categories = ["a", "b", "c", "d"] - testbed = MockAnyColumn( - data=categories * 4, - col_type=PandasSeriesColumn, - use_visible_rows=False, - ) - col = testbed.col.astype("category") - - assert (np.array(categories) == col.cat.categories.values).all() - - -@pytest.mark.parametrize( - "dtype,index_type", - product( - ["float", "int", "str"], [NumpyArrayColumn, PandasSeriesColumn, TensorColumn] - ), -) -def test_getitem(dtype, index_type): - """`map`, single return,""" - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - - col = testbed.col - - assert testbed.array[testbed.visible_rows[1]] == col[1] - - assert (testbed.array[testbed.visible_rows[2:4]] == col[2:4].values).all() - - bool_index = (np.arange(len(col)) % 2).astype(bool) - bool_index_col = index_type(bool_index) - assert (testbed.array[bool_index] == col[bool_index_col].values).all() - - -@pytest.mark.parametrize( - "dtype", - ["float", "int", "str"], -) -def test_ops(dtype): - """`map`, single return,""" - col = PandasSeriesColumn(["a", "b", "c", "d"]) - col == "a" - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - - col = testbed.col - - assert testbed.array[testbed.visible_rows[1]] == col[1] - - assert (testbed.array[testbed.visible_rows[2:4]] == col[2:4].values).all() - - -@pytest.mark.parametrize( - "dtype,batched,use_kwargs", - product(["float", "int"], [True, False], [True, False]), -) -def test_map_return_single(dtype, batched, use_kwargs): - """`map`, single return,""" - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - col, array = testbed.col, testbed.array - - def func(x, bias=1): - out = x + bias - return out - - bias = 2 if use_kwargs else 1 - kwargs = {"bias": bias} if use_kwargs else {} - - result = col.map( - func, - batch_size=2, - is_batched_fn=batched, - output_type=PandasSeriesColumn, - **kwargs, - ) - assert isinstance(result, PandasSeriesColumn) - assert len(result) == len(array[testbed.visible_rows]) - assert (result.values == array[testbed.visible_rows] + bias).all() - - -@pytest.mark.parametrize( - "dtype, batched, use_kwargs", - product(["float", "int"], [True, False], [True, False]), -) -def test_map_return_multiple(dtype, batched, use_kwargs): - """`map`, multiple return.""" - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - col, array = testbed.col, testbed.array - - def func(x, bias=1): - return {"a": x + bias, "b": x - bias} - - bias = 2 if use_kwargs else 1 - kwargs = {"bias": bias} if use_kwargs else {} - - result = col.map( - func, - batch_size=2, - is_batched_fn=batched, - output_type=PandasSeriesColumn, - **kwargs, - ) - assert isinstance(result, DataPanel) - assert len(result) == len(array[testbed.visible_rows]) - - assert isinstance(result["a"], PandasSeriesColumn) - assert (result["a"].values == array[testbed.visible_rows] + bias).all() - - assert isinstance(result["b"], PandasSeriesColumn) - assert (result["b"].values == array[testbed.visible_rows] - bias).all() - - -@pytest.mark.parametrize( - "dtype", - ["float", "int", "str"], -) -def test_set_item_1(dtype): - - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - - col = testbed.col - - index = [0, 3] - not_index = [i for i in range(col.shape[0]) if i not in index] - col[index] = 0 - assert (col[not_index] == testbed.array[testbed.visible_rows[not_index]]).all() - assert (col[index] == 0).all() - - -@pytest.mark.parametrize( - "dtype", - ["float", "int", "str"], -) -def test_set_item_2(dtype): - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - - col = testbed.col - - index = 1 - not_index = [i for i in range(col.shape[0]) if i != index] - col[index] = 0 - assert ( - col[not_index].values == testbed.array[testbed.visible_rows[not_index]] - ).all() - assert col[index] == 0 - - -@pytest.mark.parametrize( - "dtype,batched,use_kwargs", - product(["float", "int"], [True, False], [True, False]), -) -def test_filter_1(dtype, batched, use_kwargs): - """multiple_dim=False.""" - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - col, array = testbed.col, testbed.array - - def func(x, thresh=10): - return x > thresh - - thresh = 5 if use_kwargs else 10 - kwargs = {"thresh": thresh} if use_kwargs else {} - - result = col.filter(func, batch_size=4, is_batched_fn=batched, **kwargs) - assert isinstance(result, PandasSeriesColumn) - assert len(result) == (array[testbed.visible_rows] > thresh).sum() - - -@pytest.mark.parametrize( - "multiple_dim, dtype,use_visible_rows", - product([True, False], ["float", "int"], [True, False]), -) -def test_pickle(multiple_dim, dtype, use_visible_rows): - # important for dataloader - testbed = MockColumn( - dtype=dtype, use_visible_rows=use_visible_rows, col_type=PandasSeriesColumn - ) - col = testbed.col - buf = pickle.dumps(col) - new_col = pickle.loads(buf) - - assert isinstance(new_col, PandasSeriesColumn) - assert (col.values == new_col.values).all() - - -@pytest.mark.parametrize( - "dtype", - ["float", "int", "str"], -) -def test_io( - tmp_path, - dtype, -): - # uses the tmp_path fixture which will provide a - # temporary directory unique to the test invocation, - # important for dataloader - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - col = testbed.col - path = os.path.join(tmp_path, "test") - col.write(path) - - new_col = PandasSeriesColumn.read(path) - - assert isinstance(new_col, PandasSeriesColumn) - assert (col == new_col).all() - - -@pytest.mark.parametrize( - "dtype", - ["float", "int", "str"], -) -def test_copy(dtype): - if dtype == "str": - testbed = MockStrColumn(col_type=PandasSeriesColumn) - else: - testbed = MockColumn(dtype=dtype, col_type=PandasSeriesColumn) - col = testbed.col - col_copy = col.copy() - - assert isinstance(col_copy, PandasSeriesColumn) - assert (col == col_copy).all() +import torch + +from meerkat import PandasSeriesColumn +from meerkat.block.tensor_block import TensorBlock + +from .abstract import AbstractColumnTestBed, TestAbstractColumn + + +class PandasSeriesColumnTestBed(AbstractColumnTestBed): + + DEFAULT_CONFIG = { + "contiguous_index": [True, False], + "dtype": ["float", "int", "str"], + } + + def __init__( + self, + length: int = 16, + dtype="float", + contiguous_index: bool = True, + seed: int = 123, + tmpdir: str = None, + ): + self.dtype = dtype + np.random.seed(seed) + array = np.random.random(length) * 10 + series = pd.Series(array).astype(dtype) + if not contiguous_index: + series.index = np.arange(1, 1 + 2 * length, 2) + + self.col = PandasSeriesColumn(series) + self.data = series + + def get_map_spec( + self, + batched: bool = True, + materialize: bool = False, + salt: int = 1, + kwarg: int = 0, + ): + salt = salt if self.dtype != "str" else str(salt) + kwarg = kwarg if self.dtype != "str" else str(kwarg) + return { + "fn": lambda x, k=0: x + salt + (k if self.dtype != "str" else str(k)), + "expected_result": PandasSeriesColumn(self.col.data + salt + kwarg), + "output_type": PandasSeriesColumn, + } + + def get_filter_spec( + self, + batched: bool = True, + materialize: bool = False, + salt: int = 1, + kwarg: int = 0, + ): + salt = 3 + salt if self.dtype != "str" else str(3 + salt) + kwarg = kwarg if self.dtype != "str" else str(kwarg) + return { + "fn": lambda x, k=0: x > salt + (k if self.dtype != "str" else str(k)), + "expected_result": self.col[self.col.data > salt + kwarg], + } + + def get_data(self, index, materialize: bool = True): + return self.data.iloc[index] + + @staticmethod + def assert_data_equal(data1: pd.Series, data2: np.ndarray): + if isinstance(data1, pd.Series): + assert (data1.values == data2.values).all() + else: + assert data1 == data2 + + +@pytest.fixture +def testbed(request, tmpdir): + testbed_class, config = request.param + return testbed_class(**config, tmpdir=tmpdir) + + +class TestPandasSeriesColumn(TestAbstractColumn): + __test__ = True + testbed_class: type = PandasSeriesColumnTestBed + column_class: type = PandasSeriesColumn + + @PandasSeriesColumnTestBed.parametrize({"dtype": ["str"]}) + def test_str_accessor(self, testbed): + col = testbed.col + + new_col = col.str.split(".").str[0].astype(int) + assert isinstance(new_col, PandasSeriesColumn) + assert (new_col == testbed.data.astype(float).astype(int)).all() + + def test_dt_accessor(self): + col = PandasSeriesColumn( + data=[f"01/{idx+1}/2001" for idx in range(16)], + ) + col = pd.to_datetime(col) + day_col = col.dt.day + assert isinstance(day_col, PandasSeriesColumn) + assert (day_col.values == np.arange(16) + 1).all() + + def test_cat_accessor(self): + categories = ["a", "b", "c", "d"] + col = PandasSeriesColumn(data=categories * 4) + col = col.astype("category") + + assert (np.array(categories) == col.cat.categories.values).all() + + def test_init_block(self): + block_view = TensorBlock(torch.zeros(10, 10))[0] + with pytest.raises(ValueError): + PandasSeriesColumn(block_view) + + def _get_data_to_set(self, testbed, data_index): + if isinstance(data_index, int): + return 0 + return pd.Series(np.zeros_like(testbed.get_data(data_index).values)) + + @PandasSeriesColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_set_item(self, testbed, index_type: type): + return super().test_set_item(testbed, index_type=index_type) + + @PandasSeriesColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_getitem(self, testbed, index_type: type): + return super().test_getitem(testbed, index_type=index_type) + + @PandasSeriesColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_filter_1(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_filter_1(testbed, batched, materialize=True) + + @PandasSeriesColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_multiple(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_multiple(testbed, batched, materialize=True) + + @PandasSeriesColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_single(testbed, batched, materialize=True) + + @PandasSeriesColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single_w_kwarg( + self, testbed: AbstractColumnTestBed, batched: bool + ): + return super().test_map_return_single_w_kwarg( + testbed, batched, materialize=True + ) + + @PandasSeriesColumnTestBed.parametrize() + def test_copy(self, testbed: AbstractColumnTestBed): + return super().test_copy(testbed) + + @PandasSeriesColumnTestBed.parametrize() + def test_io(self, tmp_path, testbed): + super().test_io(tmp_path, testbed) + + @PandasSeriesColumnTestBed.parametrize() + def test_pickle(self, testbed): + super().test_pickle(testbed) + + @PandasSeriesColumnTestBed.parametrize() + def test_to_tensor(self, testbed): + col, _ = testbed.col, testbed.data + if testbed.dtype == "str": + with pytest.raises(ValueError): + col.to_tensor() + else: + tensor = col.to_tensor() + + assert torch.is_tensor(tensor) + assert (col == tensor.numpy()).all() + + @PandasSeriesColumnTestBed.parametrize() + def test_to_pandas(self, testbed): + col, _ = testbed.col, testbed.data + series = col.to_pandas() + assert isinstance(series, pd.Series) + assert (col.data.values == series.values).all() + + @PandasSeriesColumnTestBed.parametrize() + def test_repr_pandas(self, testbed): + series = testbed.col.to_pandas() + assert isinstance(series, pd.Series) + + def test_ufunc_out(self): + out = np.zeros(3) + a = PandasSeriesColumn([1, 2, 3]) + b = PandasSeriesColumn([1, 2, 3]) + np.add(a, b, out=out) + assert (out == np.array([2, 4, 6])).all() diff --git a/tests/meerkat/columns/test_tensor_column.py b/tests/meerkat/columns/test_tensor_column.py index 7e836069a..ab3bcddc4 100644 --- a/tests/meerkat/columns/test_tensor_column.py +++ b/tests/meerkat/columns/test_tensor_column.py @@ -1,232 +1,164 @@ -"""Unittests for NumpyColumn.""" -import os -import pickle -from itertools import product - import numpy as np -import numpy.testing as np_test +import pandas as pd import pytest import torch -from meerkat.columns.tensor_column import TensorColumn -from meerkat.datapanel import DataPanel - - -def _get_data(multiple_dim: bool = True, dtype="float", use_visible_rows=False): - if multiple_dim: - array = np.array( - [ - [ - [0.5565041, 1.51486395, 0], - [123, 0.60526485, 0.7246723], - ], - [ - [0.3156991, 0.82733837, 45], - [0.71086498, 0, 0], - ], - [ - [0, 0.17152445, 0.06989294], - [0.59578079, 0.03738921, 0], - ], - [ - [0.49596023, 0, 0.56062833], - [0.31457122, 0.19126629, 16], - ], - ] - * 4 # shape (16, 2, 3) +from meerkat import TensorColumn +from meerkat.block.numpy_block import NumpyBlock + +from .abstract import AbstractColumnTestBed, TestAbstractColumn + + +class TensorColumnTestBed(AbstractColumnTestBed): + + DEFAULT_CONFIG = { + "num_dims": [1, 2, 3], + "dim_length": [1, 5], + "dtype": ["float", "int"], + } + + def __init__( + self, + length: int = 16, + num_dims: int = True, + dim_length: int = 5, + dtype="float", + seed: int = 123, + tmpdir: str = None, + ): + self.dtype = dtype + np.random.seed(seed) + array = ( + np.random.random((length, *[dim_length for _ in range(num_dims - 1)])) * 10 ) - else: - array = np.array([0.3969655, 23.26084479, 0, 123] * 4) - array = array.astype(dtype) - col = TensorColumn(array) + array = torch.tensor(array).to({"int": torch.int, "float": torch.float}[dtype]) + + self.col = TensorColumn(array) + self.data = array + + def get_map_spec( + self, + batched: bool = True, + materialize: bool = False, + salt: int = 1, + kwarg: int = 0, + ): + return { + "fn": lambda x, k=0: x + salt + k, + "expected_result": TensorColumn(self.col.data + salt + kwarg), + } - if use_visible_rows: - visible_rows = [0, 4, 6, 10] - col.visible_rows = visible_rows - array = array[visible_rows] + def get_filter_spec( + self, + batched: bool = True, + materialize: bool = False, + salt: int = 1, + kwarg: int = 0, + ): + return { + "fn": lambda x, k=0: ( + (x > 3 + salt + k).to(dtype=bool) if batched else (x > 3 + salt + k) + ), + "expected_result": self.col[self.col.data > 3 + salt + kwarg], + } - return col, array + def get_data(self, index, materialize=True): + return self.data[index] + @staticmethod + def assert_data_equal(data1: np.ndarray, data2: np.ndarray): + assert (data1 == data2).all() -def test_from_array(): - # Build a dataset from a batch - array = np.random.rand(10, 3, 3) - col = TensorColumn(array) - assert (col == array).all() - np_test.assert_equal(len(col), 10) +@pytest.fixture +def testbed(request, tmpdir): + testbed_class, config = request.param + return testbed_class(**config, tmpdir=tmpdir) -@pytest.mark.parametrize( - "dtype,batched,use_kwargs", - product(["float", "int"], [True, False], [True, False]), -) -def test_map_return_single(dtype, batched, use_kwargs): - """`map`, single return,""" - col, array = _get_data( - dtype=dtype, - ) +class TestTensorColumn(TestAbstractColumn): - def func(x, bias=0): - out = x.type(torch.FloatTensor).mean(axis=-1) + bias - return out + __test__ = True + testbed_class: type = TensorColumnTestBed + column_class: type = TensorColumn - bias = 1 if use_kwargs else 0 - kwargs = {"bias": bias} if use_kwargs else {} + def test_init_block(self): + block_view = NumpyBlock(np.zeros((10, 10)))[0] + with pytest.raises(ValueError): + TensorColumn(block_view) - result = col.map( - func, batch_size=4, is_batched_fn=batched, output_type=TensorColumn, **kwargs - ) - assert isinstance(result, TensorColumn) - np_test.assert_equal(len(result), len(array)) - assert np.allclose(result.numpy(), array.mean(axis=-1) + bias) - - -@pytest.mark.parametrize( - "dtype, batched, use_kwargs", - product(["float", "int"], [True, False], [True, False]), -) -def test_map_return_multiple(dtype, batched, use_kwargs): - """`map`, multiple return.""" - col, array = _get_data( - dtype=dtype, - ) + def _get_data_to_set(self, testbed, data_index): + return torch.zeros_like(testbed.get_data(data_index)) - def func(x, bias=0): - return { - "mean": x.type(torch.FloatTensor).mean(axis=-1) + bias, - "std": x.type(torch.FloatTensor).std(axis=-1) + bias, - } + @TensorColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_set_item(self, testbed, index_type: type): + return super().test_set_item(testbed, index_type=index_type) - bias = 1 if use_kwargs else 0 - kwargs = {"bias": bias} if use_kwargs else {} - - result = col.map(func, batch_size=4, is_batched_fn=batched, **kwargs) - assert isinstance(result, DataPanel) - assert isinstance(result["std"], TensorColumn) - assert isinstance(result["mean"], TensorColumn) - np_test.assert_equal(len(result), len(array)) - assert np.allclose(result["mean"].numpy(), array.mean(axis=-1) + bias) - assert np.allclose(result["std"].numpy(), array.std(axis=-1, ddof=1) + bias) - - -@pytest.mark.parametrize( - "multiple_dim,dtype", - product([True, False], ["float", "int"]), -) -def test_set_item_1(multiple_dim, dtype): - col, array = _get_data( - multiple_dim=multiple_dim, - dtype=dtype, - ) - index = [0, 3] - not_index = [i for i in range(col.shape[0]) if i not in index] - col[index] = 0 - - assert (col[not_index] == array[not_index]).all() - assert (col[index] == 0).all() - - -@pytest.mark.parametrize( - "multiple_dim,dtype", - product([True, False], ["float", "int"]), -) -def test_set_item_2( - multiple_dim, - dtype, -): - col, array = _get_data( - multiple_dim=multiple_dim, - dtype=dtype, - ) - index = 0 - not_index = [i for i in range(col.shape[0]) if i != index] - col[index] = 0 - - assert (col[not_index] == array[not_index]).all() - assert (col[index] == 0).all() - - -@pytest.mark.parametrize( - "multiple_dim, dtype", - product([True, False], ["float", "int"]), -) -def test_pickle( - multiple_dim, - dtype, -): - # important for dataloader - col, _ = _get_data( - multiple_dim=multiple_dim, - dtype=dtype, - ) - buf = pickle.dumps(col) - new_col = pickle.loads(buf) - - assert isinstance(new_col, TensorColumn) - assert (col == new_col).all() - - -@pytest.mark.parametrize( - "multiple_dim, dtype", - product([True, False], ["float", "int"]), -) -def test_io( - tmp_path, - multiple_dim, - dtype, -): - # uses the tmp_path fixture which will provide a - # temporary directory unique to the test invocation, - # important for dataloader - col, _ = _get_data( - multiple_dim=multiple_dim, - dtype=dtype, - ) - path = os.path.join(tmp_path, "test") - col.write(path) - - new_col = TensorColumn.read(path) - - assert isinstance(new_col, TensorColumn) - assert (col == new_col).all() - - -@pytest.mark.parametrize( - "multiple_dim,dtype", - product([True, False], ["float", "int"]), -) -def test_copy( - multiple_dim, - dtype, -): - col, _ = _get_data( - multiple_dim=multiple_dim, - dtype=dtype, + @TensorColumnTestBed.parametrize(params={"index_type": [np.array]}) + def test_getitem(self, testbed, index_type: type): + return super().test_getitem(testbed, index_type=index_type) + + @TensorColumnTestBed.parametrize( + config={"num_dims": [1], "dim_length": [1]}, params={"batched": [True, False]} ) - col_copy = col.copy() + def test_filter_1(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_filter_1(testbed, batched, materialize=True) + + @TensorColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_multiple(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_multiple(testbed, batched, materialize=True) + + @TensorColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single(self, testbed: AbstractColumnTestBed, batched: bool): + return super().test_map_return_single(testbed, batched, materialize=True) + + @TensorColumnTestBed.parametrize(params={"batched": [True, False]}) + def test_map_return_single_w_kwarg( + self, testbed: AbstractColumnTestBed, batched: bool + ): + return super().test_map_return_single_w_kwarg( + testbed, batched, materialize=True + ) + + @TensorColumnTestBed.parametrize() + def test_copy(self, testbed: AbstractColumnTestBed): + return super().test_copy(testbed) + + @TensorColumnTestBed.parametrize() + def test_io(self, tmp_path, testbed): + super().test_io(tmp_path, testbed) + + @TensorColumnTestBed.parametrize() + def test_pickle(self, testbed): + super().test_pickle(testbed) - assert isinstance(col_copy, TensorColumn) - assert (col == col_copy).all() + @TensorColumnTestBed.parametrize() + def test_to_tensor(self, testbed): + col, _ = testbed.col, testbed.data + tensor = col.to_tensor() -def test_tensor_ops(): - """Test prototype tensor operations on tensor columns.""" - col = TensorColumn(torch.ones(4, 3)) + assert torch.is_tensor(tensor) + assert (col == tensor.numpy()).all() - assert torch.all(torch.sum(col, dim=1) == 3) - assert torch.all(col.sum() == torch.sum(col)) - assert torch.all(col.sum(dim=1) == torch.sum(col, dim=1)) + @TensorColumnTestBed.parametrize() + def test_to_pandas(self, testbed): + series = testbed.col.to_pandas() - assert torch.cat([col, col]).shape == (8, 3) - assert torch.vstack([col, col]).shape == (8, 3) - assert torch.cat([col, col, col], dim=1).shape == (4, 9) + assert isinstance(series, pd.Series) - assert torch.stack([col, col], dim=0).shape == (2, 4, 3) + if testbed.col.shape == 1: + assert (series.values == testbed.col.data).all() + else: + for idx in range(len(testbed.col)): + assert (series.iloc[idx] == testbed.col[idx]).all() - chunk1, chunk2 = torch.chunk(col, chunks=2, dim=0) - assert chunk1.shape == (2, 3) - assert chunk2.shape == (2, 3) + @TensorColumnTestBed.parametrize() + def test_repr_pandas(self, testbed): + series = testbed.col.to_pandas() + assert isinstance(series, pd.Series) - col_nd = TensorColumn(torch.ones(4, 3, 5, 6)) - assert col_nd.permute(3, 2, 1, 0).shape == col_nd.shape[::-1] + def test_ufunc_unhandled(self): + a = TensorColumn([1, 2, 3]) + with pytest.raises(TypeError): + a == "a" diff --git a/tests/meerkat/test_datapanel.py b/tests/meerkat/test_datapanel.py index 7f08cc707..d47063db6 100644 --- a/tests/meerkat/test_datapanel.py +++ b/tests/meerkat/test_datapanel.py @@ -1,913 +1,736 @@ """Unittests for Datasets.""" import os import tempfile +from functools import wraps from itertools import product +from typing import Dict, Sequence import numpy as np import pandas as pd import pytest import torch import ujson as json -from PIL.Image import Image from meerkat import NumpyArrayColumn from meerkat.columns.abstract import AbstractColumn -from meerkat.columns.image_column import ImageColumn -from meerkat.columns.lambda_column import LambdaCell from meerkat.columns.list_column import ListColumn from meerkat.columns.pandas_column import PandasSeriesColumn from meerkat.columns.tensor_column import TensorColumn from meerkat.datapanel import DataPanel -from ..testbeds import MockDatapanel +from .columns.test_image_column import ImageColumnTestBed +from .columns.test_numpy_column import NumpyArrayColumnTestBed +from .columns.test_pandas_column import PandasSeriesColumnTestBed -def _get_datapanel(*args, **kwargs): - test_bed = MockDatapanel(length=16, *args, **kwargs) - return test_bed.dp, test_bed.visible_rows, test_bed.visible_columns +class DataPanelTestBed: - -def test_from_batch(): - # Build a dataset from a batch - datapanel = DataPanel.from_batch( - { - "a": [1, 2, 3], - "b": [True, False, True], - "c": ["x", "y", "z"], - "d": [{"e": 2}, {"e": 3}, {"e": 4}], - "e": torch.ones(3), - "f": np.ones(3), - }, - ) - assert set(datapanel.column_names) == {"a", "b", "c", "d", "e", "f", "index"} - assert len(datapanel) == 3 - - -def test_from_jsonl(): - # Build jsonl file - temp_f = tempfile.NamedTemporaryFile() - data = { - "a": [3.4, 2.3, 1.2], - "b": [[7, 9], [4], [1, 2]], - "c": ["the walk", "the talk", "blah"], + DEFAULT_CONFIG = { + "consolidated": [True, False], } - with open(temp_f.name, "w") as out_f: - for idx in range(3): - to_write = {k: data[k][idx] for k in list(data.keys())} - out_f.write(json.dumps(to_write) + "\n") - - dp_new = DataPanel.from_jsonl(temp_f.name) - assert dp_new.column_names == ["a", "b", "c", "index"] - # Skip index column - for k in data: - if isinstance(dp_new[k], NumpyArrayColumn): - data_to_compare = dp_new[k]._data.tolist() - else: - data_to_compare = dp_new[k]._data - assert data_to_compare == data[k] - temp_f.close() - -def test_from_csv(): - temp_f = tempfile.NamedTemporaryFile() - data = { - "a": [3.4, 2.3, 1.2], - "b": ["alpha", "beta", "gamma"], - "c": ["the walk", "the talk", "blah"], + DEFAULT_COLUMN_CONFIGS = { + "np": {"testbed_class": NumpyArrayColumnTestBed, "n": 2}, + "pd": {"testbed_class": PandasSeriesColumnTestBed, "n": 2}, + "img": {"testbed_class": ImageColumnTestBed, "n": 2}, } - pd.DataFrame(data).to_csv(temp_f.name) - - dp_new = DataPanel.from_csv(temp_f.name) - assert dp_new.column_names == ["Unnamed: 0", "a", "b", "c", "index"] - # Skip index column - for k in data: - if isinstance(dp_new[k], PandasSeriesColumn): - data_to_compare = dp_new[k]._data.tolist() - else: - data_to_compare = dp_new[k]._data - assert data_to_compare == data[k] - - -def test_col_index_single(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - # str index => single column () - index = "a" - col = dp[index] - assert isinstance(col, AbstractColumn) - # enforce that a single column index returns a coreference - assert col is dp._data["a"] - - -def test_col_index_multiple(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - # str index => single column () - index = ["a", "b"] - new_dp = dp[index] - assert isinstance(new_dp, DataPanel) - # enforce that a column index multiple returns a view of the old datapanel - for col_name in index: - assert new_dp._data[col_name] is not dp._data[col_name] - assert new_dp._data[col_name].data is dp._data[col_name].data - - -def test_row_index_single(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - # int index => single row (dict) - index = 2 - row = dp[index] - assert isinstance(row["img"], Image) - assert (np.array(row["img"]) == test_bed.img_col.image_arrays[index]).all() - assert row["a"] == index - assert row["b"] == index - - -@pytest.mark.parametrize( - "index_type,consolidate", - product([NumpyArrayColumn, PandasSeriesColumn, TensorColumn], [True, False]), -) -def test_row_index_multiple(tmpdir, index_type, consolidate): - length = 16 - rows = np.arange(length) - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - if consolidate: - dp.consolidate() - # slice index => multiple row selection (DataPanel) - # tuple or list index => multiple row selection (DataPanel) - # np.array indeex => multiple row selection (DataPanel) - for rows, indices in ( - (dp[1:3], rows[1:3]), - (dp[[0, 2]], rows[[0, 2]]), - (dp[index_type(np.array((0,)))], rows[np.array((0,))]), - (dp[index_type(np.array((1, 1)))], rows[np.array((1, 1))]), - ( - dp[index_type(np.array((True, False) * (length // 2)))], - rows[np.array((True, False) * (length // 2))], - ), - # ( - # dp[index_type(dp["a"].data % 2 == 0)], - # rows[rows % 2 == 0], - # ), + def __init__( + self, + column_configs: Dict[str, AbstractColumn], + consolidated: bool = True, + length: int = 16, + tmpdir: str = None, ): - assert isinstance(rows["img"], ListColumn) - assert (rows["a"].data == indices).all() - assert (rows["b"].data == indices).all() - assert (rows["d"].data == torch.tensor(indices)).all() - assert (rows["e"].data.values == indices).all() - assert (rows["f"].data == 1).all() - assert len(rows["f"].shape) == 2 - assert (rows["g"].data == 1).all() - assert len(rows["g"].shape) == 2 - - -def test_row_lz_index_single(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - # int index => single row (dict) - index = 2 - row = dp.lz[index] - assert isinstance(row["img"], LambdaCell) - assert str(row["img"].data) == test_bed.img_col.image_paths[index] - assert row["a"] == index - assert row["b"] == index - - -@pytest.mark.parametrize( - "index_type,consolidate", - product([NumpyArrayColumn, PandasSeriesColumn, TensorColumn], [True, False]), -) -def test_row_lz_index_multiple(tmpdir, index_type, consolidate): - length = 16 - rows = np.arange(length) - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - if consolidate: - dp.consolidate() - # slice index => multiple row selection (DataPanel) - # tuple or list index => multiple row selection (DataPanel) - # np.array indeex => multiple row selection (DataPanel) - for rows, indices in ( - (dp.lz[1:3], rows[1:3]), - (dp.lz[[0, 2]], rows[[0, 2]]), - (dp.lz[index_type(np.array((0,)))], rows[np.array((0,))]), - (dp.lz[index_type(np.array((1, 1)))], rows[np.array((1, 1))]), - ( - dp.lz[index_type(np.array((True, False) * (length // 2)))], - rows[np.array((True, False) * (length // 2))], - ), - ( - dp.lz[index_type(dp["a"].data % 2 == 0)], - rows[rows % 2 == 0], - ), - ): - assert isinstance(rows["img"], ImageColumn) - assert list(map(lambda x: x, rows["img"].data)) == [ - test_bed.img_col.image_paths[i] for i in indices - ] - assert (rows["a"].data == indices).all() - assert (rows["b"].data == indices).all() - - -def test_col_indexing_view_copy_semantics(tmpdir): - testbed = MockDatapanel(length=16, include_image_column=True, tmpdir=tmpdir) - dp = testbed.dp - - # Columns (1): Indexing a single column (i.e. with a str) returns the underlying - # AbstractColumn object directly. In the example below col1 and col2 are - # coreferences of the same column. - for name in dp.columns: - dp[name] is dp[name] - - # Columns (2): Indexing multiple columns (i.e. with Sequence[str]) returns a view of - # the DataPanel holding views to the columns in the original DataPanel. This means - # the AbstractColumn objects held in the new DataPanel are the same AbstractColumn - # objects held in the original DataPanel. - view_dp = dp[["a", "b"]] - for name in view_dp.columns: - dp[name] is not view_dp[name] - dp[name].data is dp[name].data - - -def test_row_indexing_view_copy_semantics(tmpdir): - testbed = MockDatapanel(length=16, include_image_column=True, tmpdir=tmpdir) - dp = testbed.dp - - # slice index - dp2 = dp[:8] - col = "a" - assert isinstance(dp2[col], NumpyArrayColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - assert dp[col].data is dp2[col].data.base - - col = "d" - assert isinstance(dp2[col], TensorColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - # note `data_ptr` checks whether the tensors have the same memory address of the - # first element, so this would not work if the slice didn't start at 0 - assert dp[col].data.data_ptr() == dp2[col].data.data_ptr() - - col = "e" - assert isinstance(dp2[col], PandasSeriesColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - # TODO (sabri): Figure out pandas copying behavior, it's not clear how it works and - # this deserves a deeper investigation. - # assert dp[col].data.values.base is dp2[col].data.values.base - - # slice index - dp2 = dp[np.array([0, 1, 2, 5])] - col = "a" - assert isinstance(dp2[col], NumpyArrayColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - assert dp[col].data is not dp2[col].data.base - - col = "d" - assert isinstance(dp2[col], TensorColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - # note `data_ptr` checks whether the tensors have the same memory address of the - # first element, so this would not work if the slice didn't start at 0 - assert dp[col].data.data_ptr() != dp2[col].data.data_ptr() - - col = "e" - assert isinstance(dp2[col], PandasSeriesColumn) - assert dp[col] is not dp2[col] - assert dp[col].data is not dp2[col].data - assert dp[col].data.values is not dp2[col].data.values.base - - -@pytest.mark.parametrize( - "use_visible_columns, use_input_columns, num_workers", - product([True, False], [True, False], [0, 2]), -) -def test_map_1(use_visible_columns, use_input_columns, num_workers): - """`map`, mixed datapanel, single return, `is_batched_fn=True`""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns - ) - input_columns = ["a", "b"] if use_input_columns else None - - def func(x): - if use_input_columns: - assert x.visible_columns == ["a", "b"] - out = (x["a"] + np.array(x["b"])) * 2 - return out - - rows = np.arange(16) - result = dp.map( - func, - batch_size=4, - is_batched_fn=True, - num_workers=num_workers, - input_columns=input_columns, - ) - assert isinstance(result, NumpyArrayColumn) - assert len(result) == len(rows) - assert (result == np.array(rows) * 4).all() - - -@pytest.mark.parametrize("num_workers", [0, 2]) -def test_map_2(num_workers): - """`map`, mixed datapanel, return multiple, `is_batched_fn=True`""" - dp, visible_rows, visible_columns = _get_datapanel(use_visible_columns=False) - - def func(x): - out = { - "x": (x["a"] + np.array(x["b"])) * 2, - "y": np.array([x["c"][i]["a"] for i in range(len(x["c"]))]), - } - return out - - if visible_rows is None: - visible_rows = np.arange(16) - result = dp.map( - func, - batch_size=4, - is_batched_fn=True, - num_workers=num_workers, - ) - assert isinstance(result, DataPanel) - assert len(result["x"]) == len(visible_rows) - assert len(result["y"]) == len(visible_rows) - assert (result["x"] == np.array(visible_rows) * 4).all() - assert (result["y"] == np.ones(len(visible_rows)) * 2).all() - - -@pytest.mark.parametrize("use_output_type", ([True, False])) -def test_map_3(use_output_type): - """`map`, mixed datapanel, return multiple, `is_batched_fn=True`, - func has additional arguments, specify output types""" - dp, visible_rows, visible_columns = _get_datapanel(use_visible_columns=False) - - # func has additional arguments - def func(x, factor): - out = { - "x": (x["a"] + np.array(x["b"])) * factor, - "y": np.array([x["c"][i]["a"] for i in range(len(x["c"]))]), + self.column_testbeds = {} + + def _get_tmpdir(name): + path = os.path.join(tmpdir, name) + os.makedirs(path) + return path + + for name, config in column_configs.items(): + params = config["testbed_class"].get_params(**config.get("kwargs", {})) + self.column_testbeds.update( + { + f"{name}_{col_id}_{idx}": config["testbed_class"]( + **col_config[1], + seed=idx, + length=length, + tmpdir=_get_tmpdir(f"{name}_{col_id}_{idx}"), + ) + for idx in range(config["n"]) + for col_config, col_id in zip(params["argvalues"], params["ids"]) + } + ) + + self.columns = { + name: testbed.col for name, testbed in self.column_testbeds.items() } - return out - - if visible_rows is None: - visible_rows = np.arange(16) - - # Specify output_types for returned columns - output_type = ( - {"x": ListColumn, "y": PandasSeriesColumn} if use_output_type else None - ) + self.dp = DataPanel.from_batch(self.columns) - result = dp.map( - func, - batch_size=4, - is_batched_fn=True, - num_workers=0, - output_type=output_type, - factor=2, - ) - assert isinstance(result, DataPanel) - assert len(result["x"]) == len(visible_rows) - assert len(result["y"]) == len(visible_rows) - assert (result["x"] == np.array(visible_rows) * 4).all() - assert (result["y"] == np.ones(len(visible_rows)) * 2).all() - - -@pytest.mark.parametrize( - "use_visible_columns,use_input_columns,batched", - product([True, False], [True, False], [True, False]), -) -def test_update_1(use_visible_columns, use_input_columns, batched): - """`update`, mixed datapanel, return single, new columns.""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns, - ) - input_columns = ["a", "b"] if use_input_columns else None - - # mixed datapanel (i.e. has multiple colummn types) - def func(x): - if use_input_columns: - if batched: - assert set(x.all_columns) == set(input_columns) - else: - assert set(x.keys()) == set(input_columns) - out = {"x": (x["a"] + np.array(x["b"])) * 2} - return out - - if visible_rows is None: - visible_rows = np.arange(16) - - result = dp.update( - func, - batch_size=4, - is_batched_fn=batched, - num_workers=0, - input_columns=input_columns, - ) - assert isinstance(result, DataPanel) - assert set(result.visible_columns) == set(visible_columns + ["x"]) - assert len(result["x"]) == len(visible_rows) - assert (result["x"] == np.array(visible_rows) * 4).all() - - -@pytest.mark.parametrize( - "use_visible_columns,use_input_columns,batched", - product([True, False], [True, False], [True, False]), -) -def test_update_2(use_visible_columns, use_input_columns, batched): - """`update`, mixed datapanel, return multiple, new columns, - `is_batched_fn=True`""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns - ) + if consolidated: + self.dp.consolidate() - def func(x): - out = { - "x": (x["a"] + np.array(x["b"])) * 2, - "y": (x["a"] * 6), + @classmethod + def get_params( + cls, + config: dict = None, + column_configs: Sequence[Dict] = None, + params: dict = None, + ): + # produce all combinations of the config + updated_config = cls.DEFAULT_CONFIG.copy() + if config is not None: + updated_config.update(config) + configs = list( + map( + dict, + product(*[[(k, v) for v in vs] for k, vs in updated_config.items()]), + ) + ) + + # add the column_configs to every + if column_configs is None: + column_configs = cls.DEFAULT_COLUMN_CONFIGS.copy() + for config in configs: + config["column_configs"] = column_configs + + if params is None: + return { + "argnames": "testbed", + "argvalues": configs, + "ids": [str(config) for config in configs], + } + else: + argvalues = list(product(configs, *params.values())) + return { + "argnames": "testbed," + ",".join(params.keys()), + "argvalues": argvalues, + "ids": [",".join(map(str, values)) for values in argvalues], + } + + @classmethod + @wraps(pytest.mark.parametrize) + def parametrize(cls, config: dict = None, params: dict = None): + return pytest.mark.parametrize( + **cls.get_params(config=config, params=params), indirect=["testbed"] + ) + + +@pytest.fixture +def testbed(request, tmpdir): + config = request.param + return DataPanelTestBed(**config, tmpdir=tmpdir) + + +class TestDataPanel: + + testbed_class: type = DataPanelTestBed + dp_class: type = DataPanel + + @DataPanelTestBed.parametrize() + def test_col_index_single(self, testbed): + dp = testbed.dp + + # str index => single column () + for name in testbed.columns: + index = name + col = dp[index] + assert isinstance(col, AbstractColumn) + # enforce that a single column index returns a coreference + assert col is dp._data[index] + + @DataPanelTestBed.parametrize() + def test_col_index_multiple(self, testbed): + dp = testbed.dp + + # str index => single column () + columns = list(testbed.columns) + for excluded_column in columns: + index = [c for c in columns if c != excluded_column] + new_dp = dp[index] + assert isinstance(new_dp, DataPanel) + + # enforce that a column index multiple returns a view of the old datapanel + for col_name in index: + assert new_dp._data[col_name] is not dp._data[col_name] + assert new_dp._data[col_name].data is dp._data[col_name].data + + @DataPanelTestBed.parametrize() + def test_row_index_single(self, testbed): + dp = testbed.dp + + # int index => single row (dict) + index = 2 + row = dp[index] + assert isinstance(row, dict) + + for key, value in row.items(): + if key == "index": + # TODO(Sabri): remove this when we change the index functionality + continue + col_testbed = testbed.column_testbeds[key] + col_testbed.assert_data_equal(value, col_testbed.get_data(index)) + + @DataPanelTestBed.parametrize( + params={ + "index_type": [ + np.array, + # pd.Series, + # torch.Tensor, + NumpyArrayColumn, + PandasSeriesColumn, + TensorColumn, + ] } - return out - - if visible_rows is None: - visible_rows = np.arange(16) - - input_columns = ["a", "b"] if use_input_columns else None - result = dp.update( - func, - batch_size=4, - is_batched_fn=batched, - num_workers=0, - input_columns=input_columns, ) - assert isinstance(result, DataPanel) - assert set(result.visible_columns) == set(visible_columns + ["x", "y"]) - assert len(result["x"]) == len(visible_rows) - assert len(result["y"]) == len(visible_rows) - assert (result["x"] == np.array(visible_rows) * 4).all() - assert (result["y"] == np.array(visible_rows) * 6).all() - - -@pytest.mark.parametrize( - "use_visible_columns,use_input_columns,batched", - product([True, False], [True, False], [True, False]), -) -def test_update_3(use_visible_columns, use_input_columns, batched): - """`update`, mixed datapanel, return multiple, replace existing column, - `is_batched_fn=True`""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns - ) - - def func(x): - out = { - "a": (x["a"] + np.array(x["b"])) * 2, - "y": (x["a"] * 6), + def test_row_index_multiple(self, testbed, index_type): + dp = testbed.dp + rows = np.arange(len(dp)) + + # slice index => multiple row selection (DataPanel) + # tuple or list index => multiple row selection (DataPanel) + # np.array indeex => multiple row selection (DataPanel) + for rows, indices in ( + (dp[1:3], rows[1:3]), + (dp[[0, 2]], rows[[0, 2]]), + (dp[index_type(np.array((0,)))], rows[np.array((0,))]), + (dp[index_type(np.array((1, 1)))], rows[np.array((1, 1))]), + ( + dp[index_type(np.array((True, False) * (len(dp) // 2)))], + rows[np.array((True, False) * (len(dp) // 2))], + ), + ): + assert isinstance(rows, DataPanel) + for key, value in rows.items(): + if key == "index": + # TODO(Sabri): remove this when we change the index functionality + continue + col_testbed = testbed.column_testbeds[key] + data = col_testbed.get_data(indices) + col_testbed.assert_data_equal(value.data, data) + + if value.__class__ == dp[key].__class__: + # if the getitem returns a column of the same type, enforce that all + # attributes were cloned over appropriately. We don't want to check + # for columns that return columns of different type from getitem + # (e.g. LambdaColumn) + assert dp[key]._clone(data=data).is_equal(value) + + @DataPanelTestBed.parametrize() + def test_row_lz_index_single(self, testbed): + dp = testbed.dp + + # int index => single row (dict) + index = 2 + row = dp.lz[index] + assert isinstance(row, dict) + + for key, value in row.items(): + if key == "index": + # TODO(Sabri): remove this when we change the index functionality + continue + col_testbed = testbed.column_testbeds[key] + col_testbed.assert_data_equal( + value, col_testbed.get_data(index, materialize=False) + ) + + @DataPanelTestBed.parametrize( + params={ + "index_type": [ + np.array, + # pd.Series, + # torch.Tensor, + NumpyArrayColumn, + PandasSeriesColumn, + TensorColumn, + ] } - return out - - if visible_rows is None: - visible_rows = np.arange(16) - - input_columns = ["a", "b"] if use_input_columns else None - result = dp.update( - func, - batch_size=4, - is_batched_fn=batched, - num_workers=0, - input_columns=input_columns, ) - assert isinstance(result, DataPanel) - assert set(result.visible_columns) == set(visible_columns + ["y"]) - assert len(result["a"]) == len(visible_rows) - assert len(result["y"]) == len(visible_rows) - assert (result["a"] == np.array(visible_rows) * 4).all() - assert (result["y"] == np.array(visible_rows) * 6).all() - - -@pytest.mark.parametrize("use_output_type", ([True, False])) -def test_update_4(use_output_type): - """`update`, mixed datapanel, return multiple, `is_batched_fn=True`, - func has additional arguments, specify output types""" - dp, visible_rows, visible_columns = _get_datapanel(use_visible_columns=False) - - # func has additional arguments - def func(x, factor): - out = { - "x": (x["a"] + np.array(x["b"])) * factor, - "y": np.array([x["c"][i]["a"] for i in range(len(x["c"]))]), + def test_row_lz_index_multiple(self, testbed, index_type): + dp = testbed.dp + rows = np.arange(len(dp)) + + # slice index => multiple row selection (DataPanel) + # tuple or list index => multiple row selection (DataPanel) + # np.array indeex => multiple row selection (DataPanel) + for rows, indices in ( + (dp.lz[1:3], rows[1:3]), + (dp.lz[[0, 2]], rows[[0, 2]]), + (dp.lz[index_type(np.array((0,)))], rows[np.array((0,))]), + (dp.lz[index_type(np.array((1, 1)))], rows[np.array((1, 1))]), + ( + dp.lz[index_type(np.array((True, False) * (len(dp) // 2)))], + rows[np.array((True, False) * (len(dp) // 2))], + ), + ): + assert isinstance(rows, DataPanel) + for key, value in rows.items(): + if key == "index": + # TODO(Sabri): remove this when we change the index functionality + continue + col_testbed = testbed.column_testbeds[key] + data = col_testbed.get_data(indices, materialize=False) + col_testbed.assert_data_equal(value.data, data) + + # if the getitem returns a column of the same type, enforce that all the + # attributes were cloned over appropriately. We don't want to check this + # for columns that return columns of different type from getitem + # (e.g. LambdaColumn) + if value.__class__ == dp[key].__class__: + assert dp[key]._clone(data=data).is_equal(value) + + @DataPanelTestBed.parametrize() + def test_col_indexing_view_copy_semantics(self, testbed): + dp = testbed.dp + + # Columns (1): Indexing a single column (i.e. with a str) returns the underlying + # AbstractColumn object directly. In the example below col1 and col2 are + # coreferences of the same column. + for name in dp.columns: + dp[name] is dp[name] + + # Columns (2): Indexing multiple columns (i.e. with Sequence[str]) returns a + # view of the DataPanel holding views to the columns in the original DataPanel. + # This means the AbstractColumn objects held in the new DataPanel are the same + # AbstractColumn objects held in the original DataPanel. + columns = list(testbed.columns) + for excluded_column in columns: + index = [c for c in columns if c != excluded_column] + view_dp = dp[index] + for name in view_dp.columns: + dp[name] is not view_dp[name] + dp[name].data is dp[name].data + + def test_row_indexing_view_copy_semantics(self): + length = 16 + batch = { + "a": np.arange(length), + "b": ListColumn(np.arange(length)), + "c": [{"a": 2}] * length, + "d": torch.arange(length), + # offset the index to test robustness to nonstandard indices + "e": pd.Series(np.arange(length), index=np.arange(1, 1 + length)), + # test multidimensional + "f": np.ones((length, 5)).astype(int), + "g": torch.ones(length, 5).to(int), + } + dp = DataPanel.from_batch(batch) + + # slice index + dp2 = dp[:8] + col = "a" + assert isinstance(dp2[col], NumpyArrayColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + assert dp[col].data is dp2[col].data.base + + col = "d" + assert isinstance(dp2[col], TensorColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + # note `data_ptr` checks whether the tensors have the same memory address of the + # first element, so this would not work if the slice didn't start at 0 + assert dp[col].data.data_ptr() == dp2[col].data.data_ptr() + + col = "e" + assert isinstance(dp2[col], PandasSeriesColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + # TODO (sabri): Figure out pandas copying behavior, it's not clear how it works + # and this deserves a deeper investigation. + # assert dp[col].data.values.base is dp2[col].data.values.base + + # slice index + dp2 = dp[np.array([0, 1, 2, 5])] + col = "a" + assert isinstance(dp2[col], NumpyArrayColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + assert dp[col].data is not dp2[col].data.base + + col = "d" + assert isinstance(dp2[col], TensorColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + # note `data_ptr` checks whether the tensors have the same memory address of the + # first element, so this would not work if the slice didn't start at 0 + assert dp[col].data.data_ptr() != dp2[col].data.data_ptr() + + col = "e" + assert isinstance(dp2[col], PandasSeriesColumn) + assert dp[col] is not dp2[col] + assert dp[col].data is not dp2[col].data + assert dp[col].data.values is not dp2[col].data.values.base + + @DataPanelTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} + ) + def test_map_return_multiple( + self, testbed: DataPanelTestBed, batched: bool, materialize: bool + ): + dp = testbed.dp + map_specs = { + name: col_testbed.get_map_spec( + batched=batched, materialize=materialize, salt=1 + ) + for name, col_testbed in testbed.column_testbeds.items() } - return out - - if visible_rows is None: - visible_rows = np.arange(16) - - # Specify output_types for returned columns - output_type = ( - {"x": ListColumn, "y": PandasSeriesColumn} if use_output_type else None - ) - result = dp.update( - func, - batch_size=4, - is_batched_fn=True, - num_workers=0, - output_type=output_type, - factor=2, - ) - assert isinstance(result, DataPanel) - assert len(result["x"]) == len(visible_rows) - assert len(result["y"]) == len(visible_rows) - assert (result["x"] == np.array(visible_rows) * 4).all() - assert (result["y"] == np.ones(len(visible_rows)) * 2).all() - - -@pytest.mark.parametrize( - "use_visible_columns,batched", - product([True, False], [True, False]), -) -def test_filter_1(use_visible_columns, batched): - """`filter`, mixed datapanel.""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns + def func(x): + out = {key: map_spec["fn"](x[key]) for key, map_spec in map_specs.items()} + return out + + result = dp.map( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type={ + key: map_spec["output_type"] + for key, map_spec in map_specs.items() + if "output_type" in map_spec + }, + ) + assert isinstance(result, DataPanel) + for key, map_spec in map_specs.items(): + assert result[key].is_equal(map_spec["expected_result"]) + + @DataPanelTestBed.parametrize( + params={ + "batched": [True, False], + "materialize": [True, False], + "num_workers": [0], + "use_kwargs": [True, False], + } ) + def test_map_return_single( + self, + testbed: DataPanelTestBed, + batched: bool, + materialize: bool, + num_workers: int, + use_kwargs: bool, + ): + dp = testbed.dp + kwargs = {"kwarg": 2} if use_kwargs else {} + name = list(testbed.column_testbeds.keys())[0] + map_spec = testbed.column_testbeds[name].get_map_spec( + batched=batched, materialize=materialize, salt=1, **kwargs + ) + + def func(x, kwarg=0): + out = map_spec["fn"](x[name], k=kwarg) + return out + + result = dp.map( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + num_workers=num_workers, + **kwargs, + ) + assert isinstance(result, AbstractColumn) + assert result.is_equal(map_spec["expected_result"]) + + @DataPanelTestBed.parametrize(config={"consolidated": [True]}) + def test_map_return_single_multi_worker( + self, + testbed: DataPanelTestBed, + ): + self.test_map_return_single( + testbed, batched=True, materialize=True, num_workers=2, use_kwargs=False + ) - def func(x): - return (x["a"] % 2) == 0 - - result = dp.filter(func, batch_size=4, is_batched_fn=batched, num_workers=0) - if visible_rows is None: - visible_rows = np.arange(16) - - assert isinstance(result, DataPanel) - new_len = (np.array(visible_rows) % 2 == 0).sum() - assert len(result) == new_len - for col in result._data.values(): - assert len(col) == new_len - - # old datapane unchanged - old_len = len(visible_rows) - assert len(dp) == old_len - for col in dp._data.values(): - # important to check that the column lengths are correct as well - assert len(col) == old_len - - assert result.visible_columns == dp.visible_columns - - -@pytest.mark.parametrize( - "use_visible_columns,batched", - product([True, False], [True, False]), -) -def test_filter_kwargs(use_visible_columns, batched): - """`filter`, mixed datapanel, func has additional arguments""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns + @DataPanelTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} ) + def test_map_update_new( + self, testbed: DataPanelTestBed, batched: bool, materialize: bool + ): + dp = testbed.dp + map_specs = { + name: col_testbed.get_map_spec( + batched=batched, materialize=materialize, salt=1 + ) + for name, col_testbed in testbed.column_testbeds.items() + } - def func(x, factor): - return (x["a"] % factor) == 0 + def func(x): + out = { + f"{key}_new": map_spec["fn"](x[key]) + for key, map_spec in map_specs.items() + } + return out + + result = dp.update( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type={ + f"{key}_new": map_spec["output_type"] + for key, map_spec in map_specs.items() + if "output_type" in map_spec + }, + ) + assert set(result.columns) == set(dp.columns) | { + f"{key}_new" for key in dp.columns if key != "index" + } + assert isinstance(result, DataPanel) + for key, map_spec in map_specs.items(): + assert result[f"{key}_new"].is_equal(map_spec["expected_result"]) - result = dp.filter( - func, batch_size=4, is_batched_fn=batched, num_workers=0, factor=2 + @DataPanelTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} ) - if visible_rows is None: - visible_rows = np.arange(16) - - assert isinstance(result, DataPanel) - new_len = (np.array(visible_rows) % 2 == 0).sum() - assert len(result) == new_len - for col in result._data.values(): - assert len(col) == new_len - - # old datapane unchanged - old_len = len(visible_rows) - assert len(dp) == old_len - for col in dp._data.values(): - # important to check that the column lengths are correct as well - assert len(col) == old_len - - assert result.visible_columns == dp.visible_columns - - -def test_lz_map(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - visible_rows = ( - np.arange(length) - if test_bed.visible_rows is None - else np.array(test_bed.visible_rows) - ) - - def func(x): - assert isinstance(x["img"], ImageColumn) - return [str(filepath) for filepath in x["img"].data] - - result = dp.map(func, materialize=False, num_workers=0, is_batched_fn=True) - - assert isinstance(result, ListColumn) - assert result.data == [test_bed.img_col.image_paths[i] for i in visible_rows] + def test_map_update_existing( + self, testbed: DataPanelTestBed, batched: bool, materialize: bool + ): + dp = testbed.dp + map_specs = { + name: col_testbed.get_map_spec( + batched=batched, materialize=materialize, salt=1 + ) + for name, col_testbed in testbed.column_testbeds.items() + } + def func(x): + out = { + f"{key}": map_spec["fn"](x[key]) for key, map_spec in map_specs.items() + } + return out + + result = dp.update( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + output_type={ + key: map_spec["output_type"] + for key, map_spec in map_specs.items() + if "output_type" in map_spec + }, + ) + assert set(result.columns) == set(dp.columns) + assert result.data is not dp.data + assert isinstance(result, DataPanel) + for key, map_spec in map_specs.items(): + assert result[key].is_equal(map_spec["expected_result"]) + + @DataPanelTestBed.parametrize( + params={"batched": [True, False], "materialize": [True, False]} + ) + def test_filter(self, testbed: DataPanelTestBed, batched: bool, materialize: bool): + dp = testbed.dp + name = list(testbed.column_testbeds.keys())[0] + filter_spec = testbed.column_testbeds[name].get_filter_spec( + batched=batched, materialize=materialize, salt=1 + ) + + def func(x): + out = filter_spec["fn"](x[name]) + return out + + result = dp.filter( + func, + batch_size=4, + is_batched_fn=batched, + materialize=materialize, + ) + assert isinstance(result, DataPanel) + result[name].is_equal(filter_spec["expected_result"]) + + def test_remove_column(self): + a = np.arange(16) + b = np.arange(16) * 2 + dp = DataPanel.from_batch({"a": a, "b": b}) + assert "a" in dp + dp.remove_column("a") + assert "a" not in dp + + def test_overwrite_column(self): + # make sure we remove the column when overwriting it + a = np.arange(16) + b = np.arange(16) * 2 + dp = DataPanel.from_batch({"a": a, "b": b}) + assert "a" in dp + assert dp[["a", "b"]]["a"]._data is a + # testing removal from block manager, so important to use non-blockable type + dp["a"] = ListColumn(range(16)) + assert dp[["a", "b"]]["a"]._data is not a + # check that there are no duplicate columns + assert set(dp.columns) == set(["a", "b", "index"]) + + @DataPanelTestBed.parametrize() + def test_io(self, testbed, tmp_path): + """`map`, mixed datapanel, return multiple, `is_batched_fn=True`""" + dp = testbed.dp + path = os.path.join(tmp_path, "test") + dp.write(path) + new_dp = DataPanel.read(path) + + assert isinstance(new_dp, DataPanel) + assert dp.columns == new_dp.columns + assert len(new_dp) == len(dp) + for name in dp.columns: + assert new_dp[name].is_equal(dp[name]) + + @DataPanelTestBed.parametrize() + def test_repr_html_(self, testbed): + testbed.dp._repr_html_() + + def test_append_columns(self): + length = 16 + batch = { + "a": np.arange(length), + "b": ListColumn(np.arange(length)), + "c": [{"a": 2}] * length, + "d": torch.arange(length), + # offset the index to test robustness to nonstandard indices + "e": pd.Series(np.arange(length), index=np.arange(1, 1 + length)), + # test multidimensional + "f": np.ones((length, 5)).astype(int), + "g": torch.ones(length, 5).to(int), + } + dp = DataPanel.from_batch(batch) -def test_lz_filter(tmpdir): - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - visible_rows = ( - np.arange(length) - if test_bed.visible_rows is None - else np.array(test_bed.visible_rows) - ) + out = dp.append(dp, axis="rows") - def func(x): - # see `MockImageColumn` for filepath naming logic - return (int(str(x["img"].data).split("/")[-1].split(".")[0]) % 2) == 0 - - result = dp.filter(func, is_batched_fn=False, num_workers=0, materialize=False) - - assert isinstance(result, DataPanel) - new_len = (np.array(visible_rows) % 2 == 0).sum() - assert len(result) == new_len - for col in result._data.values(): - assert len(col) == new_len - - # old datapane unchanged - old_len = len(visible_rows) - assert len(dp) == old_len - for col in dp._data.values(): - # important to check that the column lengths are correct as well - assert len(col) == old_len - - assert result.visible_columns == dp.visible_columns - - -def test_lz_update( - tmpdir, -): - """`update`, mixed datapanel, return single, new columns, - `is_batched_fn=True`""" - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - tmpdir=tmpdir, - ) - dp = test_bed.dp - visible_rows = ( - np.arange(length) - if test_bed.visible_rows is None - else np.array(test_bed.visible_rows) - ) + assert len(out) == len(dp) * 2 + assert isinstance(out, DataPanel) + assert set(out.visible_columns) == set(dp.visible_columns) + assert (out["a"].data == np.concatenate([np.arange(length)] * 2)).all() + assert out["b"].data == list(np.concatenate([np.arange(length)] * 2)) - def func(x): - out = {"x": str(x["img"].data)} - return out + @DataPanelTestBed.parametrize() + def test_tail(self, testbed): + dp = testbed.dp - result = dp.update( - func, batch_size=4, is_batched_fn=False, num_workers=0, materialize=False - ) - assert set(result.column_names) == set( - ["a", "b", "c", "d", "e", "f", "g", "x", "img", "index"] - ) - assert len(result["x"]) == len(visible_rows) - assert result["x"].data == [test_bed.img_col.image_paths[i] for i in visible_rows] - - -@pytest.mark.parametrize( - "use_visible_columns,batched", - product([True, False], [True, False]), -) -def test_filter_2(use_visible_columns, batched): - """`filter`, mixed datapanel.""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns - ) + new_dp = dp.tail(n=2) - def func(x): - return (x["a"] % 2) == 0 - - result = dp.filter(func, batch_size=4, is_batched_fn=batched) - if visible_rows is None: - visible_rows = np.arange(16) - - assert isinstance(result, DataPanel) - new_len = (np.array(visible_rows) % 2 == 0).sum() - assert len(result) == new_len - for col in result._data.values(): - assert len(col) == new_len - - # old datapane unchanged - old_len = len(visible_rows) - assert len(dp) == old_len - for col in dp._data.values(): - # important to check that the column lengths are correct as well - assert len(col) == old_len - - assert result.visible_columns == dp.visible_columns - - -def test_remove_column(): - a = np.arange(16) - b = np.arange(16) * 2 - dp = DataPanel.from_batch({"a": a, "b": b}) - assert "a" in dp - dp.remove_column("a") - assert "a" not in dp - - -def test_overwrite_column(): - # make sure we remove the column when overwriting it - a = np.arange(16) - b = np.arange(16) * 2 - dp = DataPanel.from_batch({"a": a, "b": b}) - assert "a" in dp - assert dp[["a", "b"]]["a"]._data is a - # testing removal from block manager, so important to use non-blockable type here - dp["a"] = ListColumn(range(16)) - assert dp[["a", "b"]]["a"]._data is not a - - -@pytest.mark.parametrize("use_visible_columns", [True, False]) -def test_io(tmp_path, use_visible_columns): - """`map`, mixed datapanel, return multiple, `is_batched_fn=True`""" - dp, visible_rows, visible_columns = _get_datapanel( - use_visible_columns=use_visible_columns - ) - path = os.path.join(tmp_path, "test") - dp.write(path) + assert isinstance(new_dp, DataPanel) + assert new_dp.visible_columns == dp.visible_columns + assert len(new_dp) == 2 - new_dp = DataPanel.read(path) + @DataPanelTestBed.parametrize() + def test_head(self, testbed): + dp = testbed.dp - assert isinstance(new_dp, DataPanel) - assert len(new_dp) == len(dp) - assert len(new_dp["a"]) == len(dp["a"]) - assert len(new_dp["b"]) == len(dp["b"]) - if not use_visible_columns: - assert len(new_dp["c"]) == len(dp["c"]) + new_dp = dp.head(n=2) - assert (dp["a"] == new_dp["a"]).all() - assert dp["b"].data == new_dp["b"].data + assert isinstance(new_dp, DataPanel) + assert new_dp.visible_columns == dp.visible_columns + assert len(new_dp) == 2 - assert dp.visible_columns == new_dp.visible_columns + class DataPanelSubclass(DataPanel): + """Mock class to test that ops on subclass returns subclass.""" + pass -def test_repr_html_(): - dp, visible_rows, visible_columns = _get_datapanel(use_visible_columns=False) - dp._repr_html_() + def test_subclass(self): + dp1 = self.DataPanelSubclass.from_dict( + {"a": np.arange(3), "b": ["may", "jun", "jul"]} + ) + dp2 = self.DataPanelSubclass.from_dict( + {"c": np.arange(3), "d": ["2021", "2022", "2023"]} + ) + assert isinstance(dp1.lz[np.asarray([0, 1])], self.DataPanelSubclass) + assert isinstance(dp1.lz[:2], self.DataPanelSubclass) + assert isinstance(dp1[:2], self.DataPanelSubclass) -@pytest.mark.parametrize( - "use_visible_columns", - product([True, False]), -) -def test_to_pandas(tmpdir, use_visible_columns): - import pandas as pd + assert isinstance( + dp1.merge(dp2, left_on="a", right_on="c"), self.DataPanelSubclass + ) + assert isinstance(dp1.append(dp1), self.DataPanelSubclass) - length = 16 - test_bed = MockDatapanel( - length=length, - include_image_column=True, - use_visible_columns=use_visible_columns, - tmpdir=tmpdir, - ) - dp = test_bed.dp + def test_from_csv(self): + temp_f = tempfile.NamedTemporaryFile() + data = { + "a": [3.4, 2.3, 1.2], + "b": ["alpha", "beta", "gamma"], + "c": ["the walk", "the talk", "blah"], + } + pd.DataFrame(data).to_csv(temp_f.name) + + dp_new = DataPanel.from_csv(temp_f.name) + assert dp_new.column_names == ["Unnamed: 0", "a", "b", "c", "index"] + # Skip index column + for k in data: + if isinstance(dp_new[k], PandasSeriesColumn): + data_to_compare = dp_new[k]._data.tolist() + else: + data_to_compare = dp_new[k]._data + assert data_to_compare == data[k] + + def test_from_jsonl(self): + # Build jsonl file + temp_f = tempfile.NamedTemporaryFile() + data = { + "a": [3.4, 2.3, 1.2], + "b": [[7, 9], [4], [1, 2]], + "c": ["the walk", "the talk", "blah"], + } + with open(temp_f.name, "w") as out_f: + for idx in range(3): + to_write = {k: data[k][idx] for k in list(data.keys())} + out_f.write(json.dumps(to_write) + "\n") + + dp_new = DataPanel.from_jsonl(temp_f.name) + assert dp_new.column_names == ["a", "b", "c", "index"] + # Skip index column + for k in data: + if isinstance(dp_new[k], NumpyArrayColumn): + data_to_compare = dp_new[k]._data.tolist() + else: + data_to_compare = dp_new[k]._data + assert data_to_compare == data[k] + temp_f.close() + + def test_from_batch(self): + # Build a dataset from a batch + datapanel = DataPanel.from_batch( + { + "a": [1, 2, 3], + "b": [True, False, True], + "c": ["x", "y", "z"], + "d": [{"e": 2}, {"e": 3}, {"e": 4}], + "e": torch.ones(3), + "f": np.ones(3), + }, + ) + assert set(datapanel.column_names) == {"a", "b", "c", "d", "e", "f", "index"} + assert len(datapanel) == 3 + + def test_to_pandas(self): + import pandas as pd + + length = 16 + batch = { + "a": np.arange(length), + "b": ListColumn(np.arange(length)), + "c": [{"a": 2}] * length, + "d": torch.arange(length), + # offset the index to test robustness to nonstandard indices + "e": pd.Series(np.arange(length), index=np.arange(1, 1 + length)), + # test multidimensional + "f": np.ones((length, 5)).astype(int), + "g": torch.ones(length, 5).to(int), + } + dp = DataPanel.from_batch(batch) - df = dp.to_pandas() - assert isinstance(df, pd.DataFrame) - assert list(df.columns) == dp.visible_columns - assert len(df) == len(dp) + df = dp.to_pandas() + assert isinstance(df, pd.DataFrame) + assert list(df.columns) == dp.visible_columns + assert len(df) == len(dp) - assert (df["a"].values == dp["a"].data).all() - assert list(df["b"]) == list(dp["b"].data) + assert (df["a"].values == dp["a"].data).all() + assert list(df["b"]) == list(dp["b"].data) - if not use_visible_columns: assert isinstance(df["c"][0], dict) - assert isinstance(df["img"][0], LambdaCell) - - assert isinstance(df["d"].values == dp["d"].numpy()).all() - assert isinstance(df["e"].values == dp["e"].numpy()).all() - - -@pytest.mark.parametrize( - "use_visible_columns", - product([True, False]), -) -def test_head(tmpdir, use_visible_columns): - length = 16 - test_bed = MockDatapanel( - length=length, - use_visible_columns=use_visible_columns, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - new_dp = dp.head(n=2) - - assert isinstance(new_dp, DataPanel) - assert new_dp.visible_columns == dp.visible_columns - assert len(new_dp) == 2 - assert (new_dp["a"] == dp["a"][:2]).all() - - -@pytest.mark.parametrize( - "use_visible_columns", - product([True, False]), -) -def test_tail(tmpdir, use_visible_columns): - length = 16 - test_bed = MockDatapanel( - length=length, - use_visible_columns=use_visible_columns, - tmpdir=tmpdir, - ) - dp = test_bed.dp - - new_dp = dp.tail(n=2) - - assert isinstance(new_dp, DataPanel) - assert new_dp.visible_columns == dp.visible_columns - assert len(new_dp) == 2 - assert (new_dp["a"] == dp["a"][-2:]).all() - - -@pytest.mark.parametrize( - "use_visible_columns", - product([True, False], [True, False]), -) -def test_append_columns(use_visible_columns): - mock = MockDatapanel( - length=16, - use_visible_columns=use_visible_columns, - ) - - out = mock.dp.append(mock.dp, axis="rows") - - assert len(out) == len(mock.visible_rows) * 2 - assert isinstance(out, DataPanel) - assert set(out.visible_columns) == set(mock.visible_columns) - assert (out["a"].data == np.concatenate([mock.visible_rows] * 2)).all() - assert out["b"].data == list(np.concatenate([mock.visible_rows] * 2)) - - -class DataPanelSubclass(DataPanel): - """Mock class to test that ops on subclass returns subclass.""" - - pass - - -def test_subclass(): - dp1 = DataPanelSubclass.from_dict({"a": np.arange(3), "b": ["may", "jun", "jul"]}) - dp2 = DataPanelSubclass.from_dict( - {"c": np.arange(3), "d": ["2021", "2022", "2023"]} - ) - - assert isinstance(dp1.lz[np.asarray([0, 1])], DataPanelSubclass) - assert isinstance(dp1.lz[:2], DataPanelSubclass) - assert isinstance(dp1[:2], DataPanelSubclass) - assert isinstance(dp1.merge(dp2, left_on="a", right_on="c"), DataPanelSubclass) - assert isinstance(dp1.append(dp1), DataPanelSubclass) + assert (df["d"].values == dp["d"].numpy()).all() + assert (df["e"].values == dp["e"].values).all() diff --git a/tests/testbeds.py b/tests/testbeds.py index b08faf6c2..f61d4c221 100644 --- a/tests/testbeds.py +++ b/tests/testbeds.py @@ -1,101 +1,53 @@ """A collection of simple testbeds to build test cases.""" import os -from copy import deepcopy +from functools import wraps +from itertools import product from typing import Sequence import numpy as np import pandas as pd +import pytest import torch from PIL import Image from meerkat.columns.image_column import ImageCellColumn, ImageColumn from meerkat.columns.list_column import ListColumn from meerkat.datapanel import DataPanel -from meerkat.tools.identifier import Identifier - - -class MockTestBedv0: - """Simple mock dataset with 6 examples.""" - - def __init__(self): - # Create a fake batch of data - self.batch = { - "text": [ - "The man is walking.", - "The man is running.", - "The woman is sprinting.", - "The woman is resting.", - "The hobbit is flying.", - "The hobbit is swimming.", - ], - "label": [0, 0, 1, 1, 0, 0], - "z": [1, 0, 1, 0, 1, 0], - "fast": [False, True, True, False, False, False], - "metadata": [ - {"source": "real"}, - {"source": "real"}, - {"source": "real"}, - {"source": "real"}, - {"source": "fictional"}, - {"source": "fictional"}, - ], - } - # Create a fake dataset - self.dataset = DataPanel.from_batch( - self.batch, - identifier=Identifier(_name="MockDataPane", version="1.0"), - ) - # Keep a copy of the original - self.original_dataset = deepcopy(self.dataset) - - assert len(self.dataset) == 6 - - def test_attributes(self): - # Both datasets use the same cache files for backing - print(self.dataset.cache_files) - print(self.original_dataset.cache_files) - print(self.dataset.identifier) - - def problems(self): - # FIXME(karan): this shouldn't be happening: why is otherlabel disappearing here - with self.assertRaises(AssertionError): - # Create an additional integer column in the dataset - dataset = self.testbed.dataset.map(lambda x: {"otherlabel": x["label"] + 1}) - dataset_0_0 = self.cachedop(dataset, columns=["label"]) - self.assertTrue("otherlabel" in dataset_0_0.column_names) - - -class MockTestBedv1: - """Simple mock dataset with 4 examples containing pairs of sentences.""" - - def __init__(self): - # Create a fake dataset - self.dataset = DataPanel.from_batch( - { - "text_a": [ - "Before the actor slept, the senator ran.", - "The lawyer knew that the judges shouted.", - "If the actor slept, the judge saw the artist.", - "The lawyers resigned, or the artist slept.", - ], - "text_b": [ - "The actor slept.", - "The judges shouted.", - "The actor slept.", - "The artist slept.", - ], - "label": [0, 0, 1, 1], - "z": [1, 0, 1, 0], - "fast": [False, True, True, False], - }, - identifier=Identifier(_name="MockDataPane", version="2.0"), - ) - # Keep a copy of the original - self.original_dataset = deepcopy(self.dataset) +class AbstractColumnTestBed: - assert len(self.dataset) == 4 + DEFAULT_CONFIG = {} + + @classmethod + def get_params(cls, config: dict = None, params: dict = None): + updated_config = cls.DEFAULT_CONFIG.copy() + if config is not None: + updated_config.update(config) + configs = list( + map( + dict, + product(*[[(k, v) for v in vs] for k, vs in updated_config.items()]), + ) + ) + if params is None: + return { + "argnames": "config", + "argvalues": configs, + "ids": [str(config) for config in configs], + } + else: + argvalues = list(product(configs, *params.values())) + return { + "argnames": "config," + ",".join(params.keys()), + "argvalues": argvalues, + "ids": [",".join(map(str, values)) for values in argvalues], + } + + @classmethod + @wraps(pytest.mark.parametrize) + def parametrize(cls, config: dict = None, params: dict = None): + return pytest.mark.parametrize(**cls.get_params(config=config, params=params)) class MockDatapanel: