From 9a73bdc85c7ed9b56e7279184811c688493f3770 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 4 Sep 2024 13:26:12 +0200 Subject: [PATCH] add json output to target-query --list --- dissect/target/plugins/general/loaders.py | 19 ++++++-- dissect/target/plugins/general/plugins.py | 56 +++++++++++++++++++---- dissect/target/tools/query.py | 21 ++++++--- tests/plugins/general/test_plugins.py | 8 ++-- tests/tools/test_query.py | 54 ++++++++++++++++++++++ 5 files changed, 133 insertions(+), 25 deletions(-) diff --git a/dissect/target/plugins/general/loaders.py b/dissect/target/plugins/general/loaders.py index 8303c21d5..bbba591d8 100644 --- a/dissect/target/plugins/general/loaders.py +++ b/dissect/target/plugins/general/loaders.py @@ -1,6 +1,8 @@ +import json + from dissect.target.helpers.docs import INDENT_STEP, get_docstring from dissect.target.loader import LOADERS_BY_SCHEME -from dissect.target.plugin import Plugin, export +from dissect.target.plugin import Plugin, arg, export class LoaderListPlugin(Plugin): @@ -10,7 +12,8 @@ def check_compatible(self) -> None: pass @export(output="none") - def loaders(self): + @arg("--json", dest="output_json", action="store_true") + def loaders(self, output_json: bool = False) -> None: """List the available loaders.""" loaders_info = {} @@ -21,6 +24,12 @@ def loaders(self): except ImportError: continue - print("Available loaders:") - for loader_name, loader_description in sorted(loaders_info.items()): - print(f"{INDENT_STEP}{loader_name} - {loader_description}") + loaders = sorted(loaders_info.items()) + + if output_json: + print(json.dumps([{"name": name, "description": desc} for name, desc in loaders]), end="") + + else: + print("Available loaders:") + for loader_name, loader_description in loaders: + print(f"{INDENT_STEP}{loader_name} - {loader_description}") diff --git a/dissect/target/plugins/general/plugins.py b/dissect/target/plugins/general/plugins.py index a3a0e65a1..ac8681a68 100644 --- a/dissect/target/plugins/general/plugins.py +++ b/dissect/target/plugins/general/plugins.py @@ -1,5 +1,8 @@ +from __future__ import annotations + +import json import textwrap -from typing import Dict, List, Type, Union +from typing import Iterator, Type from dissect.target import plugin from dissect.target.helpers.docs import INDENT_STEP, get_plugin_overview @@ -23,7 +26,8 @@ def categorize_plugins(plugins_selection: list[dict] = None) -> dict: return output_dict -def get_exported_plugins(): +def get_exported_plugins() -> list: + """Returns list of exported plugins.""" return [p for p in plugin.plugins() if len(p["exports"])] @@ -50,10 +54,10 @@ def update_dict_recursive(source_dict: dict, updated_dict: dict) -> dict: def output_plugin_description_recursive( - structure_dict: Union[Dict, Plugin], + structure_dict: dict | Plugin, print_docs: bool, - indentation_step=0, -) -> List[str]: + indentation_step: int = 0, +) -> list[str]: """Create plugin overview with identations.""" if isinstance(structure_dict, type) and issubclass(structure_dict, Plugin): @@ -78,10 +82,10 @@ def get_plugin_description( def get_description_dict( - structure_dict: Dict, + structure_dict: dict, print_docs: bool, indentation_step: int, -) -> List[str]: +) -> list[str]: """Returns a list of indented descriptions.""" output_descriptions = [] @@ -103,8 +107,12 @@ def check_compatible(self) -> None: @export(output="none", cache=False) @arg("--docs", dest="print_docs", action="store_true") - def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None: - categorized_plugins = dict(sorted(categorize_plugins(plugins).items())) + @arg("--json", dest="output_json", action="store_true") + def plugins(self, plugins: list[Plugin] = None, print_docs: bool = False, output_json: bool = False) -> None: + """Print all available plugins.""" + + dict_plugins = list({p.path: p.plugin_desc for p in plugins}.values()) + categorized_plugins = dict(sorted(categorize_plugins(dict_plugins).items())) plugin_descriptions = output_plugin_description_recursive(categorized_plugins, print_docs) plugins_list = textwrap.indent( @@ -138,4 +146,32 @@ def plugins(self, plugins: list[dict] = None, print_docs: bool = False) -> None: "Failed to load:", failed_list, ] - print("\n".join(output_lines)) + + if output_json: + out = {"loaded": list(generate_plugins_json(plugins))} + + if failed_plugins := plugin.failed(): + out["failed"] = [ + {"module": p["module"], "stacktrace": "".join(p["stacktrace"])} for p in failed_plugins + ] + + print(json.dumps(out), end="") + + else: + print("\n".join(output_lines)) + + +def generate_plugins_json(plugins: list[Plugin]) -> Iterator[dict]: + """Generates JSON output of a list of :class:`Plugin`s.""" + + for p in plugins: + func = getattr(p.class_object, p.method_name) + description = getattr(func, "__doc__", None) + summary = description.split("\n\n", 1)[0].strip() if description else None + + yield { + "name": p.name, + "output": p.output_type, + "description": summary, + "path": p.path, + } diff --git a/dissect/target/tools/query.py b/dissect/target/tools/query.py index f861b9a26..50a2456af 100644 --- a/dissect/target/tools/query.py +++ b/dissect/target/tools/query.py @@ -170,7 +170,7 @@ def main(): # Show the list of available plugins for the given optional target and optional # search pattern, only display plugins that can be applied to ANY targets if args.list: - collected_plugins = {} + collected_plugins = [] if targets: for plugin_target in Target.open_all(targets, args.children): @@ -178,25 +178,34 @@ def main(): parser.error("can't list compatible plugins for remote targets.") funcs, _ = find_plugin_functions(plugin_target, args.list, compatibility=True, show_hidden=True) for func in funcs: - collected_plugins[func.path] = func.plugin_desc + collected_plugins.append(func) else: funcs, _ = find_plugin_functions(Target(), args.list, compatibility=False, show_hidden=True) for func in funcs: - collected_plugins[func.path] = func.plugin_desc + collected_plugins.append(func) - # Display in a user friendly manner target = Target() fparser = generate_argparse_for_bound_method(target.plugins, usage_tmpl=USAGE_FORMAT_TMPL) fargs, rest = fparser.parse_known_args(rest) + # Display in a user friendly manner if collected_plugins: - target.plugins(list(collected_plugins.values())) + if args.json: + print('{"plugins": ', end="") + target.plugins(collected_plugins, output_json=args.json) # No real targets specified, show the available loaders if not targets: fparser = generate_argparse_for_bound_method(target.loaders, usage_tmpl=USAGE_FORMAT_TMPL) fargs, rest = fparser.parse_known_args(rest) - target.loaders(**vars(fargs)) + del fargs.output_json + if args.json: + print(', "loaders": ', end="") + target.loaders(**vars(fargs), output_json=args.json) + + if args.json: + print("}") + parser.exit() if not targets: diff --git a/tests/plugins/general/test_plugins.py b/tests/plugins/general/test_plugins.py index 0e2acc543..c2745cec1 100644 --- a/tests/plugins/general/test_plugins.py +++ b/tests/plugins/general/test_plugins.py @@ -29,7 +29,7 @@ def test_update_dict(): def test_plugin_description(): description = [x for x in output_plugin_description_recursive(PluginListPlugin, False)] - assert description == ["plugins - No documentation (output: no output)"] + assert description == ["plugins - Print all available plugins. (output: no output)"] def test_plugin_description_compacting(): @@ -39,7 +39,7 @@ def test_plugin_description_compacting(): assert description == [ "hello:", " world:", - " plugins - No documentation (output: no output)", + " plugins - Print all available plugins. (output: no output)", ] @@ -54,9 +54,9 @@ def test_plugin_description_in_dict_multiple(): "hello:", " world:", " data:", - " plugins - No documentation (output: no output)", + " plugins - Print all available plugins. (output: no output)", " data2:", - " plugins - No documentation (output: no output)", + " plugins - Print all available plugins. (output: no output)", ] diff --git a/tests/tools/test_query.py b/tests/tools/test_query.py index c37610a54..a932980c6 100644 --- a/tests/tools/test_query.py +++ b/tests/tools/test_query.py @@ -1,3 +1,4 @@ +import json import os import re from typing import Any, Optional @@ -244,3 +245,56 @@ def test_target_query_dry_run(capsys: pytest.CaptureFixture, monkeypatch: pytest " execute: network.interfaces (general.network.interfaces)\n" " execute: osinfo (general.osinfo.osinfo)\n" ) + + +def test_target_query_list_json(capsys: pytest.CaptureFixture, monkeypatch: pytest.MonkeyPatch) -> None: + """test if target-query --list --json output is formatted as we expect it to be.""" + + with monkeypatch.context() as m: + m.setattr("sys.argv", ["target-query", "-l", "-j"]) + with pytest.raises((SystemExit, IndexError, ImportError)): + target_query() + out, _ = capsys.readouterr() + + try: + output = json.loads(out) + except json.JSONDecodeError: + pass + + # test the generic structure of the returned dictionary. + assert isinstance(output, dict), "Could not load JSON output of 'target-query --list --json'" + assert output["plugins"], "Expected a dictionary of plugins" + assert output["loaders"], "Expected a dictionary of loaders" + assert len(output["plugins"]["loaded"]) > 200, "Expected more loaded plugins" + assert not output["plugins"].get("failed"), "Some plugin(s) failed to initialize" + + def get_plugin(plugins: list[dict], needle: str) -> dict: + match = [p for p in output["plugins"]["loaded"] if p["name"] == needle] + return match[0] if match else False + + # general plugin + users_plugin = get_plugin(output, "users") + assert users_plugin == { + "name": "users", + "description": "Return the users available in the target.", + "output": "record", + "path": "general.default.users", + } + + # namespaced plugin + plocate_plugin = get_plugin(output, "plocate.locate") + assert plocate_plugin == { + "name": "plocate.locate", + "description": "Yield file and directory names from the plocate.db.", + "output": "record", + "path": "os.unix.locate.plocate.locate", + } + + # regular plugin + sam_plugin = get_plugin(output, "sam") + assert sam_plugin == { + "name": "sam", + "description": "Dump SAM entries", + "output": "record", + "path": "os.windows.sam.sam", + }