Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune/train] Restore Tuner and results properly from moved storage path #40647

Merged
merged 16 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ py_test(
)

py_test(
name = "test_trial_relative_logdir",
size = "medium",
srcs = ["tests/test_trial_relative_logdir.py"],
name = "test_trial",
size = "small",
srcs = ["tests/test_trial.py"],
deps = [":tune_lib"],
tags = ["team:ml", "exclusive"],
)
Expand Down
12 changes: 11 additions & 1 deletion python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import copy
import fnmatch
import io
import json
import logging
from numbers import Number
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

import pyarrow.fs
Expand Down Expand Up @@ -122,12 +124,20 @@ def _load_trials(self) -> List[Trial]:
with self._fs.open_input_stream(self._experiment_json_fs_path) as f:
experiment_state = json.loads(f.readall(), cls=TuneFunctionDecoder)

experiment_fs_path = Path(self._experiment_fs_path)

trials = []
trial_states = experiment_state["trial_data"]
for trial_json_state, trial_runtime_metadata in trial_states:
trial = Trial.from_json_state(trial_json_state, stub=True)
trial.restore_run_metadata(trial_runtime_metadata)
# TODO(justinvyu): [handle_moved_storage_path]

new_storage = copy.copy(trial.storage)
Copy link
Member

@woshiyyya woshiyyya Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if a experiment directory originally on S3, then moved to disk. Will we automatically detect the and build a new file system object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tuner.restore("local_path") would pass in a local storage path and would be resolved into a local filesystem. I should add a test for this.

new_storage.storage_fs_path = experiment_fs_path.parent.as_posix()
new_storage.storage_filesystem = self._fs
new_storage.experiment_dir_name = experiment_fs_path.name
trial.set_storage(new_storage)
justinvyu marked this conversation as resolved.
Show resolved Hide resolved

trials.append(trial)
return trials

Expand Down
10 changes: 7 additions & 3 deletions python/ray/tune/execution/tune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,13 @@ def restore_from_dir(self) -> List[Trial]:
# Ex: moved local/cloud experiment directory

# Propagate updated storage ctx properties to the trial's restored copy.
# TODO(justinvyu): [handle_moved_storage_path]
trial.storage.storage_path = self._storage.storage_path
trial.storage.experiment_dir_name = self._storage.experiment_dir_name
new_storage = copy.copy(trial.storage)
new_storage.storage_filesystem = self._storage.storage_filesystem
new_storage.storage_fs_path = self._storage.storage_fs_path
new_storage.experiment_dir_name = self._storage.experiment_dir_name
# ATTN: `trial.set_storage` is used intentionally, since it
# also updates the absolute paths and filesystem of tracked checkpoints.
trial.set_storage(new_storage)

# Avoid creating logdir in client mode for returned trial results,
# since the dir might not be creatable locally.
Expand Down
30 changes: 30 additions & 0 deletions python/ray/tune/experiment/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,36 @@ def set_experiment_tag(self, experiment_tag):
self.experiment_tag = experiment_tag
self.invalidate_json_state()

def set_storage(self, new_storage: StorageContext):
"""Updates the storage context of the trial.

If the `storage_path` or `experiment_dir_name` has changed, then this setter
also updates the paths of all checkpoints tracked by the checkpoint manager.
This enables restoration from a checkpoint if the user moves the directory.
"""
original_storage = self.storage

checkpoint_manager = self.run_metadata.checkpoint_manager

for checkpoint_result in checkpoint_manager.best_checkpoint_results:
checkpoint_result.checkpoint = Checkpoint(
path=checkpoint_result.checkpoint.path.replace(
original_storage.trial_fs_path, new_storage.trial_fs_path, 1
),
filesystem=new_storage.storage_filesystem,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating the storage filesystem means that we require resuming from an experiment directory that contains everything -- it's not possible to start training on S3, download everything except for checkpoints to local, then restore from local.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense... let's document this somewhere?

Copy link
Member

@woshiyyya woshiyyya Oct 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Given that checkpoints are stored in trial directory (also experiment directory), generally the users will download everything together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. However, I think the reason people do this is because loading results from cloud is not so easy.

)
latest_checkpoint_result = checkpoint_manager.latest_checkpoint_result
if latest_checkpoint_result:
latest_checkpoint_result.checkpoint = Checkpoint(
path=latest_checkpoint_result.checkpoint.path.replace(
original_storage.trial_fs_path, new_storage.trial_fs_path, 1
),
filesystem=new_storage.storage_filesystem,
)

self.storage = new_storage
self.invalidate_json_state()

@property
def num_failures(self):
return self.run_metadata.num_failures
Expand Down
1 change: 1 addition & 0 deletions python/ray/tune/result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def num_terminated(self):
def _populate_exception(trial: Trial) -> Optional[Union[TuneError, RayTaskError]]:
if trial.status == Trial.TERMINATED:
return None
# TODO(justinvyu): [populate_exception] for storage_path != None
if trial.pickled_error_file and os.path.exists(trial.pickled_error_file):
with open(trial.pickled_error_file, "rb") as f:
e = cloudpickle.load(f)
Expand Down
5 changes: 0 additions & 5 deletions python/ray/tune/tests/test_result_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,6 @@ def train_fn(config):
assert isinstance(result_grid.errors[0], RuntimeError)


def test_result_grid_moved_experiment_path(ray_start_2_cpus, tmpdir):
# TODO(justinvyu): [handle_moved_storage_path]
pytest.skip("Not implemented yet.")
Comment on lines -212 to -214
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test got merged with the tuner restore test



if __name__ == "__main__":
import sys

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import sys
import pytest

from ray.train import Checkpoint
from ray.train._internal.session import _TrainingResult
from ray.train._internal.storage import StorageContext
from ray.tune.experiment import Trial

from ray.train.tests.util import mock_storage_context
Expand All @@ -27,9 +30,38 @@ def test_load_trial_from_json_state():
assert new_trial.get_json_state()[0] == json_state


def test_change_trial_local_dir(tmpdir):
# TODO(justinvyu): [handle_moved_storage_path]
pytest.skip("Changing the storage path is not supported yet.")
def test_set_storage(tmp_path):
"""Test that setting the trial's storage context will update the tracked
checkpoint paths."""
original_storage = mock_storage_context()
trial = Trial(
"MockTrainable",
stub=True,
trial_id="abcd1234",
storage=original_storage,
)

result_1 = _TrainingResult(
checkpoint=Checkpoint.from_directory(original_storage.checkpoint_fs_path),
metrics={},
)
trial.on_checkpoint(result_1)

result_2 = _TrainingResult(
checkpoint=Checkpoint.from_directory(original_storage.checkpoint_fs_path),
metrics={},
)
trial.on_checkpoint(result_2)

new_storage = StorageContext(
storage_path=tmp_path / "new_storage_path",
experiment_dir_name="new_name",
trial_dir_name="new_trial",
)
trial.set_storage(new_storage)

assert result_1.checkpoint.path.startswith(new_storage.trial_fs_path)
assert result_2.checkpoint.path.startswith(new_storage.trial_fs_path)


def test_trial_logdir_length():
Expand Down
52 changes: 26 additions & 26 deletions python/ray/tune/tests/test_tuner_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
_upload_to_fs_path,
)
from ray.tune import Callback, Trainable
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.execution.experiment_state import _find_newest_experiment_checkpoint
from ray.tune.experiment import Trial
from ray.tune.result_grid import ResultGrid
Expand Down Expand Up @@ -743,8 +744,6 @@ def create_trainable_with_params():
assert not results.errors


# TODO(justinvyu): [handle_moved_storage_path]
@pytest.mark.skip("Restoring from a moved storage path is not supported yet.")
@pytest.mark.parametrize("use_tune_run", [True, False])
def test_tuner_restore_from_moved_experiment_path(
ray_start_2_cpus, tmp_path, use_tune_run
Expand All @@ -755,10 +754,10 @@ def test_tuner_restore_from_moved_experiment_path(
fail_marker = tmp_path / "fail_marker"
fail_marker.write_text("", encoding="utf-8")

old_local_dir = tmp_path / "ray_results"
old_storage_path = tmp_path / "ray_results"
old_exp_name = "exp_dir"

new_local_dir = tmp_path / "new_ray_results"
new_storage_path = tmp_path / "new_ray_results"
new_exp_name = "new_exp_dir"

# Initial training run (that errors out in the middle)
Expand All @@ -770,25 +769,35 @@ def test_tuner_restore_from_moved_experiment_path(
),
run_config=RunConfig(
name=old_exp_name,
storage_path=str(old_local_dir),
storage_path=str(old_storage_path),
checkpoint_config=CheckpointConfig(num_to_keep=num_to_keep),
),
param_space={
"failing_hanging": (fail_marker, None),
},
)
results = tuner.fit()
assert len(results.errors) == 1
tuner.fit()

# Move experiment from `tmp_path/ray_results/exp_dir`
# to `tmp_path/moved_ray_results/new_exp_dir`, changing both `storage_path` and
# the experiment `name`
shutil.move(str(old_storage_path), str(new_storage_path))
os.rename(
str(new_storage_path / old_exp_name), str(new_storage_path / new_exp_name)
)

# Check that the results can be read from the new location.
restore_path = str(new_storage_path / new_exp_name)
results = ResultGrid(ExperimentAnalysis(restore_path))

# TODO(justinvyu): [populate_exception] for storage_path != None
# assert len(results.errors) == 1
training_iteration = results[0].metrics["training_iteration"]
Comment on lines +794 to 796
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work right now because it's looking for the error locally at ~/ray_results rather than the storage path. (Has always been a gap in functionality)

assert (
training_iteration == 1
), f"Should only have 1 train.report before erroring, got {training_iteration}"

# Move experiment from `tmp_path/ray_results/exp_dir`
# to `tmp_path/moved_ray_results/new_exp_dir`, changing both `local_dir` and
# the experiment `name`
shutil.move(str(old_local_dir), str(new_local_dir))
os.rename(str(new_local_dir / old_exp_name), str(new_local_dir / new_exp_name))
assert results[0].checkpoint.path.endswith("checkpoint_000000")
assert "new_exp_dir" in results[0].checkpoint.path

del tuner
# Remove fail_marker so that the restored Tuner doesn't error again
Expand All @@ -799,18 +808,18 @@ def test_tuner_restore_from_moved_experiment_path(
analysis = tune.run(
_train_fn_sometimes_failing,
name=new_exp_name,
storage_path=str(new_local_dir),
storage_path=str(new_storage_path),
resume="AUTO+ERRORED",
)
results = ResultGrid(analysis)
else:
restore_path = str(new_local_dir / new_exp_name)
tuner = Tuner.restore(
restore_path, trainable=_train_fn_sometimes_failing, resume_errored=True
)
results = tuner.fit()

assert len(results.errors) == 0

# Check that we restored iter=1, then made 2 calls to train.report -> iter=3
training_iteration = results[0].metrics["training_iteration"]
assert training_iteration == 3, training_iteration
Expand All @@ -819,21 +828,12 @@ def test_tuner_restore_from_moved_experiment_path(
assert results[0].checkpoint
assert len(results[0].best_checkpoints) == num_to_keep
checkpoint_dirs = [
path
for path in os.listdir(results[0].log_dir)
if path.startswith("checkpoint_")
path for path in os.listdir(results[0].path) if path.startswith("checkpoint_")
]
assert sorted(checkpoint_dirs) == ["checkpoint_000001", "checkpoint_000002"]

# Make sure that we did not create a logdir in the old location
assert not old_local_dir.exists()


# TODO(justinvyu): [handle_moved_storage_path]
@pytest.mark.skip("Restoring from a moved storage path is not supported yet.")
def test_tuner_restore_from_moved_cloud_uri(ray_start_2_cpus, tmp_path):
"""Test that restoring an experiment that was moved to a new remote URI
resumes and continues saving new results at that URI."""
assert not old_storage_path.exists()


def test_custom_searcher_and_scheduler_restore(ray_start_2_cpus, tmpdir):
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tune/trainable/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from numbers import Number
from typing import Tuple, Optional

from ray.train._internal.checkpoint_manager import _CheckpointManager
from ray.tune.utils.serialization import TuneFunctionEncoder, TuneFunctionDecoder


Expand Down Expand Up @@ -36,7 +37,7 @@ def __init__(self, n_steps: Tuple[int] = (5, 10)):
self.metric_n_steps = {}

# Checkpoints
self.checkpoint_manager = None
self.checkpoint_manager: Optional[_CheckpointManager] = None

self._cached_json = None

Expand Down
Loading