diff --git a/specialist.py b/specialist.py index 7bfb187..0c0ca4f 100644 --- a/specialist.py +++ b/specialist.py @@ -25,12 +25,15 @@ def _audit_imports(event: str, args: "typing.Sequence[object]") -> None: sys.addaudithook(_audit_imports) # pylint: disable = wrong-import-order, wrong-import-position +import _opcode # type: ignore [import-not-found] import argparse import collections import colorsys import contextlib +import ctypes import dataclasses import dis +import gc import html import http.server import importlib.util @@ -314,15 +317,18 @@ def _parse(code: types.CodeType) -> typing.Generator[_SourceChunk, None, None]: ) events[_FIRST_POSTION] = _Stats() events[_LAST_POSITION] = _Stats() - previous_previous = previous = None + previous_two: tuple[None, None] | tuple[dis.Instruction, dis.Instruction | None] = ( + None, + None, + ) + jit_code = _find_jit_code() + have_jit_code = any(jit_code.values()) for child in _walk_code(code): - raw_bytecode = child._co_code_adaptive # type: ignore [attr-defined] # pylint: disable = protected-access for instruction in dis.get_instructions(child, adaptive=True): if instruction.is_jump_target: - previous_previous = previous = None + previous_two = None, None if instruction.positions is None or None in instruction.positions: - previous_previous = previous - previous = instruction + previous_two = instruction, previous_two[0] continue lineno, end_lineno, col_offset, end_col_offset = instruction.positions assert lineno is not None @@ -330,12 +336,18 @@ def _parse(code: types.CodeType) -> typing.Generator[_SourceChunk, None, None]: assert col_offset is not None assert end_col_offset is not None stats = _score_instruction( - instruction, previous, previous_previous, raw_bytecode + instruction, previous_two[0], previous_two[1], child._co_code_adaptive # type: ignore [attr-defined] # pylint: disable = protected-access ) + if have_jit_code: # pragma: no cover + if stats.specialized and instruction.offset // 2 not in jit_code[child]: + stats = _Stats(unquickened=True) + elif ( + stats.adaptive or stats.unquickened + ) and instruction.offset // 2 in jit_code[child]: + stats = _Stats(specialized=True) events[lineno, col_offset] += stats events[end_lineno, end_col_offset] -= stats - previous_previous = previous - previous = instruction + previous_two = instruction, previous_two[0] stats = _Stats() for (start, event), (stop, _) in itertools.pairwise(sorted(events.items())): stats += event @@ -385,6 +397,113 @@ def _is_quickened(code: types.CodeType) -> bool: ) +def _find_executors() -> ( + typing.Generator[ + tuple[types.CodeType, typing.Sequence[tuple[str, int, int, int]]], None, None + ] +): # pragma: no cover + for code in _code.values(): + for child in _walk_code(code): + for i in range(0, len(child._co_code_adaptive), 2): # type: ignore [attr-defined] # pylint: disable = protected-access + if child._co_code_adaptive[i] == opcode.opmap["ENTER_EXECUTOR"]: # type: ignore [attr-defined] # pylint: disable = protected-access + try: + executor = _opcode.get_executor( # pylint: disable = no-member + child, i + ) + except (RuntimeError, ValueError): + continue + yield (child, executor) + + +def _callee_from_operand(operand: int) -> types.CodeType | None: # pragma: no cover + if operand & 1: + return typing.cast( + types.CodeType, ctypes.cast(operand - 1, ctypes.py_object).value + ) + if operand: + return typing.cast( + types.CodeType, ctypes.cast(operand, ctypes.py_object).value.__code__ + ) + return None + + +def _handle_inlining( + stack: list[types.CodeType | None], opname: str, operand: int +) -> None: # pragma: no cover + if opname == "_PUSH_FRAME": + stack.append(_callee_from_operand(operand)) + elif opname == "_POP_FRAME": + assert stack[-1] is not None + del stack[-1] + + +def _find_jit_code() -> ( + collections.defaultdict[types.CodeType, set[int]] +): # pragma: no cover + if "ENTER_EXECUTOR" not in opcode.opmap: + return collections.defaultdict(set) + todo: set[ + tuple[types.CodeType | None, typing.Sequence[tuple[str, int, int, int]]] + ] = set(_find_executors()) + jit_code: collections.defaultdict[types.CodeType, set[int]] = ( + collections.defaultdict(set) + ) + done: set[ + tuple[types.CodeType | None, typing.Sequence[tuple[str, int, int, int]]] + ] = set() + while todo - done: + for code, executor in todo - done: + done.add((code, executor)) + stack: list[types.CodeType | None] = [code] + side_exits = gc.get_referents(executor) + assert all(type(side_exit) is type(executor) for side_exit in side_exits) + for opname, _, target, operand in executor: + if opname in {"_DEOPT", "_ERROR_POP_N", "_EXIT_TRACE", "_DYNAMIC_EXIT"}: + continue + jump_target = target & 0xFFFF + error_target = target >> 16 + valid_jump = jump_target < len(executor) and ( + (jump_target == 0) + or executor[jump_target][0] + in {"_DEOPT", "_EXIT_TRACE", "_DYNAMIC_EXIT"} + ) + valid_error = error_target < len(executor) and ( + (error_target == 0) or executor[error_target][0] in {"_ERROR_POP_N"} + ) + if ( + valid_jump + and valid_error + and jump_target + and executor[jump_target][0] + in { + "_EXIT_TRACE", + "_DYNAMIC_EXIT", + } + ): + side_exit = side_exits[executor[jump_target][2] & 0xFFFF] + todo.add((stack[-1], side_exit)) + elif valid_jump and valid_error and jump_target: + assert executor[jump_target][0] == "_DEOPT" + target = executor[jump_target][2] + assert ( + stack[-1] is None or target < len(stack[-1]._co_code_adaptive) // 2 # type: ignore [attr-defined] # pylint: disable = protected-access + ) + if stack[-1] is not None: + jit_code[stack[-1]].add(target) + if valid_jump and valid_error and error_target: + assert executor[jump_target][0] == "_ERROR_POP_N" + target = executor[error_target][3] + assert stack[-1] is None or target < len(stack[-1]._co_code_adaptive) // 2 # type: ignore [attr-defined] # pylint: disable = protected-access + if stack[-1] is not None: + jit_code[stack[-1]].add(target) + else: + assert stack[-1] is None or target < len(stack[-1]._co_code_adaptive) // 2 # type: ignore [attr-defined] # pylint: disable = protected-access + if stack[-1] is not None: + jit_code[stack[-1]].add(target) + _handle_inlining(stack, opname, operand) + return jit_code + + def _view( path: pathlib.Path, *,