From 5eb01b8c30531768601f73eb8e9d508f7ab20612 Mon Sep 17 00:00:00 2001 From: "alexander.zhilov" Date: Wed, 10 Jan 2024 18:04:58 +0400 Subject: [PATCH] Add passed events for cross recon --- recon_lw/recon_ob_cross_stream.py | 232 ++++++++++++++++++------------ 1 file changed, 139 insertions(+), 93 deletions(-) diff --git a/recon_lw/recon_ob_cross_stream.py b/recon_lw/recon_ob_cross_stream.py index 2af76d6..d1fb301 100644 --- a/recon_lw/recon_ob_cross_stream.py +++ b/recon_lw/recon_ob_cross_stream.py @@ -17,6 +17,7 @@ def synopsys(price_condition: bool, num_orders_condition: bool, size_condition: syn += "s" return syn + def compare_keys(keys_collection, book1, book2): problems = [] for k in keys_collection: @@ -46,10 +47,8 @@ def compare_full_vs_aggr(full_book: dict, aggr_book: dict) -> list: for i in range(aggr_book["aggr_max_levels"]): if i < len(full_levels) and i < len(aggr_levels): price_condition = full_levels[i] == aggr_levels[i]["price"] - num_orders_condition = len(full_book[side][full_levels[i]]) == \ - (aggr_levels[i]["real_num_orders"]) - size_condition = sum(full_book[side][full_levels[i]].values()) == \ - (aggr_levels[i]["real_qty"]) + num_orders_condition = len(full_book[side][full_levels[i]]) == (aggr_levels[i]["real_num_orders"]) + size_condition = sum(full_book[side][full_levels[i]].values()) == (aggr_levels[i]["real_qty"]) if not (price_condition and num_orders_condition and size_condition): problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition), "side": side, @@ -67,7 +66,7 @@ def compare_full_vs_aggr(full_book: dict, aggr_book: dict) -> list: "ind_open_price", "ind_open_size", "ind_open_mid_price"], full_book, aggr_book)) - + return problems @@ -134,8 +133,7 @@ def compare_full_vs_top(full_book: dict, top_book: dict): else: top_p = min(full_book["ask"].keys()) price_condition = top_p == top_book["ask_price"] - num_orders_condition = len(full_book["ask"][top_p]) == \ - (top_book["ask_real_n_orders"]) + num_orders_condition = len(full_book["ask"][top_p]) == (top_book["ask_real_n_orders"]) size_condition = sum(full_book["ask"][top_p].values()) == (top_book["ask_real_qty"]) if not (price_condition and num_orders_condition and size_condition): problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition), @@ -152,8 +150,7 @@ def compare_full_vs_top(full_book: dict, top_book: dict): else: top_p = max(full_book["bid"].keys()) price_condition = top_p == top_book["bid_price"] - num_orders_condition = len(full_book["bid"][top_p]) == \ - (top_book["bid_real_n_orders"]) + num_orders_condition = len(full_book["bid"][top_p]) == (top_book["bid_real_n_orders"]) size_condition = sum(full_book["bid"][top_p].values()) == (top_book["bid_real_qty"]) if not (price_condition and num_orders_condition and size_condition): problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition), @@ -224,112 +221,160 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings): def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events): if match[0] is not None and match[1] is not None: comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"]) + events_to_store = [] if len(comp_res) > 0: - error_event = create_event("23:mismatch", - "23:mismatch", - False, - {"full_book_event": match[0]["eventId"], - "aggr_book_event": match[1]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], - "errors": comp_res}) - save_events([error_event]) + events_to_store.append(create_event( + "23:mismatch", + "23:mismatch", + False, + {"full_book_event": match[0]["eventId"], + "aggr_book_event": match[1]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "errors": comp_res})) + else: + events_to_store.append(create_event( + "23:match", + "23:match", + True, + {"full_book_event": match[0]["eventId"], + "full_book_scope": match[0]["scope"], + "aggr_book_event": match[1]["eventId"], + "aggr_book_scope": match[1]["scope"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"]})) + save_events(events_to_store) elif match[0] is not None: tech_info = ob_compare_get_timestamp_key1_key2_aggr(match[0], custom_settings) - error_event = create_event("23:missing2", - "23:missing2", - False, - {"full_book_event": match[0]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], - "sessionId": match[0]["body"]["sessionId"], - "tech_info": tech_info}) + error_event = create_event( + "23:missing2", + "23:missing2", + False, + {"full_book_event": match[0]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"], + "sessionId": match[0]["body"]["sessionId"], + "tech_info": tech_info}) save_events([error_event]) elif match[1] is not None: e_type = "23:missing3 impl" if match[1]["body"]["implied_only"] else "23:missing3" - error_event = create_event(e_type, - e_type, - False, - {"aggr_book_event": match[1]["eventId"], - "book_id": match[1]["body"]["book_id"], - "time_of_event": match[1]["body"]["time_of_event"], - "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], - "sessionId": match[1]["body"]["sessionId"]}) + error_event = create_event( + e_type, + e_type, + False, + {"aggr_book_event": match[1]["eventId"], + "book_id": match[1]["body"]["book_id"], + "time_of_event": match[1]["body"]["time_of_event"], + "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], + "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) def ob_compare_interpret_match_top(match, custom_settings, create_event, save_events): if match[0] is not None and match[1] is not None: comp_res = compare_full_vs_top(match[0]["body"], match[1]["body"]) + events_to_store = [] if len(comp_res) > 0: - error_event = create_event("13:mismatch", - "13:mismatch", - False, - {"full_book_event": match[0]["eventId"], - "top_book_event": match[1]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], - "errors": comp_res}) - save_events([error_event]) + events_to_store.append(create_event( + "13:mismatch", + "13:mismatch", + False, + {"full_book_event": match[0]["eventId"], + "top_book_event": match[1]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "errors": comp_res})) + else: + events_to_store.append(create_event( + "13:match", + "13:match", + True, + {"full_book_event": match[0]["eventId"], + "full_book_scope": match[0]["scope"], + "top_book_event": match[1]["eventId"], + "top_book_scope": match[1]["scope"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"]})) + save_events(events_to_store) elif match[0] is not None: - error_event = create_event("13:missing1", - "13:missing1", - False, - {"full_book_event": match[0]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], - "sessionId": match[0]["body"]["sessionId"]}) + error_event = create_event( + "13:missing1", + "13:missing1", + False, + {"full_book_event": match[0]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "sessionId": match[0]["body"]["sessionId"]}) save_events([error_event]) elif match[1] is not None: e_type = "13:missing3 impl" if match[1]["body"]["implied_only"] else "13:missing3" - error_event = create_event(e_type, - e_type, - False, - {"top_book_event": match[1]["eventId"], - "book_id": match[1]["body"]["book_id"], - "time_of_event": match[1]["body"]["time_of_event"], - "top_v2": match[1]["body"]["aggr_seq"]["top_v2"], - "sessionId": match[1]["body"]["sessionId"]}) + error_event = create_event( + e_type, + e_type, + False, + {"top_book_event": match[1]["eventId"], + "book_id": match[1]["body"]["book_id"], + "time_of_event": match[1]["body"]["time_of_event"], + "top_v2": match[1]["body"]["aggr_seq"]["top_v2"], + "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) def ob_compare_interpret_match_top_aggr(match, custom_settings, create_event, save_events): if match[0] is not None and match[1] is not None: comp_res = compare_aggr_vs_top(match[1]["body"], match[0]["body"]) + events_to_store = [] if len(comp_res) > 0: - error_event = create_event("12:mismatch", - "12:mismatch", - False, - {"top_book_event": match[0]["eventId"], - "aggr_book_event": match[1]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], - "errors": comp_res}) - save_events([error_event]) + events_to_store.append(create_event( + "12:mismatch", + "12:mismatch", + False, + {"top_book_event": match[0]["eventId"], + "aggr_book_event": match[1]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "errors": comp_res})) + else: + events_to_store.append(create_event( + "12:match", + "12:match", + True, + {"top_book_event": match[0]["eventId"], + "top_book_scope": match[0]["scope"], + "aggr_book_event": match[1]["eventId"], + "aggr_book_scope": match[1]["scope"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"]})) + save_events(events_to_store) elif match[0] is not None: - error_event = create_event("12:missing2", - "12:missing2", - False, - {"top_book_event": match[0]["eventId"], - "book_id": match[0]["body"]["book_id"], - "time_of_event": match[0]["body"]["time_of_event"], - "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], - "sessionId": match[0]["body"]["sessionId"]}) + error_event = create_event( + "12:missing2", + "12:missing2", + False, + {"top_book_event": match[0]["eventId"], + "book_id": match[0]["body"]["book_id"], + "time_of_event": match[0]["body"]["time_of_event"], + "top_v2": match[0]["body"]["aggr_seq"]["top_v2"], + "sessionId": match[0]["body"]["sessionId"]}) save_events([error_event]) elif match[1] is not None: - error_event = create_event("12:missing1", - "12:missing1", - False, - {"aggr_book_event": match[1]["eventId"], - "book_id": match[1]["body"]["book_id"], - "time_of_event": match[1]["body"]["time_of_event"], - "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], - "sessionId": match[1]["body"]["sessionId"]}) + error_event = create_event( + "12:missing1", + "12:missing1", + False, + {"aggr_book_event": match[1]["eventId"], + "book_id": match[1]["body"]["book_id"], + "time_of_event": match[1]["body"]["time_of_event"], + "limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"], + "sessionId": match[1]["body"]["sessionId"]}) save_events([error_event]) @@ -342,7 +387,7 @@ def split_every(n, data): yield piece -# {"horizon_dely": 180, full_session: "aaa", aggr_session: "bbb", top_session: "ccc"} +# {"horizon_delay": 180, full_session: "aaa", aggr_session: "bbb", top_session: "ccc"} def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: pathlib.PosixPath, rules_dict: dict) -> None: """The entrypoint function for comparing order-books. @@ -403,7 +448,6 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path ) processors.append(processor_top_aggr) - # order_books_events = source_events.filter(lambda e: e["eventType"] == "OrderBook") # buffers = split_every(100, order_books_events) @@ -424,6 +468,7 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path events_saver.flush() +''' def _example_of_usage_run_recon(): events = None # This are the events generated by previous round lw_recon path = None # This should be empty folder for new events generated by this new round of recon @@ -443,13 +488,13 @@ def _example_of_usage_run_recon(): } } ob_compare_streams(events, path, rules_dict) - - +''' +''' def _example_of_usage_see_results(): previous_events = None # previous events initially generated by lw_recon new_events = None # get_events_from_dir("dir for second run") - # Use same method to categorize that fter first round of lw_recon + # Use same method to categorize that iter first round of lw_recon # additional analysis # in the ErrorEvent you can get book_id - instrument and version - number of update of the specific book @@ -469,3 +514,4 @@ def _example_of_usage_see_results(): print(e2) print(" ============= ") # We can use pandas to show it better +'''