Skip to content

Commit

Permalink
Add type hinting to utils/__init__.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Shrews committed Aug 15, 2023
1 parent 63da714 commit 54bbc76
Showing 1 changed file with 66 additions and 54 deletions.
120 changes: 66 additions & 54 deletions src/ansible_runner/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

from __future__ import annotations

import json
import sys
import re
Expand All @@ -19,13 +21,15 @@
import atexit
import signal

from collections.abc import Iterable, MutableMapping
from codecs import StreamReaderWriter
from collections.abc import Callable, Iterable, MutableMapping
from io import StringIO
from typing import Any, Iterator

from ansible_runner.exceptions import ConfigurationError


def cleanup_folder(folder):
def cleanup_folder(folder: str) -> bool:
"""Deletes folder, returns True or False based on whether a change happened."""
try:
shutil.rmtree(folder)
Expand All @@ -34,23 +38,23 @@ def cleanup_folder(folder):
return False


def register_for_cleanup(folder):
def register_for_cleanup(folder: str) -> None:
'''
Provide the path to a folder to make sure it is deleted when execution finishes.
The folder need not exist at the time when this is called.
'''
atexit.register(cleanup_folder, folder)


def get_plugin_dir():
def get_plugin_dir() -> str:
return os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "display_callback"))


def get_callback_dir():
def get_callback_dir() -> str:
return os.path.join(get_plugin_dir(), 'callback')


def is_dir_owner(directory):
def is_dir_owner(directory: str) -> bool:
'''Returns True if current user is the owner of directory'''
current_user = pwd.getpwuid(os.geteuid()).pw_name
callback_owner = Path(directory).owner()
Expand All @@ -74,35 +78,35 @@ def get(self, key):
return self.__dict__.get(key)


def isplaybook(obj):
def isplaybook(obj: Any) -> bool:
'''
Inspects the object and returns if it is a playbook
Args:
obj (object): The object to be inspected by this function
:param Any obj: The object to be inspected by this function.
Returns:
boolean: True if the object is a list and False if it is not
:return: True if the object is a list and False if it is not.
'''
return isinstance(obj, Iterable) and (not isinstance(obj, str) and not isinstance(obj, MutableMapping))


def isinventory(obj):
def isinventory(obj: Any) -> bool:
'''
Inspects the object and returns if it is an inventory
Args:
obj (object): The object to be inspected by this function
:param Any obj: The object to be inspected by this function.
Returns:
boolean: True if the object is an inventory dict and False if it is not
:return: True if the object is an inventory dict and False if it is not.
'''
return isinstance(obj, (MutableMapping, str))


def check_isolation_executable_installed(isolation_executable):
def check_isolation_executable_installed(isolation_executable: str) -> bool:
'''
Check that process isolation executable (e.g. podman, docker, bwrap) is installed.
Check that process isolation executable is installed.
:param str isolation_executable: Executable name (e.g. podman, docker, bwrap).
:return: True if the executable is installed, False otherwise.
'''
cmd = [isolation_executable, '--version']
try:
Expand All @@ -116,29 +120,26 @@ def check_isolation_executable_installed(isolation_executable):
return False


def dump_artifact(obj, path, filename=None):
def dump_artifact(obj: str,
path: str,
filename: str | None = None
) -> str:
'''
Write the artifact to disk at the specified path
Args:
obj (string): The string object to be dumped to disk in the specified
path. The artifact filename will be automatically created
path (string): The full path to the artifacts data directory.
filename (string, optional): The name of file to write the artifact to.
If the filename is not provided, then one will be generated.
:param str obj: The string object to be dumped to disk in the specified
path. The artifact filename will be automatically created.
:param str path: The full path to the artifacts data directory.
:param str filename: The name of file to write the artifact to.
If the filename is not provided, then one will be generated.
Returns:
string: The full path filename for the artifact that was generated
:return: The full path filename for the artifact that was generated.
'''
p_sha1 = None

if not os.path.exists(path):
os.makedirs(path, mode=0o700)
else:
p_sha1 = hashlib.sha1()
p_sha1.update(obj.encode(encoding='UTF-8'))

p_sha1 = hashlib.sha1()
p_sha1.update(obj.encode(encoding='UTF-8'))

if filename is None:
_, fn = tempfile.mkstemp(dir=path)
Expand Down Expand Up @@ -168,7 +169,7 @@ def dump_artifact(obj, path, filename=None):
return fn


def cleanup_artifact_dir(path, num_keep=0):
def cleanup_artifact_dir(path: str, num_keep: int = 0) -> None:
# 0 disables artifact dir cleanup/rotation
if num_keep < 1:
return
Expand All @@ -181,7 +182,7 @@ def cleanup_artifact_dir(path, num_keep=0):
shutil.rmtree(all_paths[f])


def dump_artifacts(kwargs):
def dump_artifacts(kwargs: dict) -> None:
'''
Introspect the kwargs and dump objects to disk
'''
Expand Down Expand Up @@ -255,7 +256,7 @@ def dump_artifacts(kwargs):
kwargs.pop(key)


def collect_new_events(event_path, old_events):
def collect_new_events(event_path: str, old_events: dict) -> Iterator[tuple[dict, dict]]:
'''
Collect new events for the 'events' generator property
'''
Expand Down Expand Up @@ -284,23 +285,27 @@ class OutputEventFilter:

EVENT_DATA_RE = re.compile(r'\x1b\[K((?:[A-Za-z0-9+/=]+\x1b\[\d+D)+)\x1b\[K')

def __init__(self, handle, event_callback,
suppress_ansible_output=False, output_json=False):
def __init__(self,
handle: StreamReaderWriter,
event_callback: Callable[[dict], None],
suppress_ansible_output: bool = False,
output_json: bool = False
) -> None:
self._event_callback = event_callback
self._counter = 0
self._start_line = 0
self._handle = handle
self._buffer = StringIO()
self._last_chunk = ''
self._current_event_data = None
self._current_event_data: dict | None = None
self.output_json = output_json
self.suppress_ansible_output = suppress_ansible_output

def flush(self):
def flush(self) -> None:
if self._handle:
self._handle.flush()

def write(self, data):
def write(self, data: str) -> None:
self._buffer.write(data)

# keep a sliding window of the last chunk written so we can detect
Expand Down Expand Up @@ -362,7 +367,7 @@ def write(self, data):
if remainder:
self._buffer.write(remainder)

def close(self):
def close(self) -> None:
value = self._buffer.getvalue()
if value:
self._emit_event(value)
Expand All @@ -371,8 +376,12 @@ def close(self):
if self._handle:
self._handle.close()

def _emit_event(self, buffered_stdout, next_event_data=None):
def _emit_event(self,
buffered_stdout: str,
next_event_data: dict | None = None
) -> dict:
next_event_data = next_event_data or {}
event_data: dict[str, Any]
if self._current_event_data:
event_data = self._current_event_data
stdout_chunks = [buffered_stdout]
Expand All @@ -393,17 +402,18 @@ def _emit_event(self, buffered_stdout, next_event_data=None):
event_data['start_line'] = self._start_line
event_data['end_line'] = self._start_line + n_lines
self._start_line += n_lines
if self._event_callback:
self._event_callback(event_data)
self._event_callback(event_data)
if next_event_data.get('uuid', None):
self._current_event_data = next_event_data
else:
self._current_event_data = None
return event_data


def open_fifo_write(path, data):
'''open_fifo_write opens the fifo named pipe in a new thread.
def open_fifo_write(path: str, data: str | bytes) -> None:
'''
Opens the fifo named pipe in a new thread.
This blocks the thread until an external process (such as ssh-agent)
reads data from the pipe.
'''
Expand All @@ -424,7 +434,7 @@ def args2cmdline(*args):
return ' '.join([quote(a) for a in args])


def ensure_str(s, encoding='utf-8', errors='strict'):
def ensure_str(s: Any, encoding='utf-8', errors='strict') -> str:
"""
Coerce *s* to ``str``.
Expand All @@ -438,7 +448,7 @@ def ensure_str(s, encoding='utf-8', errors='strict'):
return s


def sanitize_container_name(original_name):
def sanitize_container_name(original_name: str) -> str:
"""
Docker and podman will only accept certain characters in container names
This takes a given name from user-specified values and replaces the
Expand Down Expand Up @@ -472,26 +482,28 @@ def cli_mounts():
]


def sanitize_json_response(data):
def sanitize_json_response(data: str) -> str:
'''
Removes warning message from response message emitted by Ansible
command line utilities.
:param str data: The string data to be sanitized
'''
start_re = re.compile("{(.|\n)*", re.MULTILINE)
data = start_re.search(data).group().strip()
found = start_re.search(data)
if found:
data = found.group().strip()
return data


def get_executable_path(name):
def get_executable_path(name: str) -> str:
exec_path = shutil.which(name)
if exec_path is None:
raise ConfigurationError(f"{name} command not found")
return exec_path


def signal_handler():
def signal_handler() -> Callable[[], bool] | None:
# Only the main thread is allowed to set a new signal handler
# pylint: disable=W4902
if threading.current_thread() is not threading.main_thread():
Expand Down

0 comments on commit 54bbc76

Please sign in to comment.