diff --git a/.gitignore b/.gitignore index 4f678584..d02e666a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ Pipfile* test/.env .tmp* MANIFEST +venv diff --git a/Makefile b/Makefile index 5960905d..ae1a0ac5 100644 --- a/Makefile +++ b/Makefile @@ -36,3 +36,20 @@ clean: rm -f linode-cli.sh baked_version rm -f data-* rm -rf dist + +.PHONY: test +test: + pytest tests + python -m unittest tests/*.py + + +black: + black linodecli tests + +isort: + isort linodecli tests + +autoflake: + autoflake linodecli tests + +format: black isort autoflake diff --git a/README.rst b/README.rst index 93237626..569bbade 100644 --- a/README.rst +++ b/README.rst @@ -228,6 +228,18 @@ the ``obj`` plugin that ships with the CLI. To do so, simply set appropriate values. This allows using Linode Object Storage through the CLI without having a configuration file, which is desirable in some situations. +Configurable API URL +"""""""""""""""""""" + +In some cases you may want to run linode-cli against a non-default Linode API URL. +This can be done using the following environment variables to override certain segments of the target API URL. + +* ``LINODE_CLI_API_HOST`` - The host of the Linode API instance (e.g. ``api.linode.com``) + +* ``LINODE_CLI_API_VERSION`` - The Linode API version to use (e.g. ``v4beta``) + +* ``LINODE_CLI_API_SCHEME`` - The request scheme to use (e.g. ``https``) + Multiple Users ^^^^^^^^^^^^^^ diff --git a/linodecli/__init__.py b/linodecli/__init__.py index ddf13dad..8d43c241 100755 --- a/linodecli/__init__.py +++ b/linodecli/__init__.py @@ -4,8 +4,8 @@ """ import argparse -import sys import os +import sys from importlib import import_module from sys import argv, stderr, version_info @@ -17,7 +17,9 @@ from linodecli import plugins from .cli import CLI +from .completion import bake_completions, get_completions from .configuration import ENV_TOKEN_NAME +from .helpers import handle_url_overrides from .operation import CLIArg, CLIOperation, URLParam from .output import OutputMode from .response import ModelAttr, ResponseModel @@ -27,13 +29,14 @@ VERSION = pkg_resources.require("linode-cli")[0].version except: VERSION = "building" + BASE_URL = "https://api.linode.com/v4" # if any of these arguments are given, we don't need to prompt for configuration skip_config = any(c in argv for c in ["--skip-config", "--help", "--version"]) -cli = CLI(VERSION, BASE_URL, skip_config=skip_config) +cli = CLI(VERSION, handle_url_overrides(BASE_URL), skip_config=skip_config) def warn_python2_eol(): @@ -76,7 +79,8 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem parser.add_argument( "--help", action="store_true", - help="Display information about a command, action, or " "the CLI overall.", + help="Display information about a command, action, or " + "the CLI overall.", ) parser.add_argument( "--text", @@ -89,9 +93,13 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem type=str, help="The delimiter when displaying raw output.", ) - parser.add_argument("--json", action="store_true", help="Display output as JSON") parser.add_argument( - "--markdown", action="store_true", help="Display output in Markdown format." + "--json", action="store_true", help="Display output as JSON" + ) + parser.add_argument( + "--markdown", + action="store_true", + help="Display output in Markdown format.", ) parser.add_argument( "--pretty", action="store_true", help="If set, pretty-print JSON output" @@ -149,6 +157,12 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem help="Suppress warnings that are intended for human users. " "This is useful for scripting the CLI's behavior.", ) + parser.add_argument( + "--no-truncation", + action="store_true", + default=False, + help="Prevent the truncation of long values in command outputs.", + ) parser.add_argument( "--version", "-v", @@ -184,10 +198,14 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem cli.defaults = not parsed.no_defaults cli.suppress_warnings = parsed.suppress_warnings + cli.page = parsed.page cli.page_size = parsed.page_size cli.debug_request = parsed.debug + cli.output_handler.suppress_warnings = parsed.suppress_warnings + cli.output_handler.disable_truncation = parsed.no_truncation + if not cli.suppress_warnings: warn_python2_eol() @@ -229,14 +247,7 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem sys.exit(2) cli.bake(spec) - print("Baking bash completions...") - # this step would normally happen on laod - if "_base_url" in cli.ops: - del cli.ops["_base_url"] - if "_spec_version" in cli.ops: - del cli.ops["_spec_version"] - # do the baking - cli.bake_completions() + bake_completions(cli.ops) print("Done.") sys.exit(0) elif cli.ops is None: @@ -262,7 +273,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem try: plugin_name = plugin.PLUGIN_NAME except AttributeError: - print(f"{module} is not a valid Linode CLI plugin - missing PLUGIN_NAME") + print( + f"{module} is not a valid Linode CLI plugin - missing PLUGIN_NAME" + ) sys.exit(11) # prove it's callable @@ -276,7 +289,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem reregistering = False # check for naming conflicts if plugin_name in cli.ops: - print("Plugin name conflicts with CLI operation - registration failed.") + print( + "Plugin name conflicts with CLI operation - registration failed." + ) sys.exit(12) elif plugin_name in plugins.available_local: # conflicts with an internal plugin - can't do that @@ -305,7 +320,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem if reregistering: already_registered.remove(plugin_name) - cli.config.config.remove_option("DEFAULT", f"plugin-name-{plugin_name}") + cli.config.config.remove_option( + "DEFAULT", f"plugin-name-{plugin_name}" + ) already_registered.append(plugin_name) cli.config.config.set( @@ -328,24 +345,30 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem plugin_name = parsed.action if plugin_name in plugins.available_local: # can't remove first-party plugins - print(f"{plugin_name} is bundled with the CLI and cannot be removed") + print( + f"{plugin_name} is bundled with the CLI and cannot be removed" + ) sys.exit(13) elif plugin_name not in plugins.available(cli.config): print(f"{plugin_name} is not a registered plugin") sys.exit(14) # do the removal - current_plugins = cli.config.config.get("DEFAULT", "registered-plugins").split( - "," - ) + current_plugins = cli.config.config.get( + "DEFAULT", "registered-plugins" + ).split(",") current_plugins.remove(plugin_name) cli.config.config.set( "DEFAULT", "registered-plugins", ",".join(current_plugins) ) - if cli.config.config.has_option("DEFAULT", f"plugin-name-{plugin_name}"): + if cli.config.config.has_option( + "DEFAULT", f"plugin-name-{plugin_name}" + ): # if the config if malformed, don't blow up - cli.config.config.remove_option("DEFAULT", f"plugin-name-{plugin_name}") + cli.config.config.remove_option( + "DEFAULT", f"plugin-name-{plugin_name}" + ) cli.config.write_config() @@ -353,29 +376,7 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem sys.exit(0) if parsed.command == "completion": - if parsed.help or not parsed.action: - print("linode-cli completion [SHELL]") - print() - print( - "Prints shell completions for the requested shell to stdout. " - "Currently, only completions for bash and fish are available." - ) - sys.exit(0) - - completions = "" - - if parsed.action == "bash": - completions = cli.get_bash_completions() - elif parsed.action == "fish": - completions = cli.get_fish_completions() - else: - print( - "Completions are only available for bash and fish at this time. To retrieve " - "these, please invoke as `linode-cli completion bash` " - "or `linode-cli completion fish`." - ) - sys.exit(1) - print(completions) + print(get_completions(cli.ops, parsed.help, parsed.action)) sys.exit(0) # handle a help for the CLI @@ -441,7 +442,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem print() print("To reconfigure, call `linode-cli configure`") - print("For comprehensive documentation, visit https://www.linode.com/docs/api/") + print( + "For comprehensive documentation, visit https://www.linode.com/docs/api/" + ) sys.exit(0) # configure @@ -449,8 +452,12 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem if parsed.help: print("linode-cli configure") print() - print("Configured the Linode CLI. This command can be used to change") - print("defaults selected for the current user, or to configure additional") + print( + "Configured the Linode CLI. This command can be used to change" + ) + print( + "defaults selected for the current user, or to configure additional" + ) print("users.") sys.exit(0) else: @@ -462,7 +469,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem if parsed.help or not parsed.action: print("linode-cli set-user [USER]") print() - print("Sets the active user for the CLI out of users you have configured.") + print( + "Sets the active user for the CLI out of users you have configured." + ) print("To configure a new user, see `linode-cli configure`") sys.exit(0) else: @@ -489,9 +498,15 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem if parsed.help or not parsed.action: print("linode-cli remove-user [USER]") print() - print("Removes a user the CLI was configured with. This does not change") - print("your Linode account, only this CLI installation. Once removed,") - print("the user may not be set as active or used for commands unless") + print( + "Removes a user the CLI was configured with. This does not change" + ) + print( + "your Linode account, only this CLI installation. Once removed," + ) + print( + "the user may not be set as active or used for commands unless" + ) print("configured again.") sys.exit(0) else: @@ -500,7 +515,7 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem # special command to bake shell completion script if parsed.command == "bake-bash": - cli.bake_completions() + bake_completions(cli.ops) # check for plugin invocation if parsed.command not in cli.ops and parsed.command in plugins.available( @@ -515,8 +530,9 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem plugins.invoke(parsed.command, plugin_args, context) sys.exit(0) - if parsed.command not in cli.ops and parsed.command not in plugins.available( - cli.config + if ( + parsed.command not in cli.ops + and parsed.command not in plugins.available(cli.config) ): # unknown commands print(f"Unrecognized command {parsed.command}") @@ -540,42 +556,54 @@ def main(): # pylint: disable=too-many-locals,too-many-branches,too-many-statem sys.exit(0) # handle a help for an action - if parsed.command is not None and parsed.action is not None and parsed.help: - if parsed.command in cli.ops and parsed.action in cli.ops[parsed.command]: - operation = cli.ops[parsed.command][parsed.action] - print(f"linode-cli {parsed.command} {parsed.action}", end="") - for param in operation.params: - # clean up parameter names - we add an '_' at the end of them - # during baking if it conflicts with the name of an argument. - # Remove the trailing underscores on output (they're not - # important to the end user). - pname = param.name.upper() - if pname[-1] == "_": - pname = pname[:-1] - print(f" [{pname}]", end="") - print() - print(operation.summary) - if operation.docs_url: - print(f"API Documentation: {operation.docs_url}") - print() - if operation.args: - print("Arguments:") - for arg in sorted(operation.args, key=lambda s: not s.required): - is_required = ( - "(required) " - if operation.method in {"post", "put"} and arg.required - else "" - ) - print(f" --{arg.path}: {is_required}{arg.description}") - elif operation.method == "get" and parsed.action == "list": - filterable_attrs = [ - attr for attr in operation.response_model.attrs if attr.filterable - ] - - if filterable_attrs: - print("You may filter results with:") - for attr in filterable_attrs: - print(f" --{attr.name}") + try: + parsed_operation = cli.find_operation(parsed.command, parsed.action) + except ValueError: + # No operation was found + parsed_operation = None + + if parsed_operation is not None and parsed.help: + print(f"linode-cli {parsed.command} {parsed.action}", end="") + for param in parsed_operation.params: + # clean up parameter names - we add an '_' at the end of them + # during baking if it conflicts with the name of an argument. + # Remove the trailing underscores on output (they're not + # important to the end user). + pname = param.name.upper() + if pname[-1] == "_": + pname = pname[:-1] + print(f" [{pname}]", end="") + print() + print(parsed_operation.summary) + if parsed_operation.docs_url: + print(f"API Documentation: {parsed_operation.docs_url}") + print() + if parsed_operation.args: + print("Arguments:") + for arg in sorted( + parsed_operation.args, key=lambda s: not s.required + ): + is_required = ( + "(required) " + if parsed_operation.method in {"post", "put"} + and arg.required + else "" + ) + print(f" --{arg.path}: {is_required}{arg.description}") + elif ( + parsed_operation.method == "get" + and parsed_operation.action == "list" + ): + filterable_attrs = [ + attr + for attr in parsed_operation.response_model.attrs + if attr.filterable + ] + + if filterable_attrs: + print("You may filter results with:") + for attr in filterable_attrs: + print(f" --{attr.name}") sys.exit(0) if parsed.command is not None and parsed.action is not None: diff --git a/linodecli/api_request.py b/linodecli/api_request.py new file mode 100644 index 00000000..8d38e87c --- /dev/null +++ b/linodecli/api_request.py @@ -0,0 +1,256 @@ +""" +This module is responsible for handling HTTP requests to the Linode API. +""" + +import json +import sys +from distutils.version import ( # pylint: disable=deprecated-module + LooseVersion, + StrictVersion, +) +from sys import version_info +from typing import Optional + +import requests + +PIP_CMD = "pip3" + + +def do_request( + ctx, operation, args, filter_header=None, skip_error_handling=False +): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """ + Makes a request to an operation's URL and returns the resulting JSON, or + prints and error if a non-200 comes back + """ + method = getattr(requests, operation.method) + headers = { + "Authorization": f"Bearer {ctx.config.get_token()}", + "Content-Type": "application/json", + "User-Agent": ( + f"linode-cli:{ctx.version} " + f"python/{version_info[0]}.{version_info[1]}.{version_info[2]}" + ), + } + + parsed_args = operation.parse_args(args) + + url = _build_request_url(ctx, operation, parsed_args) + + body = _build_request_body(ctx, operation, parsed_args) + + filter_header = _build_filter_header( + operation, parsed_args, filter_header=filter_header + ) + if filter_header is not None: + headers["X-Filter"] = filter_header + + # Print response debug info is requested + if ctx.debug_request: + _print_request_debug_info(method, url, headers, body) + + result = method(url, headers=headers, data=body) + + # Print response debug info is requested + if ctx.debug_request: + _print_response_debug_info(result) + + _attempt_warn_old_version(ctx, result) + + if not 199 < result.status_code < 399 and not skip_error_handling: + _handle_error(ctx, result) + + return result + + +def _build_filter_header( + operation, parsed_args, filter_header=None +) -> Optional[str]: + if operation.method != "get": + # Non-GET operations don't support filters + return None + + if filter_header is not None: + return json.dumps(filter_header) + + filters = vars(parsed_args) + + # remove URL parameters + for p in operation.params: + if p.name in filters: + del filters[p.name] + + # remove empty filters + filters = {k: v for k, v in filters.items() if v is not None} + + if filters: + return json.dumps(filters) + + return None + + +def _build_request_url(ctx, operation, parsed_args) -> str: + result = operation.url.format(**vars(parsed_args)) + + if operation.method == "get": + result += f"?page={ctx.page}&page_size={ctx.page_size}" + + return result + + +def _build_request_body(ctx, operation, parsed_args) -> Optional[str]: + if operation.method == "get": + # Get operations don't have a body + return None + + # Merge defaults into body if applicable + if ctx.defaults: + parsed_args = ctx.config.update(parsed_args, operation.allowed_defaults) + + to_json = {k: v for k, v in vars(parsed_args).items() if v is not None} + + expanded_json = {} + + # expand paths + for k, v in to_json.items(): + cur = expanded_json + for part in k.split(".")[:-1]: + if part not in cur: + cur[part] = {} + cur = cur[part] + cur[k.split(".")[-1]] = v + + return json.dumps(expanded_json) + + +def _print_request_debug_info(method, url, headers, body): + """ + Prints debug info for an HTTP request + """ + print(f"> {method.__name__.upper()} {url}", file=sys.stderr) + for k, v in headers.items(): + print(f"> {k}: {v}", file=sys.stderr) + print("> Body:", file=sys.stderr) + print("> ", body or "", file=sys.stderr) + print("> ", file=sys.stderr) + + +def _print_response_debug_info(response): + """ + Prints debug info for a response from requests + """ + # these come back as ints, convert to HTTP version + http_version = response.raw.version / 10 + + print( + f"< HTTP/{http_version:.1f} {response.status_code} {response.reason}", + file=sys.stderr, + ) + for k, v in response.headers.items(): + print(f"< {k}: {v}", file=sys.stderr) + print("< ", file=sys.stderr) + + +def _attempt_warn_old_version(ctx, result): + if ctx.suppress_warnings: + return + + api_version_higher = False + + if "X-Spec-Version" in result.headers: + spec_version = result.headers.get("X-Spec-Version") + + try: + # Parse the spec versions from the API and local CLI. + StrictVersion(spec_version) + StrictVersion(ctx.spec_version) + + # Get only the Major/Minor version of the API Spec and CLI Spec, + # ignore patch version differences + spec_major_minor_version = ( + spec_version.split(".")[0] + "." + spec_version.split(".")[1] + ) + current_major_minor_version = ( + ctx.spec_version.split(".")[0] + + "." + + ctx.spec_version.split(".")[1] + ) + except ValueError: + # If versions are non-standard like, "DEVELOPMENT" use them and don't complain. + spec_major_minor_version = spec_version + current_major_minor_version = ctx.spec_version + + try: + if LooseVersion(spec_major_minor_version) > LooseVersion( + current_major_minor_version + ): + api_version_higher = True + except: + # if this comparison or parsing failed, still process output + print( + f"Parsing failed when comparing local version {ctx.spec_version} with " + f"server version {spec_version}. If this problem persists, please open a " + "ticket with `linode-cli support ticket-create`", + file=sys.stderr, + ) + + if api_version_higher: + # check to see if there is, in fact, a version to upgrade to. If not, don't + # suggest an upgrade (since there's no package anyway) + new_version_exists = False + + try: + # do this all in a try block since it must _never_ prevent the CLI + # from showing command output + pypi_response = requests.get( + "https://pypi.org/pypi/linode-cli/json", timeout=1 # seconds + ) + + if pypi_response.status_code == 200: + # we got data back + pypi_version = pypi_response.json()["info"]["version"] + + # no need to be fancy; these should always be valid versions + if LooseVersion(pypi_version) > LooseVersion(ctx.version): + new_version_exists = True + except: + # I know, but if anything happens here the end user should still + # be able to see the command output + print( + "Unable to determine if a new linode-cli package is available " + "in pypi. If this message persists, open a ticket or invoke " + "with --suppress-warnings", + file=sys.stderr, + ) + + if new_version_exists: + print( + f"The API responded with version {spec_version}, which is newer than " + f"the CLI's version of {ctx.spec_version}. Please update the CLI to get " + "access to the newest features. You can update with a " + f"simple `{PIP_CMD} install --upgrade linode-cli`", + file=sys.stderr, + ) + + +def _handle_error(ctx, response): + """ + Given an error message, properly displays the error to the user and exits. + """ + print(f"Request failed: {response.status_code}", file=sys.stderr) + + resp_json = response.json() + + if "errors" in resp_json: + data = [ + [error.get("field") or "", error.get("reason")] + for error in resp_json["errors"] + ] + ctx.output_handler.print( + None, + data, + title="errors", + to=sys.stderr, + columns=["field", "reason"], + ) + sys.exit(1) diff --git a/linodecli/cli.py b/linodecli/cli.py index f841e1e3..f17e02fa 100644 --- a/linodecli/cli.py +++ b/linodecli/cli.py @@ -2,18 +2,15 @@ Responsible for managing spec and routing commands to operations. """ +import os import pickle -import json -import sys import re -import os -from distutils.version import LooseVersion, StrictVersion # pylint: disable=deprecated-module -from string import Template -from sys import stderr, version_info - -import requests +import sys +from sys import version_info +from .api_request import do_request from .configuration import CLIConfig +from .helpers import filter_markdown_links from .operation import CLIArg, CLIOperation, URLParam from .output import OutputHandler, OutputMode from .response import ModelAttr, ResponseModel @@ -103,7 +100,9 @@ def _parse_args( while "$ref" in info: info = self._resolve_ref(info["$ref"]) if "properties" in info: - self._parse_args(info["properties"], prefix=prefix + [arg], args=args) + self._parse_args( + info["properties"], prefix=prefix + [arg], args=args + ) continue # we can't edit this level of the tree if info.get("readOnly"): continue @@ -114,7 +113,9 @@ def _parse_args( "type": info.get("type") or "string", "desc": info.get("description") or "", "name": arg, - "format": info.get("x-linode-cli-format", info.get("format", None)), + "format": info.get( + "x-linode-cli-format", info.get("format", None) + ), } # if this is coming in as json, stop here @@ -164,7 +165,9 @@ def _parse_properties(self, node, prefix=None): attrs = [] for name, info in node.items(): if "properties" in info: - attrs += self._parse_properties(info["properties"], prefix + [name]) + attrs += self._parse_properties( + info["properties"], prefix + [name] + ) else: item_type = None item_container = info.get("items") @@ -193,7 +196,7 @@ def bake( self.ops = {} default_servers = [c["url"] for c in spec["servers"]] - for path, data in self.spec[ # pylint: disable=too-many-nested-blocks + for path, data in self.spec[ # pylint: disable=too-many-nested-blocks "paths" ].items(): # pylint: disable=too-many-nested-blocks command = data.get("x-linode-cli-command") or "default" @@ -205,7 +208,9 @@ def bake( for info in data["parameters"]: if "$ref" in info: info = self._resolve_ref(info["$ref"]) - params.append(URLParam(info["name"], info["schema"]["type"])) + params.append( + URLParam(info["name"], info["schema"]["type"]) + ) for m in METHODS: if m in data: if data[m].get("x-linode-cli-skip"): @@ -230,7 +235,7 @@ def bake( action_aliases = action[1:] action = action[0] - summary = data[m].get("summary") or "" + summary = filter_markdown_links(data[m].get("summary")) or "" # Resolve the documentation URL docs_url = None @@ -254,7 +259,10 @@ def bake( "x-linode-cli-allowed-defaults", None ) - if "application/json" in data[m]["requestBody"]["content"]: + if ( + "application/json" + in data[m]["requestBody"]["content"] + ): body_schema = data[m]["requestBody"]["content"][ "application/json" ]["schema"] @@ -263,11 +271,15 @@ def bake( required_fields = body_schema["required"] if "allOf" in body_schema: - body_schema = self._resolve_allOf(body_schema["allOf"]) + body_schema = self._resolve_allOf( + body_schema["allOf"] + ) if "required" in body_schema: required_fields += body_schema["required"] if "$ref" in body_schema: - body_schema = self._resolve_ref(body_schema["$ref"]) + body_schema = self._resolve_ref( + body_schema["$ref"] + ) if "required" in body_schema: required_fields += body_schema["required"] if "properties" in body_schema: @@ -280,7 +292,8 @@ def bake( response_model = None if ( "200" in data[m]["responses"] - and "application/json" in data[m]["responses"]["200"]["content"] + and "application/json" + in data[m]["responses"]["200"]["content"] ): resp_con = data[m]["responses"]["200"]["content"][ "application/json" @@ -303,7 +316,9 @@ def bake( if "$ref" in resp_con: resp_con = self._resolve_ref(resp_con["$ref"]) if "allOf" in resp_con: - resp_con.update(self._resolve_allOf(resp_con["allOf"])) + resp_con.update( + self._resolve_allOf(resp_con["allOf"]) + ) # handle pagination envelope if ( "properties" in resp_con @@ -320,7 +335,9 @@ def bake( attrs = [] if "properties" in resp_con: - attrs = self._parse_properties(resp_con["properties"]) + attrs = self._parse_properties( + resp_con["properties"] + ) # maybe we have special columns? rows = ( data[m]["responses"]["200"]["content"][ @@ -344,7 +361,7 @@ def bake( new_arg = CLIArg( info["name"], info["type"], - info["desc"].split(".")[0] + ".", + filter_markdown_links(info["desc"].split(".")[0] + "."), arg, info["format"], list_item=info.get("list_item"), @@ -407,99 +424,14 @@ def bake( with open(data_file, "wb") as f: pickle.dump(self.ops, f) - def get_fish_completions(self): - """ - Generates and returns fish shell completions based on the baked spec - """ - completion_template = Template( - """# This is a generated file! Do not modify! -complete -c linode-cli -n "not __fish_seen_subcommand_from $subcommands" -x -a '$subcommands --help' -$command_items""" - ) - - command_template = Template( - """complete -c linode-cli -n "__fish_seen_subcommand_from $command" \ - -x -a '$actions --help'""" - ) - - command_blocks = [ - command_template.safe_substitute( - command=op, actions=" ".join(list(actions.keys())) - ) - for op, actions in self.ops.items() - ] - - rendered = completion_template.safe_substitute( - subcommands=" ".join(self.ops.keys()), - command_items="\n".join(command_blocks), - ) - - return rendered - - def get_bash_completions(self): - """ - Generates and returns bash shell completions based on the baked spec - """ - completion_template = Template( - """# This is a generated file! Do not modify! -_linode_cli() -{ - local cur prev opts - COMPREPLY=() - cur="${COMP_WORDS[COMP_CWORD]}" - prev="${COMP_WORDS[COMP_CWORD-1]}" - - case "${prev}" in - linode-cli) - COMPREPLY=( $(compgen -W "$actions --help" -- ${cur}) ) - return 0 - ;; - $command_items - *) - ;; - esac -} - -complete -F _linode_cli linode-cli""" - ) - - command_template = Template( - """$command) - COMPREPLY=( $(compgen -W "$actions --help" -- ${cur}) ) - return 0 - ;;""" - ) - - command_blocks = [ - command_template.safe_substitute( - command=op, actions=" ".join(list(actions.keys())) - ) - for op, actions in self.ops.items() - ] - - rendered = completion_template.safe_substitute( - actions=" ".join(self.ops.keys()), - command_items="\n ".join(command_blocks), - ) - - return rendered - - def bake_completions(self): - """ - Given a baked CLI, generates and saves a bash completion file - """ - rendered = self.get_bash_completions() - # save it off - with open("linode-cli.sh", "w", encoding="utf-8") as f: - print("Writing file...") - f.write(rendered) - def load_baked(self): """ Loads a baked spec representation from a baked pickle """ data_file = self._get_data_file() - data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), data_file) + data_path = os.path.join( + os.path.dirname(os.path.realpath(__file__)), data_file + ) if os.path.exists(data_path): with open(data_path, "rb") as f: self.ops = pickle.load(f) @@ -510,7 +442,9 @@ def load_baked(self): self.spec_version = self.ops["_spec_version"] del self.ops["_spec_version"] else: - print("No spec baked. Please bake by calling this script as follows:") + print( + "No spec baked. Please bake by calling this script as follows:" + ) print(" python3 gen_cli.py bake /path/to/spec") self.ops = None # this signals __init__.py to give up @@ -521,204 +455,6 @@ def _get_data_file(self): """ return f"data-{version_info[0]}" - def print_request_debug_info(self, method, url, headers, body): - """ - Prints debug info for an HTTP request - """ - print(f"> {method.__name__.upper()} {url}", file=stderr) - for k, v in headers.items(): - print(f"> {k}: {v}", file=stderr) - print("> Body:", file=stderr) - print("> ", body or "", file=stderr) - print("> ", file=stderr) - - def print_response_debug_info(self, response): - """ - Prints debug info for a response from requests - """ - # these come back as ints, convert to HTTP version - http_version = response.raw.version / 10 - - print( - f"< HTTP/{http_version:.1f} {response.status_code} {response.reason}", - file=stderr, - ) - for k, v in response.headers.items(): - print(f"< {k}: {v}", file=stderr) - print("< ", file=stderr) - - def do_request( - self, operation, args, filter_header=None, skip_error_handling=False - ): # pylint: disable=too-many-locals,too-many-branches,too-many-statements - """ - Makes a request to an operation's URL and returns the resulting JSON, or - prints and error if a non-200 comes back - """ - method = getattr(requests, operation.method) - headers = { - "Authorization": f"Bearer {self.config.get_token()}", - "Content-Type": "application/json", - "User-Agent": ( - f"linode-cli:{self.version} " - f"python/{version_info[0]}.{version_info[1]}.{version_info[2]}" - ), - } - - parsed_args = operation.parse_args(args) - - url = operation.url.format(**vars(parsed_args)) - - if operation.method == "get": - url += f"?page={self.page}&page_size={self.page_size}" - - body = None - if operation.method == "get": - if filter_header is not None: - # plugins can specify their own filters - use those by default - headers["X-Filter"] = json.dumps(filter_header) - else: - # otherwise, get filters from the CLI call - filters = vars(parsed_args) - # remove URL parameters - for p in operation.params: - if p.name in filters: - del filters[p.name] - # remove empty filters - filters = {k: v for k, v in filters.items() if v is not None} - # apply filter, if any - if filters: - headers["X-Filter"] = json.dumps(filters) - else: - if self.defaults: - parsed_args = self.config.update( - parsed_args, operation.allowed_defaults - ) - - to_json = {k: v for k, v in vars(parsed_args).items() if v is not None} - - expanded_json = {} - # expand paths - for k, v in to_json.items(): - cur = expanded_json - for part in k.split(".")[:-1]: - if part not in cur: - cur[part] = {} - cur = cur[part] - cur[k.split(".")[-1]] = v - - body = json.dumps(expanded_json) - - if self.debug_request: - self.print_request_debug_info(method, url, headers, body) - - result = method(url, headers=headers, data=body) - - if self.debug_request: - self.print_response_debug_info(result) - - if not self.suppress_warnings: - # check the major/minor version API reported against what we were built - # with to see if an upgrade should be available - api_version_higher = False - - if "X-Spec-Version" in result.headers: - spec_version = result.headers.get("X-Spec-Version") - - try: - # Parse the spec versions from the API and local CLI. - StrictVersion(spec_version) - StrictVersion(self.spec_version) - - # Get only the Major/Minor version of the API Spec and CLI Spec, - # ignore patch version differences - spec_major_minor_version = ( - spec_version.split(".")[0] + "." + spec_version.split(".")[1] - ) - current_major_minor_version = ( - self.spec_version.split(".")[0] - + "." - + self.spec_version.split(".")[1] - ) - except ValueError: - # If versions are non-standard like, "DEVELOPMENT" use them and don't complain. - spec_major_minor_version = spec_version - current_major_minor_version = self.spec_version - - try: - if LooseVersion(spec_major_minor_version) > LooseVersion( - current_major_minor_version - ): - api_version_higher = True - except: - # if this comparison or parsing failed, still process output - print( - f"Parsing failed when comparing local version {self.spec_version} with " - f"server version {spec_version}. If this problem persists, please open a " - "ticket with `linode-cli support ticket-create`", - file=stderr, - ) - - if api_version_higher: - # check to see if there is, in fact, a version to upgrade to. If not, don't - # suggest an upgrade (since there's no package anyway) - new_version_exists = False - - try: - # do this all in a try block since it must _never_ prevent the CLI - # from showing command output - pypi_response = requests.get( - "https://pypi.org/pypi/linode-cli/json", timeout=1 # seconds - ) - - if pypi_response.status_code == 200: - # we got data back - pypi_version = pypi_response.json()["info"]["version"] - - # no need to be fancy; these should always be valid versions - if LooseVersion(pypi_version) > LooseVersion(self.version): - new_version_exists = True - except: - # I know, but if anything happens here the end user should still - # be able to see the command output - print( - "Unable to determine if a new linode-cli package is available " - "in pypi. If this message persists, open a ticket or invoke " - "with --suppress-warnings", - file=stderr, - ) - - if new_version_exists: - print( - f"The API responded with version {spec_version}, which is newer than " - f"the CLI's version of {self.spec_version}. Please update the CLI to get " - "access to the newest features. You can update with a " - f"simple `{PIP_CMD} install --upgrade linode-cli`", - file=stderr, - ) - - if not 199 < result.status_code < 399 and not skip_error_handling: - self._handle_error(result) - - return result - - def _handle_error(self, response): - """ - Given an error message, properly displays the error to the user and exits. - """ - print(f"Request failed: {response.status_code}", file=stderr) - - resp_json = response.json() - - if "errors" in resp_json: - data = [ - [error.get("field") or "", error.get("reason")] - for error in resp_json["errors"] - ] - self.output_handler.print( - None, data, title="errors", to=stderr, columns=["field", "reason"] - ) - sys.exit(1) - @staticmethod def _flatten_url_path(tag): new_tag = tag.lower() @@ -731,25 +467,13 @@ def handle_command(self, command, action, args): action """ - if command not in self.ops: - print(f"Command not found: {command}") + try: + operation = self.find_operation(command, action) + except ValueError as e: + print(e, file=sys.stderr) sys.exit(1) - operation = self.ops[command][action] if action in self.ops[command] else None - - if operation is None: - # Find the matching alias - for op in self.ops[command].values(): - if action in op.action_aliases: - operation = op - break - - # Fail if no matching alias was found - if operation is None: - print(f"No action {action} for command {command}") - sys.exit(1) - - result = self.do_request(operation, args) + result = do_request(self, operation, args) operation.process_response_json(result.json(), self.output_handler) @@ -785,8 +509,32 @@ def call_operation(self, command, action, args=None, filters=None): operation = self.ops[command][action] - result = self.do_request( - operation, args, filter_header=filters, skip_error_handling=True + result = do_request( + self, + operation, + args, + filter_header=filters, + skip_error_handling=True, ) return result.status_code, result.json() + + def find_operation(self, command, action): + """ + Finds the corresponding operation for the given command and action. + """ + if command not in self.ops: + raise ValueError(f"Command not found: {command}") + + command_dict = self.ops[command] + + if action in command_dict: + return command_dict[action] + + # Find the matching alias + for op in command_dict.values(): + if action in op.action_aliases: + return op + + # Fail if no matching alias was found + raise ValueError(f"No action {action} for command {command}") diff --git a/linodecli/completion.py b/linodecli/completion.py new file mode 100644 index 00000000..eb6b3a72 --- /dev/null +++ b/linodecli/completion.py @@ -0,0 +1,121 @@ +#!/usr/local/bin/python3 +""" +Contains any code relevant to generating/updating shell completions for linode-cli +""" + +from string import Template + + +def bake_completions(ops): + """ + Given a baked CLI, generates and saves a bash completion file + """ + print("Baking bash completions...") + if "_base_url" in ops: + del ops["_base_url"] + if "_spec_version" in ops: + del ops["_spec_version"] + rendered = get_bash_completions(ops) + with open("linode-cli.sh", "w", encoding="utf-8") as bash_f: + print("Writing file...") + bash_f.write(rendered) + + +def get_completions(ops, help_flag, action): + """ + Handle shell completions based on `linode-cli completion ____` + """ + if help_flag or not action: + return ( + "linode-cli completion [SHELL]\n\n" + "Prints shell completions for the requested shell to stdout.\n" + "Currently, only completions for bash and fish are available." + ) + if action == "bash": + return get_bash_completions(ops) + if action == "fish": + return get_fish_completions(ops) + return ( + "Completions are only available for bash and fish at this time.\n\n" + "To retrieve these, please invoke as\n" + "`linode-cli completion bash` or `linode-cli completion fish`" + ) + + +def get_fish_completions(ops): + """ + Generates and returns fish shell completions based on the baked spec + """ + completion_template = Template( + """# This is a generated file by Linode-CLI! Do not modify! +complete -c linode-cli -n "not __fish_seen_subcommand_from $subcommands" -x -a '$subcommands --help' +$command_items""" + ) + + command_template = Template( + """complete -c linode-cli -n "__fish_seen_subcommand_from $command" \ +-x -a '$actions --help'""" + ) + + command_blocks = [ + command_template.safe_substitute( + command=op, actions=" ".join(list(actions.keys())) + ) + for op, actions in ops.items() + ] + + rendered = completion_template.safe_substitute( + subcommands=" ".join(ops.keys()), + command_items="\n".join(command_blocks), + ) + + return rendered + + +def get_bash_completions(ops): + """ + Generates and returns bash shell completions based on the baked spec + """ + completion_template = Template( + """# This is a generated file by Linode-CLI! Do not modify! +_linode_cli() +{ +local cur prev opts +COMPREPLY=() +cur="${COMP_WORDS[COMP_CWORD]}" +prev="${COMP_WORDS[COMP_CWORD-1]}" + +case "${prev}" in + linode-cli) + COMPREPLY=( $(compgen -W "$actions --help" -- ${cur}) ) + return 0 + ;; + $command_items + *) + ;; +esac +} + +complete -F _linode_cli linode-cli""" + ) + + command_template = Template( + """$command) + COMPREPLY=( $(compgen -W "$actions --help" -- ${cur}) ) + return 0 + ;;""" + ) + + command_blocks = [ + command_template.safe_substitute( + command=op, actions=" ".join(list(actions.keys())) + ) + for op, actions in ops.items() + ] + + rendered = completion_template.safe_substitute( + actions=" ".join(ops.keys()), + command_items="\n ".join(command_blocks), + ) + + return rendered diff --git a/linodecli/configuration.py b/linodecli/configuration.py deleted file mode 100644 index e2163f16..00000000 --- a/linodecli/configuration.py +++ /dev/null @@ -1,805 +0,0 @@ -""" -Handles configuring the cli, as well as loading configs so that they can be -used elsewhere. -""" - -import re -import socket -import argparse -import webbrowser -from http import server - -try: - # python3 - import configparser -except ImportError: - # python2 - import ConfigParser as configparser - -import os -import sys - -import requests - -ENV_TOKEN_NAME = "LINODE_CLI_TOKEN" - -LEGACY_CONFIG_DIR = os.path.expanduser("~") -LEGACY_CONFIG_NAME = ".linode-cli" -CONFIG_DIR = os.environ.get("XDG_CONFIG_HOME", f"{os.path.expanduser('~')}/.config") - -CONFIG_NAME = "linode-cli" -TOKEN_GENERATION_URL = "https://cloud.linode.com/profile/tokens" - -# This is used for web-based configuration -OAUTH_CLIENT_ID = "5823b4627e45411d18e9" - -# this is a list of browser that _should_ work for web-based auth. This is mostly -# intended to exclude lynx and other terminal browsers which could be opened, but -# won't work. -KNOWN_GOOD_BROWSERS = { - "chrome", - "firefox", - "mozilla", - "netscape", - "opera", - "safari", - "chromium", - "chromium-browser", - "epiphany", -} - -# in the event that we can't load the styled landing page from file, this will -# do as a landing page -DEFAULT_LANDING_PAGE = """ -

Success


You may return to your terminal to continue..

- -""" - - -class CLIConfig: - """ - Generates the necessary config for the Linode CLI - """ - - def __init__(self, base_url, username=None, skip_config=False): - self.base_url = base_url - self.username = username - self.config = self._get_config(load=not skip_config) - self.running_plugin = None - self.used_env_token = False - - self._configured = False - - self.configure_with_pat = "--token" in sys.argv - - if ( - not skip_config - and not self.config.has_option("DEFAULT", "default-user") - and self.config.has_option("DEFAULT", "token") - ): - self._handle_no_default_user() - - environ_token = os.getenv(ENV_TOKEN_NAME, None) - - if ( - not self.config.has_option("DEFAULT", "default-user") - and not skip_config - and environ_token is None - ): - self.configure() - elif environ_token is not None: - self.used_env_token = True - - def set_user(self, username): - """ - Sets the acting username. If this username is not in the config, this is - an error. This overrides the default username - """ - if not self.config.has_section(username): - print(f"User {username} is not configured!") - sys.exit(1) - - self.username = username - - def default_username(self): - """ - Returns the default-user Username - """ - if self.config.has_option("DEFAULT", "default-user"): - return self.config.get("DEFAULT", "default-user") - return "" - - def update_namespace(self, namespace, new_dict): - """ - In order to update the namespace, we need to turn it into a dict, modify it there, - then reconstruct it with the exploded dict. - """ - ns_dict = vars(namespace) - warn_dict = {} - for k in new_dict: - if k.startswith("plugin-"): - # plugins set config options that start with 'plugin-' - these don't - # get included in the updated namespace - continue - if k in ns_dict and isinstance(k, list): - ns_dict[k].append(new_dict[k]) - if k in ns_dict and ns_dict[k] is None: - warn_dict[k] = new_dict[k] - ns_dict[k] = new_dict[k] - if not any(x in ["--suppress-warnings", "--no-headers"] for x in sys.argv): - print( - f"using default values: {warn_dict}, use --no-defaults flag to disable defaults" - ) - return argparse.Namespace(**ns_dict) - - def update(self, namespace, allowed_defaults): - """ - This updates a Namespace (as returned by ArgumentParser) with config values - if they aren't present in the Namespace already. - """ - if self.used_env_token and self.config is None: - # the CLI is using a token defined in the environment; as such, we may - # not have actually loaded a config file. That's fine, there are just - # no defaults - return None - - username = self.username or self.default_username() - - if not self.config.has_option(username, "token") and not os.environ.get( - ENV_TOKEN_NAME, None - ): - print(f"User {username} is not configured.") - sys.exit(1) - - if self.config.has_section(username) and allowed_defaults: - # update_dicts = { - # default_key: self.config.get(username, default_key) - # for default_key in allowed_defaults - # if self.config.has_option(username, default_key) - # } - update_dicts = {} - for default_key in allowed_defaults: - if not self.config.has_option(username, default_key): - continue - value = self.config.get(username, default_key) - if default_key == "authorized_users": - update_dicts[default_key] = [value] - else: - update_dicts[default_key] = value - return self.update_namespace(namespace, update_dicts) - return namespace - - def get_token(self): - """ - Returns the token for a configured user - """ - if self.used_env_token: - return os.environ.get(ENV_TOKEN_NAME, None) - - if self.config.has_option(self.username or self.default_username(), "token"): - return self.config.get(self.username or self.default_username(), "token") - return "" - - def remove_user(self, username): - """ - Removes the requested user from the config. If the user is the default, - this exits with error - """ - if self.default_username() == username: - print( - f"Cannot remote {username} as they are the default user! You can " - "change the default user with: `linode-cli set-user USERNAME`" - ) - sys.exit(1) - - if self.config.has_section(username): - self.config.remove_section(username) - self.write_config() - - def print_users(self): - """ - Prints all users available and exits - """ - print("Configured Users: ") - default_user = self.default_username() - - for sec in self.config.sections(): - if sec != "DEFAULT": - print(f'{"*" if sec == default_user else " "} {sec}') - - sys.exit(0) - - def set_default_user(self, username): - """ - Sets the default user. If that user isn't in the config, exits with error - """ - if not self.config.has_section(username): - print(f"User {username} is not configured!") - sys.exit(1) - - self.config.set("DEFAULT", "default-user", username) - self.write_config() - - # plugin methods - these are intended for plugins to utilize to store their - # own persistent config information - def get_value(self, key): - """ - Retrieves and returns an existing config value for the current user. This - is intended for plugins to use instead of having to deal with figuring out - who the current user is when accessing their config. - - .. warning:: - Plugins _MUST NOT_ set values for the user's config except through - ``plugin_set_value`` below. - - :param key: The key to look up. - :type key: str - - :returns: The value for that key, or None if the key doesn't exist for the - current user. - :rtype: any - """ - username = self.username or self.default_username() - - if not self.config.has_option(username, key): - return None - - return self.config.get(username, key) - - def plugin_set_value(self, key, value): - """ - Sets a new config value for a plugin for the current user. Plugin config - keys are set in the following format:: - - plugin-{plugin_name}-{key} - - Values set with this method are intended to be retrieved with ``plugin_get_value`` - below. - - :param key: The config key to set - this is needed to retrieve the value - :type key: str - :param value: The value to set for this key - :type value: any - """ - if self.running_plugin is None: - raise RuntimeError("No running plugin to retrieve configuration for!") - - username = self.username or self.default_username() - self.config.set(username, f"plugin-{self.running_plugin}-{key}", value) - - def plugin_get_value(self, key): - """ - Retrieves and returns a config value previously set for a plugin. Your - plugin should have set this value in the past. If this value does not - exist in the config, ``None`` is returned. This is the only time - ``None`` is returned, so receiving this value should be treated as - "plugin is not configured." - - :param key: The key of the value to return - :type key: str - - :returns: The value for this plugin for this key, or None if not set - :rtype: any - """ - if self.running_plugin is None: - raise RuntimeError("No running plugin to retrieve configuration for!") - - username = self.username or self.default_username() - full_key = f"plugin-{self.running_plugin}-{key}" - - if not self.config.has_option(username, full_key): - return None - - return self.config.get(username, full_key) - - def write_config(self, silent=False): - """ - Saves the config file as it is right now. This can be used by plugins - to save values they've set, and is used internally to update the config - on disk when a new user if configured. - - :param silent: If True, does not print a message noting the config file - has been updated. This is primarily intended for silently - updated the config file from one version to another. - :type silent: bool - """ - - # Create the ~/.config directory if it does not exist - if not os.path.exists(f"{os.path.expanduser('~')}/.config"): - os.makedirs(f"{os.path.expanduser('~')}/.config") - - with open(self._get_config_path(), "w", encoding="utf-8") as f: - self.config.write(f) - - if not silent: - print(f"\nConfig written to {self._get_config_path()}") - - def _username_for_token(self, token): - """ - A helper function that returns the username assocaited with a token by - requesting it from the API - """ - u = self._do_get_request("/profile", token=token, exit_on_error=False) - if "errors" in u: - reasons = ",".join([c["reason"] for c in u["errors"]]) - print(f"That token didn't work: {reasons}") - return None - - return u["username"] - - def _get_token_terminal(self): - """ - Handles prompting the user for a Personal Access Token and checking it - to ensure it works. - """ - print( - f""" -First, we need a Personal Access Token. To get one, please visit -{TOKEN_GENERATION_URL} and click -"Create a Personal Access Token". The CLI needs access to everything -on your account to work correctly.""" - ) - - while True: - token = input("Personal Access Token: ") - - username = self._username_for_token(token) - if username is not None: - break - - return username, token - - def _get_token_web(self): - """ - Handles OAuth authentication for the CLI. This requires us to get a temporary - token over OAuth and then use it to create a permanent token for the CLI. - This function returns the token the CLI should use, or exits if anything - goes wrong. - """ - temp_token = self._handle_oauth_callback() - username = self._username_for_token(temp_token) - - if username is None: - print("OAuth failed. Please try again of use a token for auth.") - sys.exit(1) - - # the token returned via public oauth will expire in 2 hours, which - # isn't great. Instead, we're gonna make a token that expires never - # and store that. - result = self._do_request( - requests.post, - "/profile/tokens", - token=temp_token, - # generate the actual token with a label like: - # Linode CLI @ linode - # The creation date is already recoreded with the token, so - # this should be all the relevant info. - body={"label": f"Linode CLI @ {socket.gethostname()}"}, - ) - - return username, result["token"] - - def _handle_oauth_callback(self): - """ - Sends the user to a URL to perform an OAuth login for the CLI, then redirets - them to a locally-hosted page that captures teh token - """ - # load up landing page HTML - landing_page_path = os.path.join( - os.path.dirname(os.path.realpath(__file__)), "oauth-landing-page.html" - ) - - try: - with open(landing_page_path, encoding="utf-8") as f: - landing_page = f.read() - except: - landing_page = DEFAULT_LANDING_PAGE - - class Handler(server.BaseHTTPRequestHandler): - """ - The issue here is that Login sends the token in the URL hash, meaning - that we cannot see it on the server side. An attempt was made to - get the client (browser) to send an ajax request to pass it along, - but that's pretty gross and also isn't working. Needs more thought. - """ - - def do_GET(self): - """ - get the access token - """ - if "token" in self.path: - # we got a token! Parse it out of the request - token_part = self.path.split("/", 2)[2] - - m = re.search(r"access_token=([a-z0-9]+)", token_part) - if m and len(m.groups()): - self.server.token = m.groups()[0] - - self.send_response(200) - self.send_header("Content-type", "text/html") - self.end_headers() - - # TODO: Clean up this page and make it look nice - self.wfile.write( - bytes( - landing_page.format(port=self.server.server_address[1]).encode( - "utf-8" - ) - ) - ) - - def log_message(self, form, *args): # pylint: disable=arguments-differ - """Don't actually log the request""" - - # start a server to catch the response - serv = server.HTTPServer(("localhost", 0), Handler) - serv.token = None - - # figure out the URL to direct the user to and print out the prompt - # pylint: disable-next=line-too-long - url = f"https://login.linode.com/oauth/authorize?client_id={OAUTH_CLIENT_ID}&response_type=token&scopes=*&redirect_uri=http://localhost:{serv.server_address[1]}" - print( - f"""A browser should open directing you to this URL to authenticate: - -{url} - -If you are not automatically directed there, please copy/paste the link into your browser -to continue.. -""" - ) - - webbrowser.open(url) - - try: - while serv.token is None: - # serve requests one at a time until we get a token or are interrupted - serv.handle_request() - except KeyboardInterrupt: - print() - print( - "Giving up. If you couldn't get web authentication to work, please " - "try token using a token by invoking with `linode-cli configure --token`, " - "and open an issue at https://github.com/linode/linode-cli" - ) - sys.exit(1) - - return serv.token - - def configure(self): # pylint: disable=too-many-branches,too-many-statements - """ - This assumes we're running interactively, and prompts the user - for a series of defaults in order to make future CLI calls - easier. This also sets up the config file. - """ - # If configuration has already been done in this run, don't do it again. - if self._configured: - return - config = {} - # we're configuring the default user if there is no default user configured - # yet - is_default = not self.config.has_option("DEFAULT", "default-user") - username = None - token = None - - print( - """Welcome to the Linode CLI. This will walk you through some initial setup.""" - ) - - if ENV_TOKEN_NAME in os.environ: - print( - """Using token from {env_token_name}. -Note that no token will be saved in your configuration file. - * If you lose or remove {env_token_name}. - * All profiles will use {env_token_name}.""".format( - env_token_name=ENV_TOKEN_NAME - ) - ) - username = "DEFAULT" - token = os.getenv(ENV_TOKEN_NAME) - - else: - # let's see if we _can_ use web - can_use_browser = True - try: - webbrowser.get() - except webbrowser.Error: - # there are no browsers installed - can_use_browser = False - - if can_use_browser and not KNOWN_GOOD_BROWSERS.intersection( - webbrowser._tryorder # pylint: disable=protected-access - ): - print() - print( - "This tool defaults to web-based authentication, however " - "no known-working browsers were found." - ) - - while True: - r = input("Try it anyway? [y/N]: ") - if r.lower() in "yn ": - can_use_browser = r.lower() == "y" - break - - if self.configure_with_pat or not can_use_browser: - username, config["token"] = self._get_token_terminal() - else: - # pylint: disable=line-too-long - print() - print( - "The CLI will use its web-based authentication to log you in. " - "If you prefer to supply a Personal Access Token, use `linode-cli configure --token`. " - ) - print() - input( - "Press enter to continue. This will open a browser and proceed with authentication." - ) - username, config["token"] = self._get_token_web() - # pylint: enable=line-too-long - - token = config["token"] - - print() - print(f"Configuring {username}") - print() - - regions = [r["id"] for r in self._do_get_request("/regions")["data"]] - types = [t["id"] for t in self._do_get_request("/linode/types")["data"]] - images = [i["id"] for i in self._do_get_request("/images")["data"]] - - is_full_access = self._check_full_access(token) - - auth_users = [] - - if is_full_access: - auth_users = [ - u["username"] - for u in self._do_get_request( - "/account/users", exit_on_error=False, token=token - )["data"] - if "ssh_keys" in u - ] - - # get the preferred things - config["region"] = self._default_thing_input( - "Default Region for operations.", - regions, - "Default Region (Optional): ", - "Please select a valid Region, or press Enter to skip", - ) - - config["type"] = self._default_thing_input( - "Default Type of Linode to deploy.", - types, - "Default Type of Linode (Optional): ", - "Please select a valid Type, or press Enter to skip", - ) - - config["image"] = self._default_thing_input( - "Default Image to deploy to new Linodes.", - images, - "Default Image (Optional): ", - "Please select a valid Image, or press Enter to skip", - ) - - if auth_users: - config["authorized_users"] = self._default_thing_input( - "Select the user that should be given default SSH access to new Linodes.", - auth_users, - "Default Option (Optional): ", - "Please select a valid Option, or press Enter to skip", - ) - - # save off the new configuration - if username != "DEFAULT" and not self.config.has_section(username): - self.config.add_section(username) - - if not is_default: - if username != self.default_username(): - while True: - value = input( - "Make this user the default when using the CLI? [y/N]: " - ) - - if value.lower() in "yn": - is_default = value.lower() == "y" - break - if not value.strip(): - break - - if not is_default: # they didn't change the default user - print( - f"Active user will remain {self.config.get('DEFAULT', 'default-user')}" - ) - - if is_default: - # if this is the default user, make it so - self.config.set("DEFAULT", "default-user", username) - print(f"Active user is now {username}") - - for k, v in config.items(): - if v: - self.config.set(username, k, v) - - self.write_config() - os.chmod(self._get_config_path(), 0o600) - self._configured = True - - def _get_config_path(self): - """ - Returns the path to the config file. - """ - path = f"{LEGACY_CONFIG_DIR}/{LEGACY_CONFIG_NAME}" - if os.path.exists(path): - return path - - return f"{CONFIG_DIR}/{CONFIG_NAME}" - - def _get_config(self, load=True): - """ - Returns a new ConfigParser object that represents the CLI's configuration. - If load is false, we won't load the config from disk. - - :param load: If True, load the config from the default path. Otherwise, - don't (and just return an empty ConfigParser) - :type load: bool - """ - conf = configparser.ConfigParser() - - if load: - conf.read(self._get_config_path()) - - return conf - - def _default_thing_input( - self, ask, things, prompt, error, optional=True - ): # pylint: disable=too-many-arguments - """ - Requests the user choose from a list of things with the given prompt and - error if they choose something invalid. If optional, the user may hit - enter to not configure this option. - """ - print(f"\n{ask} Choices are:") - for ind, thing in enumerate(things): - print(f" {ind + 1} - {thing}") - print() - - ret = "" - while True: - choice = input(prompt) - - if choice: - try: - choice = int(choice) - choice = things[choice - 1] - except: - pass - - if choice in list(things): - ret = choice - break - print(error) - else: - if optional: - break - print(error) - return ret - - def _do_get_request(self, url, token=None, exit_on_error=True): - """ - Does helper get requests during configuration - """ - return self._do_request( - requests.get, url, token=token, exit_on_error=exit_on_error - ) - - @staticmethod - def _handle_response_status(response, exit_on_error=None): - if 199 < response.status_code < 300: - return - - print(f"Could not contact {response.url} - Error: {response.status_code}") - if exit_on_error: - sys.exit(4) - - def _do_request( - self, method, url, token=None, exit_on_error=None, body=None - ): # pylint: disable=too-many-arguments - """ - Does helper requests during configuration - """ - headers = {} - - if token is not None: - headers["Authorization"] = f"Bearer {token}" - headers["Content-type"] = "application/json" - - result = method(self.base_url + url, headers=headers, json=body) - - self._handle_response_status(result, exit_on_error=exit_on_error) - - return result.json() - - def _check_full_access(self, token): - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/json", - } - - result = requests.get( - self.base_url + "/profile/grants", headers=headers, timeout=120 - ) - - self._handle_response_status(result, exit_on_error=True) - - return result.status_code == 204 - - def _handle_no_default_user(self): - """ - Handle the case that there is no default user in the config - """ - users = [c for c in self.config.sections() if c != "DEFAULT"] - - if len(users) == 1: - # only one user configured - they're the default - self.config.set("DEFAULT", "default-user", users[0]) - self.write_config(silent=True) - return - - if len(users) == 0: - # config is new or _really_ old - token = self.config.get("DEFAULT", "token") - - if token is not None: - # there's a token in the config - configure that user - u = self._do_get_request("/profile", token=token, exit_on_error=False) - - if "errors" in u: - # this token was bad - reconfigure - self.configure() - return - - # setup config for this user - username = u["username"] - - self.config.set("DEFAULT", "default-user", username) - self.config.add_section(username) - self.config.set(username, "token", token) - self.config.set( - username, "region", self.config.get("DEFAULT", "region") - ) - self.config.set(username, "type", self.config.get("DEFAULT", "type")) - self.config.set(username, "image", self.config.get("DEFAULT", "image")) - self.config.set( - username, - "authorized_keys", - self.config.get("DEFAULT", "authorized_keys"), - ) - - self.write_config(silent=True) - else: - # got nothin', reconfigure - self.configure() - - # this should be handled - return - - # more than one user - prompt for the default - print("Please choose the active user. Configured users are:") - for u in users: - print(f" {u}") - print() - - while True: - username = input("Active user: ") - - if username in users: - self.config.set("DEFAULT", "default-user", username) - self.write_config() - return - print(f"No user {username}") diff --git a/linodecli/configuration/__init__.py b/linodecli/configuration/__init__.py new file mode 100644 index 00000000..f637df4b --- /dev/null +++ b/linodecli/configuration/__init__.py @@ -0,0 +1,395 @@ +""" +Handles configuring the cli, as well as loading configs so that they can be +used elsewhere. +""" + +import os +import sys +import argparse + +from .auth import ( + _get_token_web, + _check_full_access, + _do_get_request, + _get_token_terminal, +) +from .helpers import ( + _check_browsers, + _default_thing_input, + _get_config, + _get_config_path, + _handle_no_default_user, +) + +ENV_TOKEN_NAME = "LINODE_CLI_TOKEN" + +class CLIConfig: + """ + Generates the necessary config for the Linode CLI + """ + + def __init__(self, base_url, username=None, skip_config=False): + self.base_url = base_url + self.username = username + self.config = _get_config(load=not skip_config) + self.running_plugin = None + self.used_env_token = False + + self._configured = False + + self.configure_with_pat = "--token" in sys.argv + + if ( + not skip_config + and not self.config.has_option("DEFAULT", "default-user") + and self.config.has_option("DEFAULT", "token") + ): + _handle_no_default_user(self) + + environ_token = os.getenv(ENV_TOKEN_NAME, None) + + if ( + not self.config.has_option("DEFAULT", "default-user") + and not skip_config + and environ_token is None + ): + self.configure() + elif environ_token is not None: + self.used_env_token = True + + def default_username(self): + """ + Returns the default-user Username + """ + if self.config.has_option("DEFAULT", "default-user"): + return self.config.get("DEFAULT", "default-user") + return "" + + def set_user(self, username): + """ + Sets the acting username. If this username is not in the config, this is + an error. This overrides the default username + """ + if not self.config.has_section(username): + print(f"User {username} is not configured!") + sys.exit(1) + + self.username = username + + def remove_user(self, username): + """ + Removes the requested user from the config. If the user is the default, + this exits with error + """ + if self.default_username() == username: + print( + f"Cannot remove {username} as they are the default user! You can " + "change the default user with: `linode-cli set-user USERNAME`" + ) + sys.exit(1) + + if self.config.has_section(username): + self.config.remove_section(username) + self.write_config() + + def print_users(self): + """ + Prints all users available and exits + """ + print("Configured Users: ") + default_user = self.default_username() + + for sec in self.config.sections(): + if sec != "DEFAULT": + print(f'{"*" if sec == default_user else " "} {sec}') + + sys.exit(0) + + def set_default_user(self, username): + """ + Sets the default user. If that user isn't in the config, exits with error + """ + if not self.config.has_section(username): + print(f"User {username} is not configured!") + sys.exit(1) + + self.config.set("DEFAULT", "default-user", username) + self.write_config() + + def get_token(self): + """ + Returns the token for a configured user + """ + if self.used_env_token: + return os.environ.get(ENV_TOKEN_NAME, None) + + if self.config.has_option(self.username or self.default_username(), "token"): + return self.config.get(self.username or self.default_username(), "token") + return "" + + def get_value(self, key): + """ + Retrieves and returns an existing config value for the current user. This + is intended for plugins to use instead of having to deal with figuring out + who the current user is when accessing their config. + + .. warning:: + Plugins _MUST NOT_ set values for the user's config except through + ``plugin_set_value`` below. + + :param key: The key to look up. + :type key: str + + :returns: The value for that key, or None if the key doesn't exist for the + current user. + :rtype: any + """ + username = self.username or self.default_username() + + if not self.config.has_option(username, key): + return None + + return self.config.get(username, key) + + # plugin methods - these are intended for plugins to utilize to store their + # own persistent config information + def plugin_set_value(self, key, value): + """ + Sets a new config value for a plugin for the current user. Plugin config + keys are set in the following format:: + + plugin-{plugin_name}-{key} + + Values set with this method are intended to be retrieved with ``plugin_get_value`` + below. + + :param key: The config key to set - this is needed to retrieve the value + :type key: str + :param value: The value to set for this key + :type value: any + """ + if self.running_plugin is None: + raise RuntimeError( + "No running plugin to retrieve configuration for!" + ) + + username = self.username or self.default_username() + self.config.set(username, f"plugin-{self.running_plugin}-{key}", value) + + def plugin_get_value(self, key): + """ + Retrieves and returns a config value previously set for a plugin. Your + plugin should have set this value in the past. If this value does not + exist in the config, ``None`` is returned. This is the only time + ``None`` is returned, so receiving this value should be treated as + "plugin is not configured." + + :param key: The key of the value to return + :type key: str + + :returns: The value for this plugin for this key, or None if not set + :rtype: any + """ + if self.running_plugin is None: + raise RuntimeError( + "No running plugin to retrieve configuration for!" + ) + + username = self.username or self.default_username() + full_key = f"plugin-{self.running_plugin}-{key}" + + if not self.config.has_option(username, full_key): + return None + + return self.config.get(username, full_key) + + # TODO: this is more of an argparsing function than it is a config function + # might be better to move this to argparsing during refactor and just have + # configuration return defaults or keys or something + def update(self, namespace, allowed_defaults): + """ + This updates a Namespace (as returned by ArgumentParser) with config values + if they aren't present in the Namespace already. + """ + if self.used_env_token and self.config is None: + return None + username = self.username or self.default_username() + if (not self.config.has_option(username, "token") + and not os.environ.get(ENV_TOKEN_NAME, None)): + print(f"User {username} is not configured.") + sys.exit(1) + if (not self.config.has_section(username) + or allowed_defaults is None): + return namespace + + warn_dict = {} + ns_dict = vars(namespace) + for key in allowed_defaults: + if key not in ns_dict: + continue + if ns_dict[key] is not None: + continue + # plugins set config options that start with 'plugin-' + # these don't get included in the updated namespace + if key.startswith("plugin-"): + continue + if self.config.has_option(username, key): + value = self.config.get(username, key) + else: + value = allowed_defaults[key] + if key == "authorized_users": + ns_dict[key] = [value] + warn_dict[key] = [value] + else: + ns_dict[key] = value + warn_dict[key] = value + + if not any(x in ["--suppress-warnings", "--no-headers"] for x in sys.argv): + print(f"using default values: {warn_dict}, " + "use --no-defaults flag to disable defaults") + return argparse.Namespace(**ns_dict) + + def write_config(self): + """ + Saves the config file as it is right now. This can be used by plugins + to save values they've set, and is used internally to update the config + on disk when a new user if configured. + """ + if not os.path.exists(f"{os.path.expanduser('~')}/.config"): + os.makedirs(f"{os.path.expanduser('~')}/.config") + with open(_get_config_path(), "w", encoding="utf-8") as f: + self.config.write(f) + + + def configure(self): # pylint: disable=too-many-branches,too-many-statements + """ + This assumes we're running interactively, and prompts the user + for a series of defaults in order to make future CLI calls + easier. This also sets up the config file. + """ + # If configuration has already been done in this run, don't do it again. + if self._configured: + return + config = {} + # we're configuring the default user if there is no default user configured + # yet + is_default = not self.config.has_option("DEFAULT", "default-user") + username = None + token = None + + print("Welcome to the Linode CLI. This will walk you through some initial setup.") + + if ENV_TOKEN_NAME in os.environ: + print( + f"""Using token from {ENV_TOKEN_NAME}. +Note that no token will be saved in your configuration file. + * If you lose or remove {ENV_TOKEN_NAME}. + * All profiles will use {ENV_TOKEN_NAME}.""" + ) + username = "DEFAULT" + token = os.getenv(ENV_TOKEN_NAME) + + else: + if _check_browsers() and not self.configure_with_pat: + print(""" +The CLI will use its web-based authentication to log you in. +If you prefer to supply a Personal Access Token, use `linode-cli configure --token`. + """) + input( + "Press enter to continue. " + "This will open a browser and proceed with authentication." + ) + username, config["token"] = _get_token_web(self.base_url) + else: + username, config["token"] = _get_token_terminal(self.base_url) + token = config["token"] + + print(f"\nConfiguring {username}\n") + + # Configuring Defaults + + regions = [ + r["id"] for r in _do_get_request(self.base_url, "/regions")["data"] + ] + types = [ + t["id"] + for t in _do_get_request(self.base_url, "/linode/types")["data"] + ] + images = [ + i["id"] for i in _do_get_request(self.base_url, "/images")["data"] + ] + + is_full_access = _check_full_access(self.base_url, token) + + auth_users = [] + + if is_full_access: + auth_users = [ + u["username"] + for u in _do_get_request( + self.base_url, + "/account/users", + exit_on_error=False, + token=token, + )["data"] + if "ssh_keys" in u + ] + + # get the preferred things + config["region"] = _default_thing_input( + "Default Region for operations.", + regions, + "Default Region (Optional): ", + "Please select a valid Region, or press Enter to skip", + ) + + config["type"] = _default_thing_input( + "Default Type of Linode to deploy.", + types, + "Default Type of Linode (Optional): ", + "Please select a valid Type, or press Enter to skip", + ) + + config["image"] = _default_thing_input( + "Default Image to deploy to new Linodes.", + images, + "Default Image (Optional): ", + "Please select a valid Image, or press Enter to skip", + ) + + if auth_users: + config["authorized_users"] = _default_thing_input( + "Select the user that should be given default SSH access to new Linodes.", + auth_users, + "Default Option (Optional): ", + "Please select a valid Option, or press Enter to skip", + ) + + # save off the new configuration + if username != "DEFAULT" and not self.config.has_section(username): + self.config.add_section(username) + + if not is_default: + if username != self.default_username(): + while True: + value = input("Make this user the default when using the CLI? [y/N]: ") + if value.lower() in "yn": + is_default = value.lower() == "y" + break + if not value.strip(): + break + if not is_default: # they didn't change the default user + print(f"Active user will remain {self.config.get('DEFAULT', 'default-user')}") + + if is_default: + # if this is the default user, make it so + self.config.set("DEFAULT", "default-user", username) + print(f"Active user is now {username}") + + for k, v in config.items(): + if v: + self.config.set(username, k, v) + + self.write_config() + os.chmod(_get_config_path(), 0o600) + self._configured = True diff --git a/linodecli/configuration/auth.py b/linodecli/configuration/auth.py new file mode 100644 index 00000000..45557119 --- /dev/null +++ b/linodecli/configuration/auth.py @@ -0,0 +1,233 @@ +""" +Helper functions for configuration related to auth +""" + +import re +import socket +import sys +import webbrowser +from http import server +from pathlib import Path + +import requests + +TOKEN_GENERATION_URL = "https://cloud.linode.com/profile/tokens" +# This is used for web-based configuration +OAUTH_CLIENT_ID = "5823b4627e45411d18e9" +# in the event that we can't load the styled landing page from file, this will +# do as a landing page +DEFAULT_LANDING_PAGE = """ +

Success


You may return to your terminal to continue..

+ +""" + + +def _handle_response_status(response, exit_on_error=None): + if 199 < response.status_code < 300: + return + + print(f"Could not contact {response.url} - Error: {response.status_code}") + if exit_on_error: + sys.exit(4) + + +# TODO: merge config do_request and cli do_request +def _do_get_request(base_url, url, token=None, exit_on_error=True): + """ + Does helper get requests during configuration + """ + return _do_request( + base_url, requests.get, url, token=token, exit_on_error=exit_on_error + ) + + +def _do_request( + base_url, method, url, token=None, exit_on_error=None, body=None +): # pylint: disable=too-many-arguments + """ + Does helper requests during configuration + """ + headers = {} + + if token is not None: + headers["Authorization"] = f"Bearer {token}" + headers["Content-type"] = "application/json" + + result = method(base_url + url, headers=headers, json=body) + + _handle_response_status(result, exit_on_error=exit_on_error) + + return result.json() + + +def _check_full_access(base_url, token): + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + + result = requests.get( + base_url + "/profile/grants", headers=headers, timeout=120 + ) + + _handle_response_status(result, exit_on_error=True) + + return result.status_code == 204 + + +def _username_for_token(base_url, token): + """ + A helper function that returns the username associated with a token by + requesting it from the API + """ + u = _do_get_request(base_url, "/profile", token=token, exit_on_error=False) + if "errors" in u: + reasons = ",".join([c["reason"] for c in u["errors"]]) + print(f"That token didn't work: {reasons}") + return None + + return u["username"] + + +def _get_token_terminal(base_url): + """ + Handles prompting the user for a Personal Access Token and checking it + to ensure it works. + """ + print( + f""" +First, we need a Personal Access Token. To get one, please visit +{TOKEN_GENERATION_URL} and click +"Create a Personal Access Token". The CLI needs access to everything +on your account to work correctly.""" + ) + + while True: + token = input("Personal Access Token: ") + + username = _username_for_token(base_url, token) + if username is not None: + break + + return username, token + + +def _get_token_web(base_url): + """ + Handles OAuth authentication for the CLI. This requires us to get a temporary + token over OAuth and then use it to create a permanent token for the CLI. + This function returns the token the CLI should use, or exits if anything + goes wrong. + """ + temp_token = _handle_oauth_callback() + username = _username_for_token(base_url, temp_token) + + if username is None: + print("OAuth failed. Please try again of use a token for auth.") + sys.exit(1) + + # the token returned via public oauth will expire in 2 hours, which + # isn't great. Instead, we're gonna make a token that expires never + # and store that. + result = _do_request( + base_url, + requests.post, + "/profile/tokens", + token=temp_token, + # generate the actual token with a label like: + # Linode CLI @ linode + # The creation date is already recoreded with the token, so + # this should be all the relevant info. + body={"label": f"Linode CLI @ {socket.gethostname()}"}, + ) + + return username, result["token"] + + +def _handle_oauth_callback(): + """ + Sends the user to a URL to perform an OAuth login for the CLI, then redirets + them to a locally-hosted page that captures teh token + """ + # load up landing page HTML + landing_page_path = Path(__file__).parent.parent / "oauth-landing-page.html" + try: + with open(landing_page_path, encoding="utf-8") as f: + landing_page = f.read() + except: + landing_page = DEFAULT_LANDING_PAGE + + class Handler(server.BaseHTTPRequestHandler): + """ + The issue here is that Login sends the token in the URL hash, meaning + that we cannot see it on the server side. An attempt was made to + get the client (browser) to send an ajax request to pass it along, + but that's pretty gross and also isn't working. Needs more thought. + """ + + def do_GET(self): + """ + get the access token + """ + if "token" in self.path: + # we got a token! Parse it out of the request + token_part = self.path.split("/", 2)[2] + + m = re.search(r"access_token=([a-z0-9]+)", token_part) + if m and len(m.groups()): + self.server.token = m.groups()[0] + + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + # TODO: Clean up this page and make it look nice + self.wfile.write( + bytes( + landing_page.format( + port=self.server.server_address[1] + ).encode("utf-8") + ) + ) + + def log_message(self, form, *args): # pylint: disable=arguments-differ + """Don't actually log the request""" + + # start a server to catch the response + serv = server.HTTPServer(("localhost", 0), Handler) + serv.token = None + + # figure out the URL to direct the user to and print out the prompt + # pylint: disable-next=line-too-long + url = f"https://login.linode.com/oauth/authorize?client_id={OAUTH_CLIENT_ID}&response_type=token&scopes=*&redirect_uri=http://localhost:{serv.server_address[1]}" + print( + f"""A browser should open directing you to this URL to authenticate: + +{url} + +If you are not automatically directed there, please copy/paste the link into your browser +to continue.. +""" + ) + + webbrowser.open(url) + + try: + while serv.token is None: + # serve requests one at a time until we get a token or are interrupted + serv.handle_request() + except KeyboardInterrupt: + print() + print( + "Giving up. If you couldn't get web authentication to work, please " + "try token using a token by invoking with `linode-cli configure --token`, " + "and open an issue at https://github.com/linode/linode-cli" + ) + sys.exit(1) + + return serv.token diff --git a/linodecli/configuration/helpers.py b/linodecli/configuration/helpers.py new file mode 100644 index 00000000..2b7f1672 --- /dev/null +++ b/linodecli/configuration/helpers.py @@ -0,0 +1,190 @@ +""" +General helper functions for configuraiton +""" + +import os +import webbrowser + +from .auth import _do_get_request + +try: + # python3 + import configparser +except ImportError: + # python2 + import ConfigParser as configparser + +LEGACY_CONFIG_NAME = ".linode-cli" +LEGACY_CONFIG_DIR = os.path.expanduser("~") + +CONFIG_NAME = "linode-cli" +CONFIG_DIR = os.environ.get( + "XDG_CONFIG_HOME", f"{os.path.expanduser('~')}/.config" +) + + +# this is a list of browser that _should_ work for web-based auth. This is mostly +# intended to exclude lynx and other terminal browsers which could be opened, but +# won't work. +KNOWN_GOOD_BROWSERS = { + "chrome", + "firefox", + "mozilla", + "netscape", + "opera", + "safari", + "chromium", + "chromium-browser", + "epiphany", +} + +def _get_config_path(): + """ + Returns the path to the config file. + """ + path = f"{LEGACY_CONFIG_DIR}/{LEGACY_CONFIG_NAME}" + if os.path.exists(path): + return path + + return f"{CONFIG_DIR}/{CONFIG_NAME}" + + +def _get_config(load=True): + """ + Returns a new ConfigParser object that represents the CLI's configuration. + If load is false, we won't load the config from disk. + + :param load: If True, load the config from the default path. Otherwise, + don't (and just return an empty ConfigParser) + :type load: bool + """ + conf = configparser.ConfigParser() + + if load: + conf.read(_get_config_path()) + + return conf + +def _check_browsers(): + # let's see if we _can_ use web + try: + webbrowser.get() + except webbrowser.Error: + # there are no browsers installed + return False + + # pylint: disable-next=protected-access + if not KNOWN_GOOD_BROWSERS.intersection(webbrowser._tryorder): + print(""" +This tool defaults to web-based authentication, +however no known-working browsers were found.""") + while True: + r = input("Try it anyway? [y/N]: ") + if r.lower() in "yn ": + return r.lower() == "y" + return True + +def _default_thing_input( + ask, things, prompt, error, optional=True +): # pylint: disable=too-many-arguments + """ + Requests the user choose from a list of things with the given prompt and + error if they choose something invalid. If optional, the user may hit + enter to not configure this option. + """ + print(f"\n{ask} Choices are:") + for ind, thing in enumerate(things): + print(f" {ind + 1} - {thing}") + print() + + ret = "" + while True: + choice = input(prompt) + + if choice: + try: + choice = int(choice) + choice = things[choice - 1] + except: + pass + + if choice in list(things): + ret = choice + break + print(error) + else: + if optional: + break + print(error) + return ret + +def _handle_no_default_user(self): + """ + Handle the case that there is no default user in the config + """ + users = [c for c in self.config.sections() if c != "DEFAULT"] + + if len(users) == 1: + # only one user configured - they're the default + self.config.set("DEFAULT", "default-user", users[0]) + self.write_config() + return + + if len(users) == 0: + # config is new or _really_ old + token = self.config.get("DEFAULT", "token") + + if token is not None: + # there's a token in the config - configure that user + u = _do_get_request( + self.base_url, "/profile", token=token, exit_on_error=False + ) + + if "errors" in u: + # this token was bad - reconfigure + self.configure() + return + + # setup config for this user + username = u["username"] + + self.config.set("DEFAULT", "default-user", username) + self.config.add_section(username) + self.config.set(username, "token", token) + self.config.set( + username, "region", self.config.get("DEFAULT", "region") + ) + self.config.set( + username, "type", self.config.get("DEFAULT", "type") + ) + self.config.set( + username, "image", self.config.get("DEFAULT", "image") + ) + self.config.set( + username, + "authorized_keys", + self.config.get("DEFAULT", "authorized_keys"), + ) + + self.write_config() + else: + # got nothin', reconfigure + self.configure() + + # this should be handled + return + + # more than one user - prompt for the default + print("Please choose the active user. Configured users are:") + for u in users: + print(f" {u}") + print() + + while True: + username = input("Active user: ") + + if username in users: + self.config.set("DEFAULT", "default-user", username) + self.write_config() + return + print(f"No user {username}") diff --git a/linodecli/helpers.py b/linodecli/helpers.py new file mode 100644 index 00000000..893240bb --- /dev/null +++ b/linodecli/helpers.py @@ -0,0 +1,53 @@ +""" +Various helper functions shared across multiple CLI components. +""" + +import os +import re +from urllib.parse import urlparse + +API_HOST_OVERRIDE = os.getenv("LINODE_CLI_API_HOST") +API_VERSION_OVERRIDE = os.getenv("LINODE_CLI_API_VERSION") +API_SCHEME_OVERRIDE = os.getenv("LINODE_CLI_API_SCHEME") + + +def handle_url_overrides(url): + """ + Returns the URL with the API URL environment overrides applied. + """ + + parsed_url = urlparse(url) + + overrides = { + "netloc": API_HOST_OVERRIDE, + "path": API_VERSION_OVERRIDE, + "scheme": API_SCHEME_OVERRIDE, + } + + # Apply overrides + return parsed_url._replace( + **{k: v for k, v in overrides.items() if v is not None} + ).geturl() + + +def filter_markdown_links(text): + """ + Returns the given text with Markdown links converted to human-readable links. + """ + + result = text + + # Find all Markdown links + r = re.compile(r"\[(?P.*?)]\((?P.*?)\)") + + for match in r.finditer(text): + url = match.group("link") + + # Expand the URL if necessary + if url.startswith("/"): + url = f"https://linode.com{url}" + + # Replace with more readable text + result = result.replace(match.group(), f"{match.group('text')} ({url})") + + return result diff --git a/linodecli/operation.py b/linodecli/operation.py index d87de172..f72739d1 100644 --- a/linodecli/operation.py +++ b/linodecli/operation.py @@ -9,6 +9,8 @@ from getpass import getpass from os import environ, path +from linodecli.helpers import handle_url_overrides + def parse_boolean(value): """ @@ -183,7 +185,8 @@ def url(self): """ Returns the full URL for this resource based on servers and endpoint """ - base_url = self.servers[0] + base_url = handle_url_overrides(self.servers[0]) + return base_url + "/" + self._url def parse_args( @@ -219,7 +222,9 @@ def parse_args( ) else: parser.add_argument( - "--" + attr.name, type=expected_type, metavar=attr.name + "--" + attr.name, + type=expected_type, + metavar=attr.name, ) elif self.method in ("post", "put"): @@ -242,10 +247,15 @@ def parse_args( ) list_items.append((arg.path, arg.list_item)) else: - if arg.arg_type == "string" and arg.arg_format == "password": + if ( + arg.arg_type == "string" + and arg.arg_format == "password" + ): # special case - password input parser.add_argument( - "--" + arg.path, nargs="?", action=PasswordPromptAction + "--" + arg.path, + nargs="?", + action=PasswordPromptAction, ) elif arg.arg_type == "string" and arg.arg_format in ( "file", @@ -260,7 +270,9 @@ def parse_args( ) else: parser.add_argument( - "--" + arg.path, metavar=arg.name, type=TYPES[arg.arg_type] + "--" + arg.path, + metavar=arg.name, + type=TYPES[arg.arg_type], ) parsed = parser.parse_args(args) diff --git a/linodecli/output.py b/linodecli/output.py index f97d9362..551a29e0 100644 --- a/linodecli/output.py +++ b/linodecli/output.py @@ -2,6 +2,7 @@ Handles formatting the output of commands used in Linode CLI """ import json +import sys from enum import Enum from sys import stdout @@ -19,7 +20,7 @@ class OutputMode(Enum): markdown = 4 -class OutputHandler: # pylint: disable=too-few-public-methods +class OutputHandler: # pylint: disable=too-few-public-methods,too-many-instance-attributes """ Handles formatting the output of commands used in Linode CLI """ @@ -31,12 +32,21 @@ def __init__( # pylint: disable=too-many-arguments headers=True, pretty_json=False, columns=None, + disable_truncation=False, + truncation_length=64, + suppress_warnings=False, ): self.mode = mode self.delimiter = delimiter self.pretty_json = pretty_json self.headers = headers self.columns = columns + self.disable_truncation = disable_truncation + self.truncation_length = truncation_length + self.suppress_warnings = suppress_warnings + + # Used to track whether a warning has already been printed + self.has_warned = False def print( self, response_model, data, title=None, to=stdout, columns=None @@ -53,20 +63,31 @@ def print( :param columns: The columns to display :type columns: list[str] """ + + # We need to use lambdas here since we don't want unused function params + output_mode_to_func = { + OutputMode.table: lambda: self._table_output( + header, data, columns, title, to + ), + OutputMode.delimited: lambda: self._delimited_output( + header, data, columns, to + ), + OutputMode.json: lambda: self._json_output(header, data, to), + OutputMode.markdown: lambda: self._markdown_output( + header, data, columns, to + ), + } + if columns is None: columns = self._get_columns(response_model) header = [c.column_name for c in columns] else: header = columns - if self.mode == OutputMode.table: - self._table_output(header, data, columns, title, to) - elif self.mode == OutputMode.delimited: - self._delimited_output(header, data, columns, to) - elif self.mode == OutputMode.json: - self._json_output(header, data, to) - elif self.mode == OutputMode.markdown: - self._markdown_output(header, data, columns) + if self.mode not in output_mode_to_func: + raise RuntimeError(f"Unknown output mode: {self.mode}") + + output_mode_to_func[self.mode]() def _get_columns(self, response_model): """ @@ -75,7 +96,9 @@ def _get_columns(self, response_model): if self.columns is None: columns = [ attr - for attr in sorted(response_model.attrs, key=lambda c: c.display) + for attr in sorted( + response_model.attrs, key=lambda c: c.display + ) if attr.display ] elif self.columns == "*": @@ -102,16 +125,14 @@ def _table_output( """ Pretty-prints data in a table """ - content = [] - - if isinstance(columns[0], str): - content = data - else: - for model in data: - content.append([attr.render_value(model) for attr in columns]) - - if self.headers: - content = [header] + content + content = self._build_output_content( + data, + columns, + header=header, + value_transform=lambda attr, v: self._attempt_truncate_value( + attr.render_value(v) + ), + ) tab = SingleTable(content) @@ -127,16 +148,12 @@ def _delimited_output(self, header, data, columns, to): """ Prints data in delimited format with the given delimiter """ - content = [] - - if isinstance(columns[0], str): - content = data - else: - for model in data: - content.append([attr.get_string(model) for attr in columns]) - - if self.headers: - content = [header] + content + content = self._build_output_content( + data, + columns, + header=header, + value_transform=lambda attr, v: attr.get_string(v), + ) for row in content: print(self.delimiter.join(row), file=to) @@ -163,7 +180,8 @@ def _json_output(self, header, data, to): file=to, ) - def _select_json_elements(self, keys, json_res): + @staticmethod + def _select_json_elements(keys, json_res): """ Returns a dict filtered down to include only the selected keys. Walks paths to handle nested dicts @@ -173,29 +191,76 @@ def _select_json_elements(self, keys, json_res): if k in keys: ret[k] = v elif isinstance(v, dict): - v = self._select_json_elements(keys, v) + v = OutputHandler._select_json_elements(keys, v) if v: ret[k] = v return ret - def _markdown_output(self, header, data, columns): + def _markdown_output(self, header, data, columns, to): """ Pretty-prints data in a Markdown-formatted table. This uses github's flavor of Markdown """ + content = self._build_output_content( + data, + columns, + value_transform=lambda attr, v: self._attempt_truncate_value( + attr.render_value(v, colorize=False) + ), + ) + + if header: + print("| " + " | ".join([str(c) for c in header]) + " |", file=to) + print("|---" * len(header) + "|", file=to) + + for row in content: + print("| " + " | ".join([str(c) for c in row]) + " |", file=to) + + def _build_output_content( + self, + data, + columns, + header=None, + value_transform=lambda attr, model: model, + ): + """ + Returns the `content` to be displayed by the corresponding output function. + `value_transform` allows functions to specify how each value should be formatted. + """ + content = [] + if self.headers and header is not None: + content = [header] + + # We're not using models here + # We won't apply transforms here since no formatting is being applied if isinstance(columns[0], str): - content = data - else: - for model in data: - content.append( - [attr.render_value(model, colorize=False) for attr in columns] - ) + return content + data - if header: - print("| " + " | ".join([str(c) for c in header]) + " |") - print("|---" * len(header) + "|") + for model in data: + content.append([value_transform(attr, model) for attr in columns]) - for row in content: - print("| " + " | ".join([str(c) for c in row]) + " |") + return content + + def _attempt_truncate_value(self, value): + if self.disable_truncation: + return value + + if not isinstance(value, str): + value = str(value) + + if len(value) < self.truncation_length: + return value + + if not self.suppress_warnings and not self.has_warned: + print( + "Certain values in this output have been truncated. " + "To disable output truncation, use --no-truncation. " + "Alternatively, use the --json or --text output modes, " + "or disable warnings using --suppress-warnings.", + file=sys.stderr, + ) + self.has_warned = True + + return f"{value[:self.truncation_length]}..." diff --git a/linodecli/plugins/__init__.py b/linodecli/plugins/__init__.py index 8d7a2a2f..6ec84ede 100644 --- a/linodecli/plugins/__init__.py +++ b/linodecli/plugins/__init__.py @@ -6,6 +6,8 @@ from os import listdir from os.path import dirname +from linodecli.cli import CLI + _available_files = listdir(dirname(__file__)) available_local = [ f[:-3] for f in _available_files if f.endswith(".py") and f != "__init__.py" @@ -64,7 +66,7 @@ class PluginContext: # pylint: disable=too-few-public-methods and the CLI access token the user has provided. """ - def __init__(self, token, client): + def __init__(self, token: str, client: CLI): """ Constructs a new PluginContext with the given information """ diff --git a/linodecli/plugins/firewall-editor.py b/linodecli/plugins/firewall-editor.py index 148cb0c6..d4a1419d 100644 --- a/linodecli/plugins/firewall-editor.py +++ b/linodecli/plugins/firewall-editor.py @@ -5,10 +5,10 @@ import argparse import json import re -import termios import sys +import termios +from ipaddress import IPv4Address, ip_address from typing import Callable -from ipaddress import ip_address, IPv4Address from terminaltables import PorcelainTable @@ -101,7 +101,9 @@ def callback(value): value_int = int(value) - if value_int < 0 or value_int >= len(ref_list) + (1 if allow_append else 0): + if value_int < 0 or value_int >= len(ref_list) + ( + 1 if allow_append else 0 + ): raise ValueError(f"Invalid index {value_int}") return callback @@ -127,7 +129,9 @@ def one_of(valid_choices): """ def callback(value): - if value.lower() not in [choice.lower() for choice in valid_choices]: + if value.lower() not in [ + choice.lower() for choice in valid_choices + ]: raise ValueError( f"Invalid choice: {value}; must be one " f"of {', '.join(valid_choices)}" @@ -154,7 +158,9 @@ def callback(value): ) if not ip_parts[1].isnumeric(): - raise ValueError(f"Invalid IP: {ip}; IP masks must be numeric") + raise ValueError( + f"Invalid IP: {ip}; IP masks must be numeric" + ) try: ip_address(ip_parts[0]) @@ -188,13 +194,17 @@ def _get_firewall(firewall_id, client): """ Returns the firewall object with the given ID """ - code, firewall = client.call_operation("firewalls", "view", args=[firewall_id]) + code, firewall = client.call_operation( + "firewalls", "view", args=[firewall_id] + ) if code != 200: print(f"Error retrieving firewall: {code}") sys.exit(1) - code, rules = client.call_operation("firewalls", "rules-list", args=[firewall_id]) + code, rules = client.call_operation( + "firewalls", "rules-list", args=[firewall_id] + ) if code != 200: print(f"Error retrieving firewall rules: {code}") @@ -436,7 +446,8 @@ def remove_rule(rules): return False ind_str = InputValidation.input( - "Index to remove: ", InputValidation.optional(InputValidation.index_of(change)) + "Index to remove: ", + InputValidation.optional(InputValidation.index_of(change)), ).strip() # No changes to be made @@ -457,14 +468,16 @@ def swap_rules(rules): change = InputValidation.input_io(rules) a_str = InputValidation.input( - "Swap index: ", InputValidation.optional(InputValidation.index_of(change)) + "Swap index: ", + InputValidation.optional(InputValidation.index_of(change)), ).strip() if a_str == "": return False b_str = InputValidation.input( - "With index: ", InputValidation.optional(InputValidation.index_of(change)) + "With index: ", + InputValidation.optional(InputValidation.index_of(change)), ).strip() if b_str == "": @@ -483,7 +496,9 @@ def toggle_policy(policy_key): """ def callback(rules): - rules[policy_key] = "DROP" if rules[policy_key] == "ACCEPT" else "ACCEPT" + rules[policy_key] = ( + "DROP" if rules[policy_key] == "ACCEPT" else "ACCEPT" + ) return True diff --git a/linodecli/plugins/image-upload.py b/linodecli/plugins/image-upload.py index 781fdc28..42a66a22 100644 --- a/linodecli/plugins/image-upload.py +++ b/linodecli/plugins/image-upload.py @@ -81,7 +81,8 @@ def call(args, context): "--label", metavar="LABEL", nargs="?", - help="Label for the new Image. If omitted, the filename " "will be used.", + help="Label for the new Image. If omitted, the filename " + "will be used.", ) parser.add_argument( "--description", @@ -126,11 +127,15 @@ def call(args, context): # make sure it's not larger than the max upload size if os.path.getsize(filepath) > MAX_UPLOAD_SIZE: - print(f"File {filepath} is too large; compressed size must be less than 5GB") + print( + f"File {filepath} is too large; compressed size must be less than 5GB" + ) sys.exit(2) if not parsed.region: - print("No region provided. Please set a default region or use --region") + print( + "No region provided. Please set a default region or use --region" + ) sys.exit(1) label = parsed.label or os.path.basename(filepath) diff --git a/linodecli/plugins/obj.py b/linodecli/plugins/obj.py index ff9180bc..fa380f65 100644 --- a/linodecli/plugins/obj.py +++ b/linodecli/plugins/obj.py @@ -2,7 +2,6 @@ """ CLI Plugin for handling OBJ """ -import argparse import getpass import glob import math @@ -11,11 +10,19 @@ import socket import sys import time +from argparse import ArgumentParser, ArgumentTypeError +from contextlib import suppress from datetime import datetime from math import ceil +from typing import List from terminaltables import SingleTable +from linodecli.cli import CLI +from linodecli.configuration import _do_get_request +from linodecli.configuration.helpers import _default_thing_input +from linodecli.plugins import PluginContext + ENV_ACCESS_KEY_NAME = "LINODE_CLI_OBJ_ACCESS_KEY" ENV_SECRET_KEY_NAME = "LINODE_CLI_OBJ_SECRET_KEY" @@ -82,9 +89,9 @@ def restricted_int(string): except ValueError as e: # argparse can handle ValueErrors, but shows an unfriendly "invalid restricted_int # value: '0.1'" message, so catch and raise with a better message. - raise argparse.ArgumentTypeError(err_msg) from e + raise ArgumentTypeError(err_msg) from e if value < min or value > max: - raise argparse.ArgumentTypeError(err_msg) + raise ArgumentTypeError(err_msg) return value return restricted_int @@ -94,7 +101,7 @@ def list_objects_or_buckets(get_client, args): """ Lists buckets or objects """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " ls") + parser = ArgumentParser(PLUGIN_BASE + " ls") parser.add_argument( "bucket", @@ -132,7 +139,11 @@ def list_objects_or_buckets(get_client, args): size = c.size # pylint: disable-next=redefined-outer-name - datetime = _convert_datetime(c.last_modified) if size != "DIR" else " " * 16 + datetime = ( + _convert_datetime(c.last_modified) + if size != "DIR" + else " " * 16 + ) data.append([datetime, size, c.name]) @@ -157,10 +168,13 @@ def create_bucket(get_client, args): """ Creates a new bucket """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " mb") + parser = ArgumentParser(PLUGIN_BASE + " mb") parser.add_argument( - "name", metavar="NAME", type=str, help="The name of the bucket to create." + "name", + metavar="NAME", + type=str, + help="The name of the bucket to create.", ) parsed = parser.parse_args(args) @@ -176,10 +190,13 @@ def delete_bucket(get_client, args): """ Deletes a bucket """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " rb") + parser = ArgumentParser(PLUGIN_BASE + " rb") parser.add_argument( - "name", metavar="NAME", type=str, help="The name of the bucket to remove." + "name", + metavar="NAME", + type=str, + help="The name of the bucket to remove.", ) parser.add_argument( "--recursive", @@ -214,18 +231,22 @@ def upload_object(get_client, args): # pylint: disable=too-many-locals """ Uploads an object to object storage """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " put") + parser = ArgumentParser(PLUGIN_BASE + " put") parser.add_argument( "file", metavar="FILE", type=str, nargs="+", help="The files to upload." ) parser.add_argument( - "bucket", metavar="BUCKET", type=str, help="The bucket to put a file in." + "bucket", + metavar="BUCKET", + type=str, + help="The bucket to put a file in.", ) parser.add_argument( "--acl-public", action="store_true", - help="If set, the new object can be downloaded without " "authentication.", + help="If set, the new object can be downloaded without " + "authentication.", ) parser.add_argument( "--chunk-size", @@ -288,10 +309,14 @@ def upload_object(get_client, args): # pylint: disable=too-many-locals k.key = filename print(filename) - k.set_contents_from_filename(file_path, cb=_progress, num_cb=100, policy=policy) + k.set_contents_from_filename( + file_path, cb=_progress, num_cb=100, policy=policy + ) for filename, file_path, file_size in to_multipart_upload: - _do_multipart_upload(bucket, filename, file_path, file_size, policy, chunk_size) + _do_multipart_upload( + bucket, filename, file_path, file_size, policy, chunk_size + ) print("Done.") @@ -345,8 +370,7 @@ def _do_multipart_upload( time.sleep(retry_delay) continue raise - else: - break + break except Exception: print("Upload failed! Cleaning up!") upload.cancel_upload() @@ -359,7 +383,7 @@ def get_object(get_client, args): """ Retrieves an uploaded object and writes it to a file """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " get") + parser = ArgumentParser(PLUGIN_BASE + " get") parser.add_argument( "bucket", metavar="BUCKET", type=str, help="The bucket the file is in." @@ -407,7 +431,7 @@ def delete_object(get_client, args): """ Removes a file from a bucket """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " del") + parser = ArgumentParser(PLUGIN_BASE + " del") parser.add_argument( "bucket", metavar="BUCKET", type=str, help="The bucket to delete from." @@ -442,10 +466,13 @@ def generate_url(get_client, args): """ Generates a URL to an object """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " signurl") + parser = ArgumentParser(PLUGIN_BASE + " signurl") parser.add_argument( - "bucket", metavar="BUCKET", type=str, help="The bucket containing the object." + "bucket", + metavar="BUCKET", + type=str, + help="The bucket containing the object.", ) parser.add_argument( "file", metavar="OBJECT", type=str, help="The object to sign a URL to." @@ -493,7 +520,7 @@ def set_acl(get_client, args): """ Modify Access Control List for a Bucket or Objects """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " setacl") + parser = ArgumentParser(PLUGIN_BASE + " setacl") parser.add_argument( "bucket", metavar="BUCKET", type=str, help="The bucket to modify." @@ -512,7 +539,9 @@ def set_acl(get_client, args): help="If given, makes the target publicly readable.", ) parser.add_argument( - "--acl-private", action="store_true", help="If given, makes the target private." + "--acl-private", + action="store_true", + help="If given, makes the target private.", ) parsed = parser.parse_args(args) @@ -553,7 +582,7 @@ def enable_static_site(get_client, args): """ Turns a bucket into a static website """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " ws-create") + parser = ArgumentParser(PLUGIN_BASE + " ws-create") parser.add_argument( "bucket", @@ -597,7 +626,7 @@ def static_site_info(get_client, args): """ Returns info about a configured static site """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " ws-info") + parser = ArgumentParser(PLUGIN_BASE + " ws-info") parser.add_argument( "bucket", @@ -632,7 +661,7 @@ def show_usage(get_client, args): """ Shows space used by all buckets in this cluster, and total space """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " du") + parser = ArgumentParser(PLUGIN_BASE + " du") parser.add_argument( "bucket", @@ -668,7 +697,9 @@ def show_usage(get_client, args): total = _denominate(total) - tab = _borderless_table([[_pad_to(total, length=7), f"{num} objects", b.name]]) + tab = _borderless_table( + [[_pad_to(total, length=7), f"{num} objects", b.name]] + ) print(tab.table) if len(buckets) > 1: @@ -694,10 +725,14 @@ def _denominate(total): return total -def list_all_objects(get_client): +def list_all_objects(get_client, args): """ Lists all objects in all buckets """ + # this is for printing help when --help is in the args + parser = ArgumentParser(PLUGIN_BASE + " la") + parser.parse_args(args) + client = get_client() # all buckets @@ -725,7 +760,7 @@ def disable_static_site(get_client, args): """ Disables static site for a bucket """ - parser = argparse.ArgumentParser(PLUGIN_BASE + " du") + parser = ArgumentParser(PLUGIN_BASE + " du") parser.add_argument( "bucket", @@ -770,22 +805,39 @@ def disable_static_site(get_client, args): } -def call(args, context): # pylint: disable=too-many-branches,too-many-statements +def print_help(parser: ArgumentParser): """ - This is called when the plugin is invoked + Print out the help info to the standard output. """ - if not HAS_BOTO: - # we can't do anything - ask for an install - pip_version = "pip3" if sys.version[0] == 3 else "pip" + parser.print_help() - print( - "This plugin requires the 'boto' module. Please install it by running " - f"'{pip_version} install boto'" - ) + # additional help + print() + print("Available commands: ") + + command_help_map = [ + [name, func.__doc__.strip()] + for name, func in sorted(COMMAND_MAP.items()) + ] + + tab = SingleTable(command_help_map) + tab.inner_heading_row_border = False + print(tab.table) + print() + print( + "Additionally, you can regenerate your Object Storage keys using the " + "'regenerate-keys' command or configure defaults for the plugin using " + "the 'configure' command." + ) + print() + print("See --help for individual commands for more information") - sys.exit(2) # requirements not met - we can't go on - parser = argparse.ArgumentParser(PLUGIN_BASE, add_help=False) +def get_obj_args_parser(): + """ + Initialize and return the argument parser for the obj plug-in. + """ + parser = ArgumentParser(PLUGIN_BASE, add_help=False) parser.add_argument( "command", metavar="COMMAND", @@ -799,50 +851,44 @@ def call(args, context): # pylint: disable=too-many-branches,too-many-statement type=str, help="The cluster to use for the operation", ) + return parser + + +def call( + args: List[str], context: PluginContext +): # pylint: disable=too-many-branches,too-many-statements + """ + This is called when the plugin is invoked + """ + if not HAS_BOTO: + # we can't do anything - ask for an install + print( + "This plugin requires the 'boto' module. Please install it by running " + "'pip3 install boto' or 'pip install boto'" + ) + sys.exit(2) # requirements not met - we can't go on + + parser = get_obj_args_parser() parsed, args = parser.parse_known_args(args) # don't mind --no-defaults if it's there; the top-level parser already took care of it - try: + with suppress(ValueError): args.remove("--no-defaults") - except ValueError: - pass if not parsed.command: - # show help if invoked with no command - parser.print_help() - - # additional help - print() - print("Available commands: ") - - command_help_map = [ - [name, func.__doc__.strip()] for name, func in sorted(COMMAND_MAP.items()) - ] - - tab = SingleTable(command_help_map) - tab.inner_heading_row_border = False - print(tab.table) - print() - print( - "Additionally, you can regenerate your Object Storage keys using the " - "'regenerate-keys' command or configure defaults for the plugin using " - "the 'configure' command." - ) - print() - print("See --help for individual commands for more information") - + print_help(parser) sys.exit(0) # make a client, but only if we weren't printing help access_key, secret_key = ( - os.environ.get(ENV_ACCESS_KEY_NAME, None), - os.environ.get(ENV_SECRET_KEY_NAME, None), + os.getenv(ENV_ACCESS_KEY_NAME, None), + os.getenv(ENV_SECRET_KEY_NAME, None), ) if not "--help" in args: - if access_key and not secret_key or secret_key and not access_key: + if bool(access_key) != bool(secret_key): print( f"You must set both {ENV_ACCESS_KEY_NAME} and {ENV_SECRET_KEY_NAME}, or neither" ) @@ -933,7 +979,7 @@ def _get_boto_client(cluster, access_key, secret_key): return client -def _get_s3_creds(client, force=False): +def _get_s3_creds(client: CLI, force: bool = False): """ Retrieves stored s3 creds for the acting user from the config, or generates new creds using the client and stores them if none exist @@ -1022,23 +1068,25 @@ def _get_s3_creds(client, force=False): client.config.plugin_set_value("access-key", access_key) client.config.plugin_set_value("secret-key", secret_key) - client.config.write_config(silent=True) + client.config.write_config() return access_key, secret_key -def _configure_plugin(client): +def _configure_plugin(client: CLI): """ Configures a default cluster value. """ clusters = [ c["id"] - for c in client.config._do_get_request( # pylint: disable=protected-access - "/object-storage/clusters", token=client.config.get_value("token") + for c in _do_get_request( # pylint: disable=protected-access + client.config.base_url, + "/object-storage/clusters", + token=client.config.get_value("token"), )["data"] ] - cluster = client.config._default_thing_input( # pylint: disable=protected-access + cluster = _default_thing_input( # pylint: disable=protected-access "Configure a default Cluster for operations.", clusters, "Default Cluster: ", @@ -1086,10 +1134,14 @@ def _convert_datetime(datetime_str): """ Given a string in INCOMING_DATE_FORMAT, returns a string in DATE_FORMAT """ - return datetime.strptime(datetime_str, INCOMING_DATE_FORMAT).strftime(DATE_FORMAT) + return datetime.strptime(datetime_str, INCOMING_DATE_FORMAT).strftime( + DATE_FORMAT + ) -def _pad_to(val, length=10, right_align=False): # pylint: disable=unused-argument +def _pad_to( + val, length=10, right_align=False +): # pylint: disable=unused-argument """ Pads val to be at minimum length characters long """ diff --git a/linodecli/response.py b/linodecli/response.py index 11b968e4..f9541189 100644 --- a/linodecli/response.py +++ b/linodecli/response.py @@ -61,7 +61,13 @@ class ModelAttr: # pylint: disable=too-many-instance-attributes """ def __init__( # pylint: disable=too-many-arguments - self, name, filterable, display, datatype, color_map=None, item_type=None + self, + name, + filterable, + display, + datatype, + color_map=None, + item_type=None, ): self.name = name self.value = None @@ -144,54 +150,62 @@ def fix_json(self, json): # pylint: disable=too-many-branches """ Takes JSON from the API and formats it into a list of rows """ - if self.rows: # pylint: disable=no-else-return - # take the columns as specified - ret = [] - for c in self.rows: - cur = json - for part in c.split("."): - cur = cur.get(part) - - if not cur: - # probably shouldn't happen, but ok - continue - - if isinstance(cur, list): - ret += cur - else: - ret.append(cur) - - # we're good - return ret - elif self.nested_list: - # we need to explode the rows into one row per entry in the nested list, - # copying the external values - if "pages" in json: - json = json["data"] - - nested_lists = [c.strip() for c in self.nested_list.split(",")] - ret = [] - - for nested_list in nested_lists: - path_parts = nested_list.split(".") - - if not isinstance(json, list): - json = [json] - for cur in json: - - nlist_path = cur - for p in path_parts: - nlist_path = nlist_path.get(p) - nlist = nlist_path - - for item in nlist: - cobj = {k: v for k, v in cur.items() if k != path_parts[0]} - cobj["_split"] = path_parts[-1] - cobj[path_parts[0]] = item - ret.append(cobj) - - return ret - elif "pages" in json: + if self.rows: + return self._fix_json_rows(json) + + if self.nested_list: + return self._fix_json_nested_list(json) + + if "pages" in json: return json["data"] - else: - return [json] + + return [json] + + def _fix_json_rows(self, json): + # take the columns as specified + ret = [] + for c in self.rows: + cur = json + for part in c.split("."): + cur = cur.get(part) + + if not cur: + # probably shouldn't happen, but ok + continue + + if isinstance(cur, list): + ret += cur + else: + ret.append(cur) + + # we're good + return ret + + def _fix_json_nested_list(self, json): + # we need to explode the rows into one row per entry in the nested list, + # copying the external values + if "pages" in json: + json = json["data"] + + nested_lists = [c.strip() for c in self.nested_list.split(",")] + ret = [] + + for nested_list in nested_lists: + path_parts = nested_list.split(".") + + if not isinstance(json, list): + json = [json] + + for cur in json: + nlist_path = cur + for p in path_parts: + nlist_path = nlist_path.get(p) + nlist = nlist_path + + for item in nlist: + cobj = {k: v for k, v in cur.items() if k != path_parts[0]} + cobj["_split"] = path_parts[-1] + cobj[path_parts[0]] = item + ret.append(cobj) + + return ret diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..33db9966 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" + +[tool.isort] +profile = "black" + +[tool.black] +line-length = 80 +target-version = ["py37", "py38", "py39", "py310", "py311"] + +[tool.autoflake] +expand-star-imports = true +ignore-init-module-imports = true +ignore-pass-after-docstring = true +in-place = true +recursive = true +remove-all-unused-imports = true +remove-duplicate-keys = true diff --git a/requirements-dev.txt b/requirements-dev.txt index 7fb0ea15..ad6f085b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1 +1,6 @@ -pylint +pylint==2.16.1 +pytest==7.2.1 +black>=23.1.0 +isort>=5.12.0 +autoflake>=2.0.1 +requests-mock==1.10.0 diff --git a/setup.py b/setup.py index fa568000..e350b227 100755 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ def bake_version(v): packages=[ "linodecli", "linodecli.plugins", + "linodecli.configuration", ], license="BSD 3-Clause License", install_requires=[ diff --git a/test/cli/help.bats b/test/cli/help.bats new file mode 100644 index 00000000..e7d7c205 --- /dev/null +++ b/test/cli/help.bats @@ -0,0 +1,38 @@ +#!/usr/bin/env bats + +load '../test_helper/bats-support/load' +load '../test_helper/bats-assert/load' +load '../common' + +# ################################################################## +# # WARNING: THIS TEST WILL DELETE ALL OF YOUR LINODES # +# # WARNING: USE A SEPARATE TEST ACCOUNT WHEN RUNNING THESE TESTS # +# ################################################################## + +setup() { + suiteName="help" + setToken "$suiteName" + export timestamp=$(date +%s) + clean_linodes="FALSE" +} + +@test "it should display a help page for non-aliased actions" { + run linode-cli linodes list --help + + assert_success + assert_output --partial "Linodes List" + assert_output --partial "API Documentation: https://www.linode.com/docs/api/linode-instances/#linodes-list" + assert_output --partial "You may filter results with:" + assert_output --partial "--tags" +} + +@test "it should display a help page for aliased actions" { + run linode-cli linodes ls --help + + assert_success + assert_output --partial "Linodes List" + assert_output --partial "API Documentation: https://www.linode.com/docs/api/linode-instances/#linodes-list" + assert_output --partial "You may filter results with:" + assert_output --partial "--tags" +} + diff --git a/test/cli/host-overrides.bats b/test/cli/host-overrides.bats new file mode 100644 index 00000000..1e323a50 --- /dev/null +++ b/test/cli/host-overrides.bats @@ -0,0 +1,41 @@ +#!/usr/bin/env bats + +load '../test_helper/bats-support/load' +load '../test_helper/bats-assert/load' +load '../common' + +# ################################################################## +# # WARNING: THIS TEST WILL DELETE ALL OF YOUR LINODES # +# # WARNING: USE A SEPARATE TEST ACCOUNT WHEN RUNNING THESE TESTS # +# ################################################################## + +setup() { + suiteName="help" + setToken "$suiteName" + export timestamp=$(date +%s) + clean_linodes="FALSE" +} + +@test "it should fail to access an invalid host" { + export LINODE_CLI_API_HOST=wrongapi.linode.com + run linode-cli linodes ls + + assert_failure + assert_output --partial "wrongapi.linode.com" +} + +@test "it should use v4beta when override is set" { + export LINODE_CLI_API_VERSION=v4beta + run linode-cli linodes ls --debug + + assert_success + assert_output --partial "v4beta" +} + +@test "it should fail to access an invalid api scheme" { + export LINODE_CLI_API_SCHEME=ssh + run linode-cli linodes ls + + assert_failure + assert_output --partial "ssh://" +} diff --git a/test/domains/master-domains.bats b/test/domains/master-domains.bats index 72a1531d..adfacde6 100644 --- a/test/domains/master-domains.bats +++ b/test/domains/master-domains.bats @@ -39,7 +39,7 @@ teardown() { assert_failure assert_output --partial "Request failed: 400" - assert_output --partial "soa_email SOA_Email required when type=master" + assert_output --partial "soa_email soa_email required when type=master" } @test "it should create a master domain" { diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/completion.py b/tests/completion.py new file mode 100644 index 00000000..ba7a1713 --- /dev/null +++ b/tests/completion.py @@ -0,0 +1,91 @@ +#!/usr/local/bin/python3 +""" +Unit tests for linodecli.completion +""" + +import unittest +from unittest.mock import mock_open, patch + +from linodecli import completion + + +class CompletionTests(unittest.TestCase): + """ + Unit tests for linodecli.completion + """ + + ops = {"temp_key": {"temp_action": "description"}} + fish_expected = """# This is a generated file by Linode-CLI! Do not modify! +complete -c linode-cli -n "not __fish_seen_subcommand_from temp_key" -x -a 'temp_key --help' +complete -c linode-cli -n "__fish_seen_subcommand_from temp_key" -x -a 'temp_action --help'""" + bash_expected = """# This is a generated file by Linode-CLI! Do not modify! +_linode_cli() +{ +local cur prev opts +COMPREPLY=() +cur="${COMP_WORDS[COMP_CWORD]}" +prev="${COMP_WORDS[COMP_CWORD-1]}" + +case "${prev}" in + linode-cli) + COMPREPLY=( $(compgen -W "temp_key --help" -- ${cur}) ) + return 0 + ;; + temp_key) + COMPREPLY=( $(compgen -W "temp_action --help" -- ${cur}) ) + return 0 + ;; + *) + ;; +esac +} + +complete -F _linode_cli linode-cli""" + + def test_fish_completion(self): + """ + Test if the fish completion renders correctly + """ + actual = completion.get_fish_completions(self.ops) + self.assertEqual(actual, self.fish_expected) + + def test_bash_completion(self): + """ + Test if the bash completion renders correctly + """ + actual = completion.get_bash_completions(self.ops) + self.assertEqual(actual, self.bash_expected) + + def test_get_completions(self): + """ + Test get_completions for arg parse + """ + actual = completion.get_completions(self.ops, False, "bash") + self.assertEqual(actual, self.bash_expected) + + actual = completion.get_completions(self.ops, False, "fish") + self.assertEqual(actual, self.fish_expected) + + actual = completion.get_completions(self.ops, False, "notrealshell") + self.assertIn("invoke", actual) + + actual = completion.get_completions(self.ops, True, "") + self.assertIn("[SHELL]", actual) + + def test_bake_completions(self): + """ + Test bake_completions write to file + """ + m = mock_open() + with patch("linodecli.completion.open", m, create=True): + new_ops = self.ops + new_ops["_base_url"] = "bloo" + new_ops["_spec_version"] = "berry" + + completion.bake_completions(new_ops) + + self.assertNotIn("_base_url", new_ops) + self.assertNotIn("_spec_version", new_ops) + + m.assert_called_with("linode-cli.sh", "w", encoding="utf-8") + m.return_value.write.assert_called_once_with(self.bash_expected) diff --git a/tests/configuration.py b/tests/configuration.py new file mode 100644 index 00000000..46711556 --- /dev/null +++ b/tests/configuration.py @@ -0,0 +1,282 @@ +#!/usr/local/bin/python3 +""" +Unit tests for linodecli.configuration +""" +import io +import os +import sys +import argparse +import contextlib + +import unittest +from unittest.mock import call, patch, mock_open +import requests_mock + +from linodecli import configuration + +class ConfigurationTests(unittest.TestCase): + """ + Unit tests for linodecli.configuration + """ + + base_url = "https://linode-test.com" + test_token = "cli-dev-token" + mock_config_file = f"""[DEFAULT] +default-user = cli-dev + +[cli-dev] +token = {test_token} +region = us-east +type = g6-nanode-1 +image = linode/alpine3.16 +plugin-testplugin-testkey = plugin-test-value +authorized_users = cli-dev + +[cli-dev2] +token = {test_token}2 +region = us-east +type = g6-nanode-1 +image = linode/alpine3.16 +authorized_users = cli-dev2""" + + def _build_test_config(self, config=mock_config_file, base_url=base_url): + """ + Helper to generate config with mock data + """ + conf = None + with patch('linodecli.configuration.helpers.configparser.open', + mock_open(read_data=config)): + conf = configuration.CLIConfig(base_url) + return conf + + def test_default_username(self): + """ + Test CLIConfig.default_username() with no default user + """ + conf = self._build_test_config() + + self.assertEqual(conf.default_username(), "cli-dev") + + conf.config.remove_option("DEFAULT", "default-user") + self.assertEqual(conf.default_username(), "") + + def test_set_user(self): + """ + Test CLIConfig.set_user({username}) + """ + conf = self._build_test_config() + + f = io.StringIO() + with self.assertRaises(SystemExit) as cm, contextlib.redirect_stdout(f): + conf.set_user("bad_user") + self.assertEqual(cm.exception.code, 1) + self.assertTrue("not configured" in f.getvalue()) + + conf.set_user("cli-dev2") + self.assertEqual(conf.username, "cli-dev2") + + def test_remove_user(self): + """ + Test CLIConfig.remove_user({username}) with default username + """ + conf = self._build_test_config() + + f = io.StringIO() + with self.assertRaises(SystemExit) as cm, contextlib.redirect_stdout(f): + conf.remove_user("cli-dev") + self.assertEqual(cm.exception.code, 1) + self.assertTrue("default user!" in f.getvalue()) + + with patch('linodecli.configuration.open', mock_open()): + conf.remove_user("cli-dev2") + self.assertFalse(conf.config.has_section("cli-dev2")) + + def test_print_users(self): + """ + Test CLIConfig.print_users() + """ + conf = self._build_test_config() + + f = io.StringIO() + with self.assertRaises(SystemExit) as cm, contextlib.redirect_stdout(f): + conf.print_users() + self.assertEqual(cm.exception.code, 0) + self.assertTrue("* cli-dev" in f.getvalue()) + + def test_set_default_user(self): + """ + Test CLIConfig.set_default_user({username}) + """ + conf = self._build_test_config() + + f = io.StringIO() + with self.assertRaises(SystemExit) as cm, contextlib.redirect_stdout(f): + conf.set_default_user("bad_user") + self.assertEqual(cm.exception.code, 1) + self.assertTrue("not configured!" in f.getvalue()) + + with patch('linodecli.configuration.open', mock_open()): + conf.set_default_user("cli-dev2") + self.assertEqual(conf.config.get("DEFAULT", "default-user"), "cli-dev2") + + def test_get_token(self): + """ + Test CLIConfig.get_token() + """ + conf = self._build_test_config() + conf.used_env_token = False + self.assertEqual(conf.get_token(), self.test_token) + + def test_get_value(self): + """ + Test CLIConfig.get_value({key}) + """ + conf = self._build_test_config() + self.assertEqual(conf.get_value("notakey"), None) + self.assertEqual(conf.get_value("region"), "us-east") + + def test_plugin_set_value(self): + """ + Test CLIConfig.plugin_set_value({key}, {value}) + """ + conf = self._build_test_config() + with self.assertRaises(RuntimeError): + conf.plugin_set_value("anykey", "anyvalue") + + conf.running_plugin = "testplugin" + + conf.plugin_set_value("testkey", "newvalue") + actual = conf.config.get("cli-dev", "plugin-testplugin-testkey") + self.assertEqual(actual, "newvalue") + + def test_plugin_get_value(self): + """ + Test CLIConfig.plugin_get_value({key}) + """ + conf = self._build_test_config() + with self.assertRaises(RuntimeError): + conf.plugin_get_value("anykey") + + conf.running_plugin = "testplugin" + + actual = conf.plugin_get_value("badkey") + self.assertEqual(actual, None) + + actual = conf.plugin_get_value("testkey") + self.assertEqual(actual, "plugin-test-value") + + def test_update(self): + """ + Test CLIConfig.update({namespace}, {allowed_defaults}) + """ + conf = self._build_test_config() + + parser = argparse.ArgumentParser() + parser.add_argument('--newkey') + parser.add_argument('--testkey') + parser.add_argument('--authorized_users') + parser.add_argument('--plugin-testplugin-testkey') + ns = parser.parse_args(['--testkey', 'testvalue']) + + update_dict = { + 'newkey': 'newvalue', + 'authorized_users': ['user1'], + 'plugin-testplugin-testkey': 'plugin-value', + } + + f = io.StringIO() + with contextlib.redirect_stdout(f): + result = vars(conf.update(ns, update_dict)) + + self.assertTrue("--no-defaults" in f.getvalue()) + self.assertEqual(result.get("newkey"), "newvalue") + self.assertEqual(result.get("testkey"), "testvalue") + self.assertTrue(isinstance(result.get("authorized_users"), list)) + self.assertFalse(result.get("plugin-testplugin-testkey")) + + f = io.StringIO() + sys.argv.append("--suppress-warnings") + with contextlib.redirect_stdout(f): + result = vars(conf.update(ns, None)) + sys.argv.remove("--suppress-warnings") + + self.assertFalse("--no-defaults" in f.getvalue()) + + def test_write_config(self): + """ + Test CLIConfig.write_config() + """ + conf = self._build_test_config() + + conf.config.set("cli-dev", "type", "newvalue") + m = mock_open() + with patch('builtins.open', m): + conf.write_config() + self.assertIn(call("type = newvalue\n"), m().write.call_args_list) + + def test_configure_no_default_terminal(self): + """ + Test CLIConfig.configure() with + no default user, no environment variables, and no browser + """ + conf = configuration.CLIConfig(self.base_url, skip_config=True) + + def mock_input(prompt): + answers = (a for a in ["1", "1", "1", "1"]) + if 'token' in prompt.lower(): + return "test-token" + return next(answers) + + with (patch('linodecli.configuration.open', mock_open()), + patch('builtins.input', mock_input), + contextlib.redirect_stdout(io.StringIO()), + patch('linodecli.configuration._check_browsers', lambda: False), + patch.dict(os.environ, {}), requests_mock.Mocker() as m): + m.get(f'{self.base_url}/profile', json= {"username": "cli-dev"}) + m.get(f'{self.base_url}/profile/grants', status_code=204) + m.get(f'{self.base_url}/regions', json= {"data":[{"id": "test-region"}]}) + m.get(f'{self.base_url}/linode/types', json= {"data":[{"id": "test-type"}]}) + m.get(f'{self.base_url}/images', json= {"data":[{"id": "test-image"}]}) + m.get(f'{self.base_url}/account/users', + json= {"data":[{"username": "cli-dev", "ssh_keys": "testkey"}]}) + conf.configure() + + self.assertEqual(conf.get_value('type'), 'test-type') + self.assertEqual(conf.get_value('token'), 'test-token') + self.assertEqual(conf.get_value('image'), 'test-image') + self.assertEqual(conf.get_value('region'), 'test-region') + self.assertEqual(conf.get_value('authorized_users'), 'cli-dev') + + def test_configure_default_terminal(self): + """ + Test CLIConfig.configure() with + a default user, token in environment, and no browser + """ + conf = configuration.CLIConfig(self.base_url, skip_config=True) + + def mock_input(prompt): + if not prompt: + return None + answers = (a for a in ["1", "1", "1", "1"]) + return next(answers) + + with (patch('linodecli.configuration.open', mock_open()), + patch('builtins.input', mock_input), + contextlib.redirect_stdout(io.StringIO()), + patch('linodecli.configuration._check_browsers', lambda: False), + patch.dict(os.environ, {"LINODE_CLI_TOKEN": "test-token"}), + requests_mock.Mocker() as m): + m.get(f'{self.base_url}/profile', json= {"username": "cli-dev"}) + m.get(f'{self.base_url}/profile/grants', status_code=204) + m.get(f'{self.base_url}/regions', json= {"data":[{"id": "test-region"}]}) + m.get(f'{self.base_url}/linode/types', json= {"data":[{"id": "test-type"}]}) + m.get(f'{self.base_url}/images', json= {"data":[{"id": "test-image"}]}) + m.get(f'{self.base_url}/account/users', + json= {"data":[{"username": "cli-dev", "ssh_keys": "testkey"}]}) + conf.configure() + + self.assertEqual(conf.get_value('type'), 'test-type') + self.assertEqual(conf.get_value('image'), 'test-image') + self.assertEqual(conf.get_value('region'), 'test-region') + self.assertEqual(conf.get_value('authorized_users'), 'cli-dev') + self.assertEqual(conf.config.get('DEFAULT', 'default-user'), 'DEFAULT') diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..e52687fa --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,157 @@ +import configparser +import sys + +import pytest + +from linodecli import CLIArg, CLIOperation, ModelAttr, ResponseModel, URLParam +from linodecli.cli import CLI + +MOCK_CONFIG = """ +[DEFAULT] +default-user = testuser + +[testuser] +region = us-southeast +image = linode/ubuntu21.10 +token = notafaketoken +type = g6-nanode-1 +""" + + +@pytest.fixture +def mock_cli( + version="DEVELOPMENT", + url="http://localhost", + defaults=True, +): + result = CLI(version, url, skip_config=True) + result.defaults = defaults + result.suppress_warnings = True + + # Let's override the config with a custom one + conf = configparser.ConfigParser() + conf.read_string(MOCK_CONFIG) + + result.config.config = conf + result.config._configured = True + + # very evil pattern :) + # We need this to suppress warnings for operations that don't + # have access to the cli.suppress_warnings attribute. + # e.g. operation defaults + sys.argv.append("--suppress-warnings") + + return result + + +def make_test_operation( + command, + action, + method, + url, + summary, + args, + response_model, + use_params, + use_servers=None, + docs_url="https://localhost/docs", + allowed_defaults=None, + action_aliases=None, +): + if args is None: + args = [ + CLIArg( + "generic_arg", + "string", + "Does something maybe.", + "generic_arg", + None, + ) + ] + + if use_params is None: + use_params = [URLParam("test_param", "integer")] + + if use_servers is None: + use_servers = ["http://localhost"] + + if allowed_defaults is None: + allowed_defaults = [] + + if action_aliases is None: + action_aliases = [] + + return CLIOperation( + command, + action, + method, + url, + summary, + args, + response_model, + use_params, + use_servers, + docs_url=docs_url, + allowed_defaults=allowed_defaults, + action_aliases=action_aliases, + ) + + +@pytest.fixture +def list_operation(): + """ + Creates the following CLI operation: + + linode-cli foo bar --filterable_result [value] + + GET http://localhost/foo/bar + {} + + X-Filter: {"filterable_result": "value"} + """ + + return make_test_operation( + "foo", + "bar", + "get", + "foo/bar", + "get info", + [], + ResponseModel([ModelAttr("filterable_result", True, True, "string")]), + [], + ) + + +@pytest.fixture +def create_operation(): + """ + Creates the following CLI operation: + + linode-cli foo bar --generic_arg [generic_arg] test_param + + POST http://localhost/foo/bar + { + "generic_arg": "[generic_arg]", + "test_param": test_param + } + """ + + return make_test_operation( + "foo", + "bar", + "post", + "foo/bar", + "create something", + [ + CLIArg( + "generic_arg", + "string", + "Does something maybe.", + "generic_arg", + None, + ), + CLIArg("region", "string", "a region", "region", None), + ], + ResponseModel([ModelAttr("result", False, True, "string")]), + [URLParam("test_param", "integer")], + ) diff --git a/tests/test_api_request.py b/tests/test_api_request.py new file mode 100644 index 00000000..d7c6c57d --- /dev/null +++ b/tests/test_api_request.py @@ -0,0 +1,253 @@ +#!/usr/local/bin/python3 +""" +Unit tests for linodecli.api_request +""" +import contextlib +import io +import json +from types import SimpleNamespace +from unittest.mock import Mock, patch + +import requests + +from linodecli import api_request + + +class TestAPIRequest: + """ + Unit tests for linodecli.api_request + """ + + def test_response_debug_info(self): + stderr_buf = io.StringIO() + + mock_response = SimpleNamespace( + raw=SimpleNamespace(version=11.1), + status_code=200, + reason="OK", + headers={"cool": "test"}, + ) + + with contextlib.redirect_stderr(stderr_buf): + api_request._print_response_debug_info(mock_response) + + output = stderr_buf.getvalue() + assert "< HTTP/1.1 200 OK" in output + assert "< cool: test" in output + + def test_request_debug_info(self): + stderr_buf = io.StringIO() + + with contextlib.redirect_stderr(stderr_buf): + api_request._print_request_debug_info( + SimpleNamespace(__name__="get"), + "https://definitely.linode.com/", + {"cool": "test"}, + "cool body", + ) + + output = stderr_buf.getvalue() + assert "> GET https://definitely.linode.com/" in output + assert "> cool: test" in output + assert "> Body:" in output + assert "> cool body" in output + assert "> " in output + + def test_build_request_body(self, mock_cli, create_operation): + create_operation.allowed_defaults = ["region"] + + result = api_request._build_request_body( + mock_cli, + create_operation, + SimpleNamespace(generic_arg="foo", region=None), + ) + + assert ( + json.dumps({"generic_arg": "foo", "region": "us-southeast"}) + == result + ) + + def test_build_request_url_get(self, mock_cli, list_operation): + result = api_request._build_request_url( + mock_cli, list_operation, SimpleNamespace() + ) + + assert "http://localhost/foo/bar?page=1&page_size=100" == result + + def test_build_request_url_post(self, mock_cli, create_operation): + result = api_request._build_request_url( + mock_cli, create_operation, SimpleNamespace() + ) + + assert "http://localhost/foo/bar" == result + + def test_build_filter_header(self, list_operation): + result = api_request._build_filter_header( + list_operation, SimpleNamespace(filterable_result="bar") + ) + + assert json.dumps({"filterable_result": "bar"}) == result + + def test_do_request_get(self, mock_cli, list_operation): + mock_response = Mock(status_code=200, reason="OK") + + def validate_http_request(url, headers=None, data=None): + assert url == "http://localhost/foo/bar?page=1&page_size=100" + assert headers["X-Filter"] == json.dumps( + {"filterable_result": "cool"} + ) + assert "Authorization" in headers + assert data is None + + return mock_response + + with patch("linodecli.api_request.requests.get", validate_http_request): + result = api_request.do_request( + mock_cli, list_operation, ["--filterable_result", "cool"] + ) + + assert result == mock_response + + def test_do_request_post(self, mock_cli, create_operation): + mock_response = Mock(status_code=200, reason="OK") + + def validate_http_request(url, headers=None, data=None): + assert url == "http://localhost/foo/bar" + assert data == json.dumps( + { + "test_param": 12345, + "generic_arg": "foobar", + "region": "us-southeast", # default + } + ) + assert "Authorization" in headers + + return mock_response + + create_operation.allowed_defaults = ["region"] + + with patch( + "linodecli.api_request.requests.post", validate_http_request + ): + result = api_request.do_request( + mock_cli, create_operation, ["--generic_arg", "foobar", "12345"] + ) + + assert result == mock_response + + def test_outdated_cli(self, mock_cli): + # "outdated" version + mock_cli.suppress_warnings = False + mock_cli.version = "1.0.0" + mock_cli.spec_version = "1.0.0" + + # Return a mock response from PyPI + def mock_http_response(url, headers=None, data=None, timeout=1): + assert "pypi.org" in url + + r = requests.Response() + r.status_code = 200 + + def json_func(): + return { + "info": { + # Add a fake new version + "version": "1.1.0" + } + } + + r.json = json_func + return r + + stderr_buf = io.StringIO() + + # Provide a mock Linode API response + mock_response = SimpleNamespace( + status_code=200, reason="OK", headers={"X-Spec-Version": "1.1.0"} + ) + + with contextlib.redirect_stderr(stderr_buf), patch( + "linodecli.api_request.requests.get", mock_http_response + ): + api_request._attempt_warn_old_version(mock_cli, mock_response) + + output = stderr_buf.getvalue() + assert ( + "The API responded with version 1.1.0, which is newer than " + "the CLI's version of 1.0.0. Please update the CLI to get " + "access to the newest features. You can update with a " + "simple `pip3 install --upgrade linode-cli`" in output + ) + + def test_outdated_cli_no_new_version(self, mock_cli): + # "outdated" version + mock_cli.suppress_warnings = False + mock_cli.version = "1.0.0" + mock_cli.spec_version = "1.0.0" + + # Return a mock response from PyPI + def mock_http_response(url, headers=None, data=None, timeout=1): + assert "pypi.org" in url + + r = requests.Response() + r.status_code = 200 + + def json_func(): + return { + "info": { + # No new CLI release :( + "version": "1.0.0" + } + } + + r.json = json_func + return r + + stderr_buf = io.StringIO() + + # Provide a mock Linode API response + mock_response = SimpleNamespace( + status_code=200, reason="OK", headers={"X-Spec-Version": "1.1.0"} + ) + + with contextlib.redirect_stderr(stderr_buf), patch( + "linodecli.api_request.requests.get", mock_http_response + ): + api_request._attempt_warn_old_version(mock_cli, mock_response) + + output = stderr_buf.getvalue() + assert "" == output + + def test_up_to_date_cli(self, mock_cli): + # "up to date" version + mock_cli.suppress_warnings = False + mock_cli.version = "1.0.0" + mock_cli.spec_version = "1.0.0" + + # Return a mock response from PyPI + def mock_http_response(url, headers=None, data=None, timeout=1): + assert "pypi.org" in url + + r = requests.Response() + r.status_code = 200 + + def json_func(): + return {"info": {"version": "1.0.0"}} + + r.json = json_func + return r + + stderr_buf = io.StringIO() + + # Provide a mock Linode API response + mock_response = SimpleNamespace( + status_code=200, reason="OK", headers={"X-Spec-Version": "1.0.0"} + ) + + with contextlib.redirect_stderr(stderr_buf), patch( + "linodecli.api_request.requests.get", mock_http_response + ): + api_request._attempt_warn_old_version(mock_cli, mock_response) + + output = stderr_buf.getvalue() + assert "" == output diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 00000000..27194a89 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,37 @@ +import copy + +import pytest + + +class TestCLI: + """ + Unit tests for linodecli.cli + """ + + def test_find_operation(self, mock_cli, list_operation): + target_operation = list_operation + target_operation.command = "foo" + target_operation.action = "list" + target_operation.action_aliases = ["ls"] + + other_operation = copy.deepcopy(list_operation) + other_operation.command = "cool" + other_operation.action = "list" + other_operation.action_aliases = ["ls"] + + mock_cli.ops = { + "foo": {"list": target_operation}, + "cool": {"list": other_operation}, + } + + assert mock_cli.find_operation("foo", "list") == target_operation + assert mock_cli.find_operation("foo", "ls") == target_operation + assert mock_cli.find_operation("cool", "list") == other_operation + assert mock_cli.find_operation("cool", "ls") == other_operation + + with pytest.raises(ValueError, match=r"Command not found: *"): + mock_cli.find_operation("bad", "list") + + with pytest.raises(ValueError, match=r"No action *"): + mock_cli.find_operation("foo", "cool") + mock_cli.find_operation("cool", "cool") diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 00000000..05de1488 --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,14 @@ +from linodecli.helpers import filter_markdown_links + + +class TestHelpers: + """ + Unit tests for linodecli.helpers + """ + + def test_markdown_links(self): + original_text = "Here's [a relative link](/docs/cool) and [an absolute link](https://cloud.linode.com)." + expected_text = "Here's a relative link (https://linode.com/docs/cool) " \ + "and an absolute link (https://cloud.linode.com)." + + assert filter_markdown_links(original_text) == expected_text diff --git a/tests/test_output.py b/tests/test_output.py new file mode 100644 index 00000000..7b9a99f4 --- /dev/null +++ b/tests/test_output.py @@ -0,0 +1,311 @@ +import contextlib +import io + +from terminaltables import SingleTable + +from linodecli import ModelAttr, OutputMode, ResponseModel + + +class TestOutputHandler: + """ + Unit tests for linodecli.output + """ + + def test_markdown_output_columns(self, mock_cli): + output = io.StringIO() + + output_handler = mock_cli.output_handler + + output_handler._markdown_output( + ["very cool header", "wow"], + [["foo", "bar"], ["oof", "rab"]], + ["1", "2"], + output, + ) + + assert ( + output.getvalue() == "| very cool header | wow |\n" + "|---|---|\n" + "| foo | bar |\n" + "| oof | rab |\n" + ) + + def test_markdown_output_models(self, mock_cli): + output = io.StringIO() + + output_handler = mock_cli.output_handler + + output_handler._markdown_output( + ["very cool header"], + [{"cool": "foo"}, {"cool": "bar"}], + [ModelAttr("cool", True, True, "string")], + output, + ) + + assert ( + output.getvalue() == "| very cool header |\n" + "|---|\n" + "| foo |\n" + "| bar |\n" + ) + + def test_json_output_delimited(self, mock_cli): + output = io.StringIO() + headers = ["foo", "bar"] + data = [{"foo": "cool", "bar": "not cool"}] + + mock_cli.output_handler._json_output(headers, data, output) + + assert '[{"foo": "cool", "bar": "not cool"}]' in output.getvalue() + + def test_json_output_list(self, mock_cli): + output = io.StringIO() + headers = ["foo", "bar"] + data = [["cool", "not cool"]] + + mock_cli.output_handler._json_output(headers, data, output) + + assert '[{"foo": "cool", "bar": "not cool"}]' in output.getvalue() + + def test_select_json_elements(self, mock_cli): + desired_keys = ["foo", "bar", "test"] + + result = mock_cli.output_handler._select_json_elements( + desired_keys, + { + "foo": 12345, + "bad": 5, + "bar": 5, + "good": {"lol": "cool", "test": "reallycoolvalue"}, + "test": 54321, + }, + ) + + assert result == { + "foo": 12345, + "bar": 5, + "good": {"test": "reallycoolvalue"}, + "test": 54321, + } + + def test_delimited_output_columns(self, mock_cli): + output = io.StringIO() + header = ["h1", "h2"] + data = [["foo", "bar"], ["oof", "rab"]] + columns = ["1", "2"] + + mock_cli.output_handler.delimiter = "," + + mock_cli.output_handler._delimited_output(header, data, columns, output) + + assert output.getvalue() == "h1,h2\nfoo,bar\noof,rab\n" + + def test_delimited_output_models(self, mock_cli): + output = io.StringIO() + header = ["h1"] + data = [ + { + "cool": "foo", + }, + {"cool": "bar"}, + ] + columns = [ModelAttr("cool", True, True, "string")] + + mock_cli.output_handler.delimiter = "," + + mock_cli.output_handler._delimited_output(header, data, columns, output) + + assert output.getvalue() == "h1\nfoo\nbar\n" + + def test_table_output_columns(self, mock_cli): + output = io.StringIO() + header = ["h1", "h2"] + data = [["foo", "bar"], ["oof", "rab"]] + columns = ["1", "2"] + + mock_cli.output_handler._table_output( + header, data, columns, "cool table", output + ) + + mock_table = io.StringIO() + tab = SingleTable([["h1", "h2"], ["foo", "bar"], ["oof", "rab"]]) + tab.title = "cool table" + print(tab.table, file=mock_table) + + assert output.getvalue() == mock_table.getvalue() + + def test_table_output_models(self, mock_cli): + output = io.StringIO() + header = ["h1"] + data = [ + { + "cool": "foo", + }, + {"cool": "bar"}, + ] + columns = [ModelAttr("cool", True, True, "string")] + + mock_cli.output_handler._table_output( + header, data, columns, "cool table", output + ) + + mock_table = io.StringIO() + tab = SingleTable([["h1"], ["foo"], ["bar"]]) + tab.title = "cool table" + print(tab.table, file=mock_table) + + assert output.getvalue() == mock_table.getvalue() + + def test_get_columns_from_model(self, mock_cli): + output_handler = mock_cli.output_handler + + response_model = ResponseModel( + [ + ModelAttr("foo", True, True, "string"), + ModelAttr("bar", True, False, "string"), + ] + ) + + result = output_handler._get_columns(response_model) + + assert len(result) == 1 + assert result[0].name == "foo" + + def test_get_columns_from_model_all(self, mock_cli): + output_handler = mock_cli.output_handler + response_model = ResponseModel( + [ + ModelAttr("foo", True, True, "string"), + ModelAttr("bar", True, False, "string"), + ] + ) + + output_handler.columns = "*" + + result = output_handler._get_columns(response_model) + + assert len(result) == 2 + assert result[0].name == "foo" + assert result[1].name == "bar" + + def test_get_columns_from_model_select(self, mock_cli): + output_handler = mock_cli.output_handler + + response_model = ResponseModel( + [ + ModelAttr("foo", True, True, "string"), + ModelAttr("bar", True, False, "string"), + ModelAttr("test", True, False, "string"), + ] + ) + + output_handler.columns = "foo,bar" + + result = output_handler._get_columns(response_model) + + assert len(result) == 2 + assert result[0].name == "foo" + assert result[1].name == "bar" + + # Let's test a single print case + def test_print(self, mock_cli): + output = io.StringIO() + + response_model = ResponseModel( + [ + ModelAttr("foo", True, True, "string"), + ModelAttr("bar", True, True, "string"), + ModelAttr("test", True, False, "string"), + ] + ) + + mock_cli.output_handler.mode = OutputMode.json + + mock_cli.output_handler.print( + response_model, + [{"foo": "blah", "bar": "blah2", "test": "blah3"}], + title="cool table", + to=output, + ) + + assert '[{"foo": "blah", "bar": "blah2"}]' in output.getvalue() + + def test_truncation(self, mock_cli): + stderr_buf = io.StringIO() + test_str = "x" * 80 + test_str_truncated = f"{'x' * 64}..." + + with contextlib.redirect_stderr(stderr_buf): + result = mock_cli.output_handler._attempt_truncate_value(test_str) + + assert "truncation" in stderr_buf.getvalue() + assert result == test_str_truncated + + # --suppress-warnings + # Faster than flushing apparently + stderr_buf = io.StringIO() + mock_cli.output_handler.suppress_warnings = True + + with contextlib.redirect_stderr(stderr_buf): + result = mock_cli.output_handler._attempt_truncate_value(test_str) + + assert "truncation" not in stderr_buf + assert result == test_str_truncated + + # --no-truncation + mock_cli.output_handler.disable_truncation = True + + result = mock_cli.output_handler._attempt_truncate_value(test_str) + + assert result == test_str + + def test_truncated_table(self, mock_cli): + output = io.StringIO() + + test_str = "x" * 80 + test_str_truncated = f"{'x' * 64}..." + + header = ["h1"] + data = [ + { + "cool": test_str, + }, + ] + columns = [ModelAttr("cool", True, True, "string")] + + mock_cli.output_handler._table_output( + header, data, columns, "cool table", output + ) + + data[0]["cool"] = test_str_truncated + + mock_table = io.StringIO() + tab = SingleTable([["h1"], [test_str_truncated]]) + tab.title = "cool table" + print(tab.table, file=mock_table) + + assert output.getvalue() == mock_table.getvalue() + + def test_truncated_markdown(self, mock_cli): + test_str = "x" * 80 + test_str_truncated = f"{'x' * 64}..." + + output = io.StringIO() + + header = ["very cool header"] + data = [ + { + "cool": test_str, + }, + ] + columns = [ModelAttr("cool", True, True, "string")] + + output_handler = mock_cli.output_handler + + output_handler._markdown_output(header, data, columns, output) + + assert ( + output.getvalue() == "| very cool header |\n" + "|---|\n" + f"| {test_str_truncated} |\n" + ) diff --git a/tests/test_plugin_obj.py b/tests/test_plugin_obj.py new file mode 100644 index 00000000..611a5445 --- /dev/null +++ b/tests/test_plugin_obj.py @@ -0,0 +1,14 @@ +from pytest import CaptureFixture + +from linodecli.plugins.obj import get_obj_args_parser, print_help + + +def test_print_help(capsys: CaptureFixture): + parser = get_obj_args_parser() + print_help(parser) + captured_text = capsys.readouterr() + assert parser.format_help() in captured_text.out + assert ( + "See --help for individual commands for more information" + in captured_text.out + ) diff --git a/tests/test_response.py b/tests/test_response.py new file mode 100644 index 00000000..ecd4df6e --- /dev/null +++ b/tests/test_response.py @@ -0,0 +1,56 @@ +from linodecli import ModelAttr, ResponseModel +from linodecli.response import colorize_string + + +class TestOutputHandler: + """ + Unit tests for linodecli.response + """ + + def test_model_fix_json_rows(self): + model = ResponseModel([], rows=["foo.bar", "bar"]) + + result = model.fix_json({"foo": {"bar": 123}, "bar": "cool"}) + + assert result == [123, "cool"] + + def test_model_fix_json_nested(self): + model = ResponseModel([], nested_list="foo.cool") + + result = model.fix_json([{"foo": {"cool": [123, 321]}}]) + + assert result == [ + {"_split": "cool", "foo": 123}, + {"_split": "cool", "foo": 321}, + ] + + def test_colorize_string(self): + result = colorize_string("cool", "yellow") + + assert result == "\x1b[33mcool\x1b[0m" + + def test_attr_get_value(self): + model = {"foo": {"bar": "cool"}} + attr = ModelAttr("foo.bar", True, True, "string") + + result = attr._get_value(model) + + assert result == "cool" + + def test_attr_get_string(self): + model = {"foo": {"bar": ["cool1", "cool2"]}} + attr = ModelAttr("foo.bar", True, True, "string") + + result = attr.get_string(model) + + assert result == "cool1 cool2" + + def test_attr_render_value(self): + model = {"foo": {"bar": ["cool1", "cool2"]}} + attr = ModelAttr( + "foo.bar", True, True, "string", color_map={"default_": "yellow"} + ) + + result = attr.render_value(model) + + assert result == "\x1b[33mcool1, cool2\x1b[0m"