-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
3f13c77
commit 5513d4e
Showing
5 changed files
with
225 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
"""A Hybrid EA-FEA Algorithm.""" | ||
from collections import Counter | ||
from typing import Callable, Final | ||
|
||
from numpy.random import Generator | ||
from pycommons.types import type_error | ||
|
||
from moptipy.algorithms.so.ffa.ffa_h import create_h, log_h | ||
from moptipy.api.algorithm import Algorithm1 | ||
from moptipy.api.operators import Op0, Op1 | ||
from moptipy.api.process import Process | ||
|
||
|
||
class EAFEAC(Algorithm1): | ||
"""An implementation of the EAFEA-C.""" | ||
|
||
def __init__(self, op0: Op0, op1: Op1, log_h_tbl: bool = False) -> None: | ||
""" | ||
Create the EAFEA-C. | ||
:param op0: the nullary search operator | ||
:param op1: the unary search operator | ||
:param log_h_tbl: should we log the H table? | ||
""" | ||
super().__init__("eafeaB", op0, op1) | ||
if not isinstance(log_h_tbl, bool): | ||
raise type_error(log_h_tbl, "log_h_tbl", bool) | ||
#: True if we should log the H table, False otherwise | ||
self.log_h_tbl: Final[bool] = log_h_tbl | ||
|
||
def solve(self, process: Process) -> None: | ||
""" | ||
Apply the EAFEA-B to an optimization problem. | ||
:param process: the black-box process object | ||
""" | ||
# Create records for old and new point in the search space. | ||
x_ea = process.create() # record for current solution of the EA | ||
x_fea = process.create() # record for current solution of the FEA | ||
x_new = process.create() # record for new solution | ||
|
||
# Obtain the random number generator. | ||
random: Final[Generator] = process.get_random() | ||
|
||
# Put function references in variables to save time. | ||
evaluate: Final[Callable] = process.evaluate # the objective | ||
should_terminate: Final[Callable] = process.should_terminate | ||
xcopy: Final[Callable] = process.copy # copy(dest, source) | ||
op0: Final[Callable] = self.op0.op0 # the nullary operator | ||
op1: Final[Callable] = self.op1.op1 # the unary operator | ||
|
||
h, ofs = create_h(process) # Allocate the h-table | ||
|
||
# Start at a random point in the search space and evaluate it. | ||
op0(random, x_ea) # Create 1 solution randomly and | ||
y_ea: int | float = evaluate(x_ea) + ofs # evaluate it. | ||
xcopy(x_fea, x_ea) # FEA and EA start with the same initial solution. | ||
y_fea: int | float = y_ea | ||
|
||
ea_max_no_lt_moves: int = 1 # maximum no-improvement moves for EA | ||
ea_no_lt_moves: int = 0 # current no-improvement moves | ||
use_ffa: bool = False # We start with the EA branch. | ||
|
||
while not should_terminate(): # Until we need to quit... | ||
# Sample and evaluate new solution. | ||
op1(random, x_new, x_fea if use_ffa else x_ea) | ||
y_new: int | float = evaluate(x_new) + ofs | ||
h[y_new] += 1 # type: ignore # Always update H. | ||
|
||
if use_ffa: # The FEA branch uses FFA. | ||
use_ffa = False # Always toggle use from FFA to EA. | ||
|
||
h[y_fea] += 1 # type: ignore # Update H for FEA solution. | ||
if h[y_new] <= h[y_fea]: # type: ignore # FEA acceptance. | ||
xcopy(x_ea, x_new) # Copy solution also to EA. | ||
x_fea, x_new = x_new, x_fea | ||
y_fea = y_ea = y_new | ||
|
||
else: # EA or RLS branch performs local search. | ||
h[y_ea] += 1 # type: ignore # Update H in *both* branches. | ||
|
||
if y_new <= y_ea: # The acceptance criterion of RLS / EA. | ||
if y_new < y_ea: # Check if we did an actual improvement. | ||
ea_no_lt_moves = 0 # non-improving moves counter = 0. | ||
xcopy(x_fea, x_new) # Copy solution over to FEA. | ||
y_fea = y_new # And store the objective value. | ||
else: # The move was *not* an improvement: | ||
ea_no_lt_moves += 1 # Increase non-improved counter. | ||
x_ea, x_new = x_new, x_ea # Accept new solution. | ||
y_ea = y_new # Store objective value. | ||
|
||
if ea_no_lt_moves >= ea_max_no_lt_moves: # Toggle: EA to FEA. | ||
ea_no_lt_moves = 0 # Reset non-improving move counter. | ||
ea_max_no_lt_moves += 1 # Increment limit by one. | ||
use_ffa = True # Toggle to FFA. | ||
|
||
if not self.log_h_tbl: | ||
return # we are done here | ||
|
||
# After we are done, we want to print the H-table. | ||
if h[y_ea] == 0: # type: ignore # Fix the H-table for the case | ||
h = Counter() # that only one FE was performed: In this case, | ||
h[y_ea] = 1 # make Counter with only a single 1 value inside. | ||
|
||
log_h(process, h, ofs) # log the H-table |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""Test the EAFEA-C.""" | ||
from pycommons.io.temp import temp_file | ||
|
||
from moptipy.algorithms.so.ffa.eafea_c import EAFEAC | ||
from moptipy.api.execution import Execution | ||
from moptipy.api.objective import Objective | ||
from moptipy.examples.bitstrings.onemax import OneMax | ||
from moptipy.examples.jssp.instance import Instance | ||
from moptipy.operators.bitstrings.op0_random import Op0Random | ||
from moptipy.operators.bitstrings.op1_flip1 import Op1Flip1 | ||
from moptipy.operators.bitstrings.op1_m_over_n_flip import Op1MoverNflip | ||
from moptipy.operators.permutations.op0_shuffle import Op0Shuffle | ||
from moptipy.operators.permutations.op1_swap2 import Op1Swap2 | ||
from moptipy.spaces.bitstrings import BitStrings | ||
from moptipy.spaces.permutations import Permutations | ||
from moptipy.tests.on_bitstrings import ( | ||
validate_algorithm_on_leadingones, | ||
validate_algorithm_on_onemax, | ||
) | ||
from moptipy.tests.on_jssp import validate_algorithm_on_jssp | ||
|
||
|
||
def __lb() -> int: | ||
"""A mock lower bound.""" | ||
return -1_000_000_000_000_000 | ||
|
||
|
||
def __ub() -> int: | ||
"""A mock upper bound.""" | ||
return 1_000_000_000_000_000 | ||
|
||
|
||
def test_fea1plus1_on_jssp() -> None: | ||
"""Validate the (1+1)-FEA on the JSSP.""" | ||
|
||
def create(instance: Instance, search_space: Permutations, | ||
objective: Objective) -> EAFEAC: | ||
assert isinstance(instance, Instance) | ||
assert isinstance(search_space, Permutations) | ||
assert isinstance(objective, Objective) | ||
return EAFEAC(Op0Shuffle(search_space), Op1Swap2(), False) | ||
|
||
validate_algorithm_on_jssp(create) | ||
|
||
|
||
def test_fea1plus1_on_onemax() -> None: | ||
"""Validate the (1+1)-FEA on the OneMax problem.""" | ||
|
||
def create(bs: BitStrings, objective: Objective) -> EAFEAC: | ||
assert isinstance(bs, BitStrings) | ||
assert isinstance(objective, Objective) | ||
return EAFEAC(Op0Random(), Op1MoverNflip(bs.dimension, 1, True), | ||
False) | ||
|
||
validate_algorithm_on_onemax(create) | ||
|
||
|
||
def test_fea1plus1_on_onemax_with_large_range() -> None: | ||
"""Validate the (1+1)-FEA on the OneMax problem.""" | ||
|
||
def create(bs: BitStrings, objective: Objective) -> EAFEAC: | ||
assert isinstance(bs, BitStrings) | ||
assert isinstance(objective, Objective) | ||
objective.lower_bound = __lb # type: ignore | ||
objective.upper_bound = __ub # type: ignore | ||
return EAFEAC(Op0Random(), Op1MoverNflip(bs.dimension, 1, True), True) | ||
|
||
validate_algorithm_on_onemax(create) | ||
|
||
|
||
def test_fea1plus1_on_leadingones() -> None: | ||
"""Validate the (1+1)-FEA on the LeadingOnes problem.""" | ||
|
||
def create(bs: BitStrings, objective: Objective) -> EAFEAC: | ||
assert isinstance(bs, BitStrings) | ||
assert isinstance(objective, Objective) | ||
return EAFEAC(Op0Random(), Op1MoverNflip(bs.dimension, 1, True), True) | ||
|
||
validate_algorithm_on_leadingones(create) | ||
|
||
|
||
def test_fea1plus1_on_leadingones_large_range() -> None: | ||
"""Validate the (1+1)-FEA on the LeadingOnes problem with larger bounds.""" | ||
|
||
def create(bs: BitStrings, objective: Objective) -> EAFEAC: | ||
assert isinstance(bs, BitStrings) | ||
assert isinstance(objective, Objective) | ||
objective.lower_bound = __lb # type: ignore | ||
objective.upper_bound = __ub # type: ignore | ||
return EAFEAC(Op0Random(), Op1MoverNflip(bs.dimension, 1, True)) | ||
|
||
validate_algorithm_on_leadingones(create) | ||
|
||
|
||
def test_h_log() -> None: | ||
"""Test whether the history table is properly logged.""" | ||
n = 10 | ||
space = BitStrings(n) | ||
problem = OneMax(n) | ||
algorithm = EAFEAC(Op0Random(), Op1Flip1(), True) | ||
|
||
with temp_file() as tf: | ||
ex = Execution() | ||
ex.set_solution_space(space) | ||
ex.set_objective(problem) | ||
ex.set_algorithm(algorithm) | ||
ex.set_rand_seed(1599) | ||
ex.set_log_file(tf) | ||
ex.set_max_fes(10) | ||
with ex.execute() as process: | ||
end_result = process.create() | ||
process.get_copy_of_best_y(end_result) | ||
|
||
lines = tf.read_all_str().splitlines() | ||
assert lines[-1] == "END_H" | ||
assert lines[-2] == "3;6;;9;;3" | ||
assert lines[-3] == "BEGIN_H" |