Skip to content

Commit

Permalink
read snapshots framework
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzverev committed Aug 14, 2023
1 parent 8df9bed commit c2185a0
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
1 change: 1 addition & 0 deletions recon_lw/recon_lw.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def execute_standalone(message_pickle_path, sessions_list, result_events_path, r
events_saver.save_events([root_event])
for rule_key, rule_settings in rules_settings_dict.items():
rule_settings["rule_root_event"] = events_saver.create_event(rule_key, "LwReconRule", parentId=root_event["eventId"])
rule_settings["events_saver"] = events_saver
if "init_func" not in rule_settings:
rule_settings["init_func"] = init_matcher
if "collect_func" not in rule_settings:
Expand Down
94 changes: 91 additions & 3 deletions recon_lw/recon_ob.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,69 @@ def process_operations_batch(operations_batch, book_id ,book, check_book_rule,
obs[i]["aggr_seq"]["limit_v2"] = -1



def read_snapshots_stream_until_stop(expanded_snapshots_stream_iter, snapshot_stop_func, stop_status):
for m in expanded_snapshots_stream_iter:
is_it_stop, seq_id, session_id, stop_msg_id = snapshot_stop_func(m)
if is_it_stop:
stop_status["snapshot_done"] = True
stop_status["sequence_id"] = seq_id
stop_status["session_id"] = session_id
stop_status["stop_msg_id"] = stop_msg_id

yield m
return
yield m
stop_status["snapshot_done"] = False
stop_status["sequence_id"] = None


def read_snapshot(expanded_snapshots_stream_iter, snapshot_stop_func, get_book_id_func, update_book_rule,
check_book_rule, initial_book_params, parent_event, events_saver : EventsSaver, saveEvents=True):

status = {}
books = {}
terminated_stream = read_snapshots_stream_until_stop(expanded_snapshots_stream_iter,
snapshot_stop_func,status)
process_market_data_update(terminated_stream, books, get_book_id_func,
update_book_rule, check_book_rule, initial_book_params,
parent_event, events_saver, None, None, False)
if status["snapshot_done"]:
if saveEvents:
for book_id, book in books.items():
log_event = events_saver.create_event("OrderBookSnap:" + status["session_id"],
"OrderBookSnap",
ok=True,
body={"book_id": book_id, "book": book},
parentId=parent_event["eventId"])
if status["stop_msg_id"] is not None:
log_event["attachedMessageIds"] = [status["source_msg_id"]]
log_event["scope"] = status["session_id"]
events_saver.save_events([log_event])

return books, status["sequence_id"]
else:
return None, None


def read_all_snapshots(snapshots_stream, snapshot_stop_func,
get_book_id_func, update_book_rule,
check_book_rule, initial_book_params,
result_events_path):
box_ts = datetime.now()
events_saver = EventsSaver(result_events_path)
root_event = events_saver.create_event("recon_ob_snapshots " + box_ts.isoformat(), "Microservice")
events_saver.save_events([root_event])

expanded_stream = (mm for m in snapshots_stream for mm in message_utils.expand_message(m))
expanded_stream_iter = iter(expanded_stream)
books = {}
while books is not None:
books, seq_id = read_snapshot(expanded_stream_iter,snapshot_stop_func,get_book_id_func,
update_book_rule, check_book_rule, initial_book_params,
root_event, EventsSaver,True)


def process_market_data_update(mess_batch, books_cache, get_book_id_func ,update_book_rule,
check_book_rule, initial_book_params, parent_event, events_saver : EventsSaver ,log_books_filter,
log_books_collection, aggregate_batch_updates):
Expand Down Expand Up @@ -191,14 +254,18 @@ def process_ob_rules(sequenced_batch: SortedKeyList, books_cache: dict, get_book
update_book_rule,
check_book_rule, events_saver : EventsSaver, parent_event: dict,
initial_book_params: dict,
log_books_filter, aggregate_batch_updates) -> int:
log_books_filter, aggregate_batch_updates, start_seq = -1) -> int:
events = []
n_processed = 0
messages_chunk = []
log_books_collection = []
for m in sequenced_batch:
seq = m[0]
mess = m[1]
if start_seq != -1 and seq < start_seq:
n_processed += 1
continue

# process gaps TODO better way to add sessionId to gap event
if "gap" in mess:
gap_event = events_saver.create_event("SeqGap:" + parent_event["eventName"], "SeqGap", ok=False,
Expand Down Expand Up @@ -228,12 +295,32 @@ def process_ob_rules(sequenced_batch: SortedKeyList, books_cache: dict, get_book
return n_processed


#def read_snapshot(expanded_snapshots_stream_iter, snapshot_stop_func, get_book_id_func, update_book_rule,
# check_book_rule, initial_book_params, parent_event, events_saver : EventsSaver, saveEvents=True):

def init_ob_stream(rule_settings: dict) -> None:
rule_settings["sequence_cache"] = SequenceCache(rule_settings["horizon_delay"])
#{"sequence": SortedKeyList(key=lambda item: item[0]),
# "times": SortedKeyList(key=lambda t: recon_lw.time_stamp_key(t[0])),
# "duplicates": SortedKeyList(key=lambda item: item[0])}
rule_settings["books_cache"] = {}
if "initial_snapshot_stream" in rule_settings:
snapshots_stream = rule_settings["initial_snapshot_stream"]
expanded_stream = (mm for m in snapshots_stream for mm in message_utils.expand_message(m))
expanded_stream_iter = iter(expanded_stream)
books, start_seq_id = read_snapshot(expanded_stream_iter,
rule_settings["snapshot_stop_func"],
rule_settings["get_book_id"],
rule_settings["update_book_rule"],
rule_settings["check_book_rule"],
rule_settings["rule_root_event"],
rule_settings["initial_book_params"],
rule_settings["events_saver"])
if books is not None:
rule_settings["books_cache"] = {}
rule_settings["start_seq_id"] = start_seq_id
else:
rule_settings["books_cache"] = {}
rule_settings["start_seq_id"] = -1


def collect_ob_stream(next_batch: list, rule_dict: dict) -> None:
Expand Down Expand Up @@ -266,7 +353,8 @@ def flush_ob_stream(ts: dict, rule_settings: dict, events_saver : EventsSaver) -
events_saver,
rule_settings["initial_book_params"],
rule_settings["log_books_filter"] if "log_books_filter" in rule_settings else None,
rule_settings["aggregate_batch_updates"] if "aggregate_batch_updates" in rule_settings else False)
rule_settings["aggregate_batch_updates"] if "aggregate_batch_updates" in rule_settings else False,
rule_settings["start_seq_id"])
## Process duplicated
#duplicates = rule_settings["sequence_cache"]["duplicates"]
duplicates = rule_settings["sequence_cache"].get_duplicates_collection()
Expand Down

0 comments on commit c2185a0

Please sign in to comment.