-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlive.py
103 lines (83 loc) · 2.83 KB
/
live.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python
# File: live.py
# Description: live
# Created: 2017-07-16
import re
from contextlib import suppress
import asyncio
from gi.repository import Gtk
import cairo
import js
class JsWidget(Gtk.DrawingArea):
enabled = False
def __init__(self, layout, anim, scale=15.0):
Gtk.DrawingArea.__init__(self)
self.layout = layout
self.anim = anim
self.set_size_request(layout.width * scale, layout.height * scale)
self.connect('draw', self.__on_draw)
def enable(self):
self.enabled = True
def __on_draw(self, widget, cr):
cr.set_source_rgba(0, 0, 0, 0)
cr.set_operator(cairo.OPERATOR_SOURCE)
cr.paint()
cr.set_operator(cairo.OPERATOR_OVER)
if self.enabled:
anim = self.anim
width = widget.get_allocated_width()
height = widget.get_allocated_height()
layout = self.layout
scale_x = width / layout.width
scale_y = height / layout.height
scale = min(scale_x, scale_y)
anim.update()
cr.save()
try:
cr.scale(scale, scale)
anim.draw(cr)
finally:
cr.restore()
if anim.context.needs_update:
anim.context.needs_update = False
self.queue_draw()
class LiveWorker(object):
def __init__(self, device_path, evs):
self.device_path = device_path
self.evs = evs
async def do_work(self):
evs = self.evs
process = await asyncio.create_subprocess_exec(
"stdbuf", "-o0", "jstest", "--event", self.device_path,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE)
try:
event = None
num_init_events = None
evnum = 0
lines = process.stdout.__aiter__()
async for line in lines:
line = line.decode('utf-8')
match = re.search(r"has (\d+) axes and (\d+) buttons\.", line)
if match:
num_init_events = int(match.group(1)) + int(match.group(2))
event = evs.feed(line)
if event:
evnum += 1
if num_init_events and evnum >= num_init_events:
break
if event is not None and (event.type & js.TY_INIT_BIT) == 0:
break
self.on_init()
async for line in lines:
evs.feed(line.decode('utf-8'))
self.on_event()
evnum += 1
finally:
with suppress(ProcessLookupError):
process.terminate()
def on_init(self):
pass
def on_event(self):
pass
# vim:set sw=4 ts=8 sts=4 et sr ft=python fdm=marker tw=0: