From 84b6a985bb59673bb87d7e7e200656eea502d286 Mon Sep 17 00:00:00 2001 From: "Gabriele A. Ron" Date: Fri, 12 Jan 2024 11:02:24 -0600 Subject: [PATCH 1/5] Fixed tests for multi-az --- tests/data/admin_configs/dev/dev.yaml | 6 +++--- tests/test_common.py | 3 ++- tests/test_create.py | 16 ++++++++++------ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/data/admin_configs/dev/dev.yaml b/tests/data/admin_configs/dev/dev.yaml index 5d1ab06..b9de8f2 100755 --- a/tests/data/admin_configs/dev/dev.yaml +++ b/tests/data/admin_configs/dev/dev.yaml @@ -1,6 +1,6 @@ forge_env: dev aws_profile: data-dev -aws_az: us-east-1a +aws_region: us-east-1 ec2_amis: single: - ami-123 @@ -14,9 +14,9 @@ ec2_amis: - ami-789 - 90 - /dev/sda1 -aws_subnet: subnet-123 +aws_multi_az: + us-east-1a: subnet-123 ec2_key: bdp -aws_security_group: sg-123 forge_pem_secret: forge-pem excluded_ec2s: ["t2.medium","t2.large","m4.large", "*g.*", "gd.*", "*metal*", "g4ad*"] tags: diff --git a/tests/test_common.py b/tests/test_common.py index f7c8a5f..78e85eb 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -124,6 +124,7 @@ def test_get_ec2_pricing_ondemand(mock_regions, mock_boto): """Test getting on-demand EC2 hourly pricing.""" exp_price = 0.123 region = 'us-east-1' + az = 'us-east-1a' long_region = 'US East (N. Virginia)' response = {'PriceList': [json.dumps( {"terms": {"OnDemand": { @@ -136,7 +137,7 @@ def test_get_ec2_pricing_ondemand(mock_regions, mock_boto): mock_products.return_value = response mock_regions.return_value = {region: long_region} - config = {'region': region} + config = {'region': region, 'aws_az': az} ec2_type = 'r5.large' act_price = common.get_ec2_pricing(ec2_type, 'on-demand', config) assert act_price == exp_price diff --git a/tests/test_create.py b/tests/test_create.py index 83586b9..d4f9f90 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -10,21 +10,25 @@ from forge import create +@mock.patch('forge.create.get_instance_details') @mock.patch('forge.create.search_and_create') -def test_create_single(mock_search_create): +def test_create_single(mock_search_create, mock_get_instance_details): """Test entry-point for creation of single instance.""" - config = {'service': 'single'} + config = {'service': 'single', 'aws_az': 'us-east-1a'} + mock_get_instance_details.return_value = {'single': {}} create.create(config) - mock_search_create.assert_called_once_with(config, 'single') + mock_search_create.assert_called_once_with(config, 'single', {}) +@mock.patch('forge.create.get_instance_details') @mock.patch('forge.create.search_and_create') -def test_create_cluster_master_workers(mock_search_create): +def test_create_cluster_master_workers(mock_search_create, mock_get_instance_details): """Test entry-point for creation of cluster with master and workers.""" - config = {'service': 'cluster', 'ram': [512]} + config = {'service': 'cluster', 'aws_az': 'us-east-1a'} + mock_get_instance_details.return_value = {'cluster-master': {}, 'cluster-worker': {}} create.create(config) mock_search_create.assert_has_calls([ - mock.call(config, 'cluster-master'), mock.call(config, 'cluster-worker') + mock.call(config, 'cluster-master', {}), mock.call(config, 'cluster-worker', {}) ]) From 023f54224282c78e4324a93542c2bab1ee292b56 Mon Sep 17 00:00:00 2001 From: "Gabriele A. Ron" Date: Fri, 12 Jan 2024 11:02:35 -0600 Subject: [PATCH 2/5] Updated documentation for multi-az --- docs/environmental_yaml.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/environmental_yaml.md b/docs/environmental_yaml.md index f423a24..6bd0d72 100644 --- a/docs/environmental_yaml.md +++ b/docs/environmental_yaml.md @@ -59,10 +59,18 @@ https://github.com/carsdotcom/cars-forge/blob/main/examples/env_yaml_example/exa constraints: [2.3, 3.0, 3.1] error: "Invalid Spark version. Only 2.3, 3.0, and 3.1 are supported." ``` -- **aws_az** - The [AWS availability zone](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html) where Forge will create the EC2 instance. Currently, Forge can run only in one AZ -- **aws_profile** - [AWS CLI profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use +- **aws_az** - The [AWS availability zone](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html) where Forge will create the EC2 instance. If set, multi-az placement will be disabled. +- **aws_region** - The AWS region for Forge to run in- **aws_profile** - [AWS CLI profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use - **aws_security_group** - [AWS Security Group](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-security-groups.html) for the instance -- **aws_subnet** - [AWS subnet](https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html) where the EC2s will run +- **aws_subnet** - [AWS subnet](https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html) where the EC2s will run +- **aws_multi_az** - [AWS subnet](https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html) where the EC2s will run organized by AZ + - E.g. + ```yaml + aws_multi_az: + us-east-1a: subnet-aaaaaaaaaaaaaaaaa + us-east-1b: subnet-bbbbbbbbbbbbbbbbb + us-east-1c: subnet-ccccccccccccccccc + ``` - **default_ratio** - Override the default ratio of RAM to CPU if the user does not provide one. Must be a list of the minimum and maximum. - default is [8, 8] - **ec2_amis** - A dictionary of dictionaries to store [AMI](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/AMIs.html) info. From 81f4c2b60ec5d0c493cc0a50381cb39e740f1306 Mon Sep 17 00:00:00 2001 From: "Gabriele A. Ron" Date: Fri, 12 Jan 2024 11:02:54 -0600 Subject: [PATCH 3/5] Add multi-az functionality --- src/forge/common.py | 15 +++- src/forge/configure.py | 8 +- src/forge/create.py | 193 ++++++++++++++++++++++++++++++----------- src/forge/destroy.py | 1 + 4 files changed, 159 insertions(+), 58 deletions(-) diff --git a/src/forge/common.py b/src/forge/common.py index 02ea5c3..a463e53 100755 --- a/src/forge/common.py +++ b/src/forge/common.py @@ -117,7 +117,8 @@ def ec2_ip(n, config): 'instance_type': i.get('InstanceType'), 'state': i.get('State').get('Name'), 'launch_time': i.get('LaunchTime'), - 'fleet_id': check_fleet_id(n, config) + 'fleet_id': check_fleet_id(n, config), + 'az': i.get('Placement')['AvailabilityZone'] } details.append(x) logger.debug('ec2_ip details is %s', details) @@ -320,6 +321,14 @@ def normalize_config(config): if config.get('aws_az'): config['region'] = config['aws_az'][:-1] + if config.get('aws_subnet') and not config.get('aws_multi_az'): + config['aws_multi_az'] = {config.get('aws_az'): config.get('aws_subnet')} + elif config.get('aws_subnet') and config.get('aws_multi_az'): + logger.warning('Both aws_multi_az and aws_subnet exist, defaulting to aws_multi_az') + + if config.get('aws_region'): + config['region'] = config['aws_region'] + if not config.get('ram') and not config.get('cpu') and config.get('ratio'): DEFAULT_ARG_VALS['default_ratio'] = config.pop('ratio') @@ -492,8 +501,8 @@ def get_ec2_pricing(ec2_type, market, config): float Hourly price of given EC2 type in given market. """ - region = config.get('region') - az = config.get('aws_az') + region = config['region'] + az = config['aws_az'] if market == 'spot': client = boto3.client('ec2') diff --git a/src/forge/configure.py b/src/forge/configure.py index 76db581..ba1fb30 100755 --- a/src/forge/configure.py +++ b/src/forge/configure.py @@ -50,11 +50,13 @@ def check_env_yaml(env_yaml): """ schema = Schema({ 'forge_env': And(str, len, error='Invalid Environment Name'), - 'aws_az': And(str, len, error='Invalid AWS availability zone'), + Optional('aws_region'): And(str, len, error='Invalid AWS region'), + Optional('aws_az'): And(str, len, error='Invalid AWS availability zone'), + Optional('aws_subnet'): And(str, len, error='Invalid AWS Subnet'), 'ec2_amis': And(dict, len, error='Invalid AMI Dictionary'), - 'aws_subnet': And(str, len, error='Invalid AWS Subnet'), + Optional('aws_multi_az'): And(dict, len, error='Invalid AWS Subnet'), 'ec2_key': And(str, len, error='Invalid AWS key'), - 'aws_security_group': And(str, len, error='Invalid AWS Security Group'), + Optional('aws_security_group'): And(str, len, error='Invalid AWS Security Group'), 'forge_pem_secret': And(str, len, error='Invalid Name of Secret'), Optional('aws_profile'): And(str, len, error='Invalid AWS profile'), Optional('ratio'): And(list, len, error='Invalid default ratio'), diff --git a/src/forge/create.py b/src/forge/create.py index 1ec4ccc..ff53c9a 100755 --- a/src/forge/create.py +++ b/src/forge/create.py @@ -2,11 +2,13 @@ import base64 import logging import sys +import math import os import time from datetime import datetime, timedelta import boto3 +import botocore.exceptions from botocore.exceptions import ClientError from . import DEFAULT_ARG_VALS, REQUIRED_ARGS @@ -474,7 +476,70 @@ def calc_machine_ranges(*, ram=None, cpu=None, ratio=None, workers=None): return job_ram, job_cpu, total_ram, sorted(ram2cpu_ratio) -def create_fleet(n, config, task): +def get_placement_az(config, instance_details, mode=None): + if not mode: + mode = 'balanced' + + region = config.get('region') + subnet = config['aws_multi_az'] + + client = boto3.client('ec2') + az_info = client.describe_availability_zones() + az_mapping = {x['ZoneId']: x['ZoneName'] for x in az_info['AvailabilityZones']} + + try: + response = client.get_spot_placement_scores( + TargetCapacity=instance_details['total_capacity'], + TargetCapacityUnitType=instance_details['capacity_unit'], + SingleAvailabilityZone=True, + RegionNames=[region], + InstanceRequirementsWithMetadata={ + 'ArchitectureTypes': ['x86_64'], # ToDo: Make configurable + 'InstanceRequirements': instance_details['override_instance_stats'] + }, + MaxResults=10, + ) + + placement = {az_mapping[x['AvailabilityZoneId']]: x['Score'] for x in response['SpotPlacementScores']} + logger.debug(placement) + except botocore.exceptions.ClientError as e: + logger.error('Permissions to pull spot placement scores are necessary') + logger.error(e) + placement = {} + + subnet_details = {} + + try: + for placement_az, placement_subnet in subnet.items(): + response = client.describe_subnets( + SubnetIds=[placement_subnet] + ) + + logger.debug(response) + + subnet_details[placement_az] = response['Subnets'][0]['AvailableIpAddressCount'] + + except botocore.exceptions.ClientError as e: + logger.error(e) + + if mode in ['balanced', 'placement']: + subnet_details = {k: int(math.sqrt(v)) for k, v in subnet_details.items()} + + if mode == 'placement': + placement = {k: v**2 for k, v in placement.items()} + + for k, v in subnet_details.items(): + if placement.get(k): + placement[k] += v + else: + placement[k] = v + + az = max(placement, key=placement.get) + + return az + + +def create_fleet(n, config, task, instance_details): """creates the AWS EC2 fleet Parameters @@ -485,6 +550,8 @@ def create_fleet(n, config, task): Forge configuration data task : str Forge service to run + instance_details: dict + EC2 instance details for create_fleet """ profile = config.get('aws_profile') valid = config.get('valid_time', DEFAULT_ARG_VALS['valid_time']) @@ -494,45 +561,15 @@ def create_fleet(n, config, task): now_utc = datetime.utcnow() now_utc = now_utc.replace(microsecond=0) valid_until = now_utc + timedelta(hours=int(valid)) - subnet = config.get('aws_subnet') - - ram = config.get('ram', None) - cpu = config.get('cpu', None) - ratio = config.get('ratio', None) - worker_count = config.get('workers', None) - destroy_flag = config.get('destroy_after_failure') - - if 'single' in n and max(len(ram or []), len(cpu or []), len(ratio or [])) > 1: - raise ValueError("Too many values provided for single job.") - elif 'cluster' in n and min(len(ram or [0, 0]), len(cpu or [0, 0]), len(ratio or [0, 0])) < 2: - raise ValueError("Too few values provided for cluster job.") - - def _check(x, i): - logger.debug('Get index %d of %s', i, x) - return x[i] if x and x[i:i + 1] else None - - if 'cluster-master' in n or 'single' in n: - ram, cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 0), cpu=_check(cpu, 0), - ratio=_check(ratio, 0)) - worker_count = 1 - elif 'cluster-worker' in n: - ram, cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), - ratio=_check(ratio, 1), workers=worker_count) - else: - logger.error("'%s' does not seem to be a valid cluster or single job.", n) - if destroy_flag: - destroy(config) - sys.exit(1) - - logger.debug('OVERRIDE DETAILS | RAM: %s out of %s | CPU: %s with ratio of %s', ram, total_ram, cpu, ram2cpu_ratio) + subnet = config.get('aws_multi_az') gpu = config.get('gpu_flag', False) market = config.get('market', DEFAULT_ARG_VALS['market']) - subnet = config.get('aws_subnet') market = market[-1] if 'cluster-worker' in n else market[0] set_boto_session(region, profile) + az = config['aws_az'] fmt = FormatEmpty() access_vars = user_accessible_vars(config, market=market, task=task) @@ -555,8 +592,8 @@ def _check(x, i): } }, 'TargetCapacitySpecification': { - 'TotalTargetCapacity': worker_count or total_ram, - 'TargetCapacityUnitType': 'units' if worker_count else 'memory-mib', + 'TotalTargetCapacity': instance_details['total_capacity'], + 'TargetCapacityUnitType': instance_details['capacity_unit'], 'DefaultTargetCapacityType': market }, 'Type': 'maintain', @@ -575,22 +612,17 @@ def _check(x, i): if not tags: kwargs.pop('TagSpecifications') - override_instance_stats = { - 'MemoryMiB': {'Min': ram[0], 'Max': ram[1]}, - 'VCpuCount': {'Min': cpu[0], 'Max': cpu[1]}, - 'SpotMaxPricePercentageOverLowestPrice': 100, - 'MemoryGiBPerVCpu': {'Min': ram2cpu_ratio[0], 'Max': ram2cpu_ratio[1]} - } if gpu: - override_instance_stats['AcceleratorTypes'] = ['gpu'] + instance_details['override_instance_stats']['AcceleratorTypes'] = ['gpu'] if excluded_ec2s: - override_instance_stats['ExcludedInstanceTypes'] = excluded_ec2s + instance_details['override_instance_stats']['ExcludedInstanceTypes'] = excluded_ec2s launch_template_config = { 'LaunchTemplateSpecification': {'LaunchTemplateName': n, 'Version': '1'}, 'Overrides': [{ - 'SubnetId': subnet, - 'InstanceRequirements': override_instance_stats + 'SubnetId': subnet[az], + 'AvailabilityZone': az, + 'InstanceRequirements': instance_details['override_instance_stats'] }] } kwargs['LaunchTemplateConfigs'] = [launch_template_config] @@ -601,7 +633,7 @@ def _check(x, i): create_status(n, request, config) -def search_and_create(config, task): +def search_and_create(config, task, instance_details): """check for running instances and create new ones if necessary Parameters @@ -610,6 +642,8 @@ def search_and_create(config, task): Forge configuration data task : str Forge service to run + instance_details: dict + EC2 instance details for create_fleet """ if not config.get('ram') and not config.get('cpu'): logger.error('Please supply either a ram or cpu value to continue.') @@ -634,18 +668,67 @@ def search_and_create(config, task): logger.info('Fleet is running without EC2, will recreate it.') destroy(config) create_template(n, config, task) - create_fleet(n, config, task) + create_fleet(n, config, task, instance_details) elif len(detail) > 1 and task != 'cluster-worker': logger.info('Multiple %s instances running, destroying and recreating', task) destroy(config) create_template(n, config, task) - create_fleet(n, config, task) + create_fleet(n, config, task, instance_details) detail = ec2_ip(n, config) for e in detail: if e['state'] == 'running': logger.info('%s is running, the IP is %s', task, e['ip']) +def get_instance_details(config, task_list): + """calculate instance details & resources for fleet creation + Parameters + ---------- + config : dict + Forge configuration data + task_list : list + Forge services to get details of + """ + ram = config.get('ram', None) + cpu = config.get('cpu', None) + ratio = config.get('ratio', None) + worker_count = config.get('workers', None) + destroy_flag = config.get('destroy_after_failure') + + instance_details = {} + + def _check(x, i): + logger.debug('Get index %d of %s', i, x) + return x[i] if x and x[i:i + 1] else None + + for task in task_list: + if 'cluster-master' in task or 'single' in task: + task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 0), cpu=_check(cpu, 0), ratio=_check(ratio, 0)) + worker_count = 1 + elif 'cluster-worker' in task: + task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), ratio=_check(ratio, 1), workers=worker_count) + else: + logger.error("'%s' does not seem to be a valid cluster or single job.", task) + if destroy_flag: + destroy(config) + sys.exit(1) + + logger.debug('%s OVERRIDE DETAILS | RAM: %s out of %s | CPU: %s with ratio of %s', task, ram, total_ram, cpu, ram2cpu_ratio) + + instance_details[task] = { + 'total_capacity': worker_count or total_ram, + 'capacity_unit': 'units' if worker_count else 'memory-mib', + 'override_instance_stats': { + 'MemoryMiB': {'Min': task_ram[0], 'Max': task_ram[1]}, + 'VCpuCount': {'Min': task_cpu[0], 'Max': task_cpu[1]}, + 'SpotMaxPricePercentageOverLowestPrice': 100, + 'MemoryGiBPerVCpu': {'Min': ram2cpu_ratio[0], 'Max': ram2cpu_ratio[1]} + } + } + + return instance_details + + def create(config): """creates EC2 instances based on config @@ -657,9 +740,15 @@ def create(config): sys.excepthook = destroy_hook service = config.get('service') + task_list = ['single'] - if service == 'single': - search_and_create(config, 'single') if service == 'cluster': - search_and_create(config, 'cluster-master') - search_and_create(config, 'cluster-worker') + task_list = ['cluster-master', 'cluster-worker'] + + instance_details = get_instance_details(config, task_list) + + if not config.get('aws_az'): + config['aws_az'] = get_placement_az(config, instance_details[task_list[-1]]) + + for task in task_list: + search_and_create(config, task, instance_details[task]) diff --git a/src/forge/destroy.py b/src/forge/destroy.py index a1e53e6..d50a127 100755 --- a/src/forge/destroy.py +++ b/src/forge/destroy.py @@ -60,6 +60,7 @@ def pricing(detail, config, market): if dif > max_dif: max_dif = dif ec2_type = e['instance_type'] + config['aws_az'] = e['az'] total_cost = get_ec2_pricing(ec2_type, market, config) if total_cost > 0: From d31bb08d68d9e6be338821e2dfe79c3146465948 Mon Sep 17 00:00:00 2001 From: "Gabriele A. Ron" Date: Fri, 19 Jan 2024 05:39:43 -0600 Subject: [PATCH 4/5] Add spot retries and failover --- src/forge/common.py | 12 ++++++++ src/forge/configure.py | 6 ++-- src/forge/create.py | 12 ++++---- src/forge/engine.py | 65 ++++++++++++++++++++++++++++++++++++---- src/forge/exceptions.py | 3 ++ src/forge/rsync.py | 3 +- src/forge/run.py | 16 ++++++---- src/forge/yaml_loader.py | 2 ++ tests/test_run.py | 1 + 9 files changed, 99 insertions(+), 21 deletions(-) create mode 100644 src/forge/exceptions.py diff --git a/src/forge/common.py b/src/forge/common.py index 02ea5c3..1afbe4d 100755 --- a/src/forge/common.py +++ b/src/forge/common.py @@ -14,6 +14,7 @@ from botocore.exceptions import ClientError, NoCredentialsError from . import DEFAULT_ARG_VALS, ADDITIONAL_KEYS +from .exceptions import ExitHandlerException logger = logging.getLogger(__name__) @@ -529,3 +530,14 @@ def get_ec2_pricing(ec2_type, market, config): price = float(price) return price + + +def exit_callback(config, exit: bool = False): + if config['job'] == 'engine' and (config.get('spot_retries') or (config.get('on_demand_failover') or config.get('market_failover'))): + logger.error('Error occurred, bubbling up error to handler.') + raise ExitHandlerException + + if exit: + sys.exit(1) + + pass diff --git a/src/forge/configure.py b/src/forge/configure.py index 76db581..0fab7e0 100755 --- a/src/forge/configure.py +++ b/src/forge/configure.py @@ -5,7 +5,7 @@ import sys import yaml -from schema import Schema, And, Optional, SchemaError +from schema import Schema, And, Optional, SchemaError, Use from .common import set_config_dir @@ -62,7 +62,9 @@ def check_env_yaml(env_yaml): Optional('tags'): And(list, len, error="Invalid AWS tags"), Optional('excluded_ec2s'): And(list), Optional('additional_config'): And(list), - Optional('ec2_max'): And(int) + Optional('ec2_max'): And(int), + Optional('on_demand_failover'): And(bool), + Optional('spot_retries'): And(Use(int), lambda x: x > 0), }) try: validated = schema.validate(env_yaml) diff --git a/src/forge/create.py b/src/forge/create.py index 1ec4ccc..9f39cdd 100755 --- a/src/forge/create.py +++ b/src/forge/create.py @@ -11,8 +11,7 @@ from . import DEFAULT_ARG_VALS, REQUIRED_ARGS from .parser import add_basic_args, add_job_args, add_env_args, add_general_args -from .common import (ec2_ip, destroy_hook, set_boto_session, - user_accessible_vars, FormatEmpty, get_ec2_pricing) +from .common import ec2_ip, destroy_hook, set_boto_session, user_accessible_vars, FormatEmpty, get_ec2_pricing, exit_callback from .destroy import destroy logger = logging.getLogger(__name__) @@ -177,7 +176,7 @@ def create_status(n, request, config): destroy(config) error_details = get_fleet_error(client, fleet_id, create_time) logger.error('Last status details: %s', error_details) - sys.exit(1) + exit_callback(config, exit=True) time.sleep(10) t += 10 time_without_spot += 10 @@ -198,7 +197,7 @@ def create_status(n, request, config): logger.error('The EC2 spot instance failed to start, please try again.') if destroy_flag: destroy(config) - sys.exit(1) + exit_callback(config, exit=True) logger.info('Finding EC2... - %ds elapsed', t) fleet_request_configs = client.describe_fleet_instances(FleetId=fleet_id) active_instances_list = fleet_request_configs.get('ActiveInstances') @@ -207,6 +206,7 @@ def create_status(n, request, config): list_len = len(ec2_id_list) logger.debug('EC2 list is: %s', ec2_id_list) + config['ec2_id_list'] = ec2_id_list time_without_instance = 0 for s in ec2_id_list: status = 'initializing' @@ -222,12 +222,12 @@ def create_status(n, request, config): logger.error('The EC2 spot instance failed to start, please try again.') if destroy_flag: destroy(config) - sys.exit(1) + exit_callback(config, exit=True) elif status not in {'initializing', 'ok'}: logger.error('Could not start instance. Last EC2 status: %s', status) if destroy_flag: destroy(config) - sys.exit(1) + exit_callback(config, exit=True) logger.info('EC2 initialized.') pricing(n, config, fleet_id) diff --git a/src/forge/engine.py b/src/forge/engine.py index 5f1cf97..4f37e7d 100755 --- a/src/forge/engine.py +++ b/src/forge/engine.py @@ -1,11 +1,18 @@ """Run a command on remote EC2, rsync user content, and execute it.""" -from . import REQUIRED_ARGS -from .parser import add_basic_args, add_job_args, add_env_args, add_general_args, add_action_args -from .create import create +import boto3 +import logging + +from . import DEFAULT_ARG_VALS, REQUIRED_ARGS +from .exceptions import ExitHandlerException +from .parser import add_basic_args, add_job_args, add_env_args, add_general_args, add_action_args, nonnegative_int_arg +from .create import create, ec2_ip from .rsync import rsync from .run import run +logger = logging.getLogger(__name__) + + def cli_engine(subparsers): """add engine parser to subparsers @@ -21,6 +28,9 @@ def cli_engine(subparsers): add_general_args(parser) add_action_args(parser) + parser.add_argument('--spot_retries', '--spot-retries', type=nonnegative_int_arg) + parser.add_argument('--on_demand_failover', '--on-demand-failover', action='store_true', dest='market_failover') + REQUIRED_ARGS['engine'] = list(set(REQUIRED_ARGS['create'] + REQUIRED_ARGS['rsync'] + REQUIRED_ARGS['run'] + @@ -40,7 +50,50 @@ def engine(config): int The exit status for run with 0 for success """ - create(config) - rsync(config) - status = run(config) + status = 4 + + try: + create(config) + rsync(config) + status = run(config) + except ExitHandlerException: + # Check for spot instances and retries + if 'spot' in config['market']: + + name = config.get('name') + date = config.get('date', '') + market = config.get('market', DEFAULT_ARG_VALS['market']) + service = config.get('service') + + n_list = [] + if service == "cluster": + n_list.append(f'{name}-{market[0]}-{service}-master-{date}') + n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}') + elif service == "single": + n_list.append(f'{name}-{market[0]}-{service}-{date}') + + for n in n_list: + flag = False + details = ec2_ip(n, config) + + for ec2 in details: + if ec2['state'] != 'running': + flag = True + + if flag: + break + else: + logger.critical('Bubble received but all instances are ok.') + status = 3 + return status + + if config.get('spot_retries', 0) > 0: + config['spot_retries'] -= 1 + status = engine(config) + elif config.get('on_demand_failover') or config.get('market_failover'): + config['market'][0] = config['market'][-1] = 'on-demand' + status = engine(config) + else: + status = 5 + return status diff --git a/src/forge/exceptions.py b/src/forge/exceptions.py new file mode 100644 index 0000000..c507967 --- /dev/null +++ b/src/forge/exceptions.py @@ -0,0 +1,3 @@ +class ExitHandlerException(Exception): + """Raised when there's an exception for the ExitHandler""" + pass diff --git a/src/forge/rsync.py b/src/forge/rsync.py index 3e5cd46..654728e 100755 --- a/src/forge/rsync.py +++ b/src/forge/rsync.py @@ -6,7 +6,7 @@ from . import DEFAULT_ARG_VALS, REQUIRED_ARGS from .parser import add_basic_args, add_general_args, add_env_args, add_action_args -from .common import ec2_ip, key_file, get_ip +from .common import ec2_ip, key_file, get_ip, exit_callback logger = logging.getLogger(__name__) @@ -80,6 +80,7 @@ def _rsync(config, ip): ) except subprocess.CalledProcessError as exc: logger.error('Rsync failed:\n%s', exc.output) + exit_callback(config) else: logger.info('Rsync successful:\n%s', output) diff --git a/src/forge/run.py b/src/forge/run.py index 906015f..f3f465f 100755 --- a/src/forge/run.py +++ b/src/forge/run.py @@ -5,8 +5,9 @@ import sys from . import DEFAULT_ARG_VALS, REQUIRED_ARGS +from .exceptions import ExitHandlerException from .parser import add_basic_args, add_general_args, add_env_args, add_action_args -from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty +from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty, exit_callback from .destroy import destroy logger = logging.getLogger(__name__) @@ -127,10 +128,13 @@ def _run(config, ip): raise ValueError('Run command unsuccessful, ending attempts.') except ValueError as e: logger.error('Run command raised error: %s', e) - if destroy_flag: - logger.info('destroy_after_failure parameter True, running forge destroy...') - destroy(config) - - break + try: + exit_callback(config) + except ExitHandlerException as exc: + raise + finally: + if destroy_flag: + logger.info('destroy_after_failure parameter True, running forge destroy...') + destroy(config) return rval diff --git a/src/forge/yaml_loader.py b/src/forge/yaml_loader.py index b9cd30f..d4042b3 100755 --- a/src/forge/yaml_loader.py +++ b/src/forge/yaml_loader.py @@ -141,11 +141,13 @@ def _get_type(x): Optional('gpu_flag'): And(bool), Optional('market'): And(Or(str, list)), Optional('name'): And(str, len, error='Invalid Name'), + Optional('on_demand_failover'): And(bool), Optional('ratio'): And(list), Optional('ram'): And(list, error='Invalid RAM'), Optional('rsync_path'): And(str), Optional('run_cmd'): And(str, len, error='Invalid run_cmd'), Optional('service'): And(str, len, Or('single', 'cluster'), error='Invalid Service'), + Optional('spot_retries'): And(Use(int), positive_int), Optional('user_data'): And(list), Optional('valid_time'): And(Use(int), positive_int), Optional('workers'): And(Use(int), positive_int), diff --git a/tests/test_run.py b/tests/test_run.py index c5340ce..ed2769c 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -72,6 +72,7 @@ def test_run_error(mock_ec2_ip, mock_get_ip, mock_key_file, mock_sub_run, caplog 'aws_profile': 'dev', 'forge_env': 'test', 'run_cmd': 'dummy.sh dev test', + 'job': 'run', } expected_cmd = [ 'ssh', '-t', '-o', 'UserKnownHostsFile=/dev/null', '-o', From 48a164f8b00e0ce300838073b147b752acd8c992 Mon Sep 17 00:00:00 2001 From: "Gabriele A. Ron" Date: Wed, 7 Feb 2024 10:07:39 -0600 Subject: [PATCH 5/5] Update documentation --- docs/environmental_yaml.md | 2 ++ docs/yaml.md | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/environmental_yaml.md b/docs/environmental_yaml.md index f423a24..df31294 100644 --- a/docs/environmental_yaml.md +++ b/docs/environmental_yaml.md @@ -95,6 +95,8 @@ https://github.com/carsdotcom/cars-forge/blob/main/examples/env_yaml_example/exa ``` - **forge_env** - Name of the Forge environment. The user will refer to this in their yaml. - **forge_pem_secret** - The secret name where the `ec2_key` is stored +- **on_demand_failover** - If using engine mode and all spot attempts (market: spot + spot retries) have failed, run a final attempt using on-demand. +- **spot_retries** - If using engine mode, sets the number of times to retry a spot instance. Only retries if either market is spot. - **tags** - [Tags](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html) to apply to instances created by Forge. Follows the AWS tag format. - Forge also exposes all string, numeric, and some extra variables from the combined user and environmental configs that will be replaced at runtime by the matching values (e.g. `{name}` for job name, `{date}` for job date, etc.) See the [variables](variables.md) page for more details. - E.g. diff --git a/docs/yaml.md b/docs/yaml.md index dc8701a..44b6620 100644 --- a/docs/yaml.md +++ b/docs/yaml.md @@ -43,6 +43,7 @@ Each forge command certain parameters. A yaml file with all the parameters can b ``` - If running via the command line, a range of values is passed as: ``--market on-demand spot``. - **name** - Name of the instance/cluster +- **on_demand_failover** - If using engine mode and all spot attempts (market: spot + spot retries) have failed, run a final attempt using on-demand. - **ram** - Minimum amount of RAM required. Can be a range e.g. [16, 32]. - If using a cluster, you must specify both the master and worker. Master first, worker second. ```yaml @@ -76,5 +77,6 @@ Each forge command certain parameters. A yaml file with all the parameters can b - Use the `--all` flag to run the script on all the instances in a cluster. - E.g. `run_cmd: scripts/run.sh {env} {date} {ip}` - **service** - `cluster` or `single` +- **spot_retries** - If using engine mode, sets the number of times to retry a spot instance. Only retries if either market is spot. - **user_data** - Custom script passed to instance. Will be run only once when the instance starts up. - **valid_time** - How many hours the fleet will stay up. After this time, all EC2s will be destroyed. The default is 8. \ No newline at end of file