Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.1.0 Forge Updates #45

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.2
current_version = 1.2.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

## [1.2.0] - 2024-02-26
- **Create** - Added `destroy_on_create`
- **Create** - Added `create_timeout` option
- **Common** - Moved all `n_list` functions to `get_nlist()`
- **Dependencies** - Updated dependencies and tested on latest versions

## [1.1.1] - 2024-02-08

### Changed
- **Create** - Set default boto3 session at beginning of create to resolve region bug

## [1.1.0] - 2024-02-07

### Added
- **Create**
- Multi-AZ functionality
- Spot retries
- On-demand Failover

### Changed
- **Create** - Configurable spot strategy
- **Documentation** - Updated with new changes

## [1.0.2] - 2022-10-27

Expand Down
25 changes: 19 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ readme = "README.md"
requires-python = ">=3.7"
license = "Apache-2.0"
authors = [
{name = "Nikhil Patel", email = "[email protected]"}
{name = "Nikhil Patel", email = "[email protected]"},
{name = "Gabriele Ron", email = "[email protected]"},
{name = "Joao Moreira", email = "[email protected]"}
]

maintainers = [
{name = "Nikhil Patel", email = "[email protected]"},
{name = "Gabriele Ron", email = "[email protected]"},
{name = "Joao Moreira", email = "[email protected]"}
]

keywords = [
"AWS",
"EC2",
Expand All @@ -19,6 +28,7 @@ keywords = [
"Cluster",
"Jupyter"
]

classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
Expand All @@ -34,18 +44,21 @@ classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]

dynamic = ["version"]

dependencies = [
"boto3~=1.19.0",
"pyyaml~=5.3.0",
"schema~=0.7.0",
"boto3",
"pyyaml",
"schema",
]

[project.optional-dependencies]
test = [
"pytest~=7.1.0",
"pytest-cov~=4.0"
"pytest",
"pytest-cov"
]

dev = [
"bump2version~=1.0",
]
Expand Down
2 changes: 1 addition & 1 deletion src/forge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.0.2"
__version__ = "1.2.0"

# Default values for forge's essential arguments
DEFAULT_ARG_VALS = {
Expand Down
29 changes: 29 additions & 0 deletions src/forge/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,35 @@ def get_ip(details, states):
return [(i['ip'], i['id']) for i in list(filter(lambda x: x['state'] in states, details))]


def get_nlist(config):
"""get list of instance names based on service

Parameters
----------
config : dict
Forge configuration data

Returns
-------
list
List of instance names
"""
date = config.get('date', '')
market = config.get('market', DEFAULT_ARG_VALS['market'])
name = config['name']
service = config['service']

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if config.get('rr_all'):
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')

return n_list


@contextlib.contextmanager
def key_file(secret_id, region, profile):
"""Safely retrieve a secret file from AWS for temporary use.
Expand Down
1 change: 1 addition & 0 deletions src/forge/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def check_env_yaml(env_yaml):
error='Invalid spot allocation strategy'),
Optional('on_demand_failover'): And(bool),
Optional('spot_retries'): And(Use(int), lambda x: x > 0),
Optional('create_timeout'): And(Use(int), lambda x: x > 0),
})
try:
validated = schema.validate(env_yaml)
Expand Down
21 changes: 17 additions & 4 deletions src/forge/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ def create_status(n, request, config):
create_time = fleet_details.get('CreateTime')
time_without_spot = 0
while current_status != 'fulfilled':
if config.get('create_timeout') and t > config['create_timeout']:
logger.error('Timeout of %s seconds hit for instance fulfillment; Aborting.', config['create_timeout'])
if destroy_flag:
destroy(config)
exit_callback(config, exit=True)

if current_status == 'pending_fulfillment':
time.sleep(10)
t += 10
Expand Down Expand Up @@ -664,6 +670,12 @@ def search_and_create(config, task, instance_details):
e = detail[0]
if e['state'] in ['running', 'stopped', 'stopping', 'pending']:
logger.info('%s is %s, the IP is %s', task, e['state'], e['ip'])

if config.get('destroy_on_create'):
logger.info('destroy_on_create true, destroying fleet.')
destroy(config)
create_template(n, config, task)
create_fleet(n, config, task, instance_details)
else:
if len(e['fleet_id']) != 0:
logger.info('Fleet is running without EC2, will recreate it.')
Expand Down Expand Up @@ -703,11 +715,12 @@ def _check(x, i):
return x[i] if x and x[i:i + 1] else None

for task in task_list:
task_worker_count = worker_count
if 'cluster-master' in task or 'single' in task:
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 0), cpu=_check(cpu, 0), ratio=_check(ratio, 0))
worker_count = 1
task_worker_count = 1
elif 'cluster-worker' in task:
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), ratio=_check(ratio, 1), workers=worker_count)
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), ratio=_check(ratio, 1), workers=task_worker_count)
else:
logger.error("'%s' does not seem to be a valid cluster or single job.", task)
if destroy_flag:
Expand All @@ -717,8 +730,8 @@ def _check(x, i):
logger.debug('%s OVERRIDE DETAILS | RAM: %s out of %s | CPU: %s with ratio of %s', task, ram, total_ram, cpu, ram2cpu_ratio)

instance_details[task] = {
'total_capacity': worker_count or total_ram,
'capacity_unit': 'units' if worker_count else 'memory-mib',
'total_capacity': task_worker_count or total_ram,
'capacity_unit': 'units' if task_worker_count else 'memory-mib',
'override_instance_stats': {
'MemoryMiB': {'Min': task_ram[0], 'Max': task_ram[1]},
'VCpuCount': {'Min': task_cpu[0], 'Max': task_cpu[1]},
Expand Down
1 change: 1 addition & 0 deletions src/forge/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def add_job_args(parser):
common_grp.add_argument('--valid_time', '--valid-time', type=positive_int_arg)
common_grp.add_argument('--user_data', '--user-data', nargs='*')
common_grp.add_argument('--gpu', action='store_true', dest='gpu_flag')
common_grp.add_argument('--destroy_on_create', '--destroy-on-create', action='store_true', default=None)


def add_action_args(parser):
Expand Down
15 changes: 2 additions & 13 deletions src/forge/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import DEFAULT_ARG_VALS, REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args, add_action_args
from .common import ec2_ip, key_file, get_ip, exit_callback
from .common import ec2_ip, key_file, get_ip, get_nlist, exit_callback

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,11 +40,6 @@ def rsync(config):
config : dict
Forge configuration data
"""
name = config.get('name')
date = config.get('date', '')
market = config.get('market', DEFAULT_ARG_VALS['market'])
service = config.get('service')
rr_all = config.get('rr_all')

def _rsync(config, ip):
"""performs the rsync to a given ip
Expand Down Expand Up @@ -84,13 +79,7 @@ def _rsync(config, ip):
else:
logger.info('Rsync successful:\n%s', output)

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if rr_all:
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')
n_list = get_nlist(config)

for n in n_list:
try:
Expand Down
11 changes: 2 additions & 9 deletions src/forge/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from . import DEFAULT_ARG_VALS, REQUIRED_ARGS
from .exceptions import ExitHandlerException
from .parser import add_basic_args, add_general_args, add_env_args, add_action_args
from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty, exit_callback
from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty, exit_callback, get_nlist
from .destroy import destroy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,7 +77,6 @@ def _run(config, ip):
pem_secret = config['forge_pem_secret']
region = config['region']
profile = config.get('aws_profile')
env = config['forge_env']

with key_file(pem_secret, region, profile) as pem_path:
fmt = FormatEmpty()
Expand All @@ -94,13 +93,7 @@ def _run(config, ip):
)
return exc.returncode

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if rr_all:
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')
n_list = get_nlist(config)

for n in n_list:
try:
Expand Down
26 changes: 6 additions & 20 deletions src/forge/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args
from .common import ec2_ip, get_ip, set_boto_session
from .common import ec2_ip, get_ip, set_boto_session, get_nlist

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,25 +75,11 @@ def start(config):
config : dict
Forge configuration data
"""
name = config['name']
date = config.get('date', '')
service = config['service']
market = config.get('market')

n_list = []
if service == "cluster":
if market[0] == 'spot':
logger.error('Master is a spot instance; you cannot start a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')

if market[-1] == 'spot':
logger.error('Worker is a spot fleet; you cannot start a spot fleet')
elif market[-1] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-worker-{date}')
elif service == "single":
if market[0] == 'spot':
logger.error('The instance is a spot instance; you cannot start a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-{date}')
if 'spot' in market:
logger.error('Master or worker is a spot instance; you cannot start a spot instance')
# sys.exit(1) # ToDo: Should we change the tests to reflect an exit or allow it to continue?

n_list = get_nlist({**config, 'rr_all': True})
start_fleet(n_list, config)
27 changes: 7 additions & 20 deletions src/forge/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args
from .common import ec2_ip, get_ip, set_boto_session
from .common import ec2_ip, get_ip, set_boto_session, get_nlist

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,25 +70,12 @@ def stop(config):
config : dict
Forge configuration data
"""
name = config['name']
date = config.get('date', '')
service = config['service']
market = config.get('market')

n_list = []
if service == "cluster":
if market[0] == 'spot':
logger.error('Master is a spot instance; you cannot stop a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')

if market[-1] == 'spot':
logger.error('Worker is a spot fleet; you cannot stop a spot fleet')
elif market[-1] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-worker-{date}')
elif service == "single":
if market[0] == 'spot':
logger.error('The instance is a spot instance; you cannot stop a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-{date}')
if 'spot' in market:
logger.error('Master or worker is a spot instance; you cannot stop a spot instance')
# sys.exit(1) # ToDo: Should we change the tests to reflect an exit or allow it to continue?

n_list = get_nlist({**config, 'rr_all': True})

stop_fleet(n_list, config)
12 changes: 6 additions & 6 deletions src/forge/yaml_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _get_type(x):
Optional('cpu'): And(list, error='Invalid CPU cores'),
Optional('destroy_after_success'): And(bool),
Optional('destroy_after_failure'): And(bool),
Optional('destroy_on_create'): And(bool),
Optional('disk'): And(Use(int), positive_int),
Optional('disk_device_name'): And(str, len, error='Invalid Device Name'),
Optional('forge_env'): And(str, len, error='Invalid Environment'),
Expand Down Expand Up @@ -191,7 +192,7 @@ def load_config(args):

logger.info('Checking config file: %s', args['yaml'])
logger.debug('Required User config is %s', user_config)
env = args['forge_env'] or user_config.get('forge_env')
env = args.get('forge_env') or user_config.get('forge_env')

if env is None:
logger.error("'forge_env' variable required.")
Expand All @@ -218,12 +219,11 @@ def load_config(args):
env_config.update(normalize_config(env_config))
check_keys(env_config['region'], env_config.get('aws_profile'))

additional_config_data = env_config.pop('additional_config', None)
additional_config_data = env_config.pop('additional_config', [])
additional_config = []
if additional_config_data:
for i in additional_config_data:
ADDITIONAL_KEYS.append(i['name'])
additional_config.append(i)
for i in additional_config_data:
ADDITIONAL_KEYS.append(i['name'])
additional_config.append(i)

logger.debug('Additional config options: %s', additional_config)

Expand Down
10 changes: 2 additions & 8 deletions tests/test_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,8 @@ def test_start_error_in_spot_instance(mock_start_fleet, caplog, service, markets
"service": service,
}
error_msg = ""
if service == "cluster":
if markets[0] == "spot":
error_msg = "Master is a spot instance; you cannot start a spot instance"
elif markets[1] == "spot":
error_msg = "Worker is a spot fleet; you cannot start a spot fleet"
else:
if markets[0] == "spot":
error_msg = "The instance is a spot instance; you cannot start a spot instance"
if 'spot' in markets:
error_msg = 'Master or worker is a spot instance; you cannot start a spot instance'

with caplog.at_level(logging.ERROR):
start.start(config)
Expand Down
Loading