-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsmokesignal.py
260 lines (198 loc) · 8.15 KB
/
smokesignal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
"""
smokesignal.py - simple event signaling
"""
import types
from collections import defaultdict
from contextlib import contextmanager
from functools import partial
__all__ = ['emit', 'emitting', 'signals', 'responds_to', 'on', 'once',
'disconnect', 'disconnect_from', 'clear', 'clear_all']
# Collection of receivers/callbacks
receivers = defaultdict(set)
_call_partial = None
def install_twisted():
"""
If twisted is available, make `emit' return a DeferredList
This has been successfully tested with Twisted 14.0 and later.
"""
global emit, _call_partial
try:
from twisted.internet import defer
emit = _emit_twisted
_call_partial = defer.maybeDeferred
return True
except ImportError:
_call_partial = lambda fn, *a, **kw: fn(*a, **kw)
return False
def emit(signal, *args, **kwargs):
"""
Emits a single signal to call callbacks registered to respond to that signal.
Optionally accepts args and kwargs that are passed directly to callbacks.
:param signal: Signal to send
"""
for callback in set(receivers[signal]): # Make a copy in case of any ninja signals
_call(callback, args=args, kwargs=kwargs)
def _emit_twisted(signal, *args, **kwargs):
"""
Emits a single signal to call callbacks registered to respond to that signal.
Optionally accepts args and kwargs that are passed directly to callbacks.
:param signal: Signal to send
"""
errback = kwargs.pop('errback', lambda f: f)
dl = []
for callback in set(receivers[signal]): # Make a copy in case of any ninja signals
d = _call(callback, args=args, kwargs=kwargs)
if d is not None:
dl.append(d.addErrback(errback))
def simplify(results):
return [x[1] for x in results]
from twisted.internet.defer import DeferredList
return DeferredList(dl).addCallback(simplify)
@contextmanager
def emitting(exit, enter=None):
"""
Context manager for emitting signals either on enter or on exit of a context.
By default, if this context manager is created using a single arg-style argument,
it will emit a signal on exit. Otherwise, keyword arguments indicate signal points
"""
if enter is not None:
emit(enter)
try:
yield
finally:
emit(exit)
def _call(callback, args=[], kwargs={}):
"""
Calls a callback with optional args and keyword args lists. This method exists so
we can inspect the `_max_calls` attribute that's set by `_on`. If this value is None,
the callback is considered to have no limit. Otherwise, an integer value is expected
and decremented until there are no remaining calls
"""
if not hasattr(callback, '_max_calls'):
callback._max_calls = None
# None implies no callback limit
if callback._max_calls is None:
return _call_partial(callback, *args, **kwargs)
# Should the signal be disconnected?
if callback._max_calls <= 0:
return disconnect(callback)
callback._max_calls -= 1
return _call_partial(callback, *args, **kwargs)
def signals(callback):
"""
Returns a tuple of all signals for a particular callback
:param callback: A callable registered with smokesignal
:returns: Tuple of all signals callback responds to
"""
return tuple(s for s in receivers if responds_to(callback, s))
def responds_to(callback, signal):
"""
Returns bool if callback will respond to a particular signal
:param callback: A callable registered with smokesignal
:param signal: A signal to check if callback responds
:returns: True if callback responds to signal, False otherwise
"""
return callback in receivers[signal]
def on(signals, callback=None, max_calls=None):
"""
Registers a single callback for receiving an event (or event list). Optionally,
can specify a maximum number of times the callback should receive a signal. This
method works as both a function and a decorator::
smokesignal.on('foo', my_callback)
@smokesignal.on('foo')
def my_callback():
pass
:param signals: A single signal or list/tuple of signals that callback should respond to
:param callback: A callable that should repond to supplied signal(s)
:param max_calls: Integer maximum calls for callback. None for no limit.
"""
if isinstance(callback, int) or callback is None:
# Decorated
if isinstance(callback, int):
# Here the args were passed arg-style, not kwarg-style
callback, max_calls = max_calls, callback
return partial(_on, signals, max_calls=max_calls)
elif isinstance(callback, types.MethodType):
# callback is a bound instance method, so we need to wrap it in a function
def _callback(*args, **kwargs):
return callback(*args, **kwargs)
return _on(signals, _callback, max_calls=max_calls)
else:
# Function call
return _on(signals, callback, max_calls=max_calls)
def _on(on_signals, callback, max_calls=None):
"""
Proxy for `smokesignal.on`, which is compatible as both a function call and
a decorator. This method cannot be used as a decorator
:param signals: A single signal or list/tuple of signals that callback should respond to
:param callback: A callable that should repond to supplied signal(s)
:param max_calls: Integer maximum calls for callback. None for no limit.
"""
if not callable(callback):
raise AssertionError('Signal callbacks must be callable')
# Support for lists of signals
if not isinstance(on_signals, (list, tuple)):
on_signals = [on_signals]
callback._max_calls = max_calls
# Register the callback
for signal in on_signals:
receivers[signal].add(callback)
# Setup responds_to partial for use later
if not hasattr(callback, 'responds_to'):
callback.responds_to = partial(responds_to, callback)
# Setup signals partial for use later.
if not hasattr(callback, 'signals'):
callback.signals = partial(signals, callback)
# Setup disconnect partial for user later
if not hasattr(callback, 'disconnect'):
callback.disconnect = partial(disconnect, callback)
# Setup disconnect_from partial for user later
if not hasattr(callback, 'disconnect_from'):
callback.disconnect_from = partial(disconnect_from, callback)
return callback
def once(signals, callback=None):
"""
Registers a callback that will respond to an event at most one time
:param signals: A single signal or list/tuple of signals that callback should respond to
:param callback: A callable that should repond to supplied signal(s)
"""
return on(signals, callback, max_calls=1)
def disconnect(callback):
"""
Removes a callback from all signal registries and prevents it from responding
to any emitted signal.
:param callback: A callable registered with smokesignal
"""
# This is basically what `disconnect_from` does, but that method guards against
# callbacks not responding to signal arguments. We don't need that because we're
# disconnecting all the valid ones here
for signal in signals(callback):
receivers[signal].remove(callback)
def disconnect_from(callback, signals):
"""
Removes a callback from specified signal registries and prevents it from responding
to any emitted signal.
:param callback: A callable registered with smokesignal
:param signals: A single signal or list/tuple of signals
"""
# Support for lists of signals
if not isinstance(signals, (list, tuple)):
signals = [signals]
# Remove callback from receiver list if it responds to the signal
for signal in signals:
if responds_to(callback, signal):
receivers[signal].remove(callback)
def clear(*signals):
"""
Clears all callbacks for a particular signal or signals
"""
signals = signals if signals else receivers.keys()
for signal in signals:
receivers[signal].clear()
def clear_all():
"""
Clears all callbacks for all signals
"""
for key in receivers.keys():
receivers[key].clear()
_twisted_support = install_twisted()