Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured output #226

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 108 additions & 150 deletions ecs_deploy/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from os import getenv
from time import sleep
from pathlib import Path

import click
import json
import getpass
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
from ecs_deploy import VERSION
from ecs_deploy.ecs import DeployAction, ScaleAction, RunAction, EcsClient, DiffAction, \
Expand All @@ -26,8 +26,7 @@ def get_client(access_key_id, secret_access_key, region, profile, assume_account


@click.command()
@click.argument('cluster')
@click.argument('service')
@click.argument('cluster-service', nargs=-1)
@click.option('-t', '--tag', help='Changes the tag for ALL container images')
@click.option('-i', '--image', type=(str, str), multiple=True, help='Overwrites the image for a container: <container> <image>')
@click.option('-c', '--command', type=(str, str), multiple=True, help='Overwrites the command in a container: <container> <command>')
Expand Down Expand Up @@ -85,11 +84,13 @@ def get_client(access_key_id, secret_access_key, region, profile, assume_account
@click.option('--volume', type=(str, str), multiple=True, required=False, help='Set volume mapping from host to container in the task definition.')
@click.option('--add-container', type=str, multiple=True, required=False, help='Add a placeholder container in the task definition.')
@click.option('--remove-container', type=str, multiple=True, required=False, help='Remove a container from the task definition.')
def deploy(cluster, service, tag, image, command, health_check, cpu, memory, memoryreservation, task_cpu, task_memory, privileged, essential, env, env_file, s3_env_file, secret, secrets_env_file, ulimit, system_control, port, mount, log, role, execution_role, runtime_platform, task, region, access_key_id, secret_access_key, profile, account, assume_role, timeout, newrelic_apikey, newrelic_appid, newrelic_region, newrelic_revision, comment, user, ignore_warnings, diff, deregister, rollback, exclusive_env, exclusive_secrets, exclusive_s3_env_file, sleep_time, exclusive_ulimits, exclusive_system_controls, exclusive_ports, exclusive_mounts, volume, add_container, remove_container, slack_url, docker_label, exclusive_docker_labels, slack_service_match='.*'):
@click.option('-o', '--out-file', type=str, required=False, help='Sets the file to write structured JSON output to')
def deploy(cluster_service, tag, image, command, health_check, cpu, memory, memoryreservation, task_cpu, task_memory, privileged, essential, env, env_file, s3_env_file, secret, secrets_env_file, ulimit, system_control, port, mount, log, role, execution_role, runtime_platform, task, region, access_key_id, secret_access_key, profile, account, assume_role, timeout, newrelic_apikey, newrelic_appid, newrelic_region, newrelic_revision, comment, user, ignore_warnings, diff, deregister, rollback, exclusive_env, exclusive_secrets, exclusive_s3_env_file, sleep_time, exclusive_ulimits, exclusive_system_controls, exclusive_ports, exclusive_mounts, volume, add_container, remove_container, slack_url, docker_label, exclusive_docker_labels, out_file, slack_service_match='.*'):
"""
Redeploy or modify a service.

\b
CLUSTER_SERVICE is the space separated pair of your CLUSTER and SERVICE. Multiple pairs can be given.
CLUSTER is the name of your cluster (e.g. 'my-cluster') within ECS.
SERVICE is the name of your service (e.g. 'my-app') within ECS.

Expand All @@ -99,59 +100,65 @@ def deploy(cluster, service, tag, image, command, health_check, cpu, memory, mem
"""
try:
client = get_client(access_key_id, secret_access_key, region, profile, account, assume_role)
deployment = DeployAction(client, cluster, service)

td = get_task_definition(deployment, task)
# If there is a new container, add it at frist.
td.add_containers(add_container)
td.remove_containers(remove_container)
td.set_images(tag, **{key: value for (key, value) in image})
td.set_commands(**{key: value for (key, value) in command})
td.set_health_checks(health_check)
td.set_cpu(**{key: value for (key, value) in cpu})
td.set_memory(**{key: value for (key, value) in memory})
td.set_memoryreservation(**{key: value for (key, value) in memoryreservation})
td.set_task_cpu(task_cpu)
td.set_task_memory(task_memory)
td.set_privileged(**{key: value for (key, value) in privileged})
td.set_essential(**{key: value for (key, value) in essential})
td.set_environment(env, exclusive_env, env_file)
td.set_docker_labels(docker_label, exclusive_docker_labels)
td.set_s3_env_file(s3_env_file, exclusive_s3_env_file)
td.set_secrets(secret, exclusive_secrets, secrets_env_file)
td.set_ulimits(ulimit, exclusive_ulimits)
td.set_system_controls(system_control, exclusive_system_controls)
td.set_port_mappings(port, exclusive_ports)
td.set_mount_points(mount, exclusive_mounts)
td.set_log_configurations(log)
td.set_role_arn(role)
td.set_execution_role_arn(execution_role)
td.set_runtime_platform(runtime_platform)
td.set_volumes(volume)
deployments = []
num_pairs = len(cluster_service) // 2
for i in range(num_pairs):
cluster, service = cluster_service[i * 2], cluster_service[i * 2 + 1]

deployment = DeployAction(client, cluster, service)

td = deployment.get_task_definition(task)
# If there is a new container, add it at frist.
td.add_containers(add_container)
td.remove_containers(remove_container)
td.set_images(tag, **{key: value for (key, value) in image})
td.set_commands(**{key: value for (key, value) in command})
td.set_health_checks(health_check)
td.set_cpu(**{key: value for (key, value) in cpu})
td.set_memory(**{key: value for (key, value) in memory})
td.set_memoryreservation(**{key: value for (key, value) in memoryreservation})
td.set_task_cpu(task_cpu)
td.set_task_memory(task_memory)
td.set_privileged(**{key: value for (key, value) in privileged})
td.set_essential(**{key: value for (key, value) in essential})
td.set_environment(env, exclusive_env, env_file)
td.set_docker_labels(docker_label, exclusive_docker_labels)
td.set_s3_env_file(s3_env_file, exclusive_s3_env_file)
td.set_secrets(secret, exclusive_secrets, secrets_env_file)
td.set_ulimits(ulimit, exclusive_ulimits)
td.set_system_controls(system_control, exclusive_system_controls)
td.set_port_mappings(port, exclusive_ports)
td.set_mount_points(mount, exclusive_mounts)
td.set_log_configurations(log)
td.set_role_arn(role)
td.set_execution_role_arn(execution_role)
td.set_runtime_platform(runtime_platform)
td.set_volumes(volume)

slack = SlackNotification(
getenv('SLACK_URL', slack_url),
getenv('SLACK_SERVICE_MATCH', slack_service_match)
)
slack.notify_start(cluster, tag, td, comment, user, service=service)

slack = SlackNotification(
getenv('SLACK_URL', slack_url),
getenv('SLACK_SERVICE_MATCH', slack_service_match)
)
slack.notify_start(cluster, tag, td, comment, user, service=service)
click.secho('Deploying based on task definition: %s\n' % td.family_revision)

click.secho('Deploying based on task definition: %s\n' % td.family_revision)
if diff:
print_diff(td)

if diff:
print_diff(td)
new_td = create_task_definition(deployment, td)

new_td = create_task_definition(deployment, td)
deployments.append((deployment, td, new_td))

try:
deploy_task_definition(
deployment=deployment,
task_definition=new_td,
deploy_task_definitions(
deployments=deployments,
title='Deploying new task definition',
success_message='Deployment successful',
failure_message='Deployment failed',
timeout=timeout,
deregister=deregister,
previous_task_definition=td,
ignore_warnings=ignore_warnings,
sleep_time=sleep_time
)
Expand All @@ -160,7 +167,7 @@ def deploy(cluster, service, tag, image, command, health_check, cpu, memory, mem
slack.notify_failure(cluster, str(e), service=service)
if rollback:
click.secho('%s\n' % str(e), fg='red', err=True)
rollback_task_definition(deployment, td, new_td, sleep_time=sleep_time)
rollback_task_definitions(deployments, sleep_time=sleep_time)
exit(1)
else:
raise
Expand All @@ -169,6 +176,27 @@ def deploy(cluster, service, tag, image, command, health_check, cpu, memory, mem

slack.notify_success(cluster, td.revision, service=service)

if out_file:
output = Path(out_file)
if output.exists() and not output.is_file():
click.secho('Out-file already exists and is not a file.\n', fg='red', err=True)
else:
click.secho('Writing structured JSON output to: %s\n' % output)
with output.open('w') as file:
data = [
{
'cluster': deployment.cluster_name,
'service': deployment.service_name,
'old_task_definition': old_td.family_revision,
'old_task_definition_arn': old_td.arn,
'task_definition': new_td.family_revision,
'task_definition_arn': new_td.arn
}
for deployment, old_td, new_td in deployments
]
json.dump(data, file)


except (EcsError, NewRelicException, ClientError) as e:
click.secho('%s\n' % str(e), fg='red', err=True)
exit(1)
Expand Down Expand Up @@ -387,7 +415,7 @@ def scale(cluster, service, desired_count, access_key_id, secret_access_key, reg
fg='green'
)
wait_for_finish(
action=scaling,
action=[scaling],
timeout=timeout,
title='Scaling service',
success_message='Scaling successful',
Expand Down Expand Up @@ -518,61 +546,44 @@ def diff(task, revision_a, revision_b, region, access_key_id, secret_access_key,
exit(1)


def wait_for_finish(action, timeout, title, success_message, failure_message,
def wait_for_finish(actions, timeout, title, success_message, failure_message,
ignore_warnings, sleep_time=1):
click.secho(title)
start_timestamp = datetime.now()
waiting_timeout = datetime.now() + timedelta(seconds=timeout)
service = action.get_service()
inspected_until = None

if timeout == -1:
waiting = False
else:
waiting = True
click.secho("Timeout disabled. Fire and forget.")
return

while waiting and datetime.now() < waiting_timeout:
click.secho('.', nl=False)
service = action.get_service()
inspected_until = inspect_errors(
service=service,
failure_message=failure_message,
ignore_warnings=ignore_warnings,
since=inspected_until,
timeout=False
)
waiting = not action.is_deployed(service)
while actions:
# Traverse in reverse order to remove finished actions
for i, action in list(enumerate(actions))[::-1]:
if action.has_finished(timeout, failure_message, ignore_warnings):
click.secho('\n%s (%s)' % (success_message, action.service_name), fg='green')
click.secho('Duration: %s sec\n' % action.get_duration())

if waiting:
sleep(sleep_time)
actions.pop(i)

inspect_errors(
service=service,
failure_message=failure_message,
ignore_warnings=ignore_warnings,
since=inspected_until,
timeout=waiting
)

click.secho('\n%s' % success_message, fg='green')
click.secho('Duration: %s sec\n' % (datetime.now() - start_timestamp).seconds)
click.secho('.', nl=False)
sleep(sleep_time)


def deploy_task_definition(deployment, task_definition, title, success_message,
def deploy_task_definitions(deployments, title, success_message,
failure_message, timeout, deregister,
previous_task_definition, ignore_warnings, sleep_time):
click.secho('Updating service')
deployment.deploy(task_definition)
ignore_warnings, sleep_time):

message = 'Successfully changed task definition to: %s:%s\n' % (
task_definition.family,
task_definition.revision
)
for deployment, _, new_td in deployments:
click.secho('Updating %s' % deployment.service_name)
deployment.deploy(new_td)

message = 'Successfully changed task definition to: %s:%s\n' % (
new_td.family,
new_td.revision
)

click.secho(message, fg='green')
click.secho(message, fg='green')

wait_for_finish(
action=deployment,
actions=[action for action, _, _ in deployments],
timeout=timeout,
title=title,
success_message=success_message,
Expand All @@ -582,15 +593,8 @@ def deploy_task_definition(deployment, task_definition, title, success_message,
)

if deregister:
deregister_task_definition(deployment, previous_task_definition)


def get_task_definition(action, task):
if task:
task_definition = action.get_task_definition(task)
else:
task_definition = action.get_current_task_definition(action.service)
return task_definition
for deployment, old_td, _ in deployments:
deregister_task_definition(deployment, old_td)


def create_task_definition(action, task_definition):
Expand All @@ -614,26 +618,25 @@ def deregister_task_definition(action, task_definition):
)


def rollback_task_definition(deployment, old, new, timeout=600, sleep_time=1):
def rollback_task_definitions(deployments, timeout=600, sleep_time=1):
task_definitions = ", ".join(old_td.family_revision for _, old_td, _ in deployments)
click.secho(
'Rolling back to task definition: %s\n' % old.family_revision,
'Rolling back to task definitions: %s\n' % task_definitions,
fg='yellow',
)
deploy_task_definition(
deployment=deployment,
task_definition=old,
deploy_task_definitions(
deployments=deployments,
title='Deploying previous task definition',
success_message='Rollback successful',
failure_message='Rollback failed. Please check ECS Console',
timeout=timeout,
deregister=True,
previous_task_definition=new,
ignore_warnings=False,
sleep_time=sleep_time
)
click.secho(
'Deployment failed, but service has been rolled back to previous '
'task definition: %s\n' % old.family_revision, fg='yellow', err=True
'Deployment failed, but services have been rolled back to previous '
'task definitions: %s\n' % task_definitions, fg='yellow', err=True
)


Expand Down Expand Up @@ -669,51 +672,6 @@ def print_diff(task_definition, title='Updating task definition'):
click.secho('')


def inspect_errors(service, failure_message, ignore_warnings, since, timeout):
error = False
last_error_timestamp = since
warnings = service.get_warnings(since)
for timestamp in warnings:
message = warnings[timestamp]
click.secho('')
if ignore_warnings:
last_error_timestamp = timestamp
click.secho(
'%s\nWARNING: %s' % (timestamp, message),
fg='yellow',
err=False
)
click.secho('Continuing.', nl=False)
else:
click.secho(
'%s\nERROR: %s\n' % (timestamp, message),
fg='red',
err=True
)
error = True

if service.older_errors:
click.secho('')
click.secho('Older errors', fg='yellow', err=True)
for timestamp in service.older_errors:
click.secho(
text='%s\n%s\n' % (timestamp, service.older_errors[timestamp]),
fg='yellow',
err=True
)

if timeout:
error = True
failure_message += ' due to timeout. Please see: ' \
'https://github.com/fabfuel/ecs-deploy#timeout'
click.secho('')

if error:
raise TaskPlacementError(failure_message)

return last_error_timestamp


ecs.add_command(deploy)
ecs.add_command(scale)
ecs.add_command(run)
Expand Down
Loading