Skip to content

Commit

Permalink
dev3 initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
denis.plotnikov committed Mar 19, 2024
1 parent 91b9e90 commit 086d1bc
Show file tree
Hide file tree
Showing 194 changed files with 4,612 additions and 1,697 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2502,3 +2502,4 @@ env/lib/python3.7/site-packages/zipp-3.15.0.dist-info/RECORD
env/lib/python3.7/site-packages/zipp-3.15.0.dist-info/top_level.txt
env/lib/python3.7/site-packages/zipp-3.15.0.dist-info/WHEEL
.DS_Store
**/__pycache__/
2 changes: 1 addition & 1 deletion package_info.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"package_name": "recon-lw",
"package_version": "2.0.0"
"package_version": "3.0.0"
}

1 change: 0 additions & 1 deletion recon_lw/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
#INIT
1 change: 0 additions & 1 deletion recon_lw/EventsSaver.py → recon_lw/core/EventsSaver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from abc import ABC, abstractmethod
from typing import Any

from recon_lw import recon_lw
from datetime import datetime, timedelta
from th2_data_services.data import Data
from pathlib import Path
Expand Down
4 changes: 1 addition & 3 deletions recon_lw/SequenceCache.py → recon_lw/core/SequenceCache.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from sortedcontainers import SortedKeyList

from recon_lw.ts_converters import epoch_nano_str_to_ts, ts_to_epoch_nano_str, time_stamp_key

from recon_lw import recon_lw
from recon_lw.core.ts_converters import time_stamp_key


class SequenceCache:
Expand Down
22 changes: 10 additions & 12 deletions recon_lw/StateStream.py → recon_lw/core/StateStream.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
from __future__ import annotations
from typing import Callable, Any, Tuple, Iterator, Iterable

from recon_lw.core.ts_converters import epoch_nano_str_to_ts, time_stamp_key
from recon_lw.core.EventsSaver import EventsSaver, IEventsSaver
from datetime import datetime

from recon_lw.core.utility import open_streams
from recon_lw.matching.LastStateMatcher import LastStateMatcher
from recon_lw.core._types import Th2Timestamp
from collections import defaultdict
from typing import Callable, Any, Tuple, Iterator, Iterable, Dict

from recon_lw.ts_converters import epoch_nano_str_to_ts, ts_to_epoch_nano_str, time_stamp_key
from recon_lw import recon_lw
from th2_data_services.utils import time as time_utils
from recon_lw.SequenceCache import SequenceCache
from recon_lw.EventsSaver import EventsSaver, IEventsSaver
from datetime import datetime
from recon_lw.LastStateMatcher import LastStateMatcher
from recon_lw._types import Th2Timestamp


class StateStream:
def __init__(self,
Expand Down Expand Up @@ -52,7 +51,6 @@ def state_updates(self, stream: Iterable, snapshots_collection):
for key, ts, action, state in updates:
if key is not None:
yield (key, ts, action, state)


def snapshots(self, stream: Iterable) -> Iterator[dict[str, Any]]:
"""It is expected Sorted stream!
Expand Down Expand Up @@ -285,7 +283,7 @@ def get_mnc_oe_state_ts(o):
def create_oe_snapshots_streams(oe_streams, result_events_path, buffer_len=100):
events_saver = EventsSaver(result_events_path)
filtered_streams = [stream.filter(order_updates_filter) for stream in oe_streams]
strm_list = recon_lw.open_streams(None, None, False, filtered_streams)
strm_list = open_streams(None, None, False, filtered_streams)
m_stream = strm_list.sync_streams(order_updates_ts)
state_stream = StateStream(get_next_update_oe,
get_snapshot_id_oe,
Expand All @@ -303,7 +301,7 @@ def create_oe_snapshots_streams(oe_streams, result_events_path, buffer_len=100):

stream1 = None # Please create your MNC stream
stream2 = state_stream.snapshots(m_stream)
streams = recon_lw.open_streams(None, data_objects=[stream1, stream2])
streams = open_streams(None, data_objects=[stream1, stream2])

message_buffer = [None] * buffer_len

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
35 changes: 35 additions & 0 deletions recon_lw/core/cache/processor/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Protocol

from recon_lw.core.type.types import Message


@dataclass
class CacheStore:
cache: dict

class ICacheProcessor(Protocol):
@abstractmethod
def __call__(self, msg: Message, cache: CacheStore):
pass

class CacheManager:
def __init__(self,
unfiltered_message_process: Optional[ICacheProcessor]=None,
filtered_message_processor: Optional[ICacheProcessor]=None,
cache_store: Optional[CacheStore] = CacheStore({})
):
self.cache = cache_store
self.unfiltered_message_process= unfiltered_message_process
self.filtered_message_processor = filtered_message_processor

def process_unfiltered_message(self, msg: Message):
if self.unfiltered_message_process:
self.unfiltered_message_process(msg, self.cache)

def process_filtered_message(self, msg: Message):
if self.filtered_message_processor:
self.filtered_message_processor(msg, self.cache)


12 changes: 12 additions & 0 deletions recon_lw/core/cache/processor/chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import List

from recon_lw.core.cache.processor.base import ICacheProcessor, CacheStore
from recon_lw.core.type.types import Message


class ChainCacheProcessor(ICacheProcessor):
def __init__(self, processors: List[ICacheProcessor]):
self.processors = processors
def __call__(self, msg: Message, cache: CacheStore):
for processor in self.processors:
processor(msg, cache)
File renamed without changes.
1 change: 1 addition & 0 deletions recon_lw/core/rule/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from recon_lw.core.rule.base import AbstractRule
55 changes: 55 additions & 0 deletions recon_lw/core/rule/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

from recon_lw.core.EventsSaver import EventsSaver
from recon_lw.matching.init_function import AbstractMatcherContext


class RuleContext:
def __init__(self,
rule_root_event: dict,
event_saver: EventsSaver,
event_sequence: Dict[str, Any]
):
self.rule_root_event = rule_root_event
self.event_saver = event_saver
self.event_sequence = event_sequence

@staticmethod
def from_dict(rule_context: dict):
return RuleContext(
rule_context['events_saver'],
rule_context['event_sequence'],
rule_context['event']
)

class AbstractRule(ABC):

def __init__(self):
self.horizon_delay = None
self.collect_func = None
self.flush_func = None

self.rule_context: Optional[RuleContext] = None
self.matcher_context: Optional[AbstractMatcherContext] = None

self.first_key_func = None
self.second_key_func = None

def set_rule_context(self, context: RuleContext):
self.rule_context = context

def get_root_event(self) -> Dict[str, Any]:
return self.rule_context.rule_root_event

def get_event_saver(self) -> EventsSaver:
return self.rule_context.event_saver

def get_event_sequence(self) -> Dict[str, Any]:
return self.rule_context.event_sequence

@abstractmethod
def to_dict(self) -> Dict[str, Any]:
pass

113 changes: 113 additions & 0 deletions recon_lw/core/rule/one_many.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import Optional, Callable, Dict, Any

from recon_lw.core.cache.processor.base import CacheManager
from recon_lw.core.rule.base import AbstractRule
from recon_lw.matching.LastStateMatcher import LastStateMatcher
from recon_lw.matching.collect_matcher.base import CollectMatcher
from recon_lw.matching.flush_function import DefaultFlushFunction, FlushFunction
from recon_lw.matching.init_function import MatcherContextProvider, SimpleMatcherContext, DefaultMatcherContextProvider
from recon_lw.matching.key_functions import KeyFunction
from recon_lw.matching.stream_matcher import ReconMatcher


class OneManyRuleConfig(AbstractRule):
def __init__(self):
super().__init__()
self.first_key_func: Optional[KeyFunction] = None
self.second_key_func: Optional[KeyFunction] = None
self.cache_manager: Optional[CacheManager] = None
self._as_dict: dict = None

def to_dict(self) -> Dict[str, Any]:
return self._as_dict

@staticmethod
def from_dict(name: str, config: dict) -> 'OneManyRuleConfig':

rule = OneManyRuleConfig()
rule._as_dict = config

rule.name = name
rule.horizon_delay = config['horizon_delay']
last_state_matcher = config.get('live_orders_cache')

rule.collect_func = config.get('collect_func')

if rule.collect_func is None:
from recon_lw.matching.collect_matcher import DefaultCollectMatcher
rule.collect_func = DefaultCollectMatcher(config['rule_match_func'], last_state_matcher)

init_func = config.get('init_func', DefaultMatcherContextProvider())
if isinstance(init_func, MatcherContextProvider):
rule.matcher_context = init_func.get_context()

elif isinstance(init_func, Callable):
init_func(config)
rule.context = SimpleMatcherContext(
match_index=config['match_index'],
time_index=config['time_index'],
message_cache=config['message_cache']
)

rule.flush_func = config.get('flush_func',
DefaultFlushFunction(config['interpret_func'],
last_state_matcher))

return rule

@staticmethod
def from_params(
name: str,
horizon_delay: int,
context_provider: MatcherContextProvider=None,
collect_func: CollectMatcher=None,
flush_func: FlushFunction=None,
first_key_func: Callable = None,
second_key_func: Callable = None,
last_state_matcher: Optional[LastStateMatcher]=None,
cache_manager: CacheManager=None
):
rule = OneManyRuleConfig()
rule.name = name
rule.horizon_delay = horizon_delay
rule.matcher_context = context_provider.get_context()
rule.last_state_matcher = last_state_matcher
rule.collect_func = collect_func
rule.first_key_func = first_key_func
rule.second_key_func = second_key_func
rule.flush_func = flush_func
rule.cache_manager = cache_manager

return rule

@staticmethod
def from_defaults(
name: str,
horizon_delay: int,
match_function: ReconMatcher,
intepretation_function: Callable,
first_key_func: Callable = None,
second_key_func: Callable = None,
):

from recon_lw.matching.init_function import DefaultMatcherContextProvider
context_provider = DefaultMatcherContextProvider()

from recon_lw.matching.collect_matcher import DefaultCollectMatcher
collect_func = DefaultCollectMatcher(match_function)

from recon_lw.matching.flush_function import DefaultFlushFunction
flush_func = DefaultFlushFunction(intepretation_function)

rule = OneManyRuleConfig()
rule.name = name
rule.horizon_delay = horizon_delay
rule.matcher_context = context_provider.get_context()
rule.last_state_matcher = None
rule.collect_func = collect_func
rule.first_key_func = first_key_func
rule.second_key_func = second_key_func
rule.flush_func = flush_func
rule.cache_manager = None

return rule
54 changes: 54 additions & 0 deletions recon_lw/core/rule/pair_one.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Optional, Callable, Dict, Any

from recon_lw.core.rule.base import AbstractRule
from recon_lw.matching.LastStateMatcher import LastStateMatcher
from recon_lw.matching.flush_function import DefaultFlushFunction
from recon_lw.matching.init_function import SimpleMatcherContext, MatcherContextProvider, DefaultMatcherContextProvider


class PairOneRule(AbstractRule):
def __init__(self):
super().__init__()
self.live_orders_cache: Optional[LastStateMatcher] = None
self.context: Optional[SimpleMatcherContext] = None
self.pair_key_func = None
self.one_key_func = None
self._dict_config = None

def to_dict(self) -> Dict[str, Any]:
return self._dict_config

@staticmethod
def from_dict(name: str, config: dict) -> 'PairOneRule':
rule = PairOneRule()

rule.name = name
rule.horizon_delay = config['horizon_delay']
last_state_matcher = config.get('live_orders_cache')

rule.collect_func = config.get('collect_func')

if rule.collect_func is None:
from recon_lw.matching.collect_matcher import DefaultCollectMatcher
rule.collect_func = DefaultCollectMatcher(config['rule_match_func'], last_state_matcher)

init_func = config.get('init_func', DefaultMatcherContextProvider())
if isinstance(init_func, MatcherContextProvider):
rule.context = init_func.get_context(rule)

elif isinstance(init_func, Callable):
init_func(config)
rule.context = SimpleMatcherContext(
match_index=config['match_index'],
time_index=config['time_index'],
message_cache=config['message_cache']
)

rule.flush_func = config.get('flush_func',
DefaultFlushFunction(rule.context, config['interpret_func'], last_state_matcher))
rule._dict_config = config

rule.pair_key_func = config.get('pair_key_func')
rule.one_key_func = config.get('one_key_func')

return rule
Loading

0 comments on commit 086d1bc

Please sign in to comment.