Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "pending" indicator on test start #275

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 138 additions & 41 deletions pytest_sugar.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,26 @@
LEN_PROGRESS_BAR: Optional[int] = None


SPINNER_BLOCKS: List[str] = [
"⠋",
"⠙",
"⠹",
"⠸",
"⠼",
"⠴",
"⠦",
"⠧",
"⠇",
"⠏",
]


@dataclasses.dataclass
class Theme:
header: Optional[str] = "magenta"
skipped: Optional[str] = "blue"
success: Optional[str] = "green"
pending: Optional[str] = "white"
warning: Optional[str] = "yellow"
fail: Optional[str] = "red"
error: Optional[str] = "red"
Expand All @@ -50,6 +65,8 @@ class Theme:
path: Optional[str] = "cyan"
name = None
symbol_passed: str = "✓"
# symbol_pending: str = SPINNER_BLOCKS[0]
symbol_pending: str = "…"
symbol_skipped: str = "s"
symbol_failed: str = "⨯"
symbol_failed_not_call: str = "ₓ"
Expand Down Expand Up @@ -205,7 +222,9 @@ def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str
if not IS_SUGAR_ENABLED:
return None

if report.passed:
if getattr(report, "pending", False):
letter = colored(THEME.symbol_pending, THEME.pending)
elif report.passed:
letter = colored(THEME.symbol_passed, THEME.success)
elif report.skipped:
letter = colored(THEME.symbol_skipped, THEME.skipped)
Expand Down Expand Up @@ -235,6 +254,33 @@ def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str
return report.outcome, letter, report.outcome.upper()


class TestStartReport(BaseReport):
"""Test start report object.

Reports can contain arbitrary extra attributes.
"""

when = "teststart"

def __init__(
self,
nodeid: str,
location: Tuple[str, Optional[int], str],
**extra,
) -> None:
#: Normalized collection nodeid.
self.nodeid = nodeid

self.location = location

#: Test outcome, always one of "passed", "failed", "skipped".
self.outcome = "passed"

self.pending = True

self.__dict__.update(extra)


class SugarTerminalReporter(TerminalReporter): # type: ignore
def __init__(self, reporter) -> None:
TerminalReporter.__init__(self, reporter.config)
Expand All @@ -250,6 +296,8 @@ def reset_tracked_lines(self) -> None:
self.current_lines = {}
self.current_line_nums = {}
self.current_line_num = 0
# self.current_spinner_indexes = {}
self.current_pending_reports = {}

def pytest_collectreport(self, report: CollectReport) -> None:
TerminalReporter.pytest_collectreport(self, report)
Expand Down Expand Up @@ -286,7 +334,9 @@ def pytest_sessionstart(self, session: Session) -> None:
def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None:
return

def insert_progress(self, report: Union[CollectReport, TestReport]) -> None:
def insert_progress(
self, report: Union[CollectReport, TestReport, TestStartReport]
) -> None:
def get_progress_bar() -> str:
length = LEN_PROGRESS_BAR
if not length:
Expand Down Expand Up @@ -380,9 +430,14 @@ def get_max_column_for_test_status(self) -> int:
)

def begin_new_line(
self, report: Union[CollectReport, TestReport], print_filename: bool
self,
report: Union[CollectReport, TestReport, TestStartReport],
print_filename: bool,
) -> None:
path = self.report_key(report)
if path in self.current_pending_reports:
pending_report = self.clear_pending_report(path)
self.insert_progress(pending_report)
self.current_line_num += 1
if len(report.fspath) > self.get_max_column_for_test_status() - 5:
fspath = (
Expand Down Expand Up @@ -426,7 +481,8 @@ def reached_last_column_for_test_status(
def pytest_runtest_logstart(self, nodeid, location) -> None:
# Prevent locationline from being printed since we already
# show the module_name & in verbose mode the test name.
pass
report = TestStartReport(nodeid, location)
self.log_report(report)

def pytest_runtest_logfinish(self, nodeid: str) -> None:
# prevent the default implementation to try to show
Expand All @@ -440,12 +496,30 @@ def report_key(self, report: Union[CollectReport, TestReport]) -> Any:
)

def pytest_runtest_logreport(self, report: TestReport) -> None:
self.log_report(report)

def clear_pending_report(self, path):
report = self.current_pending_reports[path]
assert report

# spinner_index = self.current_spinner_indexes[path]
# pending = colored(SPINNER_BLOCKS[spinner_index], THEME.pending)
pending = colored(THEME.symbol_pending, THEME.pending)
self.current_lines[path] = self.current_lines[path].replace(pending, "")
# del self.current_spinner_indexes[path]
del self.current_pending_reports[path]

return report

def log_report(self, report: Union[TestReport, TestStartReport]) -> None:
global LEN_PROGRESS_BAR_SETTING, LEN_PROGRESS_BAR

res = pytest_report_teststatus(report=report)
assert res
cat, letter, word = res
self.stats.setdefault(cat, []).append(report)
pending = getattr(report, "pending", False)
if not pending:
self.stats.setdefault(cat, []).append(report)

if not LEN_PROGRESS_BAR:
if LEN_PROGRESS_BAR_SETTING.endswith("%"):
Expand All @@ -455,18 +529,29 @@ def pytest_runtest_logreport(self, report: TestReport) -> None:
else:
LEN_PROGRESS_BAR = int(LEN_PROGRESS_BAR_SETTING)

self.reports.append(report)
if not pending:
self.reports.append(report)

if report.outcome == "failed":
if report.fspath in self.current_pending_reports:
self.clear_pending_report(report.fspath)
self.insert_progress(report)

print("")
self.print_failure(report)
# Ignore other reports or it will cause duplicated letters
if report.when == "teardown":
self.tests_taken += 1
self.insert_progress(report)
path = os.path.join(os.getcwd(), report.location[0])

if report.when == "call" or report.skipped:
if report.when == "call" or report.skipped or pending:
path = self.report_key(report)

# Begin a new line any time the path is missing, even though
# it's the "pending" case that will usually trigger this.
# If print_failure is called, the lines are cleared, so
# another pending test might have had its line removed,
# so the "call" case will have to add the new line again.
if path not in self.current_line_nums:
self.begin_new_line(report, print_filename=True)
elif self.reached_last_column_for_test_status(report):
Expand All @@ -477,44 +562,56 @@ def pytest_runtest_logreport(self, report: TestReport) -> None:
)
self.begin_new_line(report, print_filename)

if pending:
# self.current_spinner_indexes[path] = 0
self.current_pending_reports[path] = report
elif path in self.current_pending_reports:
self.clear_pending_report(path)

self.current_lines[path] = self.current_lines[path] + letter

block = int(
float(self.tests_taken) * LEN_PROGRESS_BAR / self.tests_count
if self.tests_count
else 0
)
if report.failed:
if not self.progress_blocks or self.progress_blocks[-1][0] != block:
self.progress_blocks.append([block, False])
elif self.progress_blocks and self.progress_blocks[-1][0] == block:
self.progress_blocks[-1][1] = False
if pending:
# This also takes care of updating the line, so we need
# to call it when a test starts, in order to write the
# pending indicator.
self.insert_progress(report)
else:
if not self.progress_blocks or self.progress_blocks[-1][0] != block:
self.progress_blocks.append([block, True])

if not letter and not word:
return
if self.verbosity > 0:
markup = {"red": True}
if isinstance(word, tuple):
word, markup = word
block = int(
float(self.tests_taken) * LEN_PROGRESS_BAR / self.tests_count
if self.tests_count
else 0
)
if report.failed:
if not self.progress_blocks or self.progress_blocks[-1][0] != block:
self.progress_blocks.append([block, False])
elif self.progress_blocks and self.progress_blocks[-1][0] == block:
self.progress_blocks[-1][1] = False
else:
if report.passed:
markup = {"green": True}
elif report.skipped:
markup = {"yellow": True}
elif hasattr(report, "rerun") and isinstance(report.rerun, int):
markup = {"blue": True}
line = self._locationline(str(report.fspath), *report.location)
if hasattr(report, "node"):
self._tw.write("\r\n")
self.current_line_num += 1
if not self.progress_blocks or self.progress_blocks[-1][0] != block:
self.progress_blocks.append([block, True])

if not letter and not word:
return
if self.verbosity > 0:
markup = {"red": True}
if isinstance(word, tuple):
word, markup = word
else:
if report.passed:
markup = {"green": True}
elif report.skipped:
markup = {"yellow": True}
elif hasattr(report, "rerun") and isinstance(report.rerun, int):
markup = {"blue": True}
line = self._locationline(str(report.fspath), *report.location)
if hasattr(report, "node"):
self._tw.write(f"[{report.node.gateway.id}] ")
self._tw.write(word, **markup)
self._tw.write(" " + line)
self.currentfspath = -2
self._tw.write("\r\n")
self.current_line_num += 1
if hasattr(report, "node"):
self._tw.write(f"[{report.node.gateway.id}] ")
self._tw.write(word, **markup)
self._tw.write(" " + line)
self.currentfspath = -2

def count(self, key: str, when: tuple = ("call",)) -> int:
value = self.stats.get(key)
Expand Down