Skip to content

Commit

Permalink
refactor: move cli helpers into cli_util module
Browse files Browse the repository at this point in the history
We cannot name the file "util.py" and "from uaclient.cli import util as
cli_util" because there is a bug in python where python will confuse the
two modules even if one is renamed at import time.
  • Loading branch information
orndorffgrant committed Feb 20, 2024
1 parent 5034e74 commit 9db3413
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 276 deletions.
155 changes: 22 additions & 133 deletions uaclient/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import tempfile
import textwrap
import time
from functools import wraps
from typing import List, Optional, Tuple # noqa

from uaclient import (
Expand Down Expand Up @@ -47,8 +46,8 @@
from uaclient.api.u.pro.security.status.reboot_required.v1 import (
_reboot_required,
)
from uaclient.api.u.pro.status.is_attached.v1 import _is_attached
from uaclient.apt import AptProxyScope, setup_apt_proxy
from uaclient.cli import cli_util
from uaclient.cli.constants import NAME, USAGE_TMPL
from uaclient.cli.fix import set_fix_parser
from uaclient.data_types import AttachActionsConfigFile, IncorrectTypeError
Expand Down Expand Up @@ -174,89 +173,6 @@ def _get_service_descriptions() -> Tuple[List[str], List[str]]:
return (non_beta_services_desc, beta_services_desc)


def assert_lock_file(lock_holder=None):
"""Decorator asserting exclusive access to lock file"""

def wrapper(f):
@wraps(f)
def new_f(*args, cfg, **kwargs):
with lock.SpinLock(cfg=cfg, lock_holder=lock_holder, sleep_time=1):
retval = f(*args, cfg=cfg, **kwargs)
return retval

return new_f

return wrapper


def assert_root(f):
"""Decorator asserting root user"""

@wraps(f)
def new_f(*args, **kwargs):
if not util.we_are_currently_root():
raise exceptions.NonRootUserError()
else:
return f(*args, **kwargs)

return new_f


def verify_json_format_args(f):
"""Decorator to verify if correct params are used for json format"""

@wraps(f)
def new_f(cmd_args, *args, **kwargs):
if not cmd_args:
return f(cmd_args, *args, **kwargs)

if cmd_args.format == "json" and not cmd_args.assume_yes:
raise exceptions.CLIJSONFormatRequireAssumeYes()
else:
return f(cmd_args, *args, **kwargs)

return new_f


def assert_attached(raise_custom_error_function=None):
"""Decorator asserting attached config.
:param msg_function: Optional function to generate a custom message
if raising an UnattachedError
"""

def wrapper(f):
@wraps(f)
def new_f(args, cfg, **kwargs):
if not _is_attached(cfg).is_attached:
if raise_custom_error_function:
command = getattr(args, "command", "")
service_names = getattr(args, "service", "")
raise_custom_error_function(
command=command, service_names=service_names, cfg=cfg
)
else:
raise exceptions.UnattachedError()
return f(args, cfg=cfg, **kwargs)

return new_f

return wrapper


def assert_not_attached(f):
"""Decorator asserting unattached config."""

@wraps(f)
def new_f(args, cfg, **kwargs):
if _is_attached(cfg).is_attached:
raise exceptions.AlreadyAttachedError(
account_name=cfg.machine_token_file.account.get("name", "")
)
return f(args, cfg=cfg, **kwargs)

return new_f


def api_parser(parser):
"""Build or extend an arg parser for the api subcommand."""
parser.prog = "api"
Expand Down Expand Up @@ -798,7 +714,7 @@ def action_config_show(args, *, cfg, **kwargs):
print(messages.CLI_CONFIG_GLOBAL_XOR_UA_PROXY)


@assert_root
@cli_util.assert_root
def action_config_set(args, *, cfg, **kwargs):
"""Perform the 'config set' action.
Expand Down Expand Up @@ -918,7 +834,7 @@ def action_config_set(args, *, cfg, **kwargs):
setattr(cfg, set_key, set_value)


@assert_root
@cli_util.assert_root
def action_config_unset(args, *, cfg, **kwargs):
"""Perform the 'config unset' action.
Expand Down Expand Up @@ -965,37 +881,10 @@ def action_config_unset(args, *, cfg, **kwargs):
return 0


def _raise_enable_disable_unattached_error(command, service_names, cfg):
"""Raises a custom error for enable/disable commands when unattached.
Takes into consideration if the services exist or not, and notify the user
accordingly."""
(entitlements_found, entitlements_not_found) = get_valid_entitlement_names(
names=service_names, cfg=cfg
)
if entitlements_found and entitlements_not_found:
raise exceptions.UnattachedMixedServicesError(
valid_service=", ".join(entitlements_found),
operation=command,
invalid_service=", ".join(entitlements_not_found),
service_msg="",
)
elif entitlements_found:
raise exceptions.UnattachedValidServicesError(
valid_service=", ".join(entitlements_found)
)
else:
raise exceptions.UnattachedInvalidServicesError(
operation=command,
invalid_service=", ".join(entitlements_not_found),
service_msg="",
)


@verify_json_format_args
@assert_root
@assert_attached(_raise_enable_disable_unattached_error)
@assert_lock_file("pro disable")
@cli_util.verify_json_format_args
@cli_util.assert_root
@cli_util.assert_attached(cli_util._raise_enable_disable_unattached_error)
@cli_util.assert_lock_file("pro disable")
def action_disable(args, *, cfg, **kwargs):
"""Perform the disable action on a list of entitlements.
Expand Down Expand Up @@ -1045,10 +934,10 @@ def action_disable(args, *, cfg, **kwargs):
return 0 if ret else 1


@verify_json_format_args
@assert_root
@assert_attached(_raise_enable_disable_unattached_error)
@assert_lock_file("pro enable")
@cli_util.verify_json_format_args
@cli_util.assert_root
@cli_util.assert_attached(cli_util._raise_enable_disable_unattached_error)
@cli_util.assert_lock_file("pro enable")
def action_enable(args, *, cfg, **kwargs):
"""Perform the enable action on a named entitlement.
Expand Down Expand Up @@ -1130,10 +1019,10 @@ def action_enable(args, *, cfg, **kwargs):
return 0 if ret else 1


@verify_json_format_args
@assert_root
@assert_attached()
@assert_lock_file("pro detach")
@cli_util.verify_json_format_args
@cli_util.assert_root
@cli_util.assert_attached()
@cli_util.assert_lock_file("pro detach")
def action_detach(args, *, cfg, **kwargs) -> int:
"""Perform the detach action for this machine.
Expand Down Expand Up @@ -1223,7 +1112,7 @@ def action_api(args, *, cfg, **kwargs):
return 0 if result.result == "success" else 1


@assert_root
@cli_util.assert_root
def action_auto_attach(args, *, cfg: config.UAConfig, **kwargs) -> int:
try:
_full_auto_attach(
Expand Down Expand Up @@ -1272,9 +1161,9 @@ def _magic_attach(args, *, cfg, **kwargs):
return wait_resp.contract_token


@assert_not_attached
@assert_root
@assert_lock_file("pro attach")
@cli_util.assert_not_attached
@cli_util.assert_root
@cli_util.assert_lock_file("pro attach")
def action_attach(args, *, cfg, **kwargs):
if args.token and args.attach_config:
raise exceptions.CLIAttachTokenArgXORConfig()
Expand Down Expand Up @@ -1523,7 +1412,7 @@ def _action_refresh_config(args, cfg: config.UAConfig):
print(messages.REFRESH_CONFIG_SUCCESS)


@assert_attached()
@cli_util.assert_attached()
def _action_refresh_contract(_args, cfg: config.UAConfig):
try:
contract.refresh(cfg)
Expand All @@ -1548,8 +1437,8 @@ def _action_refresh_messages(_args, cfg: config.UAConfig):
print(messages.REFRESH_MESSAGES_SUCCESS)


@assert_root
@assert_lock_file("pro refresh")
@cli_util.assert_root
@cli_util.assert_lock_file("pro refresh")
def action_refresh(args, *, cfg: config.UAConfig, **kwargs):
if args.target is None or args.target == "config":
_action_refresh_config(args, cfg)
Expand Down
115 changes: 115 additions & 0 deletions uaclient/cli/cli_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from functools import wraps

from uaclient import entitlements, exceptions, lock, util
from uaclient.api.u.pro.status.is_attached.v1 import _is_attached


def assert_lock_file(lock_holder=None):
"""Decorator asserting exclusive access to lock file"""

def wrapper(f):
@wraps(f)
def new_f(*args, cfg, **kwargs):
with lock.SpinLock(cfg=cfg, lock_holder=lock_holder, sleep_time=1):
retval = f(*args, cfg=cfg, **kwargs)
return retval

return new_f

return wrapper


def assert_root(f):
"""Decorator asserting root user"""

@wraps(f)
def new_f(*args, **kwargs):
if not util.we_are_currently_root():
raise exceptions.NonRootUserError()
else:
return f(*args, **kwargs)

return new_f


def verify_json_format_args(f):
"""Decorator to verify if correct params are used for json format"""

@wraps(f)
def new_f(cmd_args, *args, **kwargs):
if not cmd_args:
return f(cmd_args, *args, **kwargs)

if cmd_args.format == "json" and not cmd_args.assume_yes:
raise exceptions.CLIJSONFormatRequireAssumeYes()
else:
return f(cmd_args, *args, **kwargs)

return new_f


def assert_attached(raise_custom_error_function=None):
"""Decorator asserting attached config.
:param msg_function: Optional function to generate a custom message
if raising an UnattachedError
"""

def wrapper(f):
@wraps(f)
def new_f(args, cfg, **kwargs):
if not _is_attached(cfg).is_attached:
if raise_custom_error_function:
command = getattr(args, "command", "")
service_names = getattr(args, "service", "")
raise_custom_error_function(
command=command, service_names=service_names, cfg=cfg
)
else:
raise exceptions.UnattachedError()
return f(args, cfg=cfg, **kwargs)

return new_f

return wrapper


def assert_not_attached(f):
"""Decorator asserting unattached config."""

@wraps(f)
def new_f(args, cfg, **kwargs):
if _is_attached(cfg).is_attached:
raise exceptions.AlreadyAttachedError(
account_name=cfg.machine_token_file.account.get("name", "")
)
return f(args, cfg=cfg, **kwargs)

return new_f


def _raise_enable_disable_unattached_error(command, service_names, cfg):
"""Raises a custom error for enable/disable commands when unattached.
Takes into consideration if the services exist or not, and notify the user
accordingly."""
(
entitlements_found,
entitlements_not_found,
) = entitlements.get_valid_entitlement_names(names=service_names, cfg=cfg)
if entitlements_found and entitlements_not_found:
raise exceptions.UnattachedMixedServicesError(
valid_service=", ".join(entitlements_found),
operation=command,
invalid_service=", ".join(entitlements_not_found),
service_msg="",
)
elif entitlements_found:
raise exceptions.UnattachedValidServicesError(
valid_service=", ".join(entitlements_found)
)
else:
raise exceptions.UnattachedInvalidServicesError(
operation=command,
invalid_service=", ".join(entitlements_not_found),
service_msg="",
)
Loading

0 comments on commit 9db3413

Please sign in to comment.