Skip to content

Commit

Permalink
Add passed events for cross recon
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-zhilov committed Jan 10, 2024
1 parent f67df3f commit 5eb01b8
Showing 1 changed file with 139 additions and 93 deletions.
232 changes: 139 additions & 93 deletions recon_lw/recon_ob_cross_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def synopsys(price_condition: bool, num_orders_condition: bool, size_condition:
syn += "s"
return syn


def compare_keys(keys_collection, book1, book2):
problems = []
for k in keys_collection:
Expand Down Expand Up @@ -46,10 +47,8 @@ def compare_full_vs_aggr(full_book: dict, aggr_book: dict) -> list:
for i in range(aggr_book["aggr_max_levels"]):
if i < len(full_levels) and i < len(aggr_levels):
price_condition = full_levels[i] == aggr_levels[i]["price"]
num_orders_condition = len(full_book[side][full_levels[i]]) == \
(aggr_levels[i]["real_num_orders"])
size_condition = sum(full_book[side][full_levels[i]].values()) == \
(aggr_levels[i]["real_qty"])
num_orders_condition = len(full_book[side][full_levels[i]]) == (aggr_levels[i]["real_num_orders"])
size_condition = sum(full_book[side][full_levels[i]].values()) == (aggr_levels[i]["real_qty"])
if not (price_condition and num_orders_condition and size_condition):
problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition),
"side": side,
Expand All @@ -67,7 +66,7 @@ def compare_full_vs_aggr(full_book: dict, aggr_book: dict) -> list:
"ind_open_price",
"ind_open_size",
"ind_open_mid_price"], full_book, aggr_book))

return problems


Expand Down Expand Up @@ -134,8 +133,7 @@ def compare_full_vs_top(full_book: dict, top_book: dict):
else:
top_p = min(full_book["ask"].keys())
price_condition = top_p == top_book["ask_price"]
num_orders_condition = len(full_book["ask"][top_p]) == \
(top_book["ask_real_n_orders"])
num_orders_condition = len(full_book["ask"][top_p]) == (top_book["ask_real_n_orders"])
size_condition = sum(full_book["ask"][top_p].values()) == (top_book["ask_real_qty"])
if not (price_condition and num_orders_condition and size_condition):
problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition),
Expand All @@ -152,8 +150,7 @@ def compare_full_vs_top(full_book: dict, top_book: dict):
else:
top_p = max(full_book["bid"].keys())
price_condition = top_p == top_book["bid_price"]
num_orders_condition = len(full_book["bid"][top_p]) == \
(top_book["bid_real_n_orders"])
num_orders_condition = len(full_book["bid"][top_p]) == (top_book["bid_real_n_orders"])
size_condition = sum(full_book["bid"][top_p].values()) == (top_book["bid_real_qty"])
if not (price_condition and num_orders_condition and size_condition):
problems.append({"synopsys": synopsys(price_condition, num_orders_condition, size_condition),
Expand Down Expand Up @@ -224,112 +221,160 @@ def ob_compare_get_timestamp_key1_key2_top_aggr(o, custom_settings):
def ob_compare_interpret_match_aggr(match, custom_settings, create_event, save_events):
if match[0] is not None and match[1] is not None:
comp_res = compare_full_vs_aggr(match[0]["body"], match[1]["body"])
events_to_store = []
if len(comp_res) > 0:
error_event = create_event("23:mismatch",
"23:mismatch",
False,
{"full_book_event": match[0]["eventId"],
"aggr_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"errors": comp_res})
save_events([error_event])
events_to_store.append(create_event(
"23:mismatch",
"23:mismatch",
False,
{"full_book_event": match[0]["eventId"],
"aggr_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"errors": comp_res}))
else:
events_to_store.append(create_event(
"23:match",
"23:match",
True,
{"full_book_event": match[0]["eventId"],
"full_book_scope": match[0]["scope"],
"aggr_book_event": match[1]["eventId"],
"aggr_book_scope": match[1]["scope"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"]}))
save_events(events_to_store)
elif match[0] is not None:
tech_info = ob_compare_get_timestamp_key1_key2_aggr(match[0], custom_settings)
error_event = create_event("23:missing2",
"23:missing2",
False,
{"full_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[0]["body"]["sessionId"],
"tech_info": tech_info})
error_event = create_event(
"23:missing2",
"23:missing2",
False,
{"full_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"limit_v2": match[0]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[0]["body"]["sessionId"],
"tech_info": tech_info})
save_events([error_event])
elif match[1] is not None:
e_type = "23:missing3 impl" if match[1]["body"]["implied_only"] else "23:missing3"
error_event = create_event(e_type,
e_type,
False,
{"aggr_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[1]["body"]["sessionId"]})
error_event = create_event(
e_type,
e_type,
False,
{"aggr_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[1]["body"]["sessionId"]})
save_events([error_event])


def ob_compare_interpret_match_top(match, custom_settings, create_event, save_events):
if match[0] is not None and match[1] is not None:
comp_res = compare_full_vs_top(match[0]["body"], match[1]["body"])
events_to_store = []
if len(comp_res) > 0:
error_event = create_event("13:mismatch",
"13:mismatch",
False,
{"full_book_event": match[0]["eventId"],
"top_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"errors": comp_res})
save_events([error_event])
events_to_store.append(create_event(
"13:mismatch",
"13:mismatch",
False,
{"full_book_event": match[0]["eventId"],
"top_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"errors": comp_res}))
else:
events_to_store.append(create_event(
"13:match",
"13:match",
True,
{"full_book_event": match[0]["eventId"],
"full_book_scope": match[0]["scope"],
"top_book_event": match[1]["eventId"],
"top_book_scope": match[1]["scope"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"]}))
save_events(events_to_store)
elif match[0] is not None:
error_event = create_event("13:missing1",
"13:missing1",
False,
{"full_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[0]["body"]["sessionId"]})
error_event = create_event(
"13:missing1",
"13:missing1",
False,
{"full_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[0]["body"]["sessionId"]})
save_events([error_event])
elif match[1] is not None:
e_type = "13:missing3 impl" if match[1]["body"]["implied_only"] else "13:missing3"
error_event = create_event(e_type,
e_type,
False,
{"top_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"top_v2": match[1]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[1]["body"]["sessionId"]})
error_event = create_event(
e_type,
e_type,
False,
{"top_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"top_v2": match[1]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[1]["body"]["sessionId"]})
save_events([error_event])


def ob_compare_interpret_match_top_aggr(match, custom_settings, create_event, save_events):
if match[0] is not None and match[1] is not None:
comp_res = compare_aggr_vs_top(match[1]["body"], match[0]["body"])
events_to_store = []
if len(comp_res) > 0:
error_event = create_event("12:mismatch",
"12:mismatch",
False,
{"top_book_event": match[0]["eventId"],
"aggr_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"errors": comp_res})
save_events([error_event])
events_to_store.append(create_event(
"12:mismatch",
"12:mismatch",
False,
{"top_book_event": match[0]["eventId"],
"aggr_book_event": match[1]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"errors": comp_res}))
else:
events_to_store.append(create_event(
"12:match",
"12:match",
True,
{"top_book_event": match[0]["eventId"],
"top_book_scope": match[0]["scope"],
"aggr_book_event": match[1]["eventId"],
"aggr_book_scope": match[1]["scope"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"]}))
save_events(events_to_store)
elif match[0] is not None:
error_event = create_event("12:missing2",
"12:missing2",
False,
{"top_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[0]["body"]["sessionId"]})
error_event = create_event(
"12:missing2",
"12:missing2",
False,
{"top_book_event": match[0]["eventId"],
"book_id": match[0]["body"]["book_id"],
"time_of_event": match[0]["body"]["time_of_event"],
"top_v2": match[0]["body"]["aggr_seq"]["top_v2"],
"sessionId": match[0]["body"]["sessionId"]})
save_events([error_event])
elif match[1] is not None:
error_event = create_event("12:missing1",
"12:missing1",
False,
{"aggr_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[1]["body"]["sessionId"]})
error_event = create_event(
"12:missing1",
"12:missing1",
False,
{"aggr_book_event": match[1]["eventId"],
"book_id": match[1]["body"]["book_id"],
"time_of_event": match[1]["body"]["time_of_event"],
"limit_v2": match[1]["body"]["aggr_seq"]["limit_v2"],
"sessionId": match[1]["body"]["sessionId"]})
save_events([error_event])


Expand All @@ -342,7 +387,7 @@ def split_every(n, data):
yield piece


# {"horizon_dely": 180, full_session: "aaa", aggr_session: "bbb", top_session: "ccc"}
# {"horizon_delay": 180, full_session: "aaa", aggr_session: "bbb", top_session: "ccc"}
def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: pathlib.PosixPath,
rules_dict: dict) -> None:
"""The entrypoint function for comparing order-books.
Expand Down Expand Up @@ -403,7 +448,6 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path
)
processors.append(processor_top_aggr)


# order_books_events = source_events.filter(lambda e: e["eventType"] == "OrderBook")

# buffers = split_every(100, order_books_events)
Expand All @@ -424,6 +468,7 @@ def ob_compare_streams(source_events_path: pathlib.PosixPath, results_path: path
events_saver.flush()


'''
def _example_of_usage_run_recon():
events = None # This are the events generated by previous round lw_recon
path = None # This should be empty folder for new events generated by this new round of recon
Expand All @@ -443,13 +488,13 @@ def _example_of_usage_run_recon():
}
}
ob_compare_streams(events, path, rules_dict)


'''
'''
def _example_of_usage_see_results():
previous_events = None # previous events initially generated by lw_recon
new_events = None # get_events_from_dir("dir for second run")
# Use same method to categorize that fter first round of lw_recon
# Use same method to categorize that iter first round of lw_recon
# additional analysis
# in the ErrorEvent you can get book_id - instrument and version - number of update of the specific book
Expand All @@ -469,3 +514,4 @@ def _example_of_usage_see_results():
print(e2)
print(" ============= ")
# We can use pandas to show it better
'''

0 comments on commit 5eb01b8

Please sign in to comment.