forked from benmoran56/esper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathesper.py
323 lines (248 loc) · 12.6 KB
/
esper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import time as _time
from functools import lru_cache as _lru_cache
from typing import List, Type, TypeVar, Dict, Any, Tuple, Iterable
C = TypeVar('C')
P = TypeVar('P')
class Processor:
"""Base class for all Processors to inherit from.
Processor instances must contain a `process` method. Other than that,
you are free to add any additional methods that are necessary. The process
method will be called by each call to `World.process`, so you will
generally want to iterate over entities with one (or more) calls to the
appropriate world methods there, such as
`for ent, (rend, vel) in self.world.get_components(Renderable, Velocity):`
"""
world = None # type: World
def process(self, *args, **kwargs):
raise NotImplementedError
class World:
def __init__(self, timed=False):
"""A World object keeps track of all Entities, Components, and Processors.
A World contains a database of all Entity/Component assignments. It also
handles calling the process method on any Processors assigned to it.
"""
self._processors = [] # type: List[Processor]
self._next_entity_id = 0
self._components = {}
self._entities = {} # type: Dict[int, Any]
self._dead_entities = set()
if timed:
self.process_times = {}
self._process = self._timed_process
def clear_cache(self) -> None:
self.get_component.cache_clear()
self.get_components.cache_clear()
def clear_database(self) -> None:
"""Remove all Entities and Components from the World."""
self._next_entity_id = 0
self._dead_entities.clear()
self._components.clear()
self._entities.clear()
self.clear_cache()
def add_processor(self, processor_instance: Processor, priority=0) -> None:
"""Add a Processor instance to the World.
:param processor_instance: An instance of a Processor,
subclassed from the Processor class
:param priority: A higher number is processed first.
"""
assert issubclass(processor_instance.__class__, Processor)
processor_instance.priority = priority
processor_instance.world = self
self._processors.append(processor_instance)
self._processors.sort(key=lambda proc: proc.priority, reverse=True)
def remove_processor(self, processor_type: Processor) -> None:
"""Remove a Processor from the World, by type.
:param processor_type: The class type of the Processor to remove.
"""
for processor in self._processors:
if type(processor) == processor_type:
processor.world = None
self._processors.remove(processor)
def get_processor(self, processor_type: Type[P]) -> P:
"""Get a Processor instance, by type.
This method returns a Processor instance by type. This could be
useful in certain situations, such as wanting to call a method on a
Processor, from within another Processor.
:param processor_type: The type of the Processor you wish to retrieve.
:return: A Processor instance that has previously been added to the World.
"""
for processor in self._processors:
if type(processor) == processor_type:
return processor
def create_entity(self, *components) -> int:
"""Create a new Entity.
This method returns an Entity ID, which is just a plain integer.
You can optionally pass one or more Component instances to be
assigned to the Entity.
:param components: Optional components to be assigned to the
entity on creation.
:return: The next Entity ID in sequence.
"""
self._next_entity_id += 1
# TODO: duplicate add_component code here for performance
for component in components:
self.add_component(self._next_entity_id, component)
# self.clear_cache()
return self._next_entity_id
def delete_entity(self, entity: int, immediate=False) -> None:
"""Delete an Entity from the World.
Delete an Entity and all of it's assigned Component instances from
the world. By default, Entity deletion is delayed until the next call
to *World.process*. You can request immediate deletion, however, by
passing the "immediate=True" parameter. This should generally not be
done during Entity iteration (calls to World.get_component/s).
Raises a KeyError if the given entity does not exist in the database.
:param entity: The Entity ID you wish to delete.
:param immediate: If True, delete the Entity immediately.
"""
if immediate:
for component_type in self._entities[entity]:
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity]
self.clear_cache()
else:
self._dead_entities.add(entity)
def component_for_entity(self, entity: int, component_type: Type[C]) -> C:
"""Retrieve a Component instance for a specific Entity.
Retrieve a Component instance for a specific Entity. In some cases,
it may be necessary to access a specific Component instance.
For example: directly modifying a Component to handle user input.
Raises a KeyError if the given Entity and Component do not exist.
:param entity: The Entity ID to retrieve the Component for.
:param component_type: The Component instance you wish to retrieve.
:return: The Component instance requested for the given Entity ID.
"""
return self._entities[entity][component_type]
def components_for_entity(self, entity: int) -> Tuple[C, ...]:
"""Retrieve all Components for a specific Entity, as a Tuple.
Retrieve all Components for a specific Entity. The method is probably
not appropriate to use in your Processors, but might be useful for
saving state, or passing specific Components between World instances.
Unlike most other methods, this returns all of the Components as a
Tuple in one batch, instead of returning a Generator for iteration.
Raises a KeyError if the given entity does not exist in the database.
:param entity: The Entity ID to retrieve the Components for.
:return: A tuple of all Component instances that have been
assigned to the passed Entity ID.
"""
return tuple(self._entities[entity].values())
def has_component(self, entity: int, component_type: Any) -> bool:
"""Check if a specific Entity has a Component of a certain type.
:param entity: The Entity you are querying.
:param component_type: The type of Component to check for.
:return: True if the Entity has a Component of this type,
otherwise False
"""
return component_type in self._entities[entity]
def add_component(self, entity: int, component_instance: Any) -> None:
"""Add a new Component instance to an Entity.
Add a Component instance to an Entiy. If a Component of the same type
is already assigned to the Entity, it will be replaced.
:param entity: The Entity to associate the Component with.
:param component_instance: A Component instance.
"""
component_type = type(component_instance)
if component_type not in self._components:
self._components[component_type] = set()
self._components[component_type].add(entity)
if entity not in self._entities:
self._entities[entity] = {}
self._entities[entity][component_type] = component_instance
self.clear_cache()
def remove_component(self, entity: int, component_type: Any) -> int:
"""Remove a Component instance from an Entity, by type.
A Component instance can be removed by providing it's type.
For example: world.delete_component(enemy_a, Velocity) will remove
the Velocity instance from the Entity enemy_a.
Raises a KeyError if either the given entity or Component type does
not exist in the database.
:param entity: The Entity to remove the Component from.
:param component_type: The type of the Component to remove.
"""
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity][component_type]
if not self._entities[entity]:
del self._entities[entity]
self.clear_cache()
return entity
def _get_component(self, component_type: Type[C]) -> Iterable[Tuple[int, C]]:
"""Get an iterator for Entity, Component pairs.
:param component_type: The Component type to retrieve.
:return: An iterator for (Entity, Component) tuples.
"""
entity_db = self._entities
for entity in self._components.get(component_type, []):
yield entity, entity_db[entity][component_type]
def _get_components(self, *component_types: Type)-> Iterable[Tuple[int, ...]]:
"""Get an iterator for Entity and multiple Component sets.
:param component_types: Two or more Component types.
:return: An iterator for Entity, (Component1, Component2, etc)
tuples.
"""
entity_db = self._entities
comp_db = self._components
try:
for entity in set.intersection(*[comp_db[ct] for ct in component_types]):
yield entity, [entity_db[entity][ct] for ct in component_types]
except KeyError:
pass
@_lru_cache()
def get_component(self, component_type: Type[C]) -> List[Tuple[int, C]]:
return [query for query in self._get_component(component_type)]
@_lru_cache()
def get_components(self, *component_types: Type):
return [query for query in self._get_components(*component_types)]
def try_component(self, entity: int, component_type: Type):
"""Try to get a single component type for an Entity.
This method will return the requested Component if it exists, but
will pass silently if it does not. This allows a way to access optional
Components that may or may not exist.
:param entity: The Entity ID to retrieve the Component for.
:param component_type: The Component instance you wish to retrieve.
:return: A iterator containg the single Component instance requested,
which is empty if the component doesn't exist.
"""
if component_type in self._entities[entity]:
yield self._entities[entity][component_type]
else:
return None
def _clear_dead_entities(self):
"""Finalize deletion of any Entities that are marked dead.
In the interest of performance, this method duplicates code from the
`delete_entity` method. If that method is changed, those changes should
be duplicated here as well.
"""
for entity in self._dead_entities:
for component_type in self._entities[entity]:
self._components[component_type].discard(entity)
if not self._components[component_type]:
del self._components[component_type]
del self._entities[entity]
self._dead_entities.clear()
self.clear_cache()
def _process(self, *args, **kwargs):
for processor in self._processors:
processor.process(*args, **kwargs)
def _timed_process(self, *args, **kwargs):
"""Track Processor execution time for benchmarking."""
for processor in self._processors:
start_time = _time.process_time()
processor.process(*args, **kwargs)
process_time = int(round((_time.process_time() - start_time) * 1000, 2))
self.process_times[processor.__class__.__name__] = process_time
def process(self, *args, **kwargs):
"""Call the process method on all Processors, in order of their priority.
Call the *process* method on all assigned Processors, respecting their
optional priority setting. In addition, any Entities that were marked
for deletion since the last call to *World.process*, will be deleted
at the start of this method call.
:param args: Optional arguments that will be passed through to the
*process* method of all Processors.
"""
self._clear_dead_entities()
self._process(*args, **kwargs)
CachedWorld = World