Skip to content

Commit

Permalink
Evaluate all operations at execution time
Browse files Browse the repository at this point in the history
This represents a total overhaul of operation execution, previously
the commands-to-execute were generated in one stage and executed in
another. With this change command generation happens twice - once to
assertain if changes are required (to enable dry-runs, diffs) and then
again at execution.

This means that facts can be collected at execution rather than before
which prevents hidden side effects of operations impacting others. This
should properly solve the interdependent operation issues in pyinfra v2
as described here: https://docs.pyinfra.com/en/2.x/deploy-process.html#limitations
  • Loading branch information
Fizzadar committed Jul 23, 2023
1 parent 5be96ac commit 07e7e34
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 51 deletions.
92 changes: 47 additions & 45 deletions pyinfra/api/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from pyinfra.context import ctx_host, ctx_state

from .arguments import get_execution_kwarg_keys, pop_global_arguments
from .command import EvalOperationAtExecution, FunctionCommand, StringCommand
from .command import StringCommand
from .exceptions import OperationValueError, PyinfraError
from .host import Host
from .operations import run_host_op
Expand All @@ -36,24 +36,16 @@
class OperationMeta:
combined_output_lines = None

def __init__(self, hash=None, commands=None):
# Wrap all the attributes
commands = commands or []
self.commands = commands
def __init__(self, hash=None, is_change=False):
self.hash = hash

# Changed flag = did we do anything?
self.changed = len(self.commands) > 0
self.changed = is_change

def __repr__(self):
"""
Return Operation object as a string.
"""

return (
f"OperationMeta(commands={len(self.commands)}, "
f"changed={self.changed}, hash={self.hash})"
)
return f"OperationMeta(changed={self.changed}, hash={self.hash})"

def set_combined_output_lines(self, combined_output_lines):
self.combined_output_lines = combined_output_lines
Expand Down Expand Up @@ -200,30 +192,42 @@ def decorated_func(*args, **kwargs):
if has_run:
return OperationMeta(op_hash)

# "Run" operation
#
# "Run" operation - here we make a generator that will yield out actual commands to execute
# and, if we're diff-ing, we then iterate the generator now to determine if any changes
# *would* be made based on the *current* remote state.

# Otherwise, flag as in-op and run it to get the commands
host.in_op = True
host.current_op_hash = op_hash
host.current_op_global_kwargs = global_kwargs
def command_generator():
host.in_op = True
host.current_op_hash = op_hash
host.current_op_global_kwargs = global_kwargs

# Convert to list as the result may be a generator
commands = [ # convert any strings -> StringCommand's
StringCommand(command.strip()) if isinstance(command, str) else command
for command in func(*args, **kwargs)
]
for command in func(*args, **kwargs):
command = StringCommand(command.strip()) if isinstance(command, str) else command
yield command

host.in_op = False
host.current_op_hash = None
host.current_op_global_kwargs = None
host.in_op = False
host.current_op_hash = None
host.current_op_global_kwargs = None

if EvalOperationAtExecution in commands:
logger.warning("Defering operation evaluation until execution: %s", op_meta["names"])
commands = [FunctionCommand(operation(func), args, kwargs)]
op_is_change = False
if state.should_diff():
for command in command_generator():
op_is_change = True
break

# Add host-specific operation data to state, this mutates state
operation_meta = _update_state_meta(state, host, commands, op_hash, op_meta, global_kwargs)
operation_meta = _add_host_op_to_state(
state,
host,
op_hash,
op_is_change,
command_generator,
global_kwargs,
)

# If we're already in the execution phase, execute this operation immediately
if state.is_executing:
_execute_immediately(state, host, op_hash)

# Return result meta for use in deploy scripts
return operation_meta
Expand Down Expand Up @@ -338,12 +342,14 @@ def _ensure_shared_op_meta(state, op_hash, op_order, global_kwargs, names):
return op_meta


def _execute_immediately(state, host, op_data, op_meta, op_hash):
def _execute_immediately(state, host, op_hash):
logger.warning(
f"Note: nested operations are currently in beta ({get_call_location()})\n"
" More information: "
"https://docs.pyinfra.com/en/2.x/using-operations.html#nested-operations",
)
op_meta = state.get_op_meta(op_hash)
op_data = state.get_op_data_for_host(host, op_hash)
op_data["parent_op_hash"] = host.executing_op_hash
log_operation_start(op_meta, op_types=["nested"], prefix="")
run_host_op(state, host, op_hash)
Expand All @@ -370,28 +376,24 @@ def _attach_args(op_meta, args, kwargs):


# NOTE: this function mutates state.meta for this host
def _update_state_meta(state, host, commands, op_hash, op_meta, global_kwargs):
# We're doing some commands, meta/ops++
state.meta[host]["ops"] += 1
state.meta[host]["commands"] += len(commands)
def _add_host_op_to_state(state, host, op_hash, is_change, command_generator, global_kwargs):
host_meta = state.get_meta_for_host(host)

host_meta["ops"] += 1

if commands:
state.meta[host]["ops_change"] += 1
if is_change:
host_meta["ops_change"] += 1
else:
state.meta[host]["ops_no_change"] += 1
host_meta["ops_no_change"] += 1

operation_meta = OperationMeta(op_hash, commands)
operation_meta = OperationMeta(op_hash, is_change)

# Add the server-relevant commands
op_data = {
"commands": commands,
"command_generator": command_generator,
"global_kwargs": global_kwargs,
"operation_meta": operation_meta,
}
state.set_op_data(host, op_hash, op_data)

# If we're already in the execution phase, execute this operation immediately
if state.is_executing:
_execute_immediately(state, host, op_data, op_meta, op_hash)
state.set_op_data_for_host(host, op_hash, op_data)

return operation_meta
8 changes: 4 additions & 4 deletions pyinfra/api/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def run_host_op(state: "State", host: "Host", op_hash):
logger.info("{0}{1}".format(host.print_prefix, click.style("Skipped", "blue")))
return True

op_data = state.get_op_data(host, op_hash)
op_data = state.get_op_data_for_host(host, op_hash)
global_kwargs = op_data["global_kwargs"]

op_meta = state.get_op_meta(op_hash)
Expand Down Expand Up @@ -114,7 +114,7 @@ def run_condition(condition_name: str) -> bool:
executed_commands = 0
all_combined_output_lines = []

for i, command in enumerate(op_data["commands"]):
for command in op_data["command_generator"]():
status = False

executor_kwargs = base_executor_kwargs.copy()
Expand Down Expand Up @@ -170,7 +170,7 @@ def run_condition(condition_name: str) -> bool:
state.results[host]["ops"] += 1
state.results[host]["success_ops"] += 1

_status_log = "Success" if len(op_data["commands"]) > 0 else "No changes"
_status_log = "Success" if executed_commands > 0 else "No changes"
_click_log_status = click.style(_status_log, "green")
logger.info("{0}{1}".format(host.print_prefix, _click_log_status))

Expand All @@ -188,7 +188,7 @@ def run_condition(condition_name: str) -> bool:
if executed_commands:
state.results[host]["partial_ops"] += 1

_command_description = f"executed {executed_commands}/{len(op_data['commands'])} commands"
_command_description = f"executed {executed_commands} commands"
log_error_or_warning(host, ignore_errors, _command_description, continue_on_error)

# Always trigger any error handler
Expand Down
11 changes: 9 additions & 2 deletions pyinfra/api/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ def to_dict(self):
"results": self.results,
}

def should_diff(self):
# TODO: disable diffs if -y/--yes
return not self.is_executing

def add_callback_handler(self, handler):
if not isinstance(handler, BaseStateCallback):
raise TypeError(
Expand Down Expand Up @@ -285,10 +289,13 @@ def get_op_order(self):
def get_op_meta(self, op_hash: str):
return self.op_meta[op_hash]

def get_op_data(self, host: "Host", op_hash: str):
def get_meta_for_host(self, host: "Host"):
return self.meta[host]

def get_op_data_for_host(self, host: "Host", op_hash: str):
return self.ops[host][op_hash]

def set_op_data(self, host: "Host", op_hash: str, op_data):
def set_op_data_for_host(self, host: "Host", op_hash: str, op_data):
self.ops[host][op_hash] = op_data

def activate_host(self, host: "Host"):
Expand Down

0 comments on commit 07e7e34

Please sign in to comment.