diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 71dfcf1c378..4fcd11af74d 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -30,7 +30,7 @@ jobs: torchscript-version: 1.10.2 ray-version: 2.2.0 - python-version: "3.9" - pytorch-version: 2.0.0 + pytorch-version: 2.1.0 torchscript-version: 1.10.2 ray-version: 2.3.0 - python-version: "3.10" @@ -208,6 +208,7 @@ jobs: - "integration_tests_c" - "integration_tests_d" - "integration_tests_e" + - "integration_tests_f" env: AWS_ACCESS_KEY_ID: ${{ secrets.LUDWIG_TESTS_AWS_ACCESS_KEY_ID }} diff --git a/.vscode/settings.json b/.vscode/settings.json index fba148f3ce7..bd803cd8d52 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,10 +3,15 @@ 120 ], "editor.formatOnSave": true, - "python.formatting.provider": "black", - "python.linting.enabled": true, - "python.linting.flake8Enabled": true, - "python.linting.flake8Args": [ + "[python]": { + "editor.defaultFormatter": "ms-python.black-formatter", + "editor.formatOnSave": true + }, + "black-formatter.args": [ + "--line-length", + "120" + ], + "flake8.args": [ "--config=setup.cfg" ], "python.testing.unittestEnabled": false, diff --git a/README.md b/README.md index 37ab95b84d5..85e32f22df3 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ Ludwig is a **low-code** framework for building **custom** AI models like **LLMs Key features: - 🛠 **Build custom models with ease:** a declarative YAML configuration file is all you need to train a state-of-the-art LLM on your data. Support for multi-task and multi-modality learning. Comprehensive config validation detects invalid parameter combinations and prevents runtime failures. -- ⚡ **Optimized for scale and efficiency:** automatic batch size selection, distributed training ([DDP](https://pytorch.org/tutorials/beginner/ddp_series_theory.html), [DeepSpeed](https://github.com/microsoft/DeepSpeed)), parameter efficient fine-tuning ([PEFT](https://github.com/huggingface/peft)), 4-bit quantization (QLoRA), and larger-than-memory datasets. +- ⚡ **Optimized for scale and efficiency:** automatic batch size selection, distributed training ([DDP](https://pytorch.org/tutorials/beginner/ddp_series_theory.html), [DeepSpeed](https://github.com/microsoft/DeepSpeed)), parameter efficient fine-tuning ([PEFT](https://github.com/huggingface/peft)), 4-bit quantization (QLoRA), paged and 8-bit optimizers, and larger-than-memory datasets. - 📐 **Expert level control:** retain full control of your models down to the activation functions. Support for hyperparameter optimization, explainability, and rich metric visualizations. - 🧱 **Modular and extensible:** experiment with different model architectures, tasks, features, and modalities with just a few parameter changes in the config. Think building blocks for deep learning. - 🚢 **Engineered for production:** prebuilt [Docker](https://hub.docker.com/u/ludwigai) containers, native support for running with [Ray](https://www.ray.io/) on [Kubernetes](https://github.com/ray-project/kuberay), export models to [Torchscript](https://pytorch.org/docs/stable/jit.html) and [Triton](https://developer.nvidia.com/triton-inference-server), upload to [HuggingFace](https://huggingface.co/models) with one command. @@ -52,8 +52,13 @@ pip install ludwig[full] Want to take a quick peak at some of the Ludwig 0.8 features? Check out this Colab Notebook 🚀 [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1lB4ALmEyvcMycE3Mlnsd7I3bc0zxvk39) -For a full tutorial, check out the official [getting started guide](https://ludwig-ai.github.io/ludwig-docs/latest/getting_started/), -or take a look at end-to-end [Examples](https://ludwig-ai.github.io/ludwig-docs/latest/examples). +Looking to fine-tune Llama-2 or Mistral? Check out these notebooks: + +1. Fine-Tune Llama-2-7b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1r4oSEwRJpYKBPM0M0RSh0pBEYK_gBKbe) +1. Fine-Tune Llama-2-13b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1zmSEzqZ7v4twBrXagj1TE_C--RNyVAyu) +1. Fine-Tune Mistral-7b: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1i_8A1n__b7ljRWHzIsAdhO7u7r49vUm4) + +For a full tutorial, check out the official [getting started guide](https://ludwig-ai.github.io/ludwig-docs/latest/getting_started/), or take a look at end-to-end [Examples](https://ludwig-ai.github.io/ludwig-docs/latest/examples). ## Large Language Model Fine-Tuning diff --git a/ludwig/backend/base.py b/ludwig/backend/base.py index 77a14a0b104..112a8afe35e 100644 --- a/ludwig/backend/base.py +++ b/ludwig/backend/base.py @@ -20,7 +20,7 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from typing import Any, Callable, TYPE_CHECKING +from typing import Any, Callable, Generator, TYPE_CHECKING import numpy as np import pandas as pd @@ -89,7 +89,7 @@ def initialize_pytorch(self, *args, **kwargs): @contextmanager @abstractmethod - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> Generator: raise NotImplementedError() @abstractmethod @@ -146,7 +146,9 @@ def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len: raise NotImplementedError() @abstractmethod - def batch_transform(self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform( + self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None + ) -> DataFrame: """Applies `transform_fn` to every `batch_size` length batch of `df` and returns the result.""" raise NotImplementedError() @@ -171,7 +173,9 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si with ThreadPoolExecutor() as executor: # number of threads is inferred if isinstance(sample_fname, str): if map_fn is read_audio_from_path: # bypass torchaudio issue that no longer takes in file-like objects - result = executor.map(lambda path: map_fn(path) if path is not None else path, column.values) + result = executor.map( # type: ignore[misc] + lambda path: map_fn(path) if path is not None else path, column.values + ) else: result = executor.map( lambda path: get_bytes_obj_from_path(path) if path is not None else path, column.values @@ -186,7 +190,7 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si return pd.Series(result, index=column.index, name=column.name) @staticmethod - def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None) -> DataFrame: name = name or "Batch Transform" batches = to_batches(df, batch_size) transform = transform_fn() @@ -204,21 +208,11 @@ def initialize(): def initialize_pytorch(*args, **kwargs): initialize_pytorch(*args, **kwargs) - def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> BaseTrainer: - from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry - - if model.type() == MODEL_LLM: - trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) - else: - trainer_cls = get_from_registry(model.type(), get_trainers_registry()) - - return trainer_cls(config=config, model=model, **kwargs) - @staticmethod def create_predictor(model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, **kwargs) + return get_predictor_cls(model.type())(model, **kwargs) # type: ignore[call-arg] def sync_model(self, model): pass @@ -254,14 +248,16 @@ def is_coordinator() -> bool: class LocalBackend(LocalPreprocessingMixin, LocalTrainingMixin, Backend): BACKEND_TYPE = "local" + _shared_instance: LocalBackend + @classmethod - def shared_instance(cls): + def shared_instance(cls) -> LocalBackend: """Returns a shared singleton LocalBackend instance.""" if not hasattr(cls, "_shared_instance"): cls._shared_instance = cls() return cls._shared_instance - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: super().__init__(dataset_manager=PandasDatasetManager(self), **kwargs) @property @@ -280,6 +276,22 @@ def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | No # trial resources it wants, because there is no Ray Datasets process to compete with it for CPUs. return None + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] + from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry + + trainer_cls: type + if model.type() == MODEL_LLM: + trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) + else: + trainer_cls = get_from_registry(model.type(), get_trainers_registry()) + + return trainer_cls(config=config, model=model, **kwargs) + @DeveloperAPI class DataParallelBackend(LocalPreprocessingMixin, Backend, ABC): @@ -298,7 +310,12 @@ def initialize_pytorch(self, *args, **kwargs): *args, local_rank=self._distributed.local_rank(), local_size=self._distributed.local_size(), **kwargs ) - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] from ludwig.trainers.trainer import Trainer return Trainer(distributed=self._distributed, **kwargs) @@ -306,7 +323,7 @@ def create_trainer(self, **kwargs) -> BaseTrainer: def create_predictor(self, model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) + return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) # type: ignore[call-arg] def sync_model(self, model): # Model weights are only saved on the coordinator, so broadcast diff --git a/ludwig/config_validation/checks.py b/ludwig/config_validation/checks.py index d6ed2cc1b60..2a353b98dde 100644 --- a/ludwig/config_validation/checks.py +++ b/ludwig/config_validation/checks.py @@ -581,7 +581,7 @@ def check_llm_finetuning_adaption_prompt_parameters(config: "ModelConfig"): if config.adapter.type != "adaption_prompt": return - from peft.tuners.adaption_prompt import TRANSFORMERS_MODEL_CONFIG + from peft.tuners.adaption_prompt.config import TRANSFORMERS_MODEL_CONFIG # Adaption Config is currently only supported for Llama model types model_config = _get_llm_model_config(config.base_model) @@ -718,3 +718,11 @@ def check_prompt_requirements(config: "ModelConfig") -> None: # noqa: F821 "A template must contain at least one reference to a column or the sample keyword {__sample__} for " "a JSON-serialized representation of non-output feature columns." ) + + +@register_config_check +def check_sample_ratio_and_size_compatible(config: "ModelConfig") -> None: + sample_ratio = config.preprocessing.sample_ratio + sample_size = config.preprocessing.sample_size + if sample_size is not None and sample_ratio < 1.0: + raise ConfigValidationError("sample_size cannot be used when sample_ratio < 1.0") diff --git a/ludwig/constants.py b/ludwig/constants.py index dec3e91bd0c..b813f5ce0f3 100644 --- a/ludwig/constants.py +++ b/ludwig/constants.py @@ -197,6 +197,8 @@ EPOCHS = "epochs" BATCH_SIZE = "batch_size" EVAL_BATCH_SIZE = "eval_batch_size" +EFFECTIVE_BATCH_SIZE = "effective_batch_size" +MAX_BATCH_SIZE = "max_batch_size" DEFAULT_BATCH_SIZE = "auto" FALLBACK_BATCH_SIZE = 128 # The smallest batch size that is supported on Ludwig. diff --git a/ludwig/contribs/wandb.py b/ludwig/contribs/wandb.py index 17f1011d327..9d86d51e178 100644 --- a/ludwig/contribs/wandb.py +++ b/ludwig/contribs/wandb.py @@ -67,3 +67,6 @@ def on_visualize_figure(self, fig): logger.info("wandb.on_visualize_figure() called...") if wandb.run: wandb.log({"figure": fig}) + + def on_train_end(self, output_directory): + wandb.finish() diff --git a/ludwig/data/batcher/test_batcher.py b/ludwig/data/batcher/test_batcher.py new file mode 100644 index 00000000000..69eef2a2d40 --- /dev/null +++ b/ludwig/data/batcher/test_batcher.py @@ -0,0 +1,113 @@ +import logging + +import pandas as pd +import yaml + +from ludwig.api import LudwigModel +from ludwig.data.dataset.pandas import PandasDataset + + +def test_pandas_size(): + df = pd.DataFrame( + {"name": ["joe", "janice", "sara"], "mask": ["green", "black", "pink"], "weapon": ["stick", "gun", "gun"]} + ) + config = yaml.safe_load( + """ + model_type: llm + base_model: HuggingFaceH4/tiny-random-LlamaForCausalLM + input_features: + - name: name + type: text + preprocessing: + max_sequence_length: 256 + column: name + output_features: + - name: weapon + type: text + preprocessing: + max_sequence_length: 256 + column: weapon + preprocessing: + split: + type: random + probabilities: + - 1 + - 0 + - 0 + """ + ) + model = LudwigModel(config=config, logging_level=logging.INFO) + data = model.preprocess(df, skip_save_processed_input=False) + training_set = data[0] + assert training_set.size == len(df) + + # Check if string loading works as well + # data[0].data_hdf5_fp is the string filepath to the cached data from preprocessing + data_from_str = PandasDataset(data[0].data_hdf5_fp, data[0].features, None) + assert data_from_str.size == len(df) + + +def test_pandas_batcher_use_all_samples(): + df = pd.DataFrame( + {"name": ["joe", "janice", "sara"], "mask": ["green", "black", "pink"], "weapon": ["stick", "gun", "gun"]} + ) + config = yaml.safe_load( + """ + model_type: llm + base_model: HuggingFaceH4/tiny-random-LlamaForCausalLM + input_features: + - name: name + type: text + preprocessing: + max_sequence_length: 256 + column: name + output_features: + - name: weapon + type: text + preprocessing: + max_sequence_length: 256 + column: weapon + preprocessing: + split: + type: random + probabilities: + - 1 + - 0 + - 0 + """ + ) + model = LudwigModel(config=config, logging_level=logging.INFO) + data = model.preprocess(df, skip_save_processed_input=False) + training_set = data[0] + features = training_set.dataset.keys() + + batches = [] + with training_set.initialize_batcher(batch_size=1) as batcher: + while not batcher.last_batch(): + batch = batcher.next_batch() + batches.append(batch) + assert (len(batches)) == training_set.size + + # Check to see if all items are used exactly once + for feature in features: + for i in range(len(training_set.dataset[feature])): + # Each of the arrays in the line below should contain the vector representation of a feature of sample i + assert (batches[i][feature].squeeze() == training_set.dataset[feature][i].squeeze()).all() + + # Check if string loading works as well + batches = [] + # data[0].data_hdf5_fp is the string filepath to the cached data from preprocessing + data_from_str = PandasDataset(data[0].data_hdf5_fp, data[0].features, None) + features = data_from_str.dataset.keys() + + with data_from_str.initialize_batcher(batch_size=1) as batcher: + while not batcher.last_batch(): + batch = batcher.next_batch() + batches.append(batch) + assert (len(batches)) == data_from_str.size + + # Check to see if all items are used exactly once + for feature in features: + for i in range(len(data_from_str.dataset[feature])): + # Each of the arrays in the line below should contain the vector representation of a feature of sample i + assert (batches[i][feature].squeeze() == data_from_str.dataset[feature][i].squeeze()).all() diff --git a/ludwig/data/dataset/base.py b/ludwig/data/dataset/base.py index 729c0bc4569..ea548a0ea10 100644 --- a/ludwig/data/dataset/base.py +++ b/ludwig/data/dataset/base.py @@ -14,10 +14,13 @@ # limitations under the License. # ============================================================================== +from __future__ import annotations + import contextlib from abc import ABC, abstractmethod -from typing import Iterable, Optional +from typing import Iterable +from ludwig.data.batcher.base import Batcher from ludwig.distributed import DistributedStrategy from ludwig.features.base_feature import BaseFeature from ludwig.utils.defaults import default_random_seed @@ -26,7 +29,7 @@ class Dataset(ABC): @abstractmethod - def __len__(self): + def __len__(self) -> int: raise NotImplementedError() @contextlib.contextmanager @@ -38,36 +41,36 @@ def initialize_batcher( random_seed: int = default_random_seed, ignore_last: bool = False, distributed: DistributedStrategy = None, - ): + ) -> Batcher: raise NotImplementedError() @abstractmethod - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @abstractmethod - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: raise NotImplementedError() class DatasetManager(ABC): @abstractmethod - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: raise NotImplementedError() @abstractmethod - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: raise NotImplementedError() @abstractmethod - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: raise NotImplementedError() @property @abstractmethod - def data_format(self): + def data_format(self) -> str: raise NotImplementedError() diff --git a/ludwig/data/dataset/pandas.py b/ludwig/data/dataset/pandas.py index 6cec6ec6bf8..152cdc555ab 100644 --- a/ludwig/data/dataset/pandas.py +++ b/ludwig/data/dataset/pandas.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== + +from __future__ import annotations + import contextlib -from typing import Iterable, Optional +from typing import Iterable, TYPE_CHECKING import numpy as np from pandas import DataFrame from ludwig.constants import PREPROCESSING, TRAINING +from ludwig.data.batcher.base import Batcher from ludwig.data.batcher.random_access import RandomAccessBatcher from ludwig.data.dataset.base import Dataset, DatasetManager from ludwig.data.sampler import DistributedSampler @@ -31,24 +35,27 @@ from ludwig.utils.fs_utils import download_h5 from ludwig.utils.misc_utils import get_proc_features +if TYPE_CHECKING: + from ludwig.backend.base import Backend + class PandasDataset(Dataset): def __init__(self, dataset, features, data_hdf5_fp): self.features = features self.data_hdf5_fp = data_hdf5_fp - self.size = len(dataset) if isinstance(dataset, str): dataset = load_hdf5(dataset) self.dataset = to_numpy_dataset(dataset) + self.size = len(list(self.dataset.values())[0]) - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: """Convert the dataset to a Pandas DataFrame.""" if features: return from_numpy_dataset({feature.feature_name: self.dataset[feature.proc_column] for feature in features}) return from_numpy_dataset(self.dataset) - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: return to_scalar_df(self.to_df(features)) def get(self, proc_column, idx=None): @@ -76,18 +83,18 @@ def get(self, proc_column, idx=None): indices = indices[:, np.argsort(indices[1])] return im_data[indices[2, :]] - def get_dataset(self): + def get_dataset(self) -> dict[str, np.ndarray]: return self.dataset def __len__(self): return self.size @property - def processed_data_fp(self) -> Optional[str]: + def processed_data_fp(self) -> str | None: return self.data_hdf5_fp @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: df = self.to_df() return df.memory_usage(deep=True).sum() if df is not None else 0 @@ -100,7 +107,7 @@ def initialize_batcher( ignore_last: bool = False, distributed: DistributedStrategy = None, augmentation_pipeline=None, - ): + ) -> Batcher: sampler = DistributedSampler( len(self), shuffle=should_shuffle, random_seed=random_seed, distributed=distributed ) @@ -115,21 +122,21 @@ def initialize_batcher( class PandasDatasetManager(DatasetManager): - def __init__(self, backend): - self.backend = backend + def __init__(self, backend: Backend): + self.backend: Backend = backend - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: return PandasDataset(dataset, get_proc_features(config), training_set_metadata.get(DATA_TRAIN_HDF5_FP)) - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: save_hdf5(cache_path, dataset) if tag == TRAINING: training_set_metadata[DATA_TRAIN_HDF5_FP] = cache_path return dataset - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: return self.backend.is_coordinator() and not skip_save_processed_input @property - def data_format(self): + def data_format(self) -> str: return "hdf5" diff --git a/ludwig/data/preprocessing.py b/ludwig/data/preprocessing.py index 8d36a8a8a66..b42b015c279 100644 --- a/ludwig/data/preprocessing.py +++ b/ludwig/data/preprocessing.py @@ -1202,15 +1202,8 @@ def build_dataset( if mode == "training": sample_ratio = global_preprocessing_parameters["sample_ratio"] - if sample_ratio < 1.0: - if not df_engine.partitioned and len(dataset_df) * sample_ratio < 1: - raise ValueError( - f"sample_ratio {sample_ratio} is too small for dataset of length {len(dataset_df)}. " - f"Please increase sample_ratio or use a larger dataset." - ) - - logger.debug(f"sample {sample_ratio} of data") - dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + sample_size = global_preprocessing_parameters["sample_size"] + dataset_df = _get_sampled_dataset_df(dataset_df, df_engine, sample_ratio, sample_size, random_seed) # If persisting DataFrames in memory is enabled, we want to do this after # each batch of parallel ops in order to avoid redundant computation @@ -1399,6 +1392,29 @@ def embed_fixed_features( return results +def _get_sampled_dataset_df(dataset_df, df_engine, sample_ratio, sample_size, random_seed): + df_len = len(dataset_df) + if sample_ratio < 1.0: + if not df_engine.partitioned and df_len * sample_ratio < 1: + raise ValueError( + f"sample_ratio {sample_ratio} is too small for dataset of length {df_len}. " + f"Please increase sample_ratio or use a larger dataset." + ) + + logger.debug(f"sample {sample_ratio} of data") + dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + + if sample_size: + if sample_size < df_len: + # Cannot use 'n' parameter when using dask DataFrames -- only 'frac' is supported + sample_ratio = sample_size / df_len + dataset_df = dataset_df.sample(frac=sample_ratio, random_state=random_seed) + else: + logger.warning("sample_size is larger than dataset size, ignoring sample_size") + + return dataset_df + + def get_features_with_cacheable_fixed_embeddings( feature_configs: List[FeatureConfigDict], metadata: TrainingSetMetadataDict ) -> List[FeatureConfigDict]: @@ -1889,7 +1905,7 @@ def preprocess_for_training( if dataset is None and training_set is None: raise ValueError("No training data is provided!") - # preload ludwig datasets + # preload ludwig and HF datasets dataset, training_set, validation_set, test_set = load_dataset_uris( dataset, training_set, validation_set, test_set, backend ) @@ -2243,7 +2259,7 @@ def preprocess_for_prediction( if isinstance(dataset, Dataset): return dataset, training_set_metadata - # preload ludwig datasets + # preload ludwig and HF datasets dataset, _, _, _ = load_dataset_uris(dataset, None, None, None, backend) # determine data format if not provided or auto diff --git a/ludwig/datasets/__init__.py b/ludwig/datasets/__init__.py index b0d5b5236a7..16795366f41 100644 --- a/ludwig/datasets/__init__.py +++ b/ludwig/datasets/__init__.py @@ -5,13 +5,13 @@ from collections import OrderedDict from functools import lru_cache from io import BytesIO -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union import yaml from ludwig.api_annotations import DeveloperAPI, PublicAPI from ludwig.backend.base import Backend -from ludwig.constants import AUDIO, BINARY, CATEGORY, IMAGE, NUMBER, TEXT, TYPE +from ludwig.constants import AUDIO, BINARY, CATEGORY, IMAGE, NUMBER, TEST, TEXT, TRAIN, TYPE, VALIDATION from ludwig.data.cache.types import CacheableDataframe from ludwig.datasets import configs from ludwig.datasets.dataset_config import DatasetConfig @@ -24,6 +24,8 @@ from ludwig.utils.types import DataFrame URI_PREFIX = "ludwig://" +HF_PREFIX = "hf://" +SPLITS = [TRAIN, VALIDATION, TEST] def _load_dataset_config(config_filename: str): @@ -81,16 +83,19 @@ def load_dataset_uris( Returns the input unmodified for any non-Ludwig datasets. """ - dataset_out = dataset - training_set_out = training_set - validation_set_out = validation_set - test_set_out = test_set + + dataset_out, training_set_out, validation_set_out, test_set_out = dataset, training_set, validation_set, test_set + # Check that any of the datasets begin with the `hf://` prefix denoting a Hugging Face dataset URI + # Hugging Face datasets should follow the naming convention `hf://--` + if _is_hf(dataset, training_set): + return _load_hf_datasets(dataset, training_set, validation_set, test_set, backend) # Check that any of the datasets begin with the `ludwig://` prefix denoting a Ludwig dataset URI if dataset is not None: if isinstance(dataset, str) and dataset.startswith(URI_PREFIX): dataset_out = _load_cacheable_dataset(dataset, backend) - elif training_set is not None: + return dataset_out, training_set_out, validation_set_out, test_set_out + if training_set is not None: train_df = test_df = val_df = None training_set_checksum = None if isinstance(training_set, str) and training_set.startswith(URI_PREFIX): @@ -118,9 +123,97 @@ def load_dataset_uris( else: test_set_out = _load_cacheable_dataset(test_set, backend) + return dataset_out, training_set_out, validation_set_out, test_set_out + + +def _is_hf(dataset, training_set): + dataset_is_hf = dataset is not None and isinstance(dataset, str) and dataset.startswith(HF_PREFIX) + training_set_is_hf = ( + training_set is not None and isinstance(training_set, str) and training_set.startswith(HF_PREFIX) + ) + return dataset_is_hf or training_set_is_hf + + +def _load_hf_datasets( + dataset: Optional[Union[str, DataFrame]], + training_set: Optional[Union[str, DataFrame]], + validation_set: Optional[Union[str, DataFrame]], + test_set: Optional[Union[str, DataFrame]], + backend: Backend, +) -> Tuple[ + Optional[CacheableDataframe], + Optional[CacheableDataframe], + Optional[CacheableDataframe], + Optional[CacheableDataframe], +]: + """Loads and returns any Hugging Face datasets as CacheableDataframes. + + Returns the input unmodified for any non-HF datasets. + """ + dataset_out = dataset + training_set_out = training_set + validation_set_out = validation_set + test_set_out = test_set + + # Check that any of the datasets begin with the `hf://` prefix denoting a Hugging Face dataset URI + # Hugging Face datasets should follow the naming convention `hf://--` + if dataset is not None: + if isinstance(dataset, str) and dataset.startswith(HF_PREFIX): + dataset_out = _load_cacheable_hf_dataset(dataset, backend) + return dataset_out, training_set_out, validation_set_out, test_set_out + + # Because of the conditional logic (_is_hf) in load_dataset_uris, if the above block is not triggered, then + # training_set must be a string that starts with HF_PREFIX + train_df = test_df = val_df = None + loader = get_dataset("hugging_face") + hf_id, hf_subsample = _get_hf_dataset_and_subsample(training_set) + train_df, val_df, test_df = loader.load(hf_id, hf_subsample, split=True) # Call hugging_face loader + train_df = backend.df_engine.from_pandas(train_df) + training_set_out = CacheableDataframe(df=train_df, name=training_set, checksum=None) + + if isinstance(validation_set, str) and validation_set.startswith(HF_PREFIX): + if validation_set == training_set: + # Reuse the loaded DF from the training split + val_df = backend.df_engine.from_pandas(val_df) + validation_set_out = CacheableDataframe(df=val_df, name=validation_set, checksum=None) + else: # This handles an edge case -- NOT EXPECTED USER BEHAVIOR + logging.warn( + "A Hugging Face validation set has been passed in that is different from the test set. " + "This is not recommended." + ) + validation_set_out = _load_cacheable_hf_dataset(validation_set, backend, split_set=VALIDATION) + + if isinstance(test_set, str) and test_set.startswith(HF_PREFIX): + if test_set == training_set: + # Reuse the loaded DF from the training split + test_df = backend.df_engine.from_pandas(test_df) + test_set_out = CacheableDataframe(df=test_df, name=test_set, checksum=None) + else: # This handles an edge case -- NOT EXPECTED USER BEHAVIOR + logging.warn( + "A Hugging Face test set has been passed in that is different from the training set. " + "This is not recommended." + ) + test_set_out = _load_cacheable_hf_dataset(test_set, backend, split_set=TEST) + return dataset_out, training_set_out, validation_set_out, test_set_out +def _load_cacheable_hf_dataset( + dataset: str, backend: Backend, split_set: Optional[Literal["train", "validation", "test"]] = None +) -> CacheableDataframe: + loader = get_dataset("hugging_face") + hf_id, hf_subsample = _get_hf_dataset_and_subsample(dataset) + if split_set: + train_df, validation_df, test_df = loader.load(hf_id, hf_subsample, split=True) + df = [train_df, validation_df, test_df][ + SPLITS.index(split_set) + ] # split_set should be one of TRAIN, VALIDATION, or TEST + else: + df = loader.load(hf_id, hf_subsample, split=False) + df = backend.df_engine.from_pandas(df) + return CacheableDataframe(df=df, name=dataset, checksum=None) + + def _load_cacheable_dataset(dataset: str, backend: Backend) -> CacheableDataframe: dataset_name = dataset[len(URI_PREFIX) :] loader = get_dataset(dataset_name) @@ -142,6 +235,9 @@ def get_datasets_output_features( """Returns a dictionary with the output features for each dataset. Optionally, you can pass a dataset name which will then cause the function to return a dictionary with the output features for that dataset. + Because Hugging Face Datasets are loaded dynamically through a shared connector, they don't have fixed output + features. As such, we exclude Hugging Face datasets here. + :param dataset: (str) name of the dataset :param include_competitions: (bool) whether to include the output features from kaggle competition datasets :param include_data_modalities: (bool) whether to include the data modalities associated with the prediction task @@ -150,12 +246,18 @@ def get_datasets_output_features( """ ordered_configs = OrderedDict(sorted(_get_dataset_configs().items())) competition_datasets = [] + hugging_face_datasets = [] for name, config in ordered_configs.items(): if not include_competitions and config.kaggle_competition: competition_datasets.append(name) continue + if config.name == "hugging_face": + # There is no output_features attribute for hugging_face datasets + hugging_face_datasets.append(name) + continue + ordered_configs[name] = {"name": config.name, "output_features": config.output_features} if include_data_modalities: @@ -180,6 +282,8 @@ def get_datasets_output_features( for competition in competition_datasets: del ordered_configs[competition] + del ordered_configs["hugging_face"] + return ordered_configs @@ -201,13 +305,34 @@ def download_dataset(dataset_name: str, output_dir: str = "."): def get_buffer(dataset_name: str, kaggle_username: str = None, kaggle_key: str = None) -> BytesIO: """Returns a byte buffer for the specified dataset.""" try: - dataset = get_dataset(dataset_name).load(kaggle_username=kaggle_username, kaggle_key=kaggle_key) + if dataset_name.startswith(HF_PREFIX): + hf_id, hf_subsample = _get_hf_dataset_and_subsample(dataset_name) + dataset = get_dataset("hugging_face").load(hf_id, hf_subsample) + else: + dataset = get_dataset(dataset_name).load(kaggle_username=kaggle_username, kaggle_key=kaggle_key) buffer = BytesIO(dataset.to_parquet()) return buffer except Exception as e: logging.error(logging.ERROR, f"Failed to upload dataset {dataset_name}: {e}") +def _get_hf_dataset_and_subsample(dataset_name: str) -> Tuple[str, Optional[str]]: + """Returns the Hugging Face ID and subsample name from the dataset name. + + The dataset name should follow the format "{HF_PREFIX}{hf_id}--{hf_subsample}" + + Examples (Dataset Name --> HF ID; HF subsample): + "hf://wikisql" --> "wikisql"; None + "hf://ColumbiaNLP/FLUTE" --> "ColumbiaNLP/FLUTE"; None + "hf://mstz/adult--income" --> "mstz/adult"; "income" + """ + dataset_name = dataset_name[len(HF_PREFIX) :] + dataset_name = dataset_name.split("--") + if len(dataset_name) == 1: + return dataset_name[0], None + return dataset_name[0], dataset_name[1] + + def cli(sys_argv): parser = argparse.ArgumentParser( description="This command downloads and lists Ludwig-ready datasets.", diff --git a/ludwig/datasets/configs/hugging_face.yaml b/ludwig/datasets/configs/hugging_face.yaml new file mode 100644 index 00000000000..0fbf423a018 --- /dev/null +++ b/ludwig/datasets/configs/hugging_face.yaml @@ -0,0 +1,5 @@ +version: 1.0 +name: hugging_face +loader: hugging_face.HFLoader +description: | + Hugging Face Datasets diff --git a/ludwig/datasets/loaders/dataset_loader.py b/ludwig/datasets/loaders/dataset_loader.py index 9573e4a7bb2..fe89d5b787c 100644 --- a/ludwig/datasets/loaders/dataset_loader.py +++ b/ludwig/datasets/loaders/dataset_loader.py @@ -451,6 +451,30 @@ def load_unprocessed_dataframe(self, file_paths: List[str]) -> pd.DataFrame: _list_of_strings(self.config.validation_filenames), root_dir=self.raw_dataset_dir ) test_paths = _glob_multiple(_list_of_strings(self.config.test_filenames), root_dir=self.raw_dataset_dir) + dataframes = [] + if self.config.name == "hugging_face": + dataframes = self._get_dataframe_with_fixed_splits_from_hf() + else: + dataframes = self._get_dataframe_with_fixed_splits( + train_paths, validation_paths, test_paths, dataset_paths, file_paths + ) + return pd.concat(dataframes, ignore_index=True) + + def _get_dataframe_with_fixed_splits_from_hf(self): + dataframes = [] + splits = ["train", "validation", "test"] + data_dict = self.load_hf_to_dict( + self.config.huggingface_dataset_id, self.config.huggingface_subset + ) # This function is defined in the Hugging Face dataloader + for split_type in splits: + if split_type in data_dict: + # We don't have to do anything if split not in data_dict because we just concatenate the dataframes + # in the end anyway. + data_dict[split_type][SPLIT] = splits.index(split_type) # Add "split" column (0, 1, or 2) + dataframes.append(data_dict[split_type]) + return dataframes + + def _get_dataframe_with_fixed_splits(self, train_paths, validation_paths, test_paths, dataset_paths, file_paths): dataframes = [] if len(train_paths) > 0: train_df = self.load_files_to_dataframe(train_paths) @@ -464,12 +488,13 @@ def load_unprocessed_dataframe(self, file_paths: List[str]) -> pd.DataFrame: test_df = self.load_files_to_dataframe(test_paths) test_df[SPLIT] = 2 dataframes.append(test_df) - # If we have neither train/validation/test files nor dataset_paths in the config, use data files in root dir. + # If we have neither train/validation/test files nor dataset_paths in the config, + # use data files in root dir. if len(dataset_paths) == len(dataframes) == 0: dataset_paths = file_paths if len(dataset_paths) > 0: dataframes.append(self.load_files_to_dataframe(dataset_paths)) - return pd.concat(dataframes, ignore_index=True) + return dataframes def transform_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame: """Transforms a dataframe of the entire dataset. diff --git a/ludwig/datasets/loaders/hugging_face.py b/ludwig/datasets/loaders/hugging_face.py new file mode 100644 index 00000000000..2b900c73255 --- /dev/null +++ b/ludwig/datasets/loaders/hugging_face.py @@ -0,0 +1,80 @@ +# Copyright (c) 2022 Predibase, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import logging +from typing import Dict + +import datasets +import pandas as pd + +from ludwig.constants import TEST, TRAIN, VALIDATION +from ludwig.datasets.loaders.dataset_loader import DatasetLoader + +SPLITS = [TRAIN, VALIDATION, TEST] +logger = logging.getLogger(__name__) + + +class HFLoader(DatasetLoader): + """HFLoader differs from all other DatasetLoaders because of how it loads data through the Hugging Face + datasets API instead of saving any files to the cache. + + The config for HFLoader contains two unique parameters, huggingface_dataset_id and huggingface_subsample, that + identify which dataset and which subsample of that dataset to load in. + """ + + def load_hf_to_dict(self, hf_id: str, hf_subsample: str) -> Dict[str, pd.DataFrame]: + """Returns a map of split -> pd.DataFrame for the given HF dataset.""" + dataset_dict: Dict[str, "datasets.arrow_dataset.Dataset"] = datasets.load_dataset( + path=hf_id, name=hf_subsample + ) # noqa + pandas_dict = {} + for split in dataset_dict: + # Convert from HF DatasetDict type to a dictionary of pandas dataframes + pandas_dict[split] = dataset_dict[split].to_pandas() + return pandas_dict + + def load(self, hf_id, hf_subsample, split=False) -> pd.DataFrame: + """When load() is called, HFLoader calls the datasets API to return all of the data in a HuggingFace + DatasetDict, converts it to a dictionary of pandas dataframes, and returns either three dataframes + containing train, validation, and test data or one dataframe that is the concatenation of all three + depending on whether `split` is set to True or False. + + Note that some datasets may not provide a validation set or a test set. In this case: + - If split is True, the DataFrames corresponding to the missing sets are initialized to be empty + - If split is False, the "split" column in the resulting DataFrame will reflect the fact that there is no + validation/test split (i.e., there will be no 1s/2s) + + A train set should always be provided by Hugging Face. + """ + self.config.huggingface_dataset_id = hf_id + self.config.huggingface_subsample = hf_subsample + pandas_dict = self.load_hf_to_dict( + hf_id=hf_id, + hf_subsample=hf_subsample, + ) + if split: # For each split, either return the appropriate dataframe or an empty dataframe + for spl in SPLITS: + if spl not in pandas_dict: + logger.warning(f"No {spl} set found in provided Hugging Face dataset. Skipping {spl} set.") + train_df = pandas_dict[TRAIN] if TRAIN in pandas_dict else pd.DataFrame() + validation_df = pandas_dict[VALIDATION] if VALIDATION in pandas_dict else pd.DataFrame() + test_df = pandas_dict[TEST] if TEST in pandas_dict else pd.DataFrame() + + return train_df, validation_df, test_df + else: + dataset_list = [] + for spl in pandas_dict: + pandas_dict[spl]["split"] = SPLITS.index(spl) # Add a column containing 0s, 1s, and 2s denoting splits + dataset_list.append(pandas_dict[spl]) + return pd.concat(dataset_list) diff --git a/ludwig/distributed/base.py b/ludwig/distributed/base.py index d9e00573b0b..3649c9e30c0 100644 --- a/ludwig/distributed/base.py +++ b/ludwig/distributed/base.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import contextlib from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TYPE_CHECKING, Union +from typing import Any, Callable, TYPE_CHECKING import torch from torch import nn @@ -31,9 +33,9 @@ class DistributedStrategy(ABC): def prepare( self, model: nn.Module, - trainer_config: "ECDTrainerConfig", + trainer_config: ECDTrainerConfig, base_learning_rate: float, - ) -> Tuple[nn.Module, Optimizer]: + ) -> tuple[nn.Module, Optimizer]: """Modifies the model to support distributed training and creates the optimizer. Args: @@ -49,7 +51,7 @@ def prepare( def prepare_for_inference(self, model: nn.Module) -> nn.Module: return model - def to_device(self, model: "BaseModel", device: Optional[torch.device] = None) -> nn.Module: + def to_device(self, model: BaseModel, device: torch.device | None = None) -> nn.Module: return model.to_device(device if device is not None else get_torch_device()) def backward(self, loss: torch.Tensor, model: nn.Module): @@ -104,7 +106,7 @@ def sync_optimizer(self, optimizer: Optimizer): pass @abstractmethod - def broadcast_object(self, v: Any, name: Optional[str] = None) -> Any: + def broadcast_object(self, v: Any, name: str | None = None) -> Any: pass @abstractmethod @@ -128,17 +130,17 @@ def is_available(cls) -> bool: @classmethod @abstractmethod - def gather_all_tensors_fn(cls) -> Optional[Callable]: + def gather_all_tensors_fn(cls) -> Callable | None: pass @classmethod @abstractmethod - def get_ray_trainer_backend(cls, **kwargs) -> Optional[Any]: + def get_ray_trainer_backend(cls, **kwargs) -> Any | None: pass @classmethod @abstractmethod - def get_trainer_cls(cls, backend_config: "BackendConfig") -> Tuple[Type["DataParallelTrainer"], Dict[str, Any]]: + def get_trainer_cls(cls, backend_config: BackendConfig) -> tuple[type[DataParallelTrainer], dict[str, Any]]: pass @abstractmethod @@ -178,19 +180,19 @@ def create_checkpoint_handle( self, dist_model: nn.Module, model: nn.Module, - optimizer: Optional[Optimizer] = None, - scheduler: Optional["LRScheduler"] = None, - ) -> "Checkpoint": + optimizer: Optimizer | None = None, + scheduler: LRScheduler | None = None, + ) -> Checkpoint: from ludwig.utils.checkpoint_utils import MultiNodeCheckpoint return MultiNodeCheckpoint(self, model, optimizer, scheduler) @classmethod - def extract_model_for_serialization(cls, model: nn.Module) -> Union[nn.Module, Tuple[nn.Module, List[Dict]]]: + def extract_model_for_serialization(cls, model: nn.Module) -> nn.Module | tuple[nn.Module, list[dict]]: return model @classmethod - def replace_model_from_serialization(cls, state: Union[nn.Module, Tuple[nn.Module, List[Dict]]]) -> nn.Module: + def replace_model_from_serialization(cls, state: nn.Module | tuple[nn.Module, list[dict]]) -> nn.Module: assert isinstance(state, nn.Module) return state @@ -199,9 +201,9 @@ class LocalStrategy(DistributedStrategy): def prepare( self, model: nn.Module, - trainer_config: "ECDTrainerConfig", + trainer_config: ECDTrainerConfig, base_learning_rate: float, - ) -> Tuple[nn.Module, Optimizer]: + ) -> tuple[nn.Module, Optimizer]: return model, create_optimizer(model, trainer_config.optimizer, base_learning_rate) def size(self) -> int: @@ -231,7 +233,7 @@ def sync_model(self, model: nn.Module): def sync_optimizer(self, optimizer: Optimizer): pass - def broadcast_object(self, v: Any, name: Optional[str] = None) -> Any: + def broadcast_object(self, v: Any, name: str | None = None) -> Any: return v def wait_optimizer_synced(self, optimizer: Optimizer): @@ -252,15 +254,15 @@ def is_available(cls) -> bool: return False @classmethod - def gather_all_tensors_fn(cls) -> Optional[Callable]: + def gather_all_tensors_fn(cls) -> Callable | None: return None @classmethod - def get_ray_trainer_backend(cls, **kwargs) -> Optional[Any]: + def get_ray_trainer_backend(cls, **kwargs) -> Any | None: return None @classmethod - def get_trainer_cls(cls, backend_config: "BackendConfig") -> Tuple[Type["DataParallelTrainer"], Dict[str, Any]]: + def get_trainer_cls(cls, backend_config: BackendConfig) -> tuple[type[DataParallelTrainer], dict[str, Any]]: raise ValueError("Cannot construct a trainer from a local strategy.") def shutdown(self): diff --git a/ludwig/explain/captum.py b/ludwig/explain/captum.py index 6194defd88f..081568e18f7 100644 --- a/ludwig/explain/captum.py +++ b/ludwig/explain/captum.py @@ -279,9 +279,11 @@ def get_input_tensors( :return: A list of variables, one for each input feature. Shape of each variable is [batch size, embedding size]. """ - # Ignore sample_ratio from the model config, since we want to explain all the data. + # Ignore sample_ratio and sample_size from the model config, since we want to explain all the data. sample_ratio_bak = model.config_obj.preprocessing.sample_ratio + sample_size_bak = model.config_obj.preprocessing.sample_size model.config_obj.preprocessing.sample_ratio = 1.0 + model.config_obj.preprocessing.sample_size = None config = model.config_obj.to_dict() training_set_metadata = copy.deepcopy(model.training_set_metadata) @@ -302,8 +304,9 @@ def get_input_tensors( callbacks=model.callbacks, ) - # Restore sample_ratio + # Restore sample_ratio and sample_size model.config_obj.preprocessing.sample_ratio = sample_ratio_bak + model.config_obj.preprocessing.sample_size = sample_size_bak # Make sure the number of rows in the preprocessed dataset matches the number of rows in the input data assert ( diff --git a/ludwig/models/llm.py b/ludwig/models/llm.py index 05acf8446bc..9582a6c33b1 100644 --- a/ludwig/models/llm.py +++ b/ludwig/models/llm.py @@ -21,6 +21,7 @@ from ludwig.utils.llm_utils import ( add_left_padding, generate_merged_ids, + get_context_len, pad_target_tensor_for_fine_tuning, realign_target_and_prediction_tensors_for_inference, remove_left_padding, @@ -126,13 +127,7 @@ def __init__( self.curr_device = next(self.model.parameters()).device logger.info("Done.") - # Determines the maximum length of the context (input + output tokens) - if hasattr(self.model_config, "max_sequence_length"): - self.context_len = self.model_config.max_sequence_length - elif hasattr(self.model_config, "max_position_embeddings"): - self.context_len = self.model_config.max_position_embeddings - else: - self.context_len = 2048 + self.context_len = get_context_len(self.model_config) # TODO(Arnav): This needs be more flexible to account for RoPE Scaling # When merging input IDs and target IDs for LLM fine-tuning, we want to make sure that the merged tensor is diff --git a/ludwig/schema/features/preprocessing/text.py b/ludwig/schema/features/preprocessing/text.py index 27b51f60ba1..ec4230fd098 100644 --- a/ludwig/schema/features/preprocessing/text.py +++ b/ludwig/schema/features/preprocessing/text.py @@ -89,7 +89,7 @@ class BaseTextPreprocessingConfig(BasePreprocessingConfig): ) lowercase: bool = schema_utils.Boolean( - default=True, + default=False, description="If true, converts the string to lowercase before tokenizing.", parameter_metadata=FEATURE_METADATA[TEXT][PREPROCESSING]["lowercase"], ) @@ -205,7 +205,7 @@ class TextOutputPreprocessingConfig(BaseTextPreprocessingConfig): ) lowercase: bool = schema_utils.Boolean( - default=True, + default=False, description="If true, converts the string to lowercase before tokenizing.", parameter_metadata=FEATURE_METADATA[TEXT][PREPROCESSING]["lowercase"], ) diff --git a/ludwig/schema/metadata/configs/preprocessing.yaml b/ludwig/schema/metadata/configs/preprocessing.yaml index 688f2084732..a29d2ece63a 100644 --- a/ludwig/schema/metadata/configs/preprocessing.yaml +++ b/ludwig/schema/metadata/configs/preprocessing.yaml @@ -44,6 +44,22 @@ sample_ratio: expected_impact: 2 suggested_values: Depends on data size ui_display_name: Sample Ratio +sample_size: + default_value_reasoning: + The default value is None because we do not want to shrink + the dataset by default, and we do not know the size of an arbitrary dataset. + By setting the default to None, we fall back on the sample_ratio to determine + the size of the dataset. + description_implications: + Decreases the amount of data you are inputting into + the model. Could be useful if you have more data than you need and you are + concerned with computational costs. More useful than sample_ratio if you + know the exact number of samples you want to train on instead of knowing the proportion. + example_value: + - 1000 + expected_impact: 2 + suggested_values: Depends on data size + ui_display_name: Sample Size column: expected_impact: 3 ui_display_name: Split Column diff --git a/ludwig/schema/model_types/base.py b/ludwig/schema/model_types/base.py index b3ebc154ffb..410aa5c454e 100644 --- a/ludwig/schema/model_types/base.py +++ b/ludwig/schema/model_types/base.py @@ -30,7 +30,7 @@ sanitize_and_filter_combiner_entities_, set_derived_feature_columns_, set_hyperopt_defaults_, - set_llm_tokenizers, + set_llm_parameters, set_preprocessing_parameters, set_tagger_decoder_parameters, set_validation_parameters, @@ -69,8 +69,8 @@ def __post_init__(self): set_tagger_decoder_parameters(self) sanitize_and_filter_combiner_entities_(self) - # Set preprocessing parameters for text features for LLM model type - set_llm_tokenizers(self) + # Reconcile LLM parameters + set_llm_parameters(self) # Reconcile conflicting preprocessing parameters set_preprocessing_parameters(self) diff --git a/ludwig/schema/model_types/utils.py b/ludwig/schema/model_types/utils.py index a38a3d53409..b8550d06838 100644 --- a/ludwig/schema/model_types/utils.py +++ b/ludwig/schema/model_types/utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Mapping, Set, TYPE_CHECKING from marshmallow import ValidationError +from transformers import AutoConfig from ludwig.api_annotations import DeveloperAPI from ludwig.constants import ( @@ -17,6 +18,7 @@ INPUT_FEATURES, LOSS, MODEL_ECD, + MODEL_LLM, OUTPUT_FEATURES, PARAMETERS, PREPROCESSING, @@ -28,9 +30,11 @@ from ludwig.features.feature_utils import compute_feature_hash from ludwig.schema.features.utils import output_config_registry from ludwig.schema.hyperopt.scheduler import BaseHyperbandSchedulerConfig +from ludwig.schema.llms.generation import LLMGenerationConfig from ludwig.schema.trainer import ECDTrainerConfig from ludwig.types import HyperoptConfigDict, ModelConfigDict from ludwig.utils.data_utils import get_sanitized_feature_name +from ludwig.utils.llm_utils import get_context_len if TYPE_CHECKING: from ludwig.schema.model_types.base import ModelConfig @@ -299,16 +303,24 @@ def set_tagger_decoder_parameters(config: "ModelConfig") -> None: output_feature.reduce_input = None -def set_llm_tokenizers(config: "ModelConfig") -> None: +def set_llm_parameters(config: "ModelConfig") -> None: + if config.model_type != MODEL_LLM: + return + + # Set preprocessing parameters for text features for LLM model type + _set_llm_tokenizers(config) + + # Set max_new_tokens in generation config to the max sequence length of the output features + _set_generation_max_new_tokens(config) + + +def _set_llm_tokenizers(config: "ModelConfig") -> None: """Sets the tokenizers for the LLM model to the pretrained model name or path. This ensures that they use the correct shared vocabulary from the tokenizer. This also ensures padding is correctly set to left padding to prevent the LLM from trying to continue to sequence based on the right padding tokens, which might exist based on sequence length. """ - if config.model_type != "llm": - return - pretrained_model_name_or_path = config.base_model if not isinstance(pretrained_model_name_or_path, str) or pretrained_model_name_or_path is None: raise ValueError("Must set `base_model` when using the LLM model.") @@ -337,6 +349,62 @@ def set_llm_tokenizers(config: "ModelConfig") -> None: output_feature.decoder.fallback_label = output_feature.preprocessing.fallback_label +def _get_maximum_possible_sequence_length(config: "ModelConfig", default_max_sequence_length: int) -> int: + """Returns the maximum possible sequence length for the LLM model based on the model config.""" + max_possible_sequence_length = default_max_sequence_length + if config.output_features[0].preprocessing.max_sequence_length is not None: + # Note: We don't need to check for max between feature.preprocessing.max_sequence_length and + # defaults.text.preprocessing.max_sequence_length because the latter is only applied to input features. + max_possible_sequence_length = max( + default_max_sequence_length, config.output_features[0].preprocessing.max_sequence_length + ) + elif config.preprocessing.global_max_sequence_length is not None: + # This is not perfect since it includes tokens from both input + output features, but this at least + # ensures that max possible of the sequence length is used. It is very likely that the model learns + # to generate sequences than this value. + max_possible_sequence_length = max( + max_possible_sequence_length, config.preprocessing.global_max_sequence_length + ) + elif max_possible_sequence_length == default_max_sequence_length: + # It's possible that both max_sequence_length and global_max_sequence_length are not set, in which case + # we should fall back to the window size of the pretrained model. By this point, because of schema validation + # checks, we know that the base_model exists so we can safely grab the base model's config. + # TODO (Arnav): Figure out how to factor in rope scaling factor into this calculation. + model_config = AutoConfig.from_pretrained(config.base_model) + max_possible_sequence_length = get_context_len(model_config) + # Artifically leave a buffer of half the total model window size to trade off + # runtime while likely covering a majority of the max sequence length. + max_possible_sequence_length = max_possible_sequence_length // 2 + return max_possible_sequence_length + + +def _set_generation_max_new_tokens(config: "ModelConfig") -> None: + """Sets the max_new_tokens parameter in the generation config to the max sequence length of the output + features. + + This ensures that the generation config is set to the correct value for the LLM model type. + """ + _DEFAULT_MAX_SEQUENCE_LENGTH = LLMGenerationConfig().max_new_tokens + if config.generation.max_new_tokens != _DEFAULT_MAX_SEQUENCE_LENGTH: + # Max new tokens is explicitly set by user, so don't override + return + + if config.output_features[0].type != TEXT: + # This is trickier to set for other output features, so don't override for now. + # TODO: Add better support for category output features + return + + max_possible_sequence_length = _get_maximum_possible_sequence_length(config, _DEFAULT_MAX_SEQUENCE_LENGTH) + + logger.info( + f"Setting generation max_new_tokens to {max_possible_sequence_length} to correspond with the max " + "sequence length assigned to the output feature or the global max sequence length. This will ensure that " + "the correct number of tokens are generated at inference time. To override this behavior, set " + "`generation.max_new_tokens` to a different value in your Ludwig config." + ) + config.generation.max_new_tokens = max_possible_sequence_length + + @DeveloperAPI def contains_grid_search_parameters(hyperopt_config: HyperoptConfigDict) -> bool: """Returns True if any hyperopt parameter in the config is using the grid_search space.""" diff --git a/ludwig/schema/preprocessing.py b/ludwig/schema/preprocessing.py index 075c63fe4a3..4963f3783ba 100644 --- a/ludwig/schema/preprocessing.py +++ b/ludwig/schema/preprocessing.py @@ -18,6 +18,14 @@ class PreprocessingConfig(schema_utils.BaseMarshmallowConfig): parameter_metadata=PREPROCESSING_METADATA["sample_ratio"], ) + sample_size: float = schema_utils.NonNegativeInteger( + default=None, + allow_none=True, + description="The maximum number of samples from the dataset to use. Cannot be set if sample_ratio is set to be " + "< 1.0. If sample_ratio is set to 1.0, this will override the number of samples to used.", + parameter_metadata=PREPROCESSING_METADATA["sample_size"], + ) + oversample_minority: float = schema_utils.NonNegativeFloat( default=None, allow_none=True, diff --git a/ludwig/schema/trainer.py b/ludwig/schema/trainer.py index 9750a566f64..8e9be989ff4 100644 --- a/ludwig/schema/trainer.py +++ b/ludwig/schema/trainer.py @@ -5,7 +5,17 @@ from packaging.version import parse as parse_version from ludwig.api_annotations import DeveloperAPI -from ludwig.constants import AUTO, LOSS, MAX_POSSIBLE_BATCH_SIZE, MODEL_ECD, MODEL_GBM, MODEL_LLM, TRAINING +from ludwig.constants import ( + AUTO, + EFFECTIVE_BATCH_SIZE, + LOSS, + MAX_BATCH_SIZE, + MAX_POSSIBLE_BATCH_SIZE, + MODEL_ECD, + MODEL_GBM, + MODEL_LLM, + TRAINING, +) from ludwig.error import ConfigValidationError from ludwig.schema import utils as schema_utils from ludwig.schema.lr_scheduler import LRSchedulerConfig, LRSchedulerDataclassField @@ -218,7 +228,7 @@ def __post_init__(self): "one of `batch_size` or `gradient_accumulation_steps` must be set to something other than 'auto', and " "consequently will be set following the formula given above." ), - parameter_metadata=TRAINER_METADATA[MODEL_ECD]["effective_batch_size"], + parameter_metadata=TRAINER_METADATA[MODEL_ECD][EFFECTIVE_BATCH_SIZE], field_options=[ schema_utils.PositiveInteger(default=128, description="", allow_none=False), schema_utils.StringOptions(options=["auto"], default="auto", allow_none=False), @@ -248,7 +258,7 @@ def __post_init__(self): "Auto batch size tuning and increasing batch size on plateau will be capped at this value. The default " "value is 2^40." ), - parameter_metadata=TRAINER_METADATA[MODEL_ECD]["max_batch_size"], + parameter_metadata=TRAINER_METADATA[MODEL_ECD][MAX_BATCH_SIZE], ) gradient_accumulation_steps: Union[int, str] = schema_utils.OneOfOptionsField( diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 10f5af9a4ca..1c91e916e2a 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -492,7 +492,7 @@ def write_step_summary(cls, train_summary_writer, combined_loss, all_losses, ste # https://pytorch.org/docs/stable/generated/torch.cuda.utilization.html#torch.cuda.utilization train_summary_writer.add_scalar( f"cuda/device{i}/utilization", - torch.cuda.device(i).utilization(), + torch.cuda.utilization(device=device), global_step=step, ) train_summary_writer.flush() @@ -743,7 +743,8 @@ def run_evaluation( else: # There's no validation, so we save the model. if not self.skip_save_model: - logger.info("Saving model.\n") + if self.is_coordinator(): + logger.info("Saving model.\n") checkpoint_manager.save_best(progress_tracker.steps) self.callback(lambda c: c.on_save_best_checkpoint(self, progress_tracker, save_path)) @@ -836,7 +837,8 @@ def train( try: progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) - logger.info("Resuming training from previous run.") + if self.is_coordinator(): + logger.info("Resuming training from previous run.") except Exception: # This may happen if model training is interrupted after the progress tracker is initialized # but before any real training progress is made. @@ -849,7 +851,8 @@ def train( ), output_features=output_features, ) - logger.info("Failed to resume training from previous run. Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Failed to resume training from previous run. Creating fresh model training run.") else: progress_tracker = get_new_progress_tracker( batch_size=self.batch_size, @@ -858,7 +861,8 @@ def train( best_increase_batch_size_eval_metric=get_initial_validation_value(self.increase_batch_size_eval_metric), output_features=output_features, ) - logger.info("Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Creating fresh model training run.") # Distributed: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when diff --git a/ludwig/utils/data_utils.py b/ludwig/utils/data_utils.py index 9942770af52..fae38b401ca 100644 --- a/ludwig/utils/data_utils.py +++ b/ludwig/utils/data_utils.py @@ -872,6 +872,8 @@ def figure_data_format_dataset(dataset): dataset = dataset.strip() if dataset.startswith("ludwig://"): return "ludwig" + if dataset.startswith("hf://"): + return "hf" dataset = dataset.lower() if dataset.endswith(".csv"): diff --git a/ludwig/utils/llm_utils.py b/ludwig/utils/llm_utils.py index 12c0a1f509f..1dd9d1df85a 100644 --- a/ludwig/utils/llm_utils.py +++ b/ludwig/utils/llm_utils.py @@ -5,6 +5,7 @@ import torch.nn.functional as F from bitsandbytes.nn.modules import Embedding from transformers import ( + AutoConfig, AutoModelForCausalLM, CodeLlamaTokenizer, CodeLlamaTokenizerFast, @@ -22,6 +23,9 @@ logger = logging.getLogger(__name__) +FALLBACK_CONTEXT_LEN = 2048 + + def set_pad_token(tokenizer: PreTrainedTokenizer): """Sets the pad token for the tokenizer if it is not already set. @@ -57,6 +61,45 @@ def set_pad_token(tokenizer: PreTrainedTokenizer): tokenizer.pad_token_id = tokenizer.eos_token_id +def get_context_len(model_config: AutoConfig): + """Determines the maximum length of the context (input + output tokens) based on the provided model + configuration. + + Args: + model_config (AutoConfig): The model configuration object containing information about the model's properties. + + Returns: + int: The maximum context length, which can be derived from the model configuration. If no relevant attribute + is found, the default value of 2048 is returned. + + This function examines the provided model configuration object to identify the attribute that specifies the maximum + context length. It checks for attributes in the following order of preference: + 1. 'max_sequence_length': If this attribute is present in the model configuration, its value is returned. + 2. 'max_position_embeddings': If 'max_sequence_length' is not found but 'max_position_embeddings' is present, its + value is returned. + 3. 'n_positions': If neither 'max_sequence_length' nor 'max_position_embeddings' are found, and 'n_positions' is + present, its value is returned. + 4. Default: If none of the relevant attributes are present, the function returns a default value of 2048. + + Note: + - The maximum context length is important for defining the size of input and output sequences in a model. + + Example Usage: + >>> config = AutoConfig.from_pretrained("bert-base-uncased") + >>> context_len = get_context_len(config) + >>> print(context_len) + 512 + """ + if hasattr(model_config, "max_sequence_length"): + return model_config.max_sequence_length + elif hasattr(model_config, "max_position_embeddings"): + return model_config.max_position_embeddings + elif hasattr(model_config, "n_positions"): + return model_config.n_positions + else: + return FALLBACK_CONTEXT_LEN + + def has_padding_token(input_tensor: torch.Tensor, tokenizer: PreTrainedTokenizer): """Checks if the input tensor contains any padding tokens. diff --git a/pytest.ini b/pytest.ini index 5a8d8487ecf..539a53b1674 100644 --- a/pytest.ini +++ b/pytest.ini @@ -12,5 +12,6 @@ markers = integration_tests_c: mark a test to be run as part of integration tests, group C. integration_tests_d: mark a test to be run as part of integration tests, group D. integration_tests_e: mark a test to be run as part of integration tests, group E. + integration_tests_f: mark a test to be run as part of integration tests, group F. filterwarnings = ignore::DeprecationWarning diff --git a/requirements.txt b/requirements.txt index f8f92939821..0c3deeebf5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,3 +56,6 @@ pyxlsb>=1.0.8 # excel pyarrow # parquet lxml # html html5lib # html + +# requirement for loading hugging face datasets +datasets diff --git a/requirements_distributed.txt b/requirements_distributed.txt index 8eb6f4b7467..47f070386ab 100644 --- a/requirements_distributed.txt +++ b/requirements_distributed.txt @@ -3,7 +3,7 @@ dask[dataframe]<2023.4.0 pyarrow # requirements for ray -ray[default,data,serve,tune]>=2.2.0,<2.5 +ray[default,data,serve,tune]>=2.2.0,<2.4 tensorboardX<2.3 GPUtil tblib diff --git a/requirements_llm.txt b/requirements_llm.txt index c036a7e7e3a..8ae6f0299dd 100644 --- a/requirements_llm.txt +++ b/requirements_llm.txt @@ -3,4 +3,7 @@ faiss-cpu accelerate loralib -peft>=0.4.0 + +# Temporarily pin PEFT to PEFT master for Mistral-7b support: +# https://github.com/ludwig-ai/ludwig/issues/3724 +peft @ git+https://github.com/huggingface/peft.git@07f2b82 diff --git a/tests/conftest.py b/tests/conftest.py index 144c8c889f4..00dc1912f76 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,7 +38,13 @@ TEST_SUITE_TIMEOUT_S = int(os.environ.get("LUDWIG_TEST_SUITE_TIMEOUT_S", 3600)) -explicit_int_markers = {"integration_tests_a", "integration_tests_b", "integration_tests_c", "integration_tests_d"} +explicit_int_markers = { + "integration_tests_a", + "integration_tests_b", + "integration_tests_c", + "integration_tests_d", + "integration_tests_e", +} def pytest_sessionstart(session): @@ -48,7 +54,7 @@ def pytest_sessionstart(session): def pytest_collection_modifyitems(config, items): for item in items: if all(False for x in item.iter_markers() if x.name in explicit_int_markers): - item.add_marker("integration_tests_e") + item.add_marker("integration_tests_f") @pytest.fixture(autouse=True) diff --git a/tests/integration_tests/test_automl.py b/tests/integration_tests/test_automl.py index 9eb7ce7026c..4ed11229aaf 100644 --- a/tests/integration_tests/test_automl.py +++ b/tests/integration_tests/test_automl.py @@ -26,11 +26,12 @@ ray = pytest.importorskip("ray") -import dask.dataframe as dd # noqa -from ray.tune.experiment.trial import Trial # noqa +import dask.dataframe as dd # noqa E402 +from ray.tune.experiment.trial import Trial # noqa E402 -from ludwig.automl import auto_train, create_auto_config, train_with_config # noqa -from ludwig.hyperopt.execution import RayTuneExecutor # noqa +from ludwig.automl import auto_train, create_auto_config, train_with_config # noqa E402 +from ludwig.automl.automl import OUTPUT_DIR # noqa E402 +from ludwig.hyperopt.execution import RayTuneExecutor # noqa E402 pytestmark = [pytest.mark.distributed, pytest.mark.integration_tests_c] @@ -290,10 +291,12 @@ def test_train_with_config(time_budget, test_data_tabular_large, ray_cluster_2cp @pytest.mark.distributed def test_auto_train(test_data_tabular_large, ray_cluster_2cpu, tmpdir): _, ofeatures, dataset_csv = test_data_tabular_large + local_output_directory_path: str = f"{str(tmpdir)}/{OUTPUT_DIR}" results = auto_train( dataset=dataset_csv, target=ofeatures[0][NAME], time_limit_s=120, + output_directory=local_output_directory_path, user_config={"hyperopt": {"executor": {"num_samples": 2}}}, ) diff --git a/tests/integration_tests/test_preprocessing.py b/tests/integration_tests/test_preprocessing.py index 9f9812c6d85..b8a330a88ea 100644 --- a/tests/integration_tests/test_preprocessing.py +++ b/tests/integration_tests/test_preprocessing.py @@ -162,6 +162,108 @@ def test_sample_ratio_deterministic(backend, tmpdir, ray_cluster_2cpu): assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute()) +@pytest.mark.parametrize( + "backend", + [ + pytest.param("local", id="local"), + pytest.param("ray", id="ray", marks=pytest.mark.distributed), + ], +) +def test_sample_size(backend, tmpdir, ray_cluster_2cpu): + num_examples = 100 + sample_size = 25 + + input_features = [sequence_feature(encoder={"reduce_output": "sum"}), audio_feature(folder=tmpdir)] + output_features = [category_feature(decoder={"vocab_size": 5}, reduce_input="sum")] + data_csv = generate_data( + input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples + ) + config = { + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, + TRAINER: { + EPOCHS: 2, + }, + PREPROCESSING: {"sample_size": sample_size}, + } + + model = LudwigModel(config, backend=backend) + train_set, val_set, test_set, training_set_metadata = model.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + count = len(train_set) + len(val_set) + len(test_set) + assert sample_size == count + + # Check that sample size is disabled when doing preprocessing for prediction + dataset, _ = preprocess_for_prediction( + model.config_obj.to_dict(), + dataset=data_csv, + training_set_metadata=training_set_metadata, + split=FULL, + include_outputs=True, + backend=model.backend, + ) + assert "sample_size" in model.config_obj.preprocessing.to_dict() + assert len(dataset) == num_examples + + +@pytest.mark.parametrize( + "backend", + [ + pytest.param("local", id="local"), + pytest.param("ray", id="ray", marks=pytest.mark.distributed), + ], +) +def test_sample_size_deterministic(backend, tmpdir, ray_cluster_2cpu): + """Ensures that the sampled dataset is the same when using a random seed. + + model.preprocess returns a PandasPandasDataset object when using local backend, and returns a RayDataset object when + using the Ray backend. + """ + num_examples = 100 + sample_size = 30 + + input_features = [binary_feature()] + output_features = [category_feature()] + data_csv = generate_data( + input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=num_examples + ) + + config = { + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, + PREPROCESSING: {"sample_size": sample_size}, + } + + model1 = LudwigModel(config, backend=backend) + train_set_1, val_set_1, test_set_1, _ = model1.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + model2 = LudwigModel(config, backend=backend) + train_set_2, val_set_2, test_set_2, _ = model2.preprocess( + data_csv, + skip_save_processed_input=True, + ) + + # Ensure sizes are the same + assert sample_size == len(train_set_1) + len(val_set_1) + len(test_set_1) + assert sample_size == len(train_set_2) + len(val_set_2) + len(test_set_2) + + # Ensure actual rows are the same + if backend == "local": + assert train_set_1.to_df().equals(train_set_2.to_df()) + assert val_set_1.to_df().equals(val_set_2.to_df()) + assert test_set_1.to_df().equals(test_set_2.to_df()) + else: + assert train_set_1.to_df().compute().equals(train_set_2.to_df().compute()) + assert val_set_1.to_df().compute().equals(val_set_2.to_df().compute()) + assert test_set_1.to_df().compute().equals(test_set_2.to_df().compute()) + + def test_strip_whitespace_category(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) @@ -835,17 +937,17 @@ def test_fill_with_mode_different_df_engine(tmpdir, csv_filename, df_engine, ray You are a helpful chatbot. USER: {__sample__}: {country}, {year:.2f} ASSISTANT: """ -expected_task_sample = """instruction: predict the output feature. return only values in {true, false} +expected_task_sample = """Instruction: predict the output feature. Return only values in {true, false} ### -examples: +Examples: ### -input: foo bar -output: true +Input: foo bar +Output: true ### -input: baz quc -output: false +Input: baz quc +Output: false ### -input:""" +Input:""" @pytest.mark.llm @@ -871,7 +973,7 @@ def test_fill_with_mode_different_df_engine(tmpdir, csv_filename, df_engine, ray category_feature(name="country"), number_feature(name="year"), ], - ("you are a helpful chatbot. user: "), + ("You are a helpful chatbot. USER: "), ), ], ids=["task_sample", "multi_col"], @@ -886,7 +988,7 @@ def test_prompt_template(input_features, expected, model_type, backend, tmpdir, data_df = pd.read_csv(data_csv) raw_values = [data_df[input_features[i][COLUMN]].values.tolist() for i in range(len(input_features))] - # Only use the first input featuere (text) and discard the others, which are only used for data gen + # Only use the first input feature (text) and discard the others, which are only used for data gen input_features = input_features[:1] config = { MODEL_TYPE: model_type, @@ -930,7 +1032,7 @@ def test_prompt_template(input_features, expected, model_type, backend, tmpdir, # Test formatting in parametrize uses 2 decimal places of precision raw_text = f"{v:.2f}" else: - raw_text = str(v).lower() + raw_text = str(v) assert raw_text in decoded, f"'{raw_text}' not in '{decoded}'" diff --git a/tests/integration_tests/test_torchscript.py b/tests/integration_tests/test_torchscript.py index b62f078c6ed..a598c15f9af 100644 --- a/tests/integration_tests/test_torchscript.py +++ b/tests/integration_tests/test_torchscript.py @@ -52,6 +52,7 @@ ) +@pytest.mark.integration_tests_e @pytest.mark.parametrize("should_load_model", [True, False]) @pytest.mark.parametrize("model_type", ["ecd", "gbm"]) def test_torchscript(tmpdir, csv_filename, should_load_model, model_type): @@ -219,6 +220,7 @@ def test_torchscript(tmpdir, csv_filename, should_load_model, model_type): assert np.all(original_predictions_df[predictions_column_name] == restored_predictions) +@pytest.mark.integration_tests_e def test_torchscript_e2e_tabular(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) # Configure features to be tested: @@ -275,6 +277,7 @@ def test_torchscript_e2e_tabular(csv_filename, tmpdir): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_binary_only(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) @@ -297,6 +300,7 @@ def test_torchscript_e2e_binary_only(csv_filename, tmpdir): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_tabnet_combiner(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) # Configure features to be tested: @@ -330,6 +334,7 @@ def test_torchscript_e2e_tabnet_combiner(csv_filename, tmpdir): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_audio(csv_filename, tmpdir): data_csv_path = os.path.join(tmpdir, csv_filename) audio_dest_folder = os.path.join(tmpdir, "generated_audio") @@ -354,6 +359,7 @@ def test_torchscript_e2e_audio(csv_filename, tmpdir): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path, tolerance=1e-6) +@pytest.mark.integration_tests_e @pytest.mark.parametrize( "kwargs", [ @@ -381,6 +387,7 @@ def test_torchscript_e2e_image(tmpdir, csv_filename, kwargs): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_text(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [ @@ -405,6 +412,7 @@ def test_torchscript_e2e_text(tmpdir, csv_filename): torch.torch_version.TorchVersion(torchtext.__version__) < (0, 14, 0), reason="requires torchtext 0.14.0 or higher", ) +@pytest.mark.integration_tests_e def test_torchscript_e2e_text_hf_tokenizer(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [text_feature(encoder={"vocab_size": 3, "type": "bert"})] @@ -426,6 +434,7 @@ def test_torchscript_e2e_text_hf_tokenizer(tmpdir, csv_filename): torch.torch_version.TorchVersion(torchtext.__version__) < (0, 14, 0), reason="requires torchtext 0.14.0 or higher", ) +@pytest.mark.integration_tests_e def test_torchscript_e2e_text_hf_tokenizer_truncated_sequence(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [text_feature(encoder={"vocab_size": 3, "type": "bert"}, preprocessing={"max_sequence_length": 3})] @@ -443,6 +452,7 @@ def test_torchscript_e2e_text_hf_tokenizer_truncated_sequence(tmpdir, csv_filena validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_sequence(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [ @@ -462,6 +472,7 @@ def test_torchscript_e2e_sequence(tmpdir, csv_filename): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_timeseries(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [ @@ -481,6 +492,7 @@ def test_torchscript_e2e_timeseries(tmpdir, csv_filename): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_h3(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [ @@ -500,6 +512,7 @@ def test_torchscript_e2e_h3(tmpdir, csv_filename): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e def test_torchscript_e2e_date(tmpdir, csv_filename): data_csv_path = os.path.join(tmpdir, csv_filename) input_features = [ @@ -519,6 +532,7 @@ def test_torchscript_e2e_date(tmpdir, csv_filename): validate_torchscript_outputs(tmpdir, config, backend, training_data_csv_path) +@pytest.mark.integration_tests_e @pytest.mark.parametrize("vector_type", [torch.Tensor, List[torch.Tensor]]) def test_torchscript_preproc_vector_alternative_type(tmpdir, csv_filename, vector_type): data_csv_path = os.path.join(tmpdir, csv_filename) @@ -575,6 +589,7 @@ def transform_vector_list(vector_list, vector_type): assert utils.is_all_close(feature_values, feature_values_expected), f"feature: {feature_name}" +@pytest.mark.integration_tests_e @pytest.mark.parametrize("padding", ["left", "right"]) @pytest.mark.parametrize("fill_value", ["", "1.0"]) def test_torchscript_preproc_timeseries_alternative_type(tmpdir, csv_filename, padding, fill_value): @@ -635,6 +650,7 @@ def transform_timeseries_from_str_list_to_tensor_list(timeseries_list): assert utils.is_all_close(feature_values, feature_values_expected), f'feature "{feature_name}" value mismatch.' +@pytest.mark.integration_tests_e @pytest.mark.parametrize( "feature", [ @@ -698,6 +714,7 @@ def test_torchscript_preproc_with_nans(tmpdir, csv_filename, feature): @pytest.mark.skipif(torch.cuda.device_count() == 0, reason="test requires at least 1 gpu") @pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires gpu support") +@pytest.mark.integration_tests_e @pytest.mark.distributed @pytest.mark.parametrize( "feature_fn", @@ -763,6 +780,7 @@ def test_torchscript_preproc_gpu(tmpdir, csv_filename, feature_fn): @pytest.mark.skipif(torch.cuda.device_count() == 0, reason="test requires at least 1 gpu") @pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires gpu support") +@pytest.mark.integration_tests_e @pytest.mark.distributed @pytest.mark.parametrize( "feature_fn", diff --git a/tests/integration_tests/test_trainer.py b/tests/integration_tests/test_trainer.py index ce28e47fd1f..29151b878ae 100644 --- a/tests/integration_tests/test_trainer.py +++ b/tests/integration_tests/test_trainer.py @@ -11,7 +11,16 @@ from ludwig.api import LudwigModel from ludwig.callbacks import Callback -from ludwig.constants import BATCH_SIZE, MAX_BATCH_SIZE_DATASET_FRACTION, TRAINER +from ludwig.constants import ( + BATCH_SIZE, + EFFECTIVE_BATCH_SIZE, + EPOCHS, + EVAL_BATCH_SIZE, + INPUT_FEATURES, + MAX_BATCH_SIZE_DATASET_FRACTION, + OUTPUT_FEATURES, + TRAINER, +) from ludwig.distributed import init_dist_strategy from tests.integration_tests.utils import ( binary_feature, @@ -67,8 +76,8 @@ def on_trainer_train_teardown(self, trainer, progress_tracker, save_path, is_coo def test_tune_learning_rate(tmpdir): config = { - "input_features": [text_feature(), binary_feature()], - "output_features": [binary_feature()], + INPUT_FEATURES: [text_feature(), binary_feature()], + OUTPUT_FEATURES: [binary_feature()], TRAINER: { "train_steps": 1, BATCH_SIZE: 128, @@ -77,7 +86,7 @@ def test_tune_learning_rate(tmpdir): } csv_filename = os.path.join(tmpdir, "training.csv") - data_csv = generate_data(config["input_features"], config["output_features"], csv_filename) + data_csv = generate_data(config[INPUT_FEATURES], config[OUTPUT_FEATURES], csv_filename) val_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "validation.csv")) test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) @@ -88,9 +97,9 @@ def test_tune_learning_rate(tmpdir): @pytest.mark.parametrize("is_cpu", [True, False]) -@pytest.mark.parametrize("effective_batch_size", ["auto", 256]) -@pytest.mark.parametrize("eval_batch_size", ["auto", None, 128]) -def test_tune_batch_size_and_lr(tmpdir, eval_batch_size, effective_batch_size, is_cpu): +@pytest.mark.parametrize(EFFECTIVE_BATCH_SIZE, ["auto", 256]) +@pytest.mark.parametrize(EVAL_BATCH_SIZE, ["auto", None, 128]) +def test_ecd_tune_batch_size_and_lr(tmpdir, eval_batch_size, effective_batch_size, is_cpu): input_features = [sequence_feature(encoder={"reduce_output": "sum"})] output_features = [ category_feature(decoder={"vocab_size": 2}, reduce_input="sum"), @@ -106,19 +115,19 @@ def test_tune_batch_size_and_lr(tmpdir, eval_batch_size, effective_batch_size, i test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) trainer = { - "epochs": 2, - "effective_batch_size": effective_batch_size, - "batch_size": "auto", + EPOCHS: 2, + EFFECTIVE_BATCH_SIZE: effective_batch_size, + BATCH_SIZE: "auto", "gradient_accumulation_steps": "auto", "learning_rate": "auto", } if eval_batch_size: - trainer["eval_batch_size"] = eval_batch_size + trainer[EVAL_BATCH_SIZE] = eval_batch_size config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: trainer, } @@ -190,11 +199,11 @@ def test_scale_lr(learning_rate_scaling, expected_lr, tmpdir, ray_cluster_2cpu): data_csv = generate_data(input_features, output_features, csv_filename) config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: { - "epochs": 2, + EPOCHS: 2, BATCH_SIZE: 128, "learning_rate": base_lr, "learning_rate_scaling": learning_rate_scaling, @@ -214,11 +223,11 @@ def test_changing_parameters_on_plateau(tmpdir): val_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "validation.csv")) test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: { - "epochs": 2, + EPOCHS: 2, BATCH_SIZE: 128, "learning_rate": 1.0, "reduce_learning_rate_on_plateau": 1, @@ -234,8 +243,8 @@ def test_changing_parameters_on_plateau(tmpdir): def test_lightgbm_dataset_partition(ray_cluster_2cpu): # Create a LightGBM model with a Ray backend config = { - "input_features": [{"name": "in_column", "type": "binary"}], - "output_features": [{"name": "out_column", "type": "binary"}], + INPUT_FEATURES: [{"name": "in_column", "type": "binary"}], + OUTPUT_FEATURES: [{"name": "out_column", "type": "binary"}], "model_type": "gbm", # Disable feature filtering to avoid having no features due to small test dataset, # see https://stackoverflow.com/a/66405983/5222402 @@ -299,13 +308,13 @@ def test_mixed_precision(tmpdir): test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) trainer = { - "epochs": 2, + EPOCHS: 2, "use_mixed_precision": True, } config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: trainer, } @@ -330,13 +339,13 @@ def test_compile(tmpdir): test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) trainer = { - "epochs": 2, + EPOCHS: 2, "compile": True, } config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: trainer, } @@ -359,14 +368,14 @@ def test_gradient_accumulation(gradient_accumulation_steps: int, tmpdir): test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) trainer = { - "epochs": 2, - "batch_size": 8, + EPOCHS: 2, + BATCH_SIZE: 8, "gradient_accumulation_steps": gradient_accumulation_steps, } config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: trainer, } @@ -390,12 +399,12 @@ def test_enable_gradient_checkpointing(tmpdir, caplog): test_csv = shutil.copyfile(data_csv, os.path.join(tmpdir, "test.csv")) config = { - "input_features": input_features, - "output_features": output_features, + INPUT_FEATURES: input_features, + OUTPUT_FEATURES: output_features, "combiner": {"type": "concat", "output_size": 14}, TRAINER: { "train_steps": 2, - "batch_size": 8, + BATCH_SIZE: 8, "enable_gradient_checkpointing": True, }, } diff --git a/tests/ludwig/config_validation/test_checks.py b/tests/ludwig/config_validation/test_checks.py index 44630167fda..613a29a3fd3 100644 --- a/tests/ludwig/config_validation/test_checks.py +++ b/tests/ludwig/config_validation/test_checks.py @@ -489,3 +489,35 @@ def test_check_prompt_requirements(): config["prompt"] = {"task": "Some task", "template": "{__task__}"} ModelConfig.from_dict(config) + + +def test_check_sample_ratio_and_size_compatible(): + config = { + "input_features": [binary_feature()], + "output_features": [binary_feature()], + "model_type": "ecd", + } + ModelConfig.from_dict( + { + "input_features": [binary_feature()], + "output_features": [binary_feature()], + "model_type": "ecd", + } + ) + + config["preprocessing"] = {"sample_size": 10} + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_ratio"] = 1 + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_ratio"] = 0.1 + with pytest.raises(ConfigValidationError): + ModelConfig.from_dict(config) + + config["preprocessing"]["sample_size"] = 0 + with pytest.raises(ConfigValidationError): + ModelConfig.from_dict(config) + + del config["preprocessing"]["sample_size"] + ModelConfig.from_dict(config) diff --git a/tests/ludwig/datasets/test_datasets.py b/tests/ludwig/datasets/test_datasets.py index cd0765f8513..e400c6d2ab5 100644 --- a/tests/ludwig/datasets/test_datasets.py +++ b/tests/ludwig/datasets/test_datasets.py @@ -239,3 +239,20 @@ def test_ad_hoc_dataset_download(tmpdir, dataset_name, size): df = ludwig_dataset.load() assert df is not None assert len(df) >= size + + +def test_hf_dataset_loading(): + import datasets + + loader = ludwig.datasets.get_dataset("hugging_face") + data = loader.load("JeremyAlain/123_test", "data_0") + hf_data = datasets.load_dataset(path="JeremyAlain/123_test", name="data_0") + + assert len(data) == hf_data["train"].num_rows + + train, val, test = loader.load("neil-code/dialogsum-test", None, split=True) + hf_data = datasets.load_dataset(path="neil-code/dialogsum-test") + + assert len(train) == hf_data["train"].num_rows + assert len(val) == hf_data["validation"].num_rows + assert len(test) == hf_data["test"].num_rows diff --git a/tests/ludwig/schema/test_model_config.py b/tests/ludwig/schema/test_model_config.py index 054a274b378..21e2883b989 100644 --- a/tests/ludwig/schema/test_model_config.py +++ b/tests/ludwig/schema/test_model_config.py @@ -950,3 +950,62 @@ def test_llm_quantization_backend_compatibility(): ModelConfig.from_dict(config) ray.shutdown() + + +class TestMaxNewTokensOverride: + def test_max_new_tokens_override_no_changes_to_max_new_tokens(self): + """Tests that the default value for max_new_tokens is respected when explicitly set in the config.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + "generation": {"max_new_tokens": 64}, + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 64 + + def test_max_new_tokens_override_large_max_sequence_length(self): + """Tests that the default value for max_new_tokens is overridden when max_sequence_length is set to a large + value than the default max_new_tokens.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text", "preprocessing": {"max_sequence_length": 100}}], + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 100 + + def test_max_new_tokens_override_large_global_max_sequence_length(self): + """Tests that the default value for max_new_tokens is overridden when global_max_sequence_length is set to + a larger value than the default max_new_tokens.""" + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + PREPROCESSING: {"global_max_sequence_length": 100}, + } + + config_obj = ModelConfig.from_dict(config) + assert config_obj.generation.max_new_tokens == 100 + + def test_max_new_tokens_override_fallback_to_model_window_size(self): + config = { + MODEL_TYPE: MODEL_LLM, + BASE_MODEL: "HuggingFaceH4/tiny-random-LlamaForCausalLM", + INPUT_FEATURES: [{NAME: "text_input", TYPE: "text"}], + # Default value for generation.max_sequence_length is 32 + OUTPUT_FEATURES: [{NAME: "text_output", TYPE: "text"}], + } + + config_obj = ModelConfig.from_dict(config) + # Base model context length is 2048 tokens by default + # Since we fallback to setting max_new_tokens to the model context length / 2, we expect it to be 1024 tokens + assert config_obj.generation.max_new_tokens == 1024 diff --git a/tests/ludwig/utils/test_llm_utils.py b/tests/ludwig/utils/test_llm_utils.py index e652297d285..d79264bf26a 100644 --- a/tests/ludwig/utils/test_llm_utils.py +++ b/tests/ludwig/utils/test_llm_utils.py @@ -1,13 +1,15 @@ import pytest import torch -from transformers import AutoTokenizer +from transformers import AutoConfig, AutoTokenizer from ludwig.constants import LOGITS, PREDICTIONS, PROBABILITIES from ludwig.utils.llm_utils import ( add_left_padding, create_attention_mask, + FALLBACK_CONTEXT_LEN, find_last_matching_index, generate_merged_ids, + get_context_len, has_padding_token, pad_target_tensor_for_fine_tuning, realign_target_and_prediction_tensors_for_inference, @@ -57,6 +59,29 @@ def test_set_pad_token_already_exists(): assert tokenizer.pad_token_id == 1 +class TestSetContextLen: + def test_max_sequence_length(self): + # Test when 'max_sequence_length' is present in the model configuration + config = AutoConfig.from_pretrained("huggyllama/llama-7b") + assert get_context_len(config) == config.max_sequence_length + + def test_max_position_embeddings(self): + # Test when 'max_position_embeddings' is present in the model configuration + config = AutoConfig.from_pretrained("huggyllama/llama-7b") + del config.max_sequence_length + assert get_context_len(config) == config.max_position_embeddings + + def test_n_positions(self): + # Test when 'n_positions' is present in the model configuration + config = AutoConfig.from_pretrained("hf-internal-testing/tiny-random-GPTJForCausalLM") + assert get_context_len(config) == config.n_positions + + def test_default_value(self): + config = AutoConfig.from_pretrained("hf-internal-testing/tiny-random-GPTJForCausalLM") + del config.n_positions + assert get_context_len(config) == FALLBACK_CONTEXT_LEN + + def test_has_padding_token_with_padding_tokens(tokenizer): input_sentence = "This is an example sentence." input_ids = tokenizer([input_sentence]) diff --git a/tests/ludwig/utils/test_strings_utils.py b/tests/ludwig/utils/test_strings_utils.py index 1d23fde7817..14afce1f88b 100644 --- a/tests/ludwig/utils/test_strings_utils.py +++ b/tests/ludwig/utils/test_strings_utils.py @@ -78,7 +78,7 @@ def test_create_vocabulary_chars(): ) vocab = vocabulary.vocab - assert len(vocab) == 24 + assert len(vocab) == 27 assert vocab[strings_utils.SpecialSymbol.START.value] == strings_utils.START_SYMBOL assert vocab[strings_utils.SpecialSymbol.STOP.value] == strings_utils.STOP_SYMBOL assert vocab[strings_utils.SpecialSymbol.PADDING.value] == strings_utils.PADDING_SYMBOL @@ -231,13 +231,13 @@ def test_create_vocabulary_idf(compute_idf: bool): # "sentence" and "and" should be next, as they appear in two docs each assert idf_sorted[1][0] > idf_sorted[0][0] - assert idf_sorted[1][1] == {"sentence", "and"} + assert idf_sorted[1][1] == {"sentence", "And"} # finally, every token that only appears once assert idf_sorted[2][0] > idf_sorted[1][0] assert idf_sorted[2][1] == { ",", - "i", + "I", "'", "one", "very", @@ -246,7 +246,7 @@ def test_create_vocabulary_idf(compute_idf: bool): "m", "!", "last", - "hello", + "Hello", "a", "another", }