Skip to content

Commit

Permalink
[CI] Ensure that mlflow callback cleans up background-saving threads …
Browse files Browse the repository at this point in the history
…on trainer teardown. (#3683)
  • Loading branch information
justinxzhao authored Oct 2, 2023
1 parent 3a0e2b2 commit e46a989
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
35 changes: 28 additions & 7 deletions ludwig/contribs/mlflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,26 @@ def on_trainer_train_setup(self, trainer, save_path, is_coordinator):
def on_eval_end(self, trainer, progress_tracker, save_path):
if progress_tracker.steps not in self.logged_steps:
self.logged_steps.add(progress_tracker.steps)
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, True)) # Why True?
# Adds a tuple to the logging queue.
# True is passed to indicate that the background saving loop should continue.
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, True))

def on_trainer_train_teardown(self, trainer, progress_tracker, save_path, is_coordinator):
if is_coordinator:
if progress_tracker.steps not in self.logged_steps:
self.logged_steps.add(progress_tracker.steps)
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, False)) # Why False?
if self.save_thread is not None:
self.save_thread.join()
# Adds a tuple to the logging queue.
# False is passed to indicate that the background saving loop should break.
self.save_fn((progress_tracker.log_metrics(), progress_tracker.steps, save_path, False))
# False ensures that the background saving loop breaks.
# TODO(Justin): This should probably live in on_ludwig_end, once that's implemented.
self.save_fn((None, None, None, False))

# Close the save_thread.
if self.save_thread is not None:
self.save_thread.join()
# if self.save_thread.is_alive():
# logger.warning("MLFlow save thread timed out and did not close properly.")

def on_visualize_figure(self, fig):
# TODO: need to also include a filename for this figure
Expand Down Expand Up @@ -205,10 +216,15 @@ def __setstate__(self, d):


def _log_mlflow_loop(q: queue.Queue, log_artifacts: bool = True):
"""The save_fn for the background thread that logs to MLFlow when save_in_background is True."""
should_continue = True
while should_continue:
elem = q.get()
log_metrics, steps, save_path, should_continue = elem
if log_metrics is None:
# Break out of the loop if we're not going to log anything.
break

mlflow.log_metrics(log_metrics, step=steps)

if not q.empty():
Expand All @@ -221,9 +237,14 @@ def _log_mlflow_loop(q: queue.Queue, log_artifacts: bool = True):


def _log_mlflow(log_metrics, steps, save_path, should_continue, log_artifacts: bool = True):
mlflow.log_metrics(log_metrics, step=steps)
if log_artifacts:
_log_model(save_path)
"""The save_fn for the MlflowCallback.
This is used when save_in_background is False.
"""
if log_metrics is not None:
mlflow.log_metrics(log_metrics, step=steps)
if log_artifacts:
_log_model(save_path)


def _log_artifacts(output_directory):
Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/test_contrib_aim.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_SCRIPT = os.path.join(os.path.dirname(__file__), "scripts", "run_train_aim.py")


@pytest.mark.skip(reason="Aim integration not compatible with Aim 4.0.")
@pytest.mark.distributed
def test_contrib_experiment(csv_filename, tmpdir):
aim_test_path = os.path.join(tmpdir, "results")
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/test_torchscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from ludwig.api import LudwigModel
from ludwig.backend import RAY
from ludwig.constants import BATCH_SIZE, COMBINER, LOGITS, NAME, PREDICTIONS, PROBABILITIES, TRAINER
from ludwig.constants import BATCH_SIZE, COMBINER, EVAL_BATCH_SIZE, LOGITS, NAME, PREDICTIONS, PROBABILITIES, TRAINER
from ludwig.data.preprocessing import preprocess_for_prediction
from ludwig.features.number_feature import numeric_transformation_registry
from ludwig.globals import TRAIN_SET_METADATA_FILE_NAME
Expand Down Expand Up @@ -415,7 +415,7 @@ def test_torchscript_e2e_text_hf_tokenizer(tmpdir, csv_filename):
config = {
"input_features": input_features,
"output_features": output_features,
TRAINER: {"epochs": 2, BATCH_SIZE: 128},
TRAINER: {"epochs": 2, BATCH_SIZE: 128, EVAL_BATCH_SIZE: 128},
}
training_data_csv_path = generate_data(input_features, output_features, data_csv_path)

Expand Down

0 comments on commit e46a989

Please sign in to comment.