diff --git a/python/ray/train/_internal/storage.py b/python/ray/train/_internal/storage.py index d318dba0394e8..2135979fd5ec1 100644 --- a/python/ray/train/_internal/storage.py +++ b/python/ray/train/_internal/storage.py @@ -236,7 +236,6 @@ def _list_at_fs_path(fs: pyarrow.fs.FileSystem, fs_path: str) -> List[str]: If the path doesn't exist, returns an empty list. """ - selector = pyarrow.fs.FileSelector(fs_path, allow_not_found=True, recursive=False) return [ os.path.relpath(file_info.path.lstrip("/"), start=fs_path.lstrip("/")) @@ -391,7 +390,7 @@ class StorageContext: ... storage_path=None, ... experiment_dir_name="exp_name", ... ) - >>> storage.storage_path # Auto-resolved + >>> storage.storage_fs_path # Auto-resolved '/tmp/ray_results' >>> storage.storage_local_path '/tmp/ray_results' @@ -438,7 +437,7 @@ def __init__( # If `storage_path=None`, then set it to the local path. # Invariant: (`storage_filesystem`, `storage_path`) is the location where # *all* results can be accessed. - self.storage_path = storage_path or ray_storage_uri or self.storage_local_path + storage_path = storage_path or ray_storage_uri or self.storage_local_path self.experiment_dir_name = experiment_dir_name self.trial_dir_name = trial_dir_name self.current_checkpoint_index = current_checkpoint_index @@ -447,7 +446,7 @@ def __init__( ) self.storage_filesystem, self.storage_fs_path = get_fs_and_path( - self.storage_path, storage_filesystem + storage_path, storage_filesystem ) self.storage_fs_path = Path(self.storage_fs_path).as_posix() @@ -471,17 +470,16 @@ def __init__( self._check_validation_file() def __str__(self): - attrs = [ - "storage_path", - "storage_local_path", - "storage_filesystem", - "storage_fs_path", - "experiment_dir_name", - "trial_dir_name", - "current_checkpoint_index", - ] - attr_str = "\n".join([f" {attr}={getattr(self, attr)}" for attr in attrs]) - return f"StorageContext<\n{attr_str}\n>" + return ( + "StorageContext<\n" + f" storage_filesystem='{self.storage_filesystem.type_name}',\n" + f" storage_fs_path='{self.storage_fs_path}',\n" + f" storage_local_path='{self.storage_local_path}',\n" + f" experiment_dir_name='{self.experiment_dir_name}',\n" + f" trial_dir_name='{self.trial_dir_name}',\n" + f" current_checkpoint_index={self.current_checkpoint_index},\n" + ">" + ) def _create_validation_file(self): """On the creation of a storage context, create a validation file at the @@ -502,7 +500,7 @@ def _check_validation_file(self): ) if not _exists_at_fs_path(fs=self.storage_filesystem, fs_path=valid_file): raise RuntimeError( - f"Unable to set up cluster storage at storage_path={self.storage_path}" + f"Unable to set up cluster storage with the following settings:\n{self}" "\nCheck that all nodes in the cluster have read/write access " "to the configured storage path." ) diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index 9028fbfa7d6c3..7891dfccef507 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -231,6 +231,25 @@ def restore( that the head node crashes (e.g., OOM or some other runtime error) or the entire cluster goes down (e.g., network error affecting all nodes). + A run that has already completed successfully will not be resumed from this API. + To continue training from a successful run, launch a new run with the + ``Trainer(resume_from_checkpoint)`` API instead, passing in a + checkpoint from the previous run to start with. + + .. note:: + + Restoring an experiment from a path that's pointing to a *different* + location than the original experiment path is supported. However, Ray Train + assumes that the full experiment directory is available + (including checkpoints) so that it's possible to resume trials from their + latest state. + + For example, if the original experiment path was run locally, then the + results are uploaded to cloud storage, Ray Train expects the full contents + to be available in cloud storage if attempting to resume + via ``Trainer.restore("s3://...")``. The restored run will + continue writing results to the same cloud storage location. + The following example can be paired with implementing job retry using :ref:`Ray Jobs ` to produce a Train experiment that will attempt to resume on both experiment-level and trial-level failures: @@ -280,6 +299,9 @@ def training_loop(self): path: The path to the experiment directory of the training run to restore. This can be a local path or a remote URI if the experiment was uploaded to the cloud. + storage_filesystem: Custom ``pyarrow.fs.FileSystem`` + corresponding to the ``path``. This may be necessary if the original + experiment passed in a custom filesystem. datasets: Re-specified datasets used in the original training run. This must include all the datasets that were passed in the original trainer constructor. diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index d4845f2d675d4..e89169d0bb4f1 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -312,9 +312,9 @@ py_test( ) py_test( - name = "test_trial_relative_logdir", - size = "medium", - srcs = ["tests/test_trial_relative_logdir.py"], + name = "test_trial", + size = "small", + srcs = ["tests/test_trial.py"], deps = [":tune_lib"], tags = ["team:ml", "exclusive"], ) diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index f21110918e9e1..2366f5c7e9e64 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -1,9 +1,11 @@ +import copy import fnmatch import io import json import logging from numbers import Number import os +from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union import pyarrow.fs @@ -122,12 +124,20 @@ def _load_trials(self) -> List[Trial]: with self._fs.open_input_stream(self._experiment_json_fs_path) as f: experiment_state = json.loads(f.readall(), cls=TuneFunctionDecoder) + experiment_fs_path = Path(self._experiment_fs_path) + trials = [] trial_states = experiment_state["trial_data"] for trial_json_state, trial_runtime_metadata in trial_states: trial = Trial.from_json_state(trial_json_state, stub=True) trial.restore_run_metadata(trial_runtime_metadata) - # TODO(justinvyu): [handle_moved_storage_path] + + new_storage = copy.copy(trial.storage) + new_storage.storage_fs_path = experiment_fs_path.parent.as_posix() + new_storage.storage_filesystem = self._fs + new_storage.experiment_dir_name = experiment_fs_path.name + trial.set_storage(new_storage) + trials.append(trial) return trials diff --git a/python/ray/tune/execution/experiment_state.py b/python/ray/tune/execution/experiment_state.py index 306521405df50..736b0dcb97ab1 100644 --- a/python/ray/tune/execution/experiment_state.py +++ b/python/ray/tune/execution/experiment_state.py @@ -1,8 +1,8 @@ from collections import Counter from dataclasses import dataclass +from pathlib import Path from typing import Callable, Dict, Optional, Tuple, Union -import pyarrow import click import logging import os @@ -11,7 +11,11 @@ from ray.air._internal.remote_storage import list_at_uri from ray.air._internal.uri_utils import _join_path_or_uri -from ray.train._internal.storage import StorageContext +from ray.train._internal.storage import ( + StorageContext, + _download_from_fs_path, + _list_at_fs_path, +) from ray.tune.experiment import Trial from ray.tune.impl.out_of_band_serialize_dataset import out_of_band_serialize_dataset @@ -341,26 +345,25 @@ def sync_up(self, force: bool = False, wait: bool = False) -> bool: def sync_down_experiment_state(self) -> None: fs = self._storage.storage_filesystem - file_infos = fs.get_file_info( - pyarrow.fs.FileSelector(self._storage.experiment_fs_path) - ) + filepaths = _list_at_fs_path(fs=fs, fs_path=self._storage.experiment_fs_path) # TODO(ekl) we should refactor our restore code to read the necessary data # directly from the storage context. As a temporary hack, restore all the # serialized files from the root dir where other modules expect them to be. - matches = [] - for file_info in file_infos: - name = os.path.basename(file_info.path) - if name.endswith(".json") or name.endswith(".pkl"): - matches.append(name) - for name in matches: - remote_path = os.path.join(self._storage.experiment_fs_path, name) - local_path = os.path.join(self._storage.experiment_local_path, name) - pyarrow.fs.copy_files( - remote_path, - local_path, - source_filesystem=self._storage.storage_filesystem, - ) - logger.debug(f"Copied {matches} from:\n{remote_path}\n-> {local_path}") + matches = [ + path + for path in filepaths + if path.endswith(".json") or path.endswith(".pkl") + ] + for relpath in matches: + fs_path = Path(self._storage.experiment_fs_path, relpath).as_posix() + local_path = Path(self._storage.experiment_local_path, relpath).as_posix() + _download_from_fs_path(fs=fs, fs_path=fs_path, local_path=local_path) + logger.debug( + f"Copied {matches} from:\n(fs, path) = " + f"({self._storage.storage_filesystem.type_name}, " + f"{self._storage.experiment_fs_path})\n" + f"-> {self._storage.experiment_local_path}" + ) def _resume_auto(self) -> bool: experiment_local_path = self._storage.experiment_local_path diff --git a/python/ray/tune/execution/tune_controller.py b/python/ray/tune/execution/tune_controller.py index 74270ea8ba989..4d66ccaaf7d40 100644 --- a/python/ray/tune/execution/tune_controller.py +++ b/python/ray/tune/execution/tune_controller.py @@ -437,9 +437,13 @@ def restore_from_dir(self) -> List[Trial]: # Ex: moved local/cloud experiment directory # Propagate updated storage ctx properties to the trial's restored copy. - # TODO(justinvyu): [handle_moved_storage_path] - trial.storage.storage_path = self._storage.storage_path - trial.storage.experiment_dir_name = self._storage.experiment_dir_name + new_storage = copy.copy(trial.storage) + new_storage.storage_filesystem = self._storage.storage_filesystem + new_storage.storage_fs_path = self._storage.storage_fs_path + new_storage.experiment_dir_name = self._storage.experiment_dir_name + # ATTN: `trial.set_storage` is used intentionally, since it + # also updates the absolute paths and filesystem of tracked checkpoints. + trial.set_storage(new_storage) # Avoid creating logdir in client mode for returned trial results, # since the dir might not be creatable locally. diff --git a/python/ray/tune/experiment/trial.py b/python/ray/tune/experiment/trial.py index e99e9385ee101..28adefa1792dc 100644 --- a/python/ray/tune/experiment/trial.py +++ b/python/ray/tune/experiment/trial.py @@ -699,6 +699,36 @@ def set_experiment_tag(self, experiment_tag): self.experiment_tag = experiment_tag self.invalidate_json_state() + def set_storage(self, new_storage: StorageContext): + """Updates the storage context of the trial. + + If the `storage_path` or `experiment_dir_name` has changed, then this setter + also updates the paths of all checkpoints tracked by the checkpoint manager. + This enables restoration from a checkpoint if the user moves the directory. + """ + original_storage = self.storage + + checkpoint_manager = self.run_metadata.checkpoint_manager + + for checkpoint_result in checkpoint_manager.best_checkpoint_results: + checkpoint_result.checkpoint = Checkpoint( + path=checkpoint_result.checkpoint.path.replace( + original_storage.trial_fs_path, new_storage.trial_fs_path, 1 + ), + filesystem=new_storage.storage_filesystem, + ) + latest_checkpoint_result = checkpoint_manager.latest_checkpoint_result + if latest_checkpoint_result: + latest_checkpoint_result.checkpoint = Checkpoint( + path=latest_checkpoint_result.checkpoint.path.replace( + original_storage.trial_fs_path, new_storage.trial_fs_path, 1 + ), + filesystem=new_storage.storage_filesystem, + ) + + self.storage = new_storage + self.invalidate_json_state() + @property def num_failures(self): return self.run_metadata.num_failures diff --git a/python/ray/tune/result_grid.py b/python/ray/tune/result_grid.py index ac30732b7bc6c..f30c2c288b362 100644 --- a/python/ray/tune/result_grid.py +++ b/python/ray/tune/result_grid.py @@ -254,6 +254,7 @@ def num_terminated(self): def _populate_exception(trial: Trial) -> Optional[Union[TuneError, RayTaskError]]: if trial.status == Trial.TERMINATED: return None + # TODO(justinvyu): [populate_exception] for storage_path != None if trial.pickled_error_file and os.path.exists(trial.pickled_error_file): with open(trial.pickled_error_file, "rb") as f: e = cloudpickle.load(f) diff --git a/python/ray/tune/tests/test_result_grid.py b/python/ray/tune/tests/test_result_grid.py index 9d275b48c0361..c5262eac1aaf8 100644 --- a/python/ray/tune/tests/test_result_grid.py +++ b/python/ray/tune/tests/test_result_grid.py @@ -209,11 +209,6 @@ def train_fn(config): assert isinstance(result_grid.errors[0], RuntimeError) -def test_result_grid_moved_experiment_path(ray_start_2_cpus, tmpdir): - # TODO(justinvyu): [handle_moved_storage_path] - pytest.skip("Not implemented yet.") - - if __name__ == "__main__": import sys diff --git a/python/ray/tune/tests/test_trial_relative_logdir.py b/python/ray/tune/tests/test_trial.py similarity index 52% rename from python/ray/tune/tests/test_trial_relative_logdir.py rename to python/ray/tune/tests/test_trial.py index e35656b54b793..f9986c77aa6bd 100644 --- a/python/ray/tune/tests/test_trial_relative_logdir.py +++ b/python/ray/tune/tests/test_trial.py @@ -2,6 +2,9 @@ import sys import pytest +from ray.train import Checkpoint +from ray.train._internal.session import _TrainingResult +from ray.train._internal.storage import StorageContext from ray.tune.experiment import Trial from ray.train.tests.util import mock_storage_context @@ -27,9 +30,38 @@ def test_load_trial_from_json_state(): assert new_trial.get_json_state()[0] == json_state -def test_change_trial_local_dir(tmpdir): - # TODO(justinvyu): [handle_moved_storage_path] - pytest.skip("Changing the storage path is not supported yet.") +def test_set_storage(tmp_path): + """Test that setting the trial's storage context will update the tracked + checkpoint paths.""" + original_storage = mock_storage_context() + trial = Trial( + "MockTrainable", + stub=True, + trial_id="abcd1234", + storage=original_storage, + ) + + result_1 = _TrainingResult( + checkpoint=Checkpoint.from_directory(original_storage.checkpoint_fs_path), + metrics={}, + ) + trial.on_checkpoint(result_1) + + result_2 = _TrainingResult( + checkpoint=Checkpoint.from_directory(original_storage.checkpoint_fs_path), + metrics={}, + ) + trial.on_checkpoint(result_2) + + new_storage = StorageContext( + storage_path=tmp_path / "new_storage_path", + experiment_dir_name="new_name", + trial_dir_name="new_trial", + ) + trial.set_storage(new_storage) + + assert result_1.checkpoint.path.startswith(new_storage.trial_fs_path) + assert result_2.checkpoint.path.startswith(new_storage.trial_fs_path) def test_trial_logdir_length(): diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index 141cdee64a5bd..9957b8db57ebd 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -25,6 +25,7 @@ _upload_to_fs_path, ) from ray.tune import Callback, Trainable +from ray.tune.analysis import ExperimentAnalysis from ray.tune.execution.experiment_state import _find_newest_experiment_checkpoint from ray.tune.experiment import Trial from ray.tune.result_grid import ResultGrid @@ -743,22 +744,21 @@ def create_trainable_with_params(): assert not results.errors -# TODO(justinvyu): [handle_moved_storage_path] -@pytest.mark.skip("Restoring from a moved storage path is not supported yet.") -@pytest.mark.parametrize("use_tune_run", [True, False]) +@pytest.mark.parametrize("use_tune_run", [True]) def test_tuner_restore_from_moved_experiment_path( ray_start_2_cpus, tmp_path, use_tune_run ): """Check that restoring a Tuner from a moved experiment directory works.""" # Create a fail_marker dummy file that causes the first Tune run to fail and # the second run to succeed + os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = str(tmp_path / "local_dir") fail_marker = tmp_path / "fail_marker" fail_marker.write_text("", encoding="utf-8") - old_local_dir = tmp_path / "ray_results" + old_storage_path = tmp_path / "ray_results" old_exp_name = "exp_dir" - new_local_dir = tmp_path / "new_ray_results" + new_storage_path = tmp_path / "new_ray_results" new_exp_name = "new_exp_dir" # Initial training run (that errors out in the middle) @@ -770,25 +770,35 @@ def test_tuner_restore_from_moved_experiment_path( ), run_config=RunConfig( name=old_exp_name, - storage_path=str(old_local_dir), + storage_path=str(old_storage_path), checkpoint_config=CheckpointConfig(num_to_keep=num_to_keep), ), param_space={ "failing_hanging": (fail_marker, None), }, ) - results = tuner.fit() - assert len(results.errors) == 1 + tuner.fit() + + # Move experiment from `tmp_path/ray_results/exp_dir` + # to `tmp_path/moved_ray_results/new_exp_dir`, changing both `storage_path` and + # the experiment `name` + shutil.move(str(old_storage_path), str(new_storage_path)) + os.rename( + str(new_storage_path / old_exp_name), str(new_storage_path / new_exp_name) + ) + + # Check that the results can be read from the new location. + restore_path = str(new_storage_path / new_exp_name) + results = ResultGrid(ExperimentAnalysis(restore_path)) + + # TODO(justinvyu): [populate_exception] for storage_path != None + # assert len(results.errors) == 1 training_iteration = results[0].metrics["training_iteration"] assert ( training_iteration == 1 ), f"Should only have 1 train.report before erroring, got {training_iteration}" - - # Move experiment from `tmp_path/ray_results/exp_dir` - # to `tmp_path/moved_ray_results/new_exp_dir`, changing both `local_dir` and - # the experiment `name` - shutil.move(str(old_local_dir), str(new_local_dir)) - os.rename(str(new_local_dir / old_exp_name), str(new_local_dir / new_exp_name)) + assert results[0].checkpoint.path.endswith("checkpoint_000000") + assert "new_exp_dir" in results[0].checkpoint.path del tuner # Remove fail_marker so that the restored Tuner doesn't error again @@ -799,18 +809,18 @@ def test_tuner_restore_from_moved_experiment_path( analysis = tune.run( _train_fn_sometimes_failing, name=new_exp_name, - storage_path=str(new_local_dir), + storage_path=str(new_storage_path), resume="AUTO+ERRORED", ) results = ResultGrid(analysis) else: - restore_path = str(new_local_dir / new_exp_name) tuner = Tuner.restore( restore_path, trainable=_train_fn_sometimes_failing, resume_errored=True ) results = tuner.fit() assert len(results.errors) == 0 + # Check that we restored iter=1, then made 2 calls to train.report -> iter=3 training_iteration = results[0].metrics["training_iteration"] assert training_iteration == 3, training_iteration @@ -819,21 +829,12 @@ def test_tuner_restore_from_moved_experiment_path( assert results[0].checkpoint assert len(results[0].best_checkpoints) == num_to_keep checkpoint_dirs = [ - path - for path in os.listdir(results[0].log_dir) - if path.startswith("checkpoint_") + path for path in os.listdir(results[0].path) if path.startswith("checkpoint_") ] assert sorted(checkpoint_dirs) == ["checkpoint_000001", "checkpoint_000002"] # Make sure that we did not create a logdir in the old location - assert not old_local_dir.exists() - - -# TODO(justinvyu): [handle_moved_storage_path] -@pytest.mark.skip("Restoring from a moved storage path is not supported yet.") -def test_tuner_restore_from_moved_cloud_uri(ray_start_2_cpus, tmp_path): - """Test that restoring an experiment that was moved to a new remote URI - resumes and continues saving new results at that URI.""" + assert not old_storage_path.exists() def test_custom_searcher_and_scheduler_restore(ray_start_2_cpus, tmpdir): diff --git a/python/ray/tune/trainable/metadata.py b/python/ray/tune/trainable/metadata.py index 6e4807d6f0419..5b1ae11a818d2 100644 --- a/python/ray/tune/trainable/metadata.py +++ b/python/ray/tune/trainable/metadata.py @@ -3,6 +3,7 @@ from numbers import Number from typing import Tuple, Optional +from ray.train._internal.checkpoint_manager import _CheckpointManager from ray.tune.utils.serialization import TuneFunctionEncoder, TuneFunctionDecoder @@ -36,7 +37,7 @@ def __init__(self, n_steps: Tuple[int] = (5, 10)): self.metric_n_steps = {} # Checkpoints - self.checkpoint_manager = None + self.checkpoint_manager: Optional[_CheckpointManager] = None self._cached_json = None diff --git a/python/ray/tune/tuner.py b/python/ray/tune/tuner.py index d730fa50f8b0c..66ac04eb7955c 100644 --- a/python/ray/tune/tuner.py +++ b/python/ray/tune/tuner.py @@ -207,12 +207,26 @@ def restore( their latest checkpoints. The latter will restart errored trials from scratch and prevent loading their last checkpoints. + .. note:: + + Restoring an experiment from a path that's pointing to a *different* + location than the original experiment path is supported. + However, Ray Tune assumes that the full experiment directory is available + (including checkpoints) so that it's possible to resume trials from their + latest state. + + For example, if the original experiment path was run locally, + then the results are uploaded to cloud storage, Ray Tune expects the full + contents to be available in cloud storage if attempting to resume + via ``Tuner.restore("s3://...")``. The restored run will continue + writing results to the same cloud storage location. + Args: - path: The path where the previous failed run is checkpointed. + path: The local or remote path of the experiment directory + for an interrupted or failed run. + Note that an experiment where all trials finished will not be resumed. This information could be easily located near the end of the console output of previous run. - Note: depending on whether ray client mode is used or not, - this path may or may not exist on your local machine. trainable: The trainable to use upon resuming the experiment. This should be the same trainable that was used to initialize the original Tuner. @@ -230,6 +244,9 @@ def restore( restore from their latest checkpoints. restart_errored: If True, will re-schedule errored trials but force restarting them from scratch (no checkpoint will be loaded). + storage_filesystem: Custom ``pyarrow.fs.FileSystem`` + corresponding to the ``path``. This may be necessary if the original + experiment passed in a custom filesystem. """ # TODO(xwjiang): Add some comments to clarify the config behavior across # retored runs.