diff --git a/pydependance/__init__.py b/pydependance/__init__.py new file mode 100644 index 0000000..db51fb6 --- /dev/null +++ b/pydependance/__init__.py @@ -0,0 +1,5 @@ + +from pydependance._core import Import +from pydependance._core import Module +from pydependance._core import ModuleNamespace +from pydependance._core import is_builtin diff --git a/pydependance/__main__.py b/pydependance/__main__.py new file mode 100644 index 0000000..31d542c --- /dev/null +++ b/pydependance/__main__.py @@ -0,0 +1,159 @@ +from pydependance import is_builtin, ModuleNamespace + + +# ========================================================================= # +# Ansi Colors # +# ========================================================================= # + + +RST = '\033[0m' + +# dark colors +GRY = '\033[90m' +lRED = '\033[91m' +lGRN = '\033[92m' +lYLW = '\033[93m' +lBLU = '\033[94m' +lMGT = '\033[95m' +lCYN = '\033[96m' +WHT = '\033[97m' + +# light colors +BLK = '\033[30m' +RED = '\033[31m' +GRN = '\033[32m' +YLW = '\033[33m' +BLU = '\033[34m' +MGT = '\033[35m' +CYN = '\033[36m' +lGRY = '\033[37m' + + +# ========================================================================= # +# Helper print # +# ========================================================================= # + + +def _print_module(against: ModuleNamespace, path: str, depth: int): + if path in against: + clr = MGT + else: + clr = RED + print(f'{" " * depth}{GRY}* {clr}{path}{RST}') + + +def _print_import(against: ModuleNamespace, path: str, depth: int): + if is_builtin(path): + clr = lGRY + elif path in against: + clr = lYLW + else: + clr = lRED + print(f'{" " * depth}{GRY}- {clr}{path}{RST}') + + +# ========================================================================= # +# Entrypoint # +# ========================================================================= # + + +def _create_parser(): + import argparse + + # root parser + parser = argparse.ArgumentParser() + parser.add_argument("--package", '-p', dest='packages', action='append', type=str, help="Root python paths used to search for python packages.") + parser.add_argument("--path", "-P", dest='paths', action='append', type=str, help="Roots of python packages themselves.") + parser.add_argument("--restrict", "-r", action='append', type=str, help="Specific module paths to restrict the namespace to.") + parser.add_argument("--restrict-mode", "-R", default='children', type=str, help="How to restrict the namespace, supports: [exact, children (default), root_children]") + parser.add_argument("--print-namespace", action='store_true', help="Print items in the loaded namespace") + + # add subcommands + subparsers = parser.add_subparsers(dest="subparser", title='subcommands', description='valid subcommands') + + # COMMAND: resolve + command_resolve = subparsers.add_parser('resolve') + command_resolve.add_argument('--full', '-f', action="store_true", help="show the full import paths, not just the import roots in the output") + command_resolve.add_argument('--builtin', '-b', action="store_true", help="show builtin dependencies, otherwise these are hidden") + command_resolve.add_argument('--modules', '-m', action="store_true", help="show dependencies under each module") + + # COMMAND: imports + command_imports = subparsers.add_parser('imports') + command_imports.add_argument('--full', '-f', action="store_true", help="show the full import paths, not just the import roots in the output") + command_imports.add_argument('--builtin', '-b', action="store_true", help="show builtin dependencies, otherwise these are hidden") + command_imports.add_argument('--modules', '-m', action="store_true", help="show dependencies under each module") + + return parser + + +def pydependance_cli(): + parser = _create_parser() + args = parser.parse_args() + + # check arguments + if not args.paths and not args.packages: + print('please specify at least on of --paths or --packages') + exit(1) + + # load namespace + against = ModuleNamespace() + if args.paths: + against.add_modules_from_python_paths(args.paths) + if args.packages: + against.add_modules_from_packages(args.packages) + + # restrict the namespace for resolving + namespace = against + if args.restrict: + namespace = namespace.restrict(imports=args.restrict, mode=args.restrict_mode) + + # print the original namespace + if args.print_namespace: + print('NAMESPACE (FULL):') + for m in sorted(set(m.import_root for m in against)): + print(f'- {m}') + print() + if args.restrict: + print('NAMESPACE (RESTRICTED):') + for m in sorted(set(m.import_root for m in namespace)): + print(f'- {m}') + print() + + # run the various commands on the namespace + # TODO: clean this up + # TODO: make arguments common between commands + # TODO: better colourisation by searching pypi for packages + # TODO: allow renaming packages eg --rename=cv2:opencv-python,sklearn:scikit-learn + # TODO: support dependency versions + # TODO: support various output modes + # - json + # - pretty + # TODO: check imports against a specific python environment + if args.subparser == 'resolve': + module_imports = against.resolve(namespace=namespace, roots=not args.full, builtin=args.builtin) + if args.modules: + for module, imports in module_imports.items(): + _print_module(against, path=module, depth=0) + for target_path in sorted(imports): + _print_import(against, path=target_path, depth=1) + else: + for target_path in sorted(set(imp for imports in module_imports.values() for imp in imports)): + _print_import(against, path=target_path, depth=0) + elif args.subparser == 'imports': + if args.modules: + for module in namespace.modules(): + _print_module(against, path=module.import_path, depth=0) + for target_path in sorted(module.imports_unique(roots=not args.full, builtin=args.builtin)): + _print_import(against, path=target_path, depth=1) + else: + for target_path in sorted(namespace.imports_unique(roots=not args.full, builtin=args.builtin)): + _print_import(against, path=target_path, depth=0) + + +# ========================================================================= # +# Entrypoint # +# ========================================================================= # + + +if __name__ == '__main__': + pydependance_cli() diff --git a/pydependance/_core.py b/pydependance/_core.py new file mode 100644 index 0000000..f642e9c --- /dev/null +++ b/pydependance/_core.py @@ -0,0 +1,576 @@ +import sys + +# check the python version +if sys.version_info < (3, 10): + print('please use python >= 3.10') + exit(1) + +import ast +import sys +import warnings +from pathlib import Path +from typing import Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union + + +# ========================================================================= # +# Load Builtin Packages # +# ========================================================================= # + + +BUILTIN_PKGS = { + '__main__', + *sys.builtin_module_names, + *sys.stdlib_module_names, # python 3.10 +} + + +def is_builtin(import_: 'ImportType') -> bool: + root = _import_to_keys(import_)[0] + return root in BUILTIN_PKGS + + +# ========================================================================= # +# Ast Import Finder # +# ========================================================================= # + + +def ast_get_module_imports(path: Union[str, Path]) -> List[Tuple[List[str], bool]]: + imports = [] + + class AstImportCollector(ast.NodeVisitor): + def visit_Import(self, node): + # eg. import pkg.submodule + imports.extend((n.name.split('.'), False) for n in node.names) + return node + + def visit_ImportFrom(self, node): + assert node.level in (0, 1) # node.names: from * import name, ... + # eg: from . import ? + # eg: from .submodule import ? + # eg: from pkg.submodule import ? + import_keys = node.module.split('.') if node.module else [] + is_relative = (node.level != 0) + imports.append((import_keys, is_relative)) + return node + + # collect import from file + with open(path) as f: + AstImportCollector().generic_visit(node=ast.parse(f.read())) + return imports + + +# ========================================================================= # +# Module Helper # +# ========================================================================= # + + +INIT_PY = '__init__.py' + + +ImportKey = Tuple[str, ...] +ImportType = Union[str, ImportKey, 'Import', 'Module'] + + +def _import_to_keys(import_: ImportType) -> ImportKey: + # split the import if needed + if isinstance(import_, str): + import_ = import_.split('.') + elif isinstance(import_, Import): + import_ = import_.target_keys + elif isinstance(import_, Module): + import_ = import_.import_keys + return tuple(import_) + + +def import_to_keys(import_: ImportType) -> ImportKey: + import_keys, orig = _import_to_keys(import_), import_ + # check, all parts must be identifiers, and there must be at least one part + import_check_keys(import_keys, orig=orig) + return import_keys + + +def import_check_keys(import_keys: ImportKey, orig=None) -> ImportKey: + if orig is None: + orig = import_keys + if not import_keys: + raise ValueError(f'import path must have at least one part for: {repr(import_keys)}') + if not isinstance(import_keys, tuple): + raise TypeError(f'import keys must be a tuple, got: {type(import_keys)} for: {repr(orig)}') + for part in import_keys: + if not isinstance(part, str): + raise TypeError(f'import part: {repr(part)} is not a string, got type: {type(part)}, obtained from: {repr(import_keys)}') + if not part.isidentifier(): + raise ValueError(f'import part: {repr(part)} is not a valid identifier, obtained from: {repr(import_keys)}') + return import_keys + + +def normalize_imports_pipe( + imports: Iterable[ImportType], + roots: bool = False, + builtin: bool = True, + keys: bool = False, +) -> Union[Iterable[ImportKey], Iterable[str]]: + imports = (import_to_keys(imp) for imp in imports) + if not builtin: + imports = (imp for imp in imports if imp[0] not in BUILTIN_PKGS) + if roots: + imports = (imp[0:1] for imp in imports) + if not keys: + imports = (".".join(imp) for imp in imports) + return imports + + +def is_python_module(path: Path) -> bool: + return path.is_file() and path.name.endswith('.py') and path.name[:-3].isidentifier() + + +def is_python_package(path: Path) -> bool: + return path.is_dir() and path.name.isidentifier() and path.joinpath(INIT_PY).is_file() + + +def is_child_import(parent, child) -> bool: + parent = import_to_keys(parent) + child = import_to_keys(child) + if len(child) < len(parent): + return False + return parent == child[:len(parent)] + + +def find_modules( + root: Path, + max_depth: int = -1, + skip_root_files: bool = False, +): + def _recurse(path: Path, parent_keys: ImportKey, depth: int): + if depth > max_depth >= 0: + return + + # eg. .py + if is_python_module(path): + if skip_root_files and depth == 0: + return + assert path.name != INIT_PY + yield path, (*parent_keys, path.name[:-3]) + + # eg. /__init__.py + elif is_python_package(path): + keys = (*parent_keys, path.name) + yield path, keys + # continue recursively, making sure to skip __init__.py files + for p in path.iterdir(): + if p.name != INIT_PY: + yield from _recurse(p, parent_keys=keys, depth=depth+1) + + root = Path(root) + # make sure that if we skip root __init__.py files + if root.name == INIT_PY: + warnings.warn(f'root cannot be an {INIT_PY} file, returning no modules for: {root.resolve()}') + return + # find values! + yield from ( + Module(module_path, import_keys) + for module_path, import_keys in _recurse(root, parent_keys=(), depth=0) + ) + + +def _yield_imports( + node: Union['Module', 'ModuleNamespace'], + roots: bool = False, + builtin: bool = True, +): + visited = set() + for import_ in node.imports(builtin=builtin): + if roots: + key = import_.target_root + else: + key = import_.target_path + # return the result if it has not been seen + if key not in visited: + visited.add(key) + yield key + + +# ========================================================================= # +# Data Structures # +# ========================================================================= # + + +class Import: + + @classmethod + def from_module_perspective( + cls, + module: 'Module', + keys: Union[str, Sequence[str]], + is_relative: bool, + ): + orig = keys + if isinstance(keys, str): + keys = keys.split('.') + keys = tuple(keys) + if is_relative: + keys = module.import_keys[:-1] + keys + import_check_keys(keys, orig=orig) + return Import(keys, source_module=module) + + def __init__(self, target: Union[str, Sequence[str]], source_module: 'Module'): + self._target_keys = import_to_keys(target) + self._source_module = source_module + + def __repr__(self): + return f'{self.__class__.__name__}<{self.target_path}>' + + @property + def target_keys(self) -> ImportKey: + return self._target_keys + + @property + def target_path(self) -> str: + return ".".join(self._target_keys) + + @property + def target_root(self) -> str: + return self.target_keys[0] + + @property + def target_depth(self) -> int: + return len(self.target_keys) + + @property + def source_module(self) -> 'Module': + return self._source_module + + def __eq__(self, other): + if isinstance(other, (Module, Import, str, tuple)): + return self.target_keys == import_to_keys(other) + return False + + def __lt__(self, other): + return self.target_keys < other.target_keys + + def __hash__(self): + return hash(self.target_path) + + +class Module: + + def __init__(self, path: Union[str, Path], import_: Union[str, Sequence[str]]): + # check the path + path = Path(path) + if is_python_module(path): + self._is_package = (path.name == INIT_PY) + elif is_python_package(path): + self._is_package = True + path = path.joinpath(INIT_PY) + else: + raise ValueError(f'not a valid python module or package: {path}') + # initialize + self._abs_path: Path = path.absolute() + self._import_keys = import_to_keys(import_) + # load imports + self._imports = [ + Import.from_module_perspective(self, keys=keys, is_relative=is_relative) + for keys, is_relative in ast_get_module_imports(self.path) + ] + + def __repr__(self): + return f'{self.__class__.__name__}<{self.import_path}>' + + @property + def is_package(self) -> bool: + return self._is_package + + @property + def is_root(self) -> bool: + return self.import_depth == 1 + + @property + def path(self) -> Path: + return self._abs_path + + @property + def import_keys(self) -> ImportKey: + return self._import_keys + + @property + def import_path(self) -> str: + return ".".join(self._import_keys) + + @property + def import_root(self) -> str: + return self.import_keys[0] + + @property + def import_depth(self) -> int: + return len(self.import_keys) + + def imports(self, builtin: bool = True) -> Iterable[Import]: + if builtin: + yield from self._imports + else: + yield from (imp for imp in self._imports if imp.target_root not in BUILTIN_PKGS) + + def imports_unique(self, roots: bool = False, builtin: bool = True) -> Iterable[str]: + yield from _yield_imports(self, roots=roots, builtin=builtin) + + +class ModuleNamespace: + + _modules: Dict[ImportKey, Module] + + def __init__(self): + self._modules = {} + # cache used to help speed up some functions + # this might use a lot of memory, so we make + # sure to limit its size when used + self._cache = None + + def copy(self) -> 'ModuleNamespace': + namespace = ModuleNamespace() + namespace._modules = dict(self._modules) + return namespace + + def __repr__(self): + return f'{self.__class__.__name__}<{" ,".join(".".join(k) for k in self._modules.keys() if len(k) == 1)}>' + + # ~=~=~=~=~=~=~ # + # Add Modules # + # ~=~=~=~=~=~=~ # + + def add_modules(self, modules: Sequence[Module]) -> 'ModuleNamespace': + for module in modules: + if module.import_keys in self._modules: + raise RuntimeError(f'module {repr(module.import_path)} has already been added to namespace') + for module in modules: + self._modules[module.import_keys] = module + return self + + def add_modules_from_packages(self, roots: Sequence[Union[str, Path]]) -> 'ModuleNamespace': + modules = [ + m + for root in roots + for m in find_modules(root) + ] + self.add_modules(modules) + return self + + def add_modules_from_python_paths(self, python_paths: Sequence[Union[str, Path]] = None) -> 'ModuleNamespace': + if python_paths is None: + python_paths = sys.path + paths = [ + path + for python_path in python_paths + for path in Path(python_path).iterdir() + if is_python_package(path) or is_python_module(path) + ] + self.add_modules_from_packages(paths) + return self + + # ~=~=~=~=~=~=~ # + # Filtering # + # ~=~=~=~=~=~=~ # + + def filtered( + self, + *, + keep: Callable[[Module], bool] = None, + remove: Callable[[Module], bool] = None, + ) -> 'ModuleNamespace': + result = self.copy() + if keep: + result._modules = {k: m for k, m in result._modules.items() if keep(m)} + if remove: + result._modules = {k: m for k, m in result._modules.items() if not remove(m)} + return result + + def restrict(self, imports, mode: str = 'children'): + if isinstance(imports, (str, tuple, Import, Module)): + imports = [imports] + imports = set(import_to_keys(imp) for imp in imports) + # restrict based on the mode + if mode == 'exact': + return self.filtered(keep=lambda m: m.import_keys in imports) + elif mode == 'children': + return self.filtered(keep=lambda m: any(is_child_import(parent=keys, child=m) for keys in imports)) + elif mode == 'root_children': + return self.filtered(keep=lambda m: any(is_child_import(parent=keys[0], child=m) for keys in imports)) + else: + raise KeyError(f'invalid restrict mode: {repr(mode)}') + + # ~=~=~=~=~=~=~ # + # Getters # + # ~=~=~=~=~=~=~ # + + def __getitem__(self, import_: ImportType): + import_ = import_to_keys(import_) + return self._modules[import_] + + def __contains__(self, import_: ImportType): + import_ = import_to_keys(import_) + return import_ in self._modules + + def __iter__(self) -> Iterable[Module]: + yield from self._modules.values() + + def modules(self) -> Iterable[Module]: + yield from self._modules.values() + + def modules_roots(self) -> Iterable[Module]: + for k, m in self._modules.items(): + if len(k) == 1: + yield m + + def imports(self, builtin: bool = True) -> Iterable[Import]: + for module in self._modules.values(): + yield from module.imports(builtin=builtin) + + def imports_unique(self, roots: bool = False, builtin: bool = True) -> Iterable[str]: + yield from _yield_imports(self, roots=roots, builtin=builtin) + + def imports_resolved( + self, + against: 'ModuleNamespace' = None, + roots: bool = False, + builtin: bool = True, + mode: str = 'exact', + ) -> Set[str]: + if against is None: + against = self + # get the unique imports, and flatten imports + # using keys in the specified namespace + return against.resolve_imports( + imports=self.imports_unique(roots=False, builtin=builtin), + roots=roots, + builtin=builtin, + mode=mode, + ) + + # ~=~=~=~=~=~=~ # + # Resolving # + # ~=~=~=~=~=~=~ # + + def resolve_imports( + self, + imports: Iterable[ImportType], + roots: bool = False, + builtin: bool = True, + mode: str = 'exact', + ) -> Set[str]: + """ + This function only resolved the specified imports based on the current + namespace, by performing a BFS + - This nice thing is that you can restrict adding entries based on the `mode` + to the "exact" files visited, or you can be safe by adding all "children", + or even the "root_children" of the visited imports + + ALGORITHM: + * perform a bfs, replacing keys that are visited with the designation keys + - keys can only be visited if they are in the current namespace + - this can be re-written as 1. perform bfs 2. remove keys in namespace + """ + resolved = self._resolve_imports( + imports=imports, + mode=mode, + _restrict_cache_=None, + ) + resolved = set(normalize_imports_pipe( + resolved, + roots=roots, + builtin=builtin, + keys=False, + )) + return resolved + + def _resolve_imports( + self, + imports: Iterable[ImportType], + mode: str, + _restrict_cache_: Optional[Dict[ImportKey, Set[ImportKey]]], + ) -> Set[ImportKey]: + if _restrict_cache_ is None: + _restrict_cache_ = {} + + def get_restricted_imports(keys: ImportKey) -> Set[ImportKey]: + unique = _restrict_cache_.get(keys, None) + if unique is None: + unique = set( + imp.target_keys + for imp in self.restrict(keys, mode=mode).imports(builtin=True) + ) + _restrict_cache_[keys] = unique + return unique + + # 1. BFS + stack: List[ImportKey] = list(set(import_to_keys(i) for i in imports)) + visited: Set[ImportKey] = set() + while stack: + current = stack.pop() + visited.add(current) + for imp in get_restricted_imports(current): + if imp in visited: + continue + stack.append(imp) + + # 2. DELETE OLD RESULTS + visited -= self._modules.keys() + + # convert the imports back to strings + return visited + + def resolve( + self, + namespace: 'ModuleNamespace' = None, + roots: bool = False, + builtin: bool = True, + mode: str = 'exact', + ) -> Dict[str, Set[str]]: + # multiple packages in the same project may depend on each other + # - this function finds those imports and replaces them with + # the imports from the other package, effectively finding all + # required parent dependencies in the tree + against = self + if namespace is None: + namespace = self + + # speed things up by reusing results + _restrict_cache_ = {} + + # for each module, BFS all the imports + # - this is not as efficient as doing everything in a pass + # over the actual imports and replacing everything as we + # go, but conceptually, this is much easier to manage! + module_imports = {} + for key, module in namespace._modules.items(): + module_imports[key] = against._resolve_imports( + imports=module.imports(), + mode=mode, + _restrict_cache_=_restrict_cache_, + ) + # update the cache based on the current results to improve future speed! + # this is duplicating conversion... + _restrict_cache_[module.import_keys] = module_imports[key] + + # normalize the final results + module_imports = { + ".".join(k): set(normalize_imports_pipe( + resolved, + roots=roots, + builtin=builtin, + keys=False, + )) + for k, resolved in module_imports.items() + } + + return module_imports + + +# ========================================================================= # +# EXPORT # +# ========================================================================= # + +__all__ = ( + "Import", + "Module", + "ModuleNamespace", + "is_builtin", +)