diff --git a/ralf/v2/api.py b/ralf/v2/api.py index 5774339..6b0ee30 100644 --- a/ralf/v2/api.py +++ b/ralf/v2/api.py @@ -69,6 +69,14 @@ def on_event(self, record: Record) -> Union[None, Record, Iterable[Record]]: def __repr__(self): return self.__class__.__name__ + def get(self, key): + """Get current feature value for key. Returns null by default. + + :param key: key to lookup feature value + :type key: str + """ + return None + class FeatureFrame: """Encapsulate a feature transformation and its related policies configuration.""" diff --git a/ralf/v2/manager.py b/ralf/v2/manager.py index 80c4469..1dae5a9 100644 --- a/ralf/v2/manager.py +++ b/ralf/v2/manager.py @@ -73,6 +73,7 @@ def wait(self): refs.append(handle.wait_for_exit.remote()) while True: _, not_done = ray.wait(refs, num_returns=len(refs), timeout=0.5) + print("Waiting for", not_done) if len(not_done) == 0: break time.sleep(1) diff --git a/ralf/v2/operator.py b/ralf/v2/operator.py index 3c2e80e..076b97b 100644 --- a/ralf/v2/operator.py +++ b/ralf/v2/operator.py @@ -41,6 +41,10 @@ def __init__( def enqueue_events(self, records: List[Record]): pass + @abstractmethod + def get(self, key) -> Record: + pass + def dump_transform_state(self) -> List["BaseTransform"]: pass @@ -63,6 +67,7 @@ def __init__( self.transform_object = self.frame.transform_object self.children = children self.scheduler = frame.scheduler + self.scheduler._operator = self # set operator for scheduler self.context = context self.config = ralf_config @@ -165,6 +170,9 @@ def local_handle_events(self, records: List[Record]): for record in records: self.scheduler.push_event(record) + def get(self, key): + return self.transform_object.get(key) + def dump_transform_state(self) -> List["BaseTransform"]: return [self.transform_object] @@ -258,9 +266,14 @@ def enqueue_events(self, records: List[Record]): ] else: raise Exception(f"Can't enqueue_events for event type {record.type_}") + + # send records to actors for handle, replica_records in actor_map.items(): handle.local_handle_events.remote(replica_records) + def get(self, key): + return self.pool.choose_actor(key).get.remote(key) + def dump_transform_state(self) -> List["BaseTransform"]: return sum( ray.get( diff --git a/ralf/v2/scheduler.py b/ralf/v2/scheduler.py index b2aaf5c..b37b61e 100644 --- a/ralf/v2/scheduler.py +++ b/ralf/v2/scheduler.py @@ -30,6 +30,11 @@ class BaseScheduler(ABC): """ event_class: Type[WakerProtocol] = threading.Event + _operator = None + + def get(self, key): + assert self._operator is not None, f"Operator not set {self._operator}" + return self._operator.get(key) def wake_waiter_if_needed(self): if self.waker is not None: @@ -80,6 +85,7 @@ def push_event(self, record: Record): def pop_event(self) -> Record: if len(self.queue) == 0: return Record.make_wait_event(self.new_waker()) + return self.queue.pop(0)