Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide type hints for selected modules #393

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion terminus/image.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# type: ignore (`image_resize(width, height)` are impossible to type correctly)

import struct
import imghdr


# see https://bugs.python.org/issue16512#msg198034
# not added to imghdr.tests because of potential issues with reloads
def _is_jpg(h):
# type: (bytes) -> bool
return h.startswith(b'\xff\xd8')


def get_image_info(databytes):
# type: (bytes) -> tuple[str, int, int] | None
head = databytes[0:32]
if len(head) != 32:
return
Expand Down Expand Up @@ -43,10 +47,13 @@ def get_image_info(databytes):
width, height = struct.unpack('II', head[18:26])
else:
return
return what, width, height
return what, width, height # type: ignore (tuple[str, int, int] here)


def image_resize(img_width, img_height, width, height, em_width, max_width, preserve_ratio=1):
# type: (int, int, str | None, str | None, int, int, str | int) -> tuple[int, int]
# type: ignore

if width:
if width.isdigit():
width = int(width) * em_width
Expand Down
89 changes: 58 additions & 31 deletions terminus/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,33 @@


class Terminal:
_terminals = {}
_detached_terminals = []
_terminals = {} # type: dict[int, Terminal]
_detached_terminals = [] # type: list[Terminal]

def __init__(self, view=None):
self.view = view
self._cached_cursor = [0, 0]
self._size = sublime.load_settings('Terminus.sublime-settings').get('size', (None, None))
self._cached_cursor_is_hidden = [True]
self.image_count = 0
self.images = {}
self._strings = Queue()
self._pending_to_send_string = [False]
self._pending_to_clear_scrollback = [False]
self._pending_to_reset = [None]
self.lock = threading.Lock()
# type: (sublime.View | None) -> None
self.view = view # type: sublime.View | None
self._cached_cursor = [0, 0] # type: list[int]
self._size = sublime.load_settings('Terminus.sublime-settings').get('size', (None, None)) # type: tuple[int | None, int | None]
self._cached_cursor_is_hidden = [True] # type: list[bool]
self.image_count = 0 # type: int
self.images = {} # type: dict[int, str]
self._strings = Queue() # type: Queue
self._pending_to_send_string = [False] # type: list[bool]
self._pending_to_clear_scrollback = [False] # type: list[bool]
self._pending_to_reset = [None] # type: list[bool | None]
self.lock = threading.Lock() # type: threading.Lock

@classmethod
def from_id(cls, vid):
# type: (int) -> Terminal | None
if vid not in cls._terminals:
return None
return cls._terminals[vid]

@classmethod
def from_tag(cls, tag, current_window_only=True):
# type: (str, bool) -> Terminal | None
# restrict to only current window
for terminal in cls._terminals.values():
if terminal.tag == tag:
Expand All @@ -66,7 +69,8 @@ def from_tag(cls, tag, current_window_only=True):

@classmethod
def cull_terminals(cls):
terminals_to_kill = []
# type: () -> None
terminals_to_kill = [] # type: list[Terminal]
for terminal in cls._terminals.values():
if not terminal.is_hosted():
terminals_to_kill.append(terminal)
Expand All @@ -76,14 +80,16 @@ def cull_terminals(cls):

@property
def window(self):
# type: () -> sublime.Window | None
if self.detached:
return None
if self.show_in_panel:
return get_panel_window(self.view)
else:
return self.view.window()
return self.view.window() # type: ignore

def attach_view(self, view, offset=None):
# type: (sublime.View, int | None) -> None
with self.lock:
self.view = view
self.detached = False
Expand All @@ -95,21 +101,24 @@ def attach_view(self, view, offset=None):
self.set_offset(offset)

def detach_view(self):
# type: () -> None
with self.lock:
self.detached = True
Terminal._detached_terminals.append(self)
if self.view.id() in Terminal._terminals:
del Terminal._terminals[self.view.id()]
if self.view.id() in Terminal._terminals: # type: ignore
del Terminal._terminals[self.view.id()] # type: ignore
self.view = None

@responsive(period=1, default=True)
def is_hosted(self):
# type: () -> bool
if self.detached:
# irrelevant if terminal is detached
return True
return self.window is not None

def _need_to_render(self):
# type: () -> bool
flag = False
if self.screen.dirty:
flag = True
Expand All @@ -126,11 +135,13 @@ def _need_to_render(self):
return flag

def _start_rendering(self):
# type: () -> None
data = [""]
done = [False]

@responsive(period=1, default=False)
def was_resized():
# type: () -> bool
size = view_size(self.view, force=self._size)
return self.screen.lines != size[0] or self.screen.columns != size[1]

Expand All @@ -156,7 +167,7 @@ def renderer():

def feed_data():
if len(data[0]) > 0:
logger.debug("receieved: {}".format(data[0]))
logger.debug("received: {}".format(data[0]))
self.stream.feed(data[0])
data[0] = ""

Expand All @@ -166,10 +177,10 @@ def feed_data():
if not self.detached:
if was_resized():
self.handle_resize()
self.view.run_command("terminus_show_cursor")
self.view.run_command("terminus_show_cursor") # type: ignore

if self._need_to_render():
self.view.run_command("terminus_render")
self.view.run_command("terminus_render") # type: ignore
self.screen.dirty.clear()

if done[0] or not self.is_hosted():
Expand All @@ -188,6 +199,7 @@ def _cleanup():
threading.Thread(target=renderer).start()

def set_offset(self, offset=None):
# type: (int | None) -> None
if offset is not None:
self.offset = offset
else:
Expand All @@ -202,7 +214,8 @@ def start(
self, cmd, cwd=None, env=None, default_title=None, title=None,
show_in_panel=None, panel_name=None, tag=None, auto_close=True, cancellable=False,
timeit=False):

# type: (str | bytes | list[str] | list[bytes], str | None, dict[str, str] | None, str | None, str | None, str | None, str | None, str | None, bool, bool, bool) -> None
# type: ignore (`cmd` is impossible to type correctly without imports)
view = self.view
if view:
self.detached = False
Expand All @@ -228,7 +241,7 @@ def start(
size = view_size(view or sublime.active_window().active_view(), default=(40, 80), force=self._size)
logger.debug("view size: {}".format(str(size)))
_env = os.environ.copy()
_env.update(env)
_env.update(env) # type: ignore (should pass dict[str, str] here, not None)
self.process = TerminalPtyProcess.spawn(cmd, cwd=cwd, env=_env, dimensions=size)
self.screen = TerminalScreen(
size[1], size[0], process=self.process, history=10000,
Expand All @@ -240,14 +253,16 @@ def start(
self._start_rendering()

def kill(self):
# type: () -> None
logger.debug("kill")

self.process.terminate()
vid = self.view.id()
vid = self.view.id() # type: ignore
if vid in self._terminals:
del self._terminals[vid]

def handle_resize(self):
# type: () -> None
size = view_size(self.view, force=self._size)
logger.debug("handle resize {} {} -> {} {}".format(
self.screen.lines, self.screen.columns, size[0], size[1]))
Expand All @@ -259,20 +274,24 @@ def handle_resize(self):
pass

def clear_callback(self):
# type: () -> None
self._pending_to_clear_scrollback[0] = True

def reset_callback(self):
# type: () -> None
if self._pending_to_reset[0] is None:
self._pending_to_reset[0] = False
else:
self._pending_to_reset[0] = True

def send_key(self, *args, **kwargs):
# type: (Any, Any) -> None # type: ignore (`from typing import Any`)
kwargs["application_mode"] = self.application_mode_enabled()
kwargs["new_line_mode"] = self.new_line_mode_enabled()
self.send_string(get_key_code(*args, **kwargs), normalized=False)

def send_string(self, string, normalized=True):
# type: (str, bool) -> None
if normalized:
# normalize CR and CRLF to CR (or CRLF if LNM)
string = string.replace("\r\n", "\n")
Expand All @@ -293,6 +312,7 @@ def send_string(self, string, normalized=True):
threading.Thread(target=self.process_send_string).start()

def process_send_string(self):
# type: () -> None
while True:
try:
string = self._strings.get(False)
Expand All @@ -305,30 +325,35 @@ def process_send_string(self):
time.sleep(0.1)

def bracketed_paste_mode_enabled(self):
# type: () -> bool
return (2004 << 5) in self.screen.mode

def new_line_mode_enabled(self):
# type: () -> bool
return (20 << 5) in self.screen.mode

def application_mode_enabled(self):
# type: () -> bool
return (1 << 5) in self.screen.mode

def find_image(self, pt):
# type: (sublime.Point) -> int | None
view = self.view
for pid in self.images:
region = view.query_phantom(pid)[0]
region = view.query_phantom(pid)[0] # type: ignore
if region.end() == pt:
return pid
return None

def show_image(self, data, args, cr=None):
# type: (str, dict[str, str], str | None) -> None
view = self.view

if "inline" not in args or not args["inline"]:
return

cursor = self.screen.cursor
pt = view.text_point(self.offset + cursor.y, cursor.x)
pt = view.text_point(self.offset + cursor.y, cursor.x) # type: ignore

databytes = base64.decodebytes(data.encode())

Expand All @@ -348,17 +373,17 @@ def show_image(self, data, args, cr=None):
height,
args["width"] if "width" in args else None,
args["height"] if "height" in args else None,
view.em_width(),
view.viewport_extent()[0] - 3 * view.em_width(),
view.em_width(), # type: ignore
view.viewport_extent()[0] - 3 * view.em_width(), # type: ignore
args["preserveAspectRatio"] if "preserveAspectRatio" in args else 1
)

if self.find_image(pt):
self.view.run_command("terminus_insert", {"point": pt, "character": " "})
self.view.run_command("terminus_insert", {"point": pt, "character": " "}) # type: ignore
pt += 1

self.image_count += 1
p = view.add_phantom(
p = view.add_phantom( # type: ignore
"terminus_image#{}".format(self.image_count),
sublime.Region(pt, pt),
IMAGE.format(
Expand All @@ -375,11 +400,12 @@ def show_image(self, data, args, cr=None):
self.screen.index()

def clean_images(self):
# type: () -> None
view = self.view
for pid in list(self.images.keys()):
region = view.query_phantom(pid)[0]
region = view.query_phantom(pid)[0] # type: ignore
if region.empty() and region.begin() == 0:
view.erase_phantom_by_id(pid)
view.erase_phantom_by_id(pid) # type: ignore
if pid in self.images:
try:
os.remove(self.images[pid])
Expand All @@ -388,6 +414,7 @@ def clean_images(self):
del self.images[pid]

def __del__(self):
# type: () -> None
# make sure the process is terminated
self.process.terminate(force=True)

Expand Down
10 changes: 9 additions & 1 deletion terminus/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


def get_panel_window(view):
# type: (sublime.View) -> sublime.Window | None
for w in sublime.windows():
for panel in w.panels():
v = w.find_output_panel(panel.replace("output.", ""))
Expand All @@ -14,6 +15,7 @@ def get_panel_window(view):


def get_panel_name(view):
# type: (sublime.View) -> str | None
for w in sublime.windows():
for panel in w.panels():
v = w.find_output_panel(panel.replace("output.", ""))
Expand All @@ -23,6 +25,7 @@ def get_panel_name(view):


def panel_is_visible(view):
# type: (sublime.View) -> bool
window = get_panel_window(view)
if not window:
return False
Expand All @@ -34,6 +37,7 @@ def panel_is_visible(view):


def view_is_visible(view):
# type: (sublime.View) -> bool
window = view.window()
if not window:
return False
Expand All @@ -42,9 +46,10 @@ def view_is_visible(view):


def view_size(view, default=None, force=None):
# type: (sublime.View, tuple[int, int] | None, tuple[int | None, int | None] | None) -> tuple[int, int]
if force:
if all(force):
return force
return force # type: ignore (is tuple[int, int] here)
pixel_width, pixel_height = view.viewport_extent()
pixel_per_line = view.line_height()
pixel_per_char = view.em_width()
Expand Down Expand Up @@ -80,12 +85,14 @@ def view_size(view, default=None, force=None):
class TerminusInsertCommand(sublime_plugin.TextCommand):

def run(self, edit, point, character):
# type: (sublime.Edit, sublime.Point, str) -> None
self.view.insert(edit, point, character)


class TerminusTrimTrailingLinesCommand(sublime_plugin.TextCommand):

def run(self, edit):
# type: (sublime.Edit) -> None
view = self.view
lastrow = view.rowcol(view.size())[0]
if not self.is_empty(lastrow):
Expand All @@ -111,5 +118,6 @@ def is_empty(self, row):
class TerminusNukeCommand(sublime_plugin.TextCommand):

def run(self, edit):
# type: (sublime.Edit) -> None
view = self.view
view.replace(edit, sublime.Region(0, view.size()), "")