diff --git a/pytest_sugar.py b/pytest_sugar.py index 81441f2..bd20f5b 100644 --- a/pytest_sugar.py +++ b/pytest_sugar.py @@ -34,11 +34,26 @@ LEN_PROGRESS_BAR: Optional[int] = None +SPINNER_BLOCKS: List[str] = [ + "⠋", + "⠙", + "⠹", + "⠸", + "⠼", + "⠴", + "⠦", + "⠧", + "⠇", + "⠏", +] + + @dataclasses.dataclass class Theme: header: Optional[str] = "magenta" skipped: Optional[str] = "blue" success: Optional[str] = "green" + pending: Optional[str] = "white" warning: Optional[str] = "yellow" fail: Optional[str] = "red" error: Optional[str] = "red" @@ -50,6 +65,8 @@ class Theme: path: Optional[str] = "cyan" name = None symbol_passed: str = "✓" + # symbol_pending: str = SPINNER_BLOCKS[0] + symbol_pending: str = "…" symbol_skipped: str = "s" symbol_failed: str = "⨯" symbol_failed_not_call: str = "ₓ" @@ -205,7 +222,9 @@ def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str if not IS_SUGAR_ENABLED: return None - if report.passed: + if getattr(report, "pending", False): + letter = colored(THEME.symbol_pending, THEME.pending) + elif report.passed: letter = colored(THEME.symbol_passed, THEME.success) elif report.skipped: letter = colored(THEME.symbol_skipped, THEME.skipped) @@ -235,6 +254,33 @@ def pytest_report_teststatus(report: BaseReport) -> Optional[Tuple[str, str, str return report.outcome, letter, report.outcome.upper() +class TestStartReport(BaseReport): + """Test start report object. + + Reports can contain arbitrary extra attributes. + """ + + when = "teststart" + + def __init__( + self, + nodeid: str, + location: Tuple[str, Optional[int], str], + **extra, + ) -> None: + #: Normalized collection nodeid. + self.nodeid = nodeid + + self.location = location + + #: Test outcome, always one of "passed", "failed", "skipped". + self.outcome = "passed" + + self.pending = True + + self.__dict__.update(extra) + + class SugarTerminalReporter(TerminalReporter): # type: ignore def __init__(self, reporter) -> None: TerminalReporter.__init__(self, reporter.config) @@ -250,6 +296,8 @@ def reset_tracked_lines(self) -> None: self.current_lines = {} self.current_line_nums = {} self.current_line_num = 0 + # self.current_spinner_indexes = {} + self.current_pending_reports = {} def pytest_collectreport(self, report: CollectReport) -> None: TerminalReporter.pytest_collectreport(self, report) @@ -286,7 +334,9 @@ def pytest_sessionstart(self, session: Session) -> None: def write_fspath_result(self, nodeid: str, res, **markup: bool) -> None: return - def insert_progress(self, report: Union[CollectReport, TestReport]) -> None: + def insert_progress( + self, report: Union[CollectReport, TestReport, TestStartReport] + ) -> None: def get_progress_bar() -> str: length = LEN_PROGRESS_BAR if not length: @@ -380,9 +430,14 @@ def get_max_column_for_test_status(self) -> int: ) def begin_new_line( - self, report: Union[CollectReport, TestReport], print_filename: bool + self, + report: Union[CollectReport, TestReport, TestStartReport], + print_filename: bool, ) -> None: path = self.report_key(report) + if path in self.current_pending_reports: + pending_report = self.clear_pending_report(path) + self.insert_progress(pending_report) self.current_line_num += 1 if len(report.fspath) > self.get_max_column_for_test_status() - 5: fspath = ( @@ -426,7 +481,8 @@ def reached_last_column_for_test_status( def pytest_runtest_logstart(self, nodeid, location) -> None: # Prevent locationline from being printed since we already # show the module_name & in verbose mode the test name. - pass + report = TestStartReport(nodeid, location) + self.log_report(report) def pytest_runtest_logfinish(self, nodeid: str) -> None: # prevent the default implementation to try to show @@ -440,12 +496,30 @@ def report_key(self, report: Union[CollectReport, TestReport]) -> Any: ) def pytest_runtest_logreport(self, report: TestReport) -> None: + self.log_report(report) + + def clear_pending_report(self, path): + report = self.current_pending_reports[path] + assert report + + # spinner_index = self.current_spinner_indexes[path] + # pending = colored(SPINNER_BLOCKS[spinner_index], THEME.pending) + pending = colored(THEME.symbol_pending, THEME.pending) + self.current_lines[path] = self.current_lines[path].replace(pending, "") + # del self.current_spinner_indexes[path] + del self.current_pending_reports[path] + + return report + + def log_report(self, report: Union[TestReport, TestStartReport]) -> None: global LEN_PROGRESS_BAR_SETTING, LEN_PROGRESS_BAR res = pytest_report_teststatus(report=report) assert res cat, letter, word = res - self.stats.setdefault(cat, []).append(report) + pending = getattr(report, "pending", False) + if not pending: + self.stats.setdefault(cat, []).append(report) if not LEN_PROGRESS_BAR: if LEN_PROGRESS_BAR_SETTING.endswith("%"): @@ -455,18 +529,29 @@ def pytest_runtest_logreport(self, report: TestReport) -> None: else: LEN_PROGRESS_BAR = int(LEN_PROGRESS_BAR_SETTING) - self.reports.append(report) + if not pending: + self.reports.append(report) + if report.outcome == "failed": + if report.fspath in self.current_pending_reports: + self.clear_pending_report(report.fspath) + self.insert_progress(report) + print("") self.print_failure(report) # Ignore other reports or it will cause duplicated letters if report.when == "teardown": self.tests_taken += 1 self.insert_progress(report) - path = os.path.join(os.getcwd(), report.location[0]) - if report.when == "call" or report.skipped: + if report.when == "call" or report.skipped or pending: path = self.report_key(report) + + # Begin a new line any time the path is missing, even though + # it's the "pending" case that will usually trigger this. + # If print_failure is called, the lines are cleared, so + # another pending test might have had its line removed, + # so the "call" case will have to add the new line again. if path not in self.current_line_nums: self.begin_new_line(report, print_filename=True) elif self.reached_last_column_for_test_status(report): @@ -477,44 +562,56 @@ def pytest_runtest_logreport(self, report: TestReport) -> None: ) self.begin_new_line(report, print_filename) + if pending: + # self.current_spinner_indexes[path] = 0 + self.current_pending_reports[path] = report + elif path in self.current_pending_reports: + self.clear_pending_report(path) + self.current_lines[path] = self.current_lines[path] + letter - block = int( - float(self.tests_taken) * LEN_PROGRESS_BAR / self.tests_count - if self.tests_count - else 0 - ) - if report.failed: - if not self.progress_blocks or self.progress_blocks[-1][0] != block: - self.progress_blocks.append([block, False]) - elif self.progress_blocks and self.progress_blocks[-1][0] == block: - self.progress_blocks[-1][1] = False + if pending: + # This also takes care of updating the line, so we need + # to call it when a test starts, in order to write the + # pending indicator. + self.insert_progress(report) else: - if not self.progress_blocks or self.progress_blocks[-1][0] != block: - self.progress_blocks.append([block, True]) - - if not letter and not word: - return - if self.verbosity > 0: - markup = {"red": True} - if isinstance(word, tuple): - word, markup = word + block = int( + float(self.tests_taken) * LEN_PROGRESS_BAR / self.tests_count + if self.tests_count + else 0 + ) + if report.failed: + if not self.progress_blocks or self.progress_blocks[-1][0] != block: + self.progress_blocks.append([block, False]) + elif self.progress_blocks and self.progress_blocks[-1][0] == block: + self.progress_blocks[-1][1] = False else: - if report.passed: - markup = {"green": True} - elif report.skipped: - markup = {"yellow": True} - elif hasattr(report, "rerun") and isinstance(report.rerun, int): - markup = {"blue": True} - line = self._locationline(str(report.fspath), *report.location) - if hasattr(report, "node"): - self._tw.write("\r\n") - self.current_line_num += 1 + if not self.progress_blocks or self.progress_blocks[-1][0] != block: + self.progress_blocks.append([block, True]) + + if not letter and not word: + return + if self.verbosity > 0: + markup = {"red": True} + if isinstance(word, tuple): + word, markup = word + else: + if report.passed: + markup = {"green": True} + elif report.skipped: + markup = {"yellow": True} + elif hasattr(report, "rerun") and isinstance(report.rerun, int): + markup = {"blue": True} + line = self._locationline(str(report.fspath), *report.location) if hasattr(report, "node"): - self._tw.write(f"[{report.node.gateway.id}] ") - self._tw.write(word, **markup) - self._tw.write(" " + line) - self.currentfspath = -2 + self._tw.write("\r\n") + self.current_line_num += 1 + if hasattr(report, "node"): + self._tw.write(f"[{report.node.gateway.id}] ") + self._tw.write(word, **markup) + self._tw.write(" " + line) + self.currentfspath = -2 def count(self, key: str, when: tuple = ("call",)) -> int: value = self.stats.get(key)