From 2374d8a58c0d3bb0c105d0bf32bb322158e1d315 Mon Sep 17 00:00:00 2001 From: Alex Sherstinsky Date: Mon, 9 Oct 2023 10:43:27 -0700 Subject: [PATCH 01/12] [BUGFIX] Pin deepspeed<0.11, skip Horovod tests (#3700) Co-authored-by: Travis Addair --- .github/workflows/pytest.yml | 46 ++++++++++++++++++------------------ requirements_distributed.txt | 2 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index f65b587aa0f..71dfcf1c378 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -163,31 +163,31 @@ jobs: run: | RUN_PRIVATE=$IS_NOT_FORK LUDWIG_TEST_SUITE_TIMEOUT_S=5400 pytest -v --timeout 300 --durations 100 -m "$MARKERS and not slow and not combinatorial and not horovod or benchmark and not llm" --junitxml pytest.xml tests/regression_tests - # Skip Horovod installation for torch nightly. + # Skip Horovod and replace with DDP. # https://github.com/ludwig-ai/ludwig/issues/3468 - - name: Install Horovod if necessary - if: matrix.test-markers == 'distributed' && matrix.pytorch-version != 'nightly' - env: - HOROVOD_WITH_PYTORCH: 1 - HOROVOD_WITHOUT_MPI: 1 - HOROVOD_WITHOUT_TENSORFLOW: 1 - HOROVOD_WITHOUT_MXNET: 1 - run: | - pip install -r requirements_extra.txt - HOROVOD_BUILT=$(python -c "import horovod.torch; horovod.torch.nccl_built(); print('SUCCESS')" || true) - if [[ $HOROVOD_BUILT != "SUCCESS" ]]; then - pip uninstall -y horovod - pip install --no-cache-dir git+https://github.com/horovod/horovod.git@master - fi - horovodrun --check-build - shell: bash - - # Skip Horovod tests for torch nightly. + # - name: Install Horovod if necessary + # if: matrix.test-markers == 'distributed' && matrix.pytorch-version != 'nightly' + # env: + # HOROVOD_WITH_PYTORCH: 1 + # HOROVOD_WITHOUT_MPI: 1 + # HOROVOD_WITHOUT_TENSORFLOW: 1 + # HOROVOD_WITHOUT_MXNET: 1 + # run: | + # pip install -r requirements_extra.txt + # HOROVOD_BUILT=$(python -c "import horovod.torch; horovod.torch.nccl_built(); print('SUCCESS')" || true) + # if [[ $HOROVOD_BUILT != "SUCCESS" ]]; then + # pip uninstall -y horovod + # pip install --no-cache-dir git+https://github.com/horovod/horovod.git@master + # fi + # horovodrun --check-build + # shell: bash + + # Skip Horovod tests and replace with DDP. # https://github.com/ludwig-ai/ludwig/issues/3468 - - name: Horovod Tests - if: matrix.test-markers == 'distributed' && matrix.pytorch-version != 'nightly' - run: | - RUN_PRIVATE=$IS_NOT_FORK LUDWIG_TEST_SUITE_TIMEOUT_S=5400 pytest -v --timeout 300 --durations 100 -m "$MARKERS and horovod and not slow and not combinatorial and not llm" --junitxml pytest.xml tests/ + # - name: Horovod Tests + # if: matrix.test-markers == 'distributed' && matrix.pytorch-version != 'nightly' + # run: | + # RUN_PRIVATE=$IS_NOT_FORK LUDWIG_TEST_SUITE_TIMEOUT_S=5400 pytest -v --timeout 300 --durations 100 -m "$MARKERS and horovod and not slow and not combinatorial and not llm" --junitxml pytest.xml tests/ - name: Upload Unit Test Results if: ${{ always() && !env.ACT }} diff --git a/requirements_distributed.txt b/requirements_distributed.txt index cf6a510ee9e..b44ef77eb94 100644 --- a/requirements_distributed.txt +++ b/requirements_distributed.txt @@ -9,7 +9,7 @@ GPUtil tblib awscli -deepspeed +deepspeed<0.11.0 # 10/7/2023: limiting version to prevent PyTorch installation issues # requirements for daft getdaft[ray] From c0946beca3727b95cb82eb0cafa281fd278ee7a3 Mon Sep 17 00:00:00 2001 From: Travis Addair Date: Mon, 9 Oct 2023 14:06:39 -0700 Subject: [PATCH 02/12] Unpin deepspeed following fix in v0.11.1 (#3706) --- requirements_distributed.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements_distributed.txt b/requirements_distributed.txt index b44ef77eb94..8eb6f4b7467 100644 --- a/requirements_distributed.txt +++ b/requirements_distributed.txt @@ -9,7 +9,7 @@ GPUtil tblib awscli -deepspeed<0.11.0 # 10/7/2023: limiting version to prevent PyTorch installation issues +deepspeed!=0.11.0 # https://github.com/microsoft/DeepSpeed/issues/4473 # requirements for daft getdaft[ray] From f84ee5c4643f424e60dd7cff3747e6c037980a49 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 9 Oct 2023 17:07:09 -0400 Subject: [PATCH 03/12] Move on_epoch_end and epoch increment to after run_evaluation loop. (#3690) --- ludwig/trainers/trainer.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index bf56a1c86f5..7f41c4376be 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -1098,7 +1098,8 @@ def _train_loop( """Completes up to one epoch through the data.""" self.distributed.zero_grad(self.optimizer) batch_idx = 0 - while not batcher.last_batch() and progress_tracker.steps < self.total_steps: + should_break = False + while not batcher.last_batch() and progress_tracker.steps < self.total_steps and not should_break: progress_tracker.learning_rate = self.optimizer.param_groups[0]["lr"] self.callback(lambda c: c.on_batch_start(self, progress_tracker, save_path)) @@ -1149,13 +1150,6 @@ def _train_loop( # batch duration measurements when using timer callbacks. self.callback(lambda c: c.on_batch_end(self, progress_tracker, save_path, sync_step=should_step)) - if batcher.last_batch(): - # We have completed an epoch, so we need to increment the epoch counter. It's important to do this here - # instead of outside of the train loop since it's possible the train loop will exit early due to - # early stopping, or step-based training. - progress_tracker.epoch += 1 - self.callback(lambda c: c.on_epoch_end(self, progress_tracker, save_path)) - if progress_tracker.steps % final_steps_per_checkpoint == 0: if not self.skip_all_evaluation: # Publishes metrics to MLFLow if there are any MLFlow callbacks. @@ -1189,10 +1183,12 @@ def _train_loop( if self.is_coordinator(): progress_tracker.save(os.path.join(save_path, TRAINING_PROGRESS_TRACKER_FILE_NAME)) - if should_break: - return should_break + # If this was the last batch, then increment the epoch counter and invoke the `on_epoch_end` callback. + if batcher.last_batch(): + progress_tracker.epoch += 1 + self.callback(lambda c: c.on_epoch_end(self, progress_tracker, save_path)) - return False + return should_break def train_online(self, dataset): self.dist_model.train() # Sets model training mode. From 9a95a78458dab50b583ec3d8d165b8b98d0c101a Mon Sep 17 00:00:00 2001 From: Infernaught <72055086+Infernaught@users.noreply.github.com> Date: Mon, 9 Oct 2023 17:07:31 -0400 Subject: [PATCH 04/12] Remove model_load_path from experiment (#3707) --- ludwig/api.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ludwig/api.py b/ludwig/api.py index 676dfe07d9d..8035a22fd51 100644 --- a/ludwig/api.py +++ b/ludwig/api.py @@ -1215,7 +1215,6 @@ def experiment( data_format: Optional[str] = None, experiment_name: str = "experiment", model_name: str = "run", - model_load_path: Optional[str] = None, model_resume_path: Optional[str] = None, eval_split: str = TEST, skip_save_training_description: bool = False, @@ -1264,9 +1263,6 @@ def experiment( the experiment. :param model_name: (str, default: `'run'`) name of the model that is being used. - :param model_load_path: (str, default: `None`) if this is specified the - loaded model will be used as initialization - (useful for transfer learning). :param model_resume_path: (str, default: `None`) resumes training of the model from the path specified. The config is restored. In addition to config, training statistics and loss for @@ -1347,7 +1343,6 @@ def experiment( data_format=data_format, experiment_name=experiment_name, model_name=model_name, - model_load_path=model_load_path, model_resume_path=model_resume_path, skip_save_training_description=skip_save_training_description, skip_save_training_statistics=skip_save_training_statistics, From 4b3ccad8566dd3cd54772d32ccb6ce9aba55ef7b Mon Sep 17 00:00:00 2001 From: Alex Sherstinsky Date: Mon, 9 Oct 2023 14:21:19 -0700 Subject: [PATCH 05/12] [FEATURE] Allow typehints without the quotes. (#3699) --- ludwig/backend/base.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/ludwig/backend/base.py b/ludwig/backend/base.py index c3cef78cfc6..77a14a0b104 100644 --- a/ludwig/backend/base.py +++ b/ludwig/backend/base.py @@ -14,11 +14,13 @@ # limitations under the License. # ============================================================================== +from __future__ import annotations + import time from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from typing import Any, Callable, Dict, Optional, Type, Union +from typing import Any, Callable, TYPE_CHECKING import numpy as np import pandas as pd @@ -48,14 +50,17 @@ from ludwig.utils.torch_utils import initialize_pytorch from ludwig.utils.types import DataFrame, Series +if TYPE_CHECKING: + from ludwig.trainers.base import BaseTrainer + @DeveloperAPI class Backend(ABC): def __init__( self, dataset_manager: DatasetManager, - cache_dir: Optional[str] = None, - credentials: Optional[Dict[str, Dict[str, Any]]] = None, + cache_dir: str | None = None, + credentials: dict[str, dict[str, Any]] | None = None, ): credentials = credentials or {} self._dataset_manager = dataset_manager @@ -84,7 +89,7 @@ def initialize_pytorch(self, *args, **kwargs): @contextmanager @abstractmethod - def create_trainer(self, **kwargs) -> "BaseTrainer": # noqa: F821 + def create_trainer(self, **kwargs) -> BaseTrainer: raise NotImplementedError() @abstractmethod @@ -110,7 +115,7 @@ def supports_multiprocessing(self): raise NotImplementedError() @abstractmethod - def read_binary_files(self, column: Series, map_fn: Optional[Callable] = None) -> Series: + def read_binary_files(self, column: Series, map_fn: Callable | None = None) -> Series: raise NotImplementedError() @property @@ -128,11 +133,11 @@ def get_available_resources(self) -> Resources: raise NotImplementedError() @abstractmethod - def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> Union[int, None]: + def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | None: raise NotImplementedError() @abstractmethod - def tune_batch_size(self, evaluator_cls: Type[BatchSizeEvaluator], dataset_len: int) -> int: + def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len: int) -> int: """Returns best batch size (measured in samples / s) on the given evaluator. The evaluator class will need to be instantiated on each worker in the backend cluster, then call @@ -159,9 +164,7 @@ def supports_multiprocessing(self): return True @staticmethod - def read_binary_files( - column: pd.Series, map_fn: Optional[Callable] = None, file_size: Optional[int] = None - ) -> pd.Series: + def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_size: int | None = None) -> pd.Series: column = column.fillna(np.nan).replace([np.nan], [None]) # normalize NaNs to None sample_fname = column.head(1).values[0] @@ -201,7 +204,7 @@ def initialize(): def initialize_pytorch(*args, **kwargs): initialize_pytorch(*args, **kwargs) - def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> "BaseTrainer": # noqa: F821 + def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> BaseTrainer: from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry if model.type() == MODEL_LLM: @@ -229,7 +232,7 @@ def is_coordinator() -> bool: return True @staticmethod - def tune_batch_size(evaluator_cls: Type[BatchSizeEvaluator], dataset_len: int) -> int: + def tune_batch_size(evaluator_cls: type[BatchSizeEvaluator], dataset_len: int) -> int: evaluator = evaluator_cls() return evaluator.select_best_batch_size(dataset_len) @@ -272,7 +275,7 @@ def num_training_workers(self) -> int: def get_available_resources(self) -> Resources: return Resources(cpus=psutil.cpu_count(), gpus=torch.cuda.device_count()) - def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> Union[int, None]: + def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | None: # Every trial will be run with Pandas and NO Ray Datasets. Allow Ray Tune to use all the # trial resources it wants, because there is no Ray Datasets process to compete with it for CPUs. return None @@ -284,7 +287,7 @@ class DataParallelBackend(LocalPreprocessingMixin, Backend, ABC): def __init__(self, **kwargs): super().__init__(dataset_manager=PandasDatasetManager(self), **kwargs) - self._distributed: Optional[DistributedStrategy] = None + self._distributed: DistributedStrategy | None = None @abstractmethod def initialize(self): @@ -295,7 +298,7 @@ def initialize_pytorch(self, *args, **kwargs): *args, local_rank=self._distributed.local_rank(), local_size=self._distributed.local_size(), **kwargs ) - def create_trainer(self, **kwargs) -> "BaseTrainer": # noqa: F821 + def create_trainer(self, **kwargs) -> BaseTrainer: from ludwig.trainers.trainer import Trainer return Trainer(distributed=self._distributed, **kwargs) @@ -343,10 +346,10 @@ def get_available_resources(self) -> Resources: return Resources(cpus=cpus, gpus=gpus) - def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> Union[int, None]: + def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | None: # Return None since there is no Ray component return None - def tune_batch_size(self, evaluator_cls: Type[BatchSizeEvaluator], dataset_len: int) -> int: + def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len: int) -> int: evaluator = evaluator_cls() return evaluator.select_best_batch_size(dataset_len) From 2772e9a36fffc1bc9f7d8403a0fbca1ca5b8258e Mon Sep 17 00:00:00 2001 From: connor-mccorm <97468934+connor-mccorm@users.noreply.github.com> Date: Tue, 10 Oct 2023 08:39:30 -0700 Subject: [PATCH 06/12] Add consumer complaints generation dataset (#3685) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Justin Zhao --- .../consumer_complaints_generation.yaml | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 ludwig/datasets/configs/consumer_complaints_generation.yaml diff --git a/ludwig/datasets/configs/consumer_complaints_generation.yaml b/ludwig/datasets/configs/consumer_complaints_generation.yaml new file mode 100644 index 00000000000..476773459ed --- /dev/null +++ b/ludwig/datasets/configs/consumer_complaints_generation.yaml @@ -0,0 +1,28 @@ +version: 1.0 +name: consumer_complaints_generation +download_urls: https://predibase-public-us-west-2.s3.us-west-2.amazonaws.com/datasets/consumer_complaints_gen_tutorial.csv +train_filenames: consumer_complaints_gen_tutorial.csv +description: | + The dataset contains different information of complaints that customers have made about a multiple products and + services in the financial sector, such us Credit Reports, Student Loans, Money Transfer, etc. The date of each + complaint ranges from November 2011 to May 2019. The dataset has been modified to be used for text generation. + We have added a structured JSON field that contains a company generated response to the raised complaint. The idea + is to fine-tune an LLM to generate this output JSON field. +columns: + - name: Date received + type: Date + - name: Generated Company Response + type: text + - name: Complaint ID + type: number + - name: Issue + type: text + - name: Product + type: text + - name: Structured JSON Output + type: text + - name: Consumer complaint narrative + type: text +output_features: + - name: Structured JSON Output + type: text From 626d9fc8b49d79cfda042939405af2aaa62c73ac Mon Sep 17 00:00:00 2001 From: Infernaught <72055086+Infernaught@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:58:54 -0400 Subject: [PATCH 07/12] Set the metadata only during first training run (#3684) Co-authored-by: Justin Zhao --- ludwig/api.py | 10 ++++++++++ tests/integration_tests/test_api.py | 25 +++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/ludwig/api.py b/ludwig/api.py index 8035a22fd51..0afea35b1e0 100644 --- a/ludwig/api.py +++ b/ludwig/api.py @@ -444,6 +444,16 @@ def train( `(training_set, validation_set, test_set)`. `output_directory` filepath to where training results are stored. """ + # Only reset the metadata if the model has not been trained before + if self.training_set_metadata: + logger.warning( + "This model has been trained before. Its architecture has been defined by the original training set " + "(for example, the number of possible categorical outputs). The current training data will be mapped " + "to this architecture. If you want to change the architecture of the model, please concatenate your " + "new training data with the original and train a new model from scratch." + ) + training_set_metadata = self.training_set_metadata + if self._user_config.get(HYPEROPT): print_boxed("WARNING") logger.warning(HYPEROPT_WARNING) diff --git a/tests/integration_tests/test_api.py b/tests/integration_tests/test_api.py index 95879c8326f..e7a9d5102ba 100644 --- a/tests/integration_tests/test_api.py +++ b/tests/integration_tests/test_api.py @@ -740,3 +740,28 @@ def test_saved_weights_in_checkpoint(tmpdir): input_feature_encoder = saved_input_feature["encoder"] assert "saved_weights_in_checkpoint" in input_feature_encoder assert input_feature_encoder["saved_weights_in_checkpoint"] + + +def test_constant_metadata(tmpdir): + input_features = [category_feature(encoder={"vocab_size": 5})] + output_features = [category_feature(name="class", decoder={"vocab_size": 5}, output_feature=True)] + + data_csv1 = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset1.csv")) + val_csv1 = shutil.copyfile(data_csv1, os.path.join(tmpdir, "validation1.csv")) + test_csv1 = shutil.copyfile(data_csv1, os.path.join(tmpdir, "test1.csv")) + + config = { + "input_features": input_features, + "output_features": output_features, + } + model = LudwigModel(config) + model.train(training_set=data_csv1, validation_set=val_csv1, test_set=test_csv1, output_directory=tmpdir) + metadata1 = model.training_set_metadata + + data_csv2 = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset2.csv"), num_examples=10) + val_csv2 = shutil.copyfile(data_csv2, os.path.join(tmpdir, "validation2.csv")) + test_csv2 = shutil.copyfile(data_csv2, os.path.join(tmpdir, "test2.csv")) + model.train(training_set=data_csv2, validation_set=val_csv2, test_set=test_csv2, output_directory=tmpdir) + metadata2 = model.training_set_metadata + + assert metadata1 == metadata2 From 7c3a5492c60de495812058c10de94eab1c2a1dbf Mon Sep 17 00:00:00 2001 From: Martin Date: Wed, 11 Oct 2023 12:38:47 -0400 Subject: [PATCH 08/12] Add ability to upload Ludwig models to Predibase. (#3687) --- ludwig/cli.py | 2 +- ludwig/upload.py | 23 +++- ludwig/utils/upload_utils.py | 229 +++++++++++++++++++++++++++++++---- requirements_extra.txt | 3 + 4 files changed, 230 insertions(+), 27 deletions(-) diff --git a/ludwig/cli.py b/ludwig/cli.py index c89bc47f7cb..345a69d0e7d 100644 --- a/ludwig/cli.py +++ b/ludwig/cli.py @@ -56,7 +56,7 @@ def __init__(self): init_config Initialize a user config from a dataset and targets render_config Renders the fully populated config with all defaults set check_install Runs a quick training run on synthetic data to verify installation status - upload Push trained model artifacts to a registry (e.g., HuggingFace Hub) + upload Push trained model artifacts to a registry (e.g., Predibase, HuggingFace Hub) """, ) parser.add_argument("command", help="Subcommand to run") diff --git a/ludwig/upload.py b/ludwig/upload.py index 4cc86e488ad..fa626f3a1e1 100644 --- a/ludwig/upload.py +++ b/ludwig/upload.py @@ -4,7 +4,7 @@ from typing import Optional from ludwig.utils.print_utils import get_logging_level_registry -from ludwig.utils.upload_utils import HuggingFaceHub +from ludwig.utils.upload_utils import HuggingFaceHub, Predibase logger = logging.getLogger(__name__) @@ -12,6 +12,7 @@ def get_upload_registry(): return { "hf_hub": HuggingFaceHub, + "predibase": Predibase, } @@ -23,6 +24,8 @@ def upload_cli( private: bool = False, commit_message: str = "Upload trained [Ludwig](https://ludwig.ai/latest/) model weights", commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, **kwargs, ) -> None: """Create an empty repo on the HuggingFace Hub and upload trained model artifacts to that repo. @@ -30,7 +33,7 @@ def upload_cli( Args: service (`str`): Name of the hosted model service to push the trained artifacts to. - Currently, this only supports `hf_hub`. + Currently, this only supports `hf_hub` and `predibase`. repo_id (`str`): A namespace (user or an organization) and a repo name separated by a `/`. @@ -49,10 +52,15 @@ def upload_cli( `f"Upload {path_in_repo} with huggingface_hub"` commit_description (`str` *optional*): The description of the generated commit + dataset_file (`str`, *optional*): + The path to the dataset file. Required if `service` is set to + `"predibase"` for new model repos. + dataset_name (`str`, *optional*): + The name of the dataset. Used by the `service` + `"predibase"`. """ model_service = get_upload_registry().get(service, "hf_hub") hub = model_service() - hub.login() hub.upload( repo_id=repo_id, model_path=model_path, @@ -60,6 +68,8 @@ def upload_cli( private=private, commit_message=commit_message, commit_description=commit_description, + dataset_file=dataset_file, + dataset_name=dataset_name, ) @@ -77,7 +87,7 @@ def cli(sys_argv): "service", help="Name of the model repository service.", default="hf_hub", - choices=["hf_hub"], + choices=["hf_hub", "predibase"], ) parser.add_argument( @@ -115,6 +125,11 @@ def cli(sys_argv): choices=["critical", "error", "warning", "info", "debug", "notset"], ) + parser.add_argument("-df", "--dataset_file", help="The location of the dataset file", default=None) + parser.add_argument( + "-dn", "--dataset_name", help="(Optional) The name of the dataset in the Provider", default=None + ) + args = parser.parse_args(sys_argv) args.logging_level = get_logging_level_registry()[args.logging_level] diff --git a/ludwig/utils/upload_utils.py b/ludwig/utils/upload_utils.py index 4dc193d6410..cc166170282 100644 --- a/ludwig/utils/upload_utils.py +++ b/ludwig/utils/upload_utils.py @@ -37,6 +37,8 @@ def upload( private: Optional[bool] = False, commit_message: Optional[str] = None, commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, ) -> bool: """Abstract method to upload trained model artifacts to the target repository. @@ -68,9 +70,7 @@ def _validate_upload_parameters( trained model artifacts to the target repository. Args: - repo_id (str): The ID of the target repository. It must be a namespace (user or an organization) - and a repository name separated by a '/'. For example, if your HF username is 'johndoe' and you - want to create a repository called 'test', the repo_id should be 'johndoe/test'. + repo_id (str): The ID of the target repository. Each provider will verify their specific rules. model_path (str): The path to the directory containing the trained model artifacts. It should contain the model's weights, usually saved under 'model/model_weights'. repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses @@ -85,18 +85,10 @@ def _validate_upload_parameters( implementations. Defaults to None. Raises: - AssertionError: If the repo_id does not have both a namespace and a repo name separated by a '/'. FileNotFoundError: If the model_path does not exist. Exception: If the trained model artifacts are not found at the expected location within model_path, or if the artifacts are not in the required format (i.e., 'pytorch_model.bin' or 'adapter_model.bin'). """ - # Validate repo_id has both a namespace and a repo name - assert "/" in repo_id, ( - "`repo_id` must be a namespace (user or an organization) and a repo name separated by a `/`." - " For example, if your HF username is `johndoe` and you want to create a repository called `test`, the" - " repo_id should be johndoe/test" - ) - # Make sure the model's save path is actually a valid path if not os.path.exists(model_path): raise FileNotFoundError(f"The path '{model_path}' does not exist.") @@ -110,21 +102,11 @@ def _validate_upload_parameters( "wrong during training where the model's weights were not saved." ) - # Make sure the model's saved artifacts either contain: - # 1. pytorch_model.bin -> regular model training, such as ECD or for LLMs - # 2. adapter_model.bin -> LLM fine-tuning using PEFT - files = set(os.listdir(trained_model_artifacts_path)) - if "pytorch_model.bin" not in files and "adapter_model.bin" not in files: - raise Exception( - f"Can't find model weights at {trained_model_artifacts_path}. Trained model weights should " - "either be saved as `pytorch_model.bin` for regular model training, or have `adapter_model.bin`" - "if using parameter efficient fine-tuning methods like LoRA." - ) - class HuggingFaceHub(BaseModelUpload): def __init__(self): self.api = None + self.login() def login(self): """Login to huggingface hub using the token stored in ~/.cache/huggingface/token and returns a HfApi client @@ -142,6 +124,68 @@ def login(self): self.api = hf_api + @staticmethod + def _validate_upload_parameters( + repo_id: str, + model_path: str, + repo_type: Optional[str] = None, + private: Optional[bool] = False, + commit_message: Optional[str] = None, + commit_description: Optional[str] = None, + ): + """Validate parameters before uploading trained model artifacts. + + This method checks if the input parameters meet the necessary requirements before uploading + trained model artifacts to the target repository. + + Args: + repo_id (str): The ID of the target repository. It must be a namespace (user or an organization) + and a repository name separated by a '/'. For example, if your HF username is 'johndoe' and you + want to create a repository called 'test', the repo_id should be 'johndoe/test'. + model_path (str): The path to the directory containing the trained model artifacts. It should contain + the model's weights, usually saved under 'model/model_weights'. + repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses + may use it for specific repository implementations. Defaults to None. + private (bool, optional): Whether the repository should be private or not. Not used in the base class, + but subclasses may use it for specific repository implementations. Defaults to False. + commit_message (str, optional): A message to attach to the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + commit_description (str, optional): A description of the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + + Raises: + ValueError: If the repo_id does not have both a namespace and a repo name separated by a '/'. + """ + # Validate repo_id has both a namespace and a repo name + if "/" not in repo_id: + raise ValueError( + "`repo_id` must be a namespace (user or an organization) and a repo name separated by a `/`." + " For example, if your HF username is `johndoe` and you want to create a repository called `test`, the" + " repo_id should be johndoe/test" + ) + BaseModelUpload._validate_upload_parameters( + repo_id, + model_path, + repo_type, + private, + commit_message, + commit_description, + ) + + trained_model_artifacts_path = os.path.join(model_path, "model", "model_weights") + # Make sure the model's saved artifacts either contain: + # 1. pytorch_model.bin -> regular model training, such as ECD or for LLMs + # 2. adapter_model.bin -> LLM fine-tuning using PEFT + files = set(os.listdir(trained_model_artifacts_path)) + if "pytorch_model.bin" not in files and "adapter_model.bin" not in files: + raise Exception( + f"Can't find model weights at {trained_model_artifacts_path}. Trained model weights should " + "either be saved as `pytorch_model.bin` for regular model training, or have `adapter_model.bin`" + "if using parameter efficient fine-tuning methods like LoRA." + ) + def upload( self, repo_id: str, @@ -150,6 +194,7 @@ def upload( private: Optional[bool] = False, commit_message: Optional[str] = None, commit_description: Optional[str] = None, + **kwargs, ) -> bool: """Create an empty repo on the HuggingFace Hub and upload trained model artifacts to that repo. @@ -205,3 +250,143 @@ def upload( return True return False + + +class Predibase(BaseModelUpload): + def __init__(self): + self.pc = None + self.login() + + def login(self): + """Login to Predibase using the token stored in the PREDIBASE_API_TOKEN environment variable and return a + PredibaseClient object that can be used to interact with Predibase.""" + from predibase import PredibaseClient + + token = os.environ.get("PREDIBASE_API_TOKEN") + if token is None: + raise ValueError( + "Unable to find PREDIBASE_API_TOKEN environment variable. Please log into Predibase, " + "generate a token and use `export PREDIBASE_API_TOKEN=` to use Predibase" + ) + + try: + pc = PredibaseClient() + + # TODO: Check if subscription has expired + + self.pc = pc + except Exception as e: + raise Exception(f"Failed to login to Predibase: {e}") + return False + + return True + + @staticmethod + def _validate_upload_parameters( + repo_id: str, + model_path: str, + repo_type: Optional[str] = None, + private: Optional[bool] = False, + commit_message: Optional[str] = None, + commit_description: Optional[str] = None, + ): + """Validate parameters before uploading trained model artifacts. + + This method checks if the input parameters meet the necessary requirements before uploading + trained model artifacts to the target repository. + + Args: + repo_id (str): The ID of the target repository. It must be a less than 256 characters. + model_path (str): The path to the directory containing the trained model artifacts. It should contain + the model's weights, usually saved under 'model/model_weights'. + repo_type (str, optional): The type of the repository. Not used in the base class, but subclasses + may use it for specific repository implementations. Defaults to None. + private (bool, optional): Whether the repository should be private or not. Not used in the base class, + but subclasses may use it for specific repository implementations. Defaults to False. + commit_message (str, optional): A message to attach to the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + commit_description (str, optional): A description of the commit when uploading to version control + systems. Not used in the base class, but subclasses may use it for specific repository + implementations. Defaults to None. + + Raises: + ValueError: If the repo_id is too long. + """ + if len(repo_id) > 255: + raise ValueError("`repo_id` must be 255 characters or less.") + + BaseModelUpload._validate_upload_parameters( + repo_id, + model_path, + repo_type, + private, + commit_message, + commit_description, + ) + + def upload( + self, + repo_id: str, + model_path: str, + commit_description: Optional[str] = None, + dataset_file: Optional[str] = None, + dataset_name: Optional[str] = None, + **kwargs, + ) -> bool: + """Create an empty repo in Predibase and upload trained model artifacts to that repo. + + Args: + model_path (`str`): + The path of the saved model. This is the top level directory where + the models weights as well as other associated training artifacts + are saved. + repo_name (`str`): + A repo name. + repo_description (`str` *optional*): + The description of the repo. + dataset_file (`str` *optional*): + The path to the dataset file. Required if `service` is set to + `"predibase"` for new model repos. + dataset_name (`str` *optional*): + The name of the dataset. Used by the `service` + `"predibase"`. Falls back to the filename. + """ + # Validate upload parameters are in the right format + Predibase._validate_upload_parameters( + repo_id, + model_path, + None, + False, + "", + commit_description, + ) + + # Upload the dataset to Predibase + try: + dataset = self.pc.upload_dataset(file_path=dataset_file, name=dataset_name) + except Exception as e: + raise RuntimeError("Failed to upload dataset to Predibase") from e + + # Create empty model repo using repo_name, but it is okay if it already exists. + try: + repo = self.pc.create_model_repo( + name=repo_id, + description=commit_description, + exists_ok=True, + ) + except Exception as e: + raise RuntimeError("Failed to create repo in Predibase") from e + + # Upload the zip file to Predibase + try: + self.pc.upload_model( + repo=repo, + model_path=model_path, + dataset=dataset, + ) + except Exception as e: + raise RuntimeError("Failed to upload model to Predibase") from e + + logger.info(f"Model uploaded to Predibase with repository name `{repo_id}`") + return True diff --git a/requirements_extra.txt b/requirements_extra.txt index 4be0e04497a..26fe48eb998 100644 --- a/requirements_extra.txt +++ b/requirements_extra.txt @@ -3,3 +3,6 @@ horovod[pytorch]>=0.24.0,!=0.26.0 # alternative to Dask modin[ray] + +# Allows users to upload +predibase>=2023.10.2 From 6d74d21d71ecc7c125598a93f600b95aa35e2811 Mon Sep 17 00:00:00 2001 From: Justin Date: Wed, 11 Oct 2023 13:25:38 -0400 Subject: [PATCH 09/12] Log additional per-GPU information in model metadata files and GPU utilization on tensorboard. (#3712) --- ludwig/api.py | 36 ++++++++++++++++++++++++++---------- ludwig/trainers/trainer.py | 8 ++++++++ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/ludwig/api.py b/ludwig/api.py index 0afea35b1e0..c85fc67c3c5 100644 --- a/ludwig/api.py +++ b/ludwig/api.py @@ -2141,6 +2141,31 @@ def kfold_cross_validate( return kfold_cv_stats, kfold_split_indices +def _get_compute_description(backend) -> Dict: + """Returns the compute description for the backend.""" + compute_description = {"num_nodes": backend.num_nodes} + + if torch.cuda.is_available(): + # Assumption: All nodes are of the same instance type. + # TODO: fix for Ray where workers may be of different skus + compute_description.update( + { + "gpus_per_node": torch.cuda.device_count(), + "arch_list": torch.cuda.get_arch_list(), + "gencode_flags": torch.cuda.get_gencode_flags(), + "devices": {}, + } + ) + for i in range(torch.cuda.device_count()): + compute_description["devices"][i] = { + "gpu_type": torch.cuda.get_device_name(i), + "device_capability": torch.cuda.get_device_capability(i), + "device_properties": str(torch.cuda.get_device_properties(i)), + } + + return compute_description + + @PublicAPI def get_experiment_description( config, @@ -2184,15 +2209,6 @@ def get_experiment_description( description["config"] = config description["torch_version"] = torch.__version__ - - gpu_info = {} - if torch.cuda.is_available(): - # Assumption: All nodes are of the same instance type. - # TODO: fix for Ray where workers may be of different skus - gpu_info = {"gpu_type": torch.cuda.get_device_name(0), "gpus_per_node": torch.cuda.device_count()} - - compute_description = {"num_nodes": backend.num_nodes, **gpu_info} - - description["compute"] = compute_description + description["compute"] = _get_compute_description(backend) return description diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 7f41c4376be..10f5af9a4ca 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -487,6 +487,14 @@ def write_step_summary(cls, train_summary_writer, combined_loss, all_losses, ste / (1000**3), global_step=step, ) + + # Utilization. + # https://pytorch.org/docs/stable/generated/torch.cuda.utilization.html#torch.cuda.utilization + train_summary_writer.add_scalar( + f"cuda/device{i}/utilization", + torch.cuda.device(i).utilization(), + global_step=step, + ) train_summary_writer.flush() def is_cpu_training(self): From 90d4e42fd4db8eade8c2dc2efc092f6bc93dc689 Mon Sep 17 00:00:00 2001 From: Arnav Garg <106701836+arnavgarg1@users.noreply.github.com> Date: Wed, 11 Oct 2023 20:34:55 +0300 Subject: [PATCH 10/12] QoL: Only log generation config being used once at inference time (#3715) --- ludwig/models/llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ludwig/models/llm.py b/ludwig/models/llm.py index bf43d025531..05acf8446bc 100644 --- a/ludwig/models/llm.py +++ b/ludwig/models/llm.py @@ -412,7 +412,7 @@ def generate( mask=None, ) -> Dict[str, torch.Tensor]: """Generates tokens using the model.""" - logger.info(f"For generating text, using: {self.generation}") + log_once(f"For generating text, using: {self.generation}") input_ids, _ = self._unpack_inputs(inputs) with torch.no_grad(): From 7bc00527218e4600ee6e4a674d501fa96cbeb031 Mon Sep 17 00:00:00 2001 From: Alex Sherstinsky Date: Wed, 11 Oct 2023 21:59:23 -0700 Subject: [PATCH 11/12] [MAINTENANCE] Adding typehint annotations in backend and data components and fixing mypy errors. (#3709) --- ludwig/backend/base.py | 57 +++++++++++++++++++++++------------ ludwig/data/dataset/base.py | 23 ++++++++------ ludwig/data/dataset/pandas.py | 33 ++++++++++++-------- 3 files changed, 70 insertions(+), 43 deletions(-) diff --git a/ludwig/backend/base.py b/ludwig/backend/base.py index 77a14a0b104..112a8afe35e 100644 --- a/ludwig/backend/base.py +++ b/ludwig/backend/base.py @@ -20,7 +20,7 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from contextlib import contextmanager -from typing import Any, Callable, TYPE_CHECKING +from typing import Any, Callable, Generator, TYPE_CHECKING import numpy as np import pandas as pd @@ -89,7 +89,7 @@ def initialize_pytorch(self, *args, **kwargs): @contextmanager @abstractmethod - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> Generator: raise NotImplementedError() @abstractmethod @@ -146,7 +146,9 @@ def tune_batch_size(self, evaluator_cls: type[BatchSizeEvaluator], dataset_len: raise NotImplementedError() @abstractmethod - def batch_transform(self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform( + self, df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None + ) -> DataFrame: """Applies `transform_fn` to every `batch_size` length batch of `df` and returns the result.""" raise NotImplementedError() @@ -171,7 +173,9 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si with ThreadPoolExecutor() as executor: # number of threads is inferred if isinstance(sample_fname, str): if map_fn is read_audio_from_path: # bypass torchaudio issue that no longer takes in file-like objects - result = executor.map(lambda path: map_fn(path) if path is not None else path, column.values) + result = executor.map( # type: ignore[misc] + lambda path: map_fn(path) if path is not None else path, column.values + ) else: result = executor.map( lambda path: get_bytes_obj_from_path(path) if path is not None else path, column.values @@ -186,7 +190,7 @@ def read_binary_files(column: pd.Series, map_fn: Callable | None = None, file_si return pd.Series(result, index=column.index, name=column.name) @staticmethod - def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str = None) -> DataFrame: + def batch_transform(df: DataFrame, batch_size: int, transform_fn: Callable, name: str | None = None) -> DataFrame: name = name or "Batch Transform" batches = to_batches(df, batch_size) transform = transform_fn() @@ -204,21 +208,11 @@ def initialize(): def initialize_pytorch(*args, **kwargs): initialize_pytorch(*args, **kwargs) - def create_trainer(self, config: BaseTrainerConfig, model: BaseModel, **kwargs) -> BaseTrainer: - from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry - - if model.type() == MODEL_LLM: - trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) - else: - trainer_cls = get_from_registry(model.type(), get_trainers_registry()) - - return trainer_cls(config=config, model=model, **kwargs) - @staticmethod def create_predictor(model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, **kwargs) + return get_predictor_cls(model.type())(model, **kwargs) # type: ignore[call-arg] def sync_model(self, model): pass @@ -254,14 +248,16 @@ def is_coordinator() -> bool: class LocalBackend(LocalPreprocessingMixin, LocalTrainingMixin, Backend): BACKEND_TYPE = "local" + _shared_instance: LocalBackend + @classmethod - def shared_instance(cls): + def shared_instance(cls) -> LocalBackend: """Returns a shared singleton LocalBackend instance.""" if not hasattr(cls, "_shared_instance"): cls._shared_instance = cls() return cls._shared_instance - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: super().__init__(dataset_manager=PandasDatasetManager(self), **kwargs) @property @@ -280,6 +276,22 @@ def max_concurrent_trials(self, hyperopt_config: HyperoptConfigDict) -> int | No # trial resources it wants, because there is no Ray Datasets process to compete with it for CPUs. return None + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] + from ludwig.trainers.registry import get_llm_trainers_registry, get_trainers_registry + + trainer_cls: type + if model.type() == MODEL_LLM: + trainer_cls = get_from_registry(config.type, get_llm_trainers_registry()) + else: + trainer_cls = get_from_registry(model.type(), get_trainers_registry()) + + return trainer_cls(config=config, model=model, **kwargs) + @DeveloperAPI class DataParallelBackend(LocalPreprocessingMixin, Backend, ABC): @@ -298,7 +310,12 @@ def initialize_pytorch(self, *args, **kwargs): *args, local_rank=self._distributed.local_rank(), local_size=self._distributed.local_size(), **kwargs ) - def create_trainer(self, **kwargs) -> BaseTrainer: + def create_trainer( + self, + config: BaseTrainerConfig, + model: BaseModel, + **kwargs, + ) -> BaseTrainer: # type: ignore[override] from ludwig.trainers.trainer import Trainer return Trainer(distributed=self._distributed, **kwargs) @@ -306,7 +323,7 @@ def create_trainer(self, **kwargs) -> BaseTrainer: def create_predictor(self, model: BaseModel, **kwargs): from ludwig.models.predictor import get_predictor_cls - return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) + return get_predictor_cls(model.type())(model, distributed=self._distributed, **kwargs) # type: ignore[call-arg] def sync_model(self, model): # Model weights are only saved on the coordinator, so broadcast diff --git a/ludwig/data/dataset/base.py b/ludwig/data/dataset/base.py index 729c0bc4569..ea548a0ea10 100644 --- a/ludwig/data/dataset/base.py +++ b/ludwig/data/dataset/base.py @@ -14,10 +14,13 @@ # limitations under the License. # ============================================================================== +from __future__ import annotations + import contextlib from abc import ABC, abstractmethod -from typing import Iterable, Optional +from typing import Iterable +from ludwig.data.batcher.base import Batcher from ludwig.distributed import DistributedStrategy from ludwig.features.base_feature import BaseFeature from ludwig.utils.defaults import default_random_seed @@ -26,7 +29,7 @@ class Dataset(ABC): @abstractmethod - def __len__(self): + def __len__(self) -> int: raise NotImplementedError() @contextlib.contextmanager @@ -38,36 +41,36 @@ def initialize_batcher( random_seed: int = default_random_seed, ignore_last: bool = False, distributed: DistributedStrategy = None, - ): + ) -> Batcher: raise NotImplementedError() @abstractmethod - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @abstractmethod - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: raise NotImplementedError() @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: raise NotImplementedError() class DatasetManager(ABC): @abstractmethod - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: raise NotImplementedError() @abstractmethod - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: raise NotImplementedError() @abstractmethod - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: raise NotImplementedError() @property @abstractmethod - def data_format(self): + def data_format(self) -> str: raise NotImplementedError() diff --git a/ludwig/data/dataset/pandas.py b/ludwig/data/dataset/pandas.py index 6cec6ec6bf8..279da884515 100644 --- a/ludwig/data/dataset/pandas.py +++ b/ludwig/data/dataset/pandas.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== + +from __future__ import annotations + import contextlib -from typing import Iterable, Optional +from typing import Iterable, TYPE_CHECKING import numpy as np from pandas import DataFrame from ludwig.constants import PREPROCESSING, TRAINING +from ludwig.data.batcher.base import Batcher from ludwig.data.batcher.random_access import RandomAccessBatcher from ludwig.data.dataset.base import Dataset, DatasetManager from ludwig.data.sampler import DistributedSampler @@ -31,6 +35,9 @@ from ludwig.utils.fs_utils import download_h5 from ludwig.utils.misc_utils import get_proc_features +if TYPE_CHECKING: + from ludwig.backend.base import Backend + class PandasDataset(Dataset): def __init__(self, dataset, features, data_hdf5_fp): @@ -42,13 +49,13 @@ def __init__(self, dataset, features, data_hdf5_fp): dataset = load_hdf5(dataset) self.dataset = to_numpy_dataset(dataset) - def to_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: """Convert the dataset to a Pandas DataFrame.""" if features: return from_numpy_dataset({feature.feature_name: self.dataset[feature.proc_column] for feature in features}) return from_numpy_dataset(self.dataset) - def to_scalar_df(self, features: Optional[Iterable[BaseFeature]] = None) -> DataFrame: + def to_scalar_df(self, features: Iterable[BaseFeature] | None = None) -> DataFrame: return to_scalar_df(self.to_df(features)) def get(self, proc_column, idx=None): @@ -76,18 +83,18 @@ def get(self, proc_column, idx=None): indices = indices[:, np.argsort(indices[1])] return im_data[indices[2, :]] - def get_dataset(self): + def get_dataset(self) -> dict[str, np.ndarray]: return self.dataset def __len__(self): return self.size @property - def processed_data_fp(self) -> Optional[str]: + def processed_data_fp(self) -> str | None: return self.data_hdf5_fp @property - def in_memory_size_bytes(self): + def in_memory_size_bytes(self) -> int: df = self.to_df() return df.memory_usage(deep=True).sum() if df is not None else 0 @@ -100,7 +107,7 @@ def initialize_batcher( ignore_last: bool = False, distributed: DistributedStrategy = None, augmentation_pipeline=None, - ): + ) -> Batcher: sampler = DistributedSampler( len(self), shuffle=should_shuffle, random_seed=random_seed, distributed=distributed ) @@ -115,21 +122,21 @@ def initialize_batcher( class PandasDatasetManager(DatasetManager): - def __init__(self, backend): - self.backend = backend + def __init__(self, backend: Backend): + self.backend: Backend = backend - def create(self, dataset, config, training_set_metadata): + def create(self, dataset, config, training_set_metadata) -> Dataset: return PandasDataset(dataset, get_proc_features(config), training_set_metadata.get(DATA_TRAIN_HDF5_FP)) - def save(self, cache_path, dataset, config, training_set_metadata, tag): + def save(self, cache_path, dataset, config, training_set_metadata, tag) -> Dataset: save_hdf5(cache_path, dataset) if tag == TRAINING: training_set_metadata[DATA_TRAIN_HDF5_FP] = cache_path return dataset - def can_cache(self, skip_save_processed_input): + def can_cache(self, skip_save_processed_input) -> bool: return self.backend.is_coordinator() and not skip_save_processed_input @property - def data_format(self): + def data_format(self) -> str: return "hdf5" From fd9147870fe2ea77ca75ff93c94519708abec73a Mon Sep 17 00:00:00 2001 From: Arnav Garg <106701836+arnavgarg1@users.noreply.github.com> Date: Thu, 12 Oct 2023 08:22:05 +0300 Subject: [PATCH 12/12] QoL: Limit top-level trainer logging messages such as saving model or resuming model training to main coordinator process (#3718) --- ludwig/trainers/trainer.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 10f5af9a4ca..e4a0e875882 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -743,7 +743,8 @@ def run_evaluation( else: # There's no validation, so we save the model. if not self.skip_save_model: - logger.info("Saving model.\n") + if self.is_coordinator(): + logger.info("Saving model.\n") checkpoint_manager.save_best(progress_tracker.steps) self.callback(lambda c: c.on_save_best_checkpoint(self, progress_tracker, save_path)) @@ -836,7 +837,8 @@ def train( try: progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) - logger.info("Resuming training from previous run.") + if self.is_coordinator(): + logger.info("Resuming training from previous run.") except Exception: # This may happen if model training is interrupted after the progress tracker is initialized # but before any real training progress is made. @@ -849,7 +851,8 @@ def train( ), output_features=output_features, ) - logger.info("Failed to resume training from previous run. Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Failed to resume training from previous run. Creating fresh model training run.") else: progress_tracker = get_new_progress_tracker( batch_size=self.batch_size, @@ -858,7 +861,8 @@ def train( best_increase_batch_size_eval_metric=get_initial_validation_value(self.increase_batch_size_eval_metric), output_features=output_features, ) - logger.info("Creating fresh model training run.") + if self.is_coordinator(): + logger.info("Creating fresh model training run.") # Distributed: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when