From fed9b82cd5690fa00c7ecc6904aec9b772ffa219 Mon Sep 17 00:00:00 2001 From: Justin Date: Tue, 29 Aug 2023 18:10:53 -0400 Subject: [PATCH] Roll-forward with fixes: Fix interaction between scheduler.step() and gradient accumulation steps, refactor schedulers to use `LambdaLR`, and add cosine annealing LR scheduler as a decay method. (#3555) --- ludwig/modules/lr_scheduler.py | 109 +++++++++++++----- ludwig/schema/lr_scheduler.py | 28 ++++- ludwig/schema/metadata/configs/trainer.yaml | 14 ++- ludwig/trainers/trainer.py | 20 ++-- tests/ludwig/modules/test_lr_scheduler.py | 121 ++++++++++++++++++-- 5 files changed, 241 insertions(+), 51 deletions(-) diff --git a/ludwig/modules/lr_scheduler.py b/ludwig/modules/lr_scheduler.py index ef0f2d257e7..a796a73d410 100644 --- a/ludwig/modules/lr_scheduler.py +++ b/ludwig/modules/lr_scheduler.py @@ -1,9 +1,9 @@ import logging import math -from typing import Any, Dict +from typing import Any, Callable, Dict from torch.optim import Optimizer -from torch.optim.lr_scheduler import LambdaLR, ReduceLROnPlateau +from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, LambdaLR, ReduceLROnPlateau, SequentialLR from ludwig.constants import MINIMIZE, TRAINING, VALIDATION from ludwig.modules.metric_registry import get_metric_objective @@ -11,6 +11,8 @@ from ludwig.utils.metric_utils import TrainerMetric from ludwig.utils.trainer_utils import ProgressTracker +logger = logging.getLogger(__name__) + class ReduceLROnPLateauCappedDecreases(ReduceLROnPlateau): def __init__(self, optimizer: Optimizer, mode: str, reduce_limit: int, factor: float, patience: int): @@ -29,11 +31,12 @@ def step(self, metrics): def num_reduce_lr(self) -> int: return self._num_reduce_lr - def _reduce_lr(self, epoch): + def _reduce_lr(self, epoch=None): + """Overrides the base ReduceLROnPlateau implementation.""" self._num_reduce_lr += 1 - self.apply_lr(epoch) + self.apply_lr() - def apply_lr(self, epoch=None): + def apply_lr(self): if self._num_reduce_lr == 0: return @@ -43,8 +46,7 @@ def apply_lr(self, epoch=None): if old_lr - new_lr > self.eps: param_group["lr"] = new_lr if self.verbose: - epoch_str = ("%.2f" if isinstance(epoch, float) else "%.5d") % epoch - print("Epoch {}: reducing learning rate" " of group {} to {:.4e}.".format(epoch_str, i, new_lr)) + logger.info(f"From ReduceLROnPLateauCappedDecreases, reducing learning rate to {new_lr}") class LRScheduler: @@ -52,15 +54,15 @@ def __init__( self, config: LRSchedulerConfig, optimizer: Optimizer, - steps_per_checkpoint: int = 1000, - total_steps: int = 10000, + steps_per_checkpoint: int, + total_steps: int, ): self.config = config self.optimizer = optimizer # Scheduler updated each training step self.step_info = StepInfo(steps_per_checkpoint, total_steps, self.config) - self._train_scheduler = get_schedule_with_warmup(self.config, self.optimizer, self.step_info) + self._train_scheduler = get_schedule_with_warmup_and_decay(self.config, self.optimizer, self.step_info) # Scheduler updated each eval step self._eval_scheduler = None @@ -74,13 +76,8 @@ def __init__( patience=self.config.reduce_on_plateau_patience, ) - self.reset(steps_per_checkpoint, total_steps) - - def reset(self, steps_per_checkpoint: int, total_steps: int): - # Retain state but update number of steps for training - self.step_info.reset(steps_per_checkpoint, total_steps) - def step(self): + """Called every step of training.""" self._train_scheduler.step() if self._eval_scheduler is not None: @@ -90,6 +87,7 @@ def step(self): self._eval_scheduler.apply_lr() def eval_step(self, progress_tracker: ProgressTracker, validation_field: str): + """Called every checkpoint evaluation step.""" if self._eval_scheduler is None: # No reduce on plateau return @@ -140,14 +138,11 @@ class StepInfo: def __init__(self, steps_per_checkpoint: int, total_steps: int, config: LRSchedulerConfig): self.config = config - self.reset(steps_per_checkpoint, total_steps) - - def reset(self, steps_per_checkpoint: int, total_steps: int): self.steps_per_checkpoint = steps_per_checkpoint self.num_training_steps = total_steps if self.config.warmup_fraction > 0 and self.config.warmup_evaluations > 0: - logging.info( + logger.info( "Both `learning_rate_scheduler.warmup_fraction` and `learning_rate_scheduler.warmup_evaluations` " "provided. The larger of the two (as a function of the total training steps) will be used." ) @@ -160,20 +155,34 @@ def reset(self, steps_per_checkpoint: int, total_steps: int): self.num_warmup_steps = num_warmup_steps -def get_schedule_with_warmup( +def get_schedule_with_warmup_and_decay( config: LRSchedulerConfig, optimizer: Optimizer, step_info: StepInfo, ) -> LambdaLR: """Creates a learning rate scheduler that updates each training step.""" - decay_fn = decay_registry[config.decay] + schedulers = [] - def lr_lambda(current_step: int): - if current_step < step_info.num_warmup_steps: - return float(current_step) / float(max(1, step_info.num_warmup_steps)) - return decay_fn(current_step, step_info.num_training_steps, step_info.num_warmup_steps, config) + # Warmup scheduler. + if step_info.num_warmup_steps > 0: + warmup_scheduler = LambdaLR( + optimizer, + lambda current_step: float(current_step) / float(max(1, step_info.num_warmup_steps)), + ) + schedulers.append(warmup_scheduler) - return LambdaLR(optimizer, lr_lambda, last_epoch=-1) + # Decay scheduler. + decay = config.decay + decay_scheduler = decay_registry[decay](config, optimizer, step_info) + schedulers.append(decay_scheduler) + + if len(schedulers) == 1: + # Only one scheduler, so no need to wrap in a SequentialLR. + return schedulers[0] + + # Return a SequentialLR that applies the warmup and decay schedulers in order + # with the warmup scheduler only applied for the first num_warmup_steps steps. + return SequentialLR(optimizer, schedulers=schedulers, milestones=[step_info.num_warmup_steps]) def no_decay(current_step: int, num_training_steps: int, num_warmup_steps: int, config: LRSchedulerConfig): @@ -181,7 +190,11 @@ def no_decay(current_step: int, num_training_steps: int, num_warmup_steps: int, def linear_decay(current_step: int, num_training_steps: int, num_warmup_steps: int, config: LRSchedulerConfig): - return max(0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps))) + return max( + 0.0, + float(num_training_steps - num_warmup_steps - current_step) + / float(max(1, num_training_steps - num_warmup_steps)), + ) def exponential_decay(current_step: int, num_training_steps: int, num_warmup_steps: int, config: LRSchedulerConfig): @@ -194,8 +207,42 @@ def exponential_decay(current_step: int, num_training_steps: int, num_warmup_ste return math.pow(decay_rate, exponent) +def wrap_decay_fn(decay_fn: Callable) -> Callable: + def init_fn(config: LRSchedulerConfig, optimizer: Optimizer, step_info: StepInfo) -> LambdaLR: + return LambdaLR( + optimizer, + lambda current_step: decay_fn( + current_step, step_info.num_training_steps, step_info.num_warmup_steps, config + ), + ) + + return init_fn + + +def init_cosine_decay( + config: LRSchedulerConfig, + optimizer: Optimizer, + step_info: StepInfo, +) -> CosineAnnealingWarmRestarts: + t_0 = config.t_0 + if not t_0: + t_0 = step_info.steps_per_checkpoint + if not t_0: + # A scheduler may be initialized with dummy values like at the start of training. + # Ensure that t_0 != 0, as this causes an error to be raised. + t_0 = 1 + + return CosineAnnealingWarmRestarts( + optimizer, + T_0=t_0, + T_mult=config.t_mult or 1, + eta_min=config.eta_min or 0, + ) + + decay_registry = { - None: no_decay, - "linear": linear_decay, - "exponential": exponential_decay, + None: wrap_decay_fn(no_decay), + "linear": wrap_decay_fn(linear_decay), + "exponential": wrap_decay_fn(exponential_decay), + "cosine": init_cosine_decay, } diff --git a/ludwig/schema/lr_scheduler.py b/ludwig/schema/lr_scheduler.py index e102274c65d..3bfedab82bf 100644 --- a/ludwig/schema/lr_scheduler.py +++ b/ludwig/schema/lr_scheduler.py @@ -17,7 +17,7 @@ class LRSchedulerConfig(schema_utils.BaseMarshmallowConfig, ABC): """Configuration for learning rate scheduler parameters.""" decay: str = schema_utils.StringOptions( - options=["linear", "exponential"], + options=["linear", "exponential", "cosine"], default=None, allow_none=True, description="Turn on decay of the learning rate.", @@ -99,6 +99,32 @@ class LRSchedulerConfig(schema_utils.BaseMarshmallowConfig, ABC): parameter_metadata=TRAINER_METADATA[MODEL_ECD]["learning_rate_scheduler"]["reduce_eval_split"], ) + # Parameters for CosineAnnealingWarmRestarts scheduler + + t_0: int = schema_utils.PositiveInteger( + default=None, + allow_none=True, + description="Number of steps before the first restart for cosine annealing decay. If not specified, it" + " will be set to `steps_per_checkpoint`.", + parameter_metadata=TRAINER_METADATA[MODEL_ECD]["learning_rate_scheduler"]["t_0"], + ) + + t_mult: int = schema_utils.PositiveInteger( + default=1, + description="Period multiplier after each restart for cosine annealing decay. Defaults to 1, i.e.," + " restart every `t_0` steps. If set to a larger value, the period between restarts increases by that" + " multiplier. For e.g., if t_mult is 2, then the periods would be: t_0, 2*t_0, 2^2*t_0, 2^3*t_0, etc.", + parameter_metadata=TRAINER_METADATA[MODEL_ECD]["learning_rate_scheduler"]["t_mult"], + ) + + eta_min: float = schema_utils.FloatRange( + default=0, + min=0, + max=1, + description="Minimum learning rate allowed for cosine annealing decay. Default: 0.", + parameter_metadata=TRAINER_METADATA[MODEL_ECD]["learning_rate_scheduler"]["eta_min"], + ) + # TODO(travis): too much boilerplate here, we should find a way to abstract all this and only require specifying the # minimal amount needed for the new config object. diff --git a/ludwig/schema/metadata/configs/trainer.yaml b/ludwig/schema/metadata/configs/trainer.yaml index f97bf6b7905..59264676a09 100644 --- a/ludwig/schema/metadata/configs/trainer.yaml +++ b/ludwig/schema/metadata/configs/trainer.yaml @@ -520,7 +520,10 @@ ecd: suggested_values_reasoning: Starting with exponential decay is a safe place to start, as it is a "softer" decrease in the learning rate over time, as compared with linear, which is more steep after the initial drop. Linear decay is - most useful when the risk of catastrophic forgetting is very high (e.g, for fine-tuning pretrained models). + most useful when the risk of catastrophic forgetting is very high (e.g, for fine-tuning pretrained + models). Cosine annealing is a type of learning rate schedule that has the effect of starting with a + large learning rate that is relatively rapidly decreased to a minimum value before being increased + rapidly again. The resetting of the learning rate acts like a simulated restart of the learning process. If you observe your loss curves shooting up (even on the training set) in later epochs, increasing the decay rate may help mitigate this effect. ui_display_name: Decay @@ -600,6 +603,15 @@ ecd: reduce_eval_split: expected_impact: 1 ui_display_name: Reduce Eval Split + t_0: + expected_impact: 1 + ui_display_name: T_0 + t_mult: + expected_impact: 1 + ui_display_name: T_mult + eta_min: + expected_impact: 1 + ui_display_name: Eta Min gbm: learning_rate: commonly_used: true diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index b278548f8ab..e20f40f8143 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -208,7 +208,7 @@ def prepare(self): base_learning_rate = self.config.learning_rate if self.distributed: lr_scale_fn = learning_rate_scale_fns[self.config.learning_rate_scaling] - base_learning_rate *= lr_scale_fn(self.distributed.size() * self.gradient_accumulation_steps) + base_learning_rate *= lr_scale_fn(self.distributed.size()) self.base_learning_rate = base_learning_rate self.dist_model, self.optimizer = self.distributed.prepare( @@ -216,7 +216,9 @@ def prepare(self): self.config, self.base_learning_rate, ) - self.scheduler = LRScheduler(self.config.learning_rate_scheduler, self.optimizer) + + # NOTE: This is a partially configured LRScheduler. It will be updated in the first call to train_step. + self.scheduler = LRScheduler(self.config.learning_rate_scheduler, self.optimizer, 0, 0) def train_step( self, inputs: Dict[str, torch.Tensor], targets: Dict[str, torch.Tensor], should_step: bool = True @@ -762,8 +764,13 @@ def train( final_steps_per_checkpoint = min(final_steps_per_checkpoint, self.total_steps) early_stopping_steps = final_steps_per_checkpoint * self.early_stop - # Update learning rate scheduler which depends on number of steps - self.scheduler.reset(final_steps_per_checkpoint, self.total_steps) + # Initialize the learning rate scheduler. + self.scheduler = LRScheduler( + self.config.learning_rate_scheduler, + self.optimizer, + steps_per_checkpoint=final_steps_per_checkpoint, + total_steps=self.total_steps, + ) if self.is_coordinator(): logger.info( @@ -944,9 +951,8 @@ def _train_loop( loss, all_losses = self.train_step(inputs, targets, should_step=should_step) logger.info(f"Train loss for step {progress_tracker.steps}: {loss:.3f}") - if should_step: - # Update LR schduler here instead of train loop to avoid updating during batch size tuning, etc. - self.scheduler.step() + # Update LR schduler here instead of train loop to avoid updating during batch size tuning, etc. + self.scheduler.step() if self.is_coordinator() and not self.skip_save_log: self.write_step_summary( diff --git a/tests/ludwig/modules/test_lr_scheduler.py b/tests/ludwig/modules/test_lr_scheduler.py index e19e786e722..94876250173 100644 --- a/tests/ludwig/modules/test_lr_scheduler.py +++ b/tests/ludwig/modules/test_lr_scheduler.py @@ -1,3 +1,5 @@ +import math + import numpy as np from torch.optim import SGD @@ -20,18 +22,36 @@ def test_lr_scheduler_warmup_decay(): const_optimizer = SGD(module.parameters(), lr=base_lr) const_config = LRSchedulerConfig(warmup_evaluations=0) - const_scheduler = LRScheduler(config=const_config, optimizer=const_optimizer) - const_scheduler.reset(steps_per_checkpoint, total_steps) + const_scheduler = LRScheduler( + config=const_config, + optimizer=const_optimizer, + steps_per_checkpoint=steps_per_checkpoint, + total_steps=total_steps, + ) linear_optimizer = SGD(module.parameters(), lr=base_lr) linear_config = LRSchedulerConfig(warmup_fraction=warmup_fraction, decay="linear") - linear_scheduler = LRScheduler(config=linear_config, optimizer=linear_optimizer) - linear_scheduler.reset(steps_per_checkpoint, total_steps) + linear_scheduler = LRScheduler( + config=linear_config, + optimizer=linear_optimizer, + steps_per_checkpoint=steps_per_checkpoint, + total_steps=total_steps, + ) exp_optimizer = SGD(module.parameters(), lr=base_lr) exp_config = LRSchedulerConfig(warmup_fraction=warmup_fraction, decay="exponential") - exp_scheduler = LRScheduler(config=exp_config, optimizer=exp_optimizer) - exp_scheduler.reset(steps_per_checkpoint, total_steps) + exp_scheduler = LRScheduler( + config=exp_config, optimizer=exp_optimizer, steps_per_checkpoint=steps_per_checkpoint, total_steps=total_steps + ) + + cosine_optimizer = SGD(module.parameters(), lr=base_lr) + cosine_config = LRSchedulerConfig(warmup_fraction=warmup_fraction, decay="cosine", t_0=steps_per_checkpoint) + cosine_scheduler = LRScheduler( + config=cosine_config, + optimizer=cosine_optimizer, + steps_per_checkpoint=steps_per_checkpoint, + total_steps=total_steps, + ) warmup_steps = total_steps * warmup_fraction for i in range(total_steps): @@ -48,17 +68,25 @@ def test_lr_scheduler_warmup_decay(): exp_scheduler.step() exp_lr = exp_optimizer.param_groups[0]["lr"] + cosine_scheduler.step() + cosine_lr = cosine_optimizer.param_groups[0]["lr"] + if step < warmup_steps: assert linear_lr == exp_lr, f"step: {step}" + assert linear_lr == cosine_lr, f"step: {step}" assert linear_lr < base_lr, f"step: {step}" elif step == warmup_steps: assert linear_lr == base_lr, f"step: {step}" + assert cosine_lr == base_lr, f"step: {step}" assert exp_lr < base_lr, f"step: {step}" else: assert linear_lr < base_lr, f"step: {step}" assert exp_lr < base_lr, f"step: {step}" + assert cosine_lr <= base_lr, f"step: {step}" assert linear_lr < exp_lr + assert exp_lr < cosine_lr + assert cosine_lr == base_lr def test_lr_scheduler_reduce_on_plateau(): @@ -77,7 +105,7 @@ def test_lr_scheduler_reduce_on_plateau(): reduce_on_plateau_patience=10, reduce_on_plateau_rate=0.1, ) - scheduler = LRScheduler(config=config, optimizer=optimizer) + scheduler = LRScheduler(config=config, optimizer=optimizer, steps_per_checkpoint=0, total_steps=0) progress_tracker = get_new_progress_tracker( batch_size=64, @@ -119,6 +147,75 @@ def test_lr_scheduler_reduce_on_plateau(): assert np.isclose(lr, 0.001) +def test_lr_scheduler_cosine_decay_fixed_period(): + total_steps = 10000 + steps_per_checkpoint = 1000 + base_lr = 1.0 + + module = NumberInputFeature(NumberInputFeatureConfig(name="num1", encoder=DenseEncoderConfig())) + + optimizer = SGD(module.parameters(), lr=base_lr) + config = LRSchedulerConfig(decay="cosine", t_0=steps_per_checkpoint, decay_rate=0, reduce_on_plateau=0) + scheduler = LRScheduler(config=config, optimizer=optimizer, steps_per_checkpoint=0, total_steps=0) + + curr_lr = base_lr + prev_lr = base_lr + num_restarts = 0 + for step in range(total_steps + 1): + # Cosine annealing formula + expected_lr = base_lr * 0.5 * (1 + math.cos(math.pi * (step % steps_per_checkpoint) / steps_per_checkpoint)) + assert np.isclose(curr_lr, expected_lr), f"step: {step}" + + if prev_lr < curr_lr: + # Since Cosine decay is periodic, we should see the learning rate + # decrease and then increase again. + num_restarts += 1 + + prev_lr = curr_lr + scheduler.step() + + curr_lr = optimizer.param_groups[0]["lr"] + + assert num_restarts == 10, f"num_restarts: {num_restarts}" + + +def test_lr_scheduler_cosine_decay_increasing_period(): + total_steps = 20000 + steps_per_checkpoint = 1000 + base_lr = 1.0 + + module = NumberInputFeature(NumberInputFeatureConfig(name="num1", encoder=DenseEncoderConfig())) + + optimizer = SGD(module.parameters(), lr=base_lr) + config = LRSchedulerConfig( + decay="cosine", + t_0=steps_per_checkpoint, + t_mult=2, + decay_rate=0, + reduce_on_plateau=0, + ) + scheduler = LRScheduler( + config=config, optimizer=optimizer, steps_per_checkpoint=steps_per_checkpoint, total_steps=total_steps + ) + + curr_lr = base_lr + prev_lr = base_lr + num_restarts = 0 + for _ in range(total_steps + 1): + if prev_lr < curr_lr: + # Since Cosine decay is periodic, we should see the learning rate + # decrease and then increase again. + num_restarts += 1 + + prev_lr = curr_lr + scheduler.step() + + curr_lr = optimizer.param_groups[0]["lr"] + + # 1000, 3000, 6000, 12000, 24000 (but we stop at 20000) + assert num_restarts == 4, f"num_restarts: {num_restarts}" + + def test_lr_scheduler_save_load(): steps_per_checkpoint = 10 total_steps = 100 @@ -130,8 +227,9 @@ def test_lr_scheduler_save_load(): optimizer = SGD(module.parameters(), lr=base_lr) config = LRSchedulerConfig(warmup_fraction=0.2, reduce_on_plateau=reduce_limit) - scheduler = LRScheduler(config=config, optimizer=optimizer) - scheduler.reset(steps_per_checkpoint, total_steps) + scheduler = LRScheduler( + config=config, optimizer=optimizer, steps_per_checkpoint=steps_per_checkpoint, total_steps=total_steps + ) progress_tracker = get_new_progress_tracker( batch_size=64, @@ -151,13 +249,14 @@ def test_lr_scheduler_save_load(): scheduler_state = scheduler.state_dict() optimizer2 = SGD(module.parameters(), lr=base_lr) - scheduler2 = LRScheduler(config=config, optimizer=optimizer2) + scheduler2 = LRScheduler( + config=config, optimizer=optimizer2, steps_per_checkpoint=steps_per_checkpoint, total_steps=total_steps + ) # Important: state needs to be loaded after init of optimizer and scheduler, otherwise # it can override loaded state optimizer2.load_state_dict(optimizer_state) scheduler2.load_state_dict(scheduler_state) - scheduler2.reset(steps_per_checkpoint, total_steps) lr = optimizer.param_groups[0]["lr"] assert lr == optimizer2.param_groups[0]["lr"]