diff --git a/hftbacktest/__init__.py b/hftbacktest/__init__.py index 9d5c340..609672a 100644 --- a/hftbacktest/__init__.py +++ b/hftbacktest/__init__.py @@ -29,7 +29,7 @@ 'Stat', 'validate_data', 'correct_local_timestamp', 'correct_exch_timestamp', 'correct',) -__version__ = '1.4.1' +__version__ = '1.4.2' def HftBacktest( diff --git a/hftbacktest/data.py b/hftbacktest/data.py deleted file mode 100644 index 7a11690..0000000 --- a/hftbacktest/data.py +++ /dev/null @@ -1,252 +0,0 @@ -import sys - -import numpy as np -import pandas as pd -from numba import njit - -from .reader import COL_EVENT, COL_EXCH_TIMESTAMP, COL_LOCAL_TIMESTAMP, COL_PRICE, COL_QTY, TRADE_EVENT, \ - DEPTH_EVENT, DEPTH_CLEAR_EVENT, DEPTH_SNAPSHOT_EVENT - - -@njit -def _validate_data( - data, - tick_size=None, - lot_size=None, - err_bound=1e-8 -): - num_reversed_exch_timestamp = 0 - prev_exch_timestamp = 0 - prev_local_timestamp = 0 - for row_num in range(len(data)): - event = data[row_num, COL_EVENT] - exch_timestamp = data[row_num, COL_EXCH_TIMESTAMP] - local_timestamp = data[row_num, COL_LOCAL_TIMESTAMP] - price = data[row_num, COL_PRICE] - qty = data[row_num, COL_QTY] - - if event in [ - TRADE_EVENT, - DEPTH_EVENT, - DEPTH_CLEAR_EVENT, - DEPTH_SNAPSHOT_EVENT - ]: - if tick_size is not None: - v = price / tick_size - e = abs(v - round(v)) - if e > err_bound: - print('found a row that price does not match tick size. row_num =', row_num) - return -1 - if lot_size is not None: - v = qty / lot_size - e = abs(v - round(v)) - if e > err_bound: - print('found a row that qty does not match lot size. row_num =', row_num) - return -1 - - if local_timestamp != -1 \ - and exch_timestamp != -1 \ - and exch_timestamp > local_timestamp \ - and event in [ - TRADE_EVENT, - DEPTH_EVENT, - DEPTH_CLEAR_EVENT, - DEPTH_SNAPSHOT_EVENT - ]: - print('found a row that local_timestamp is ahead of exch_timestamp. row_num =', row_num) - return -1 - if local_timestamp != -1 and prev_local_timestamp > local_timestamp: - print('found a row that local_timestamp is ahead of the previous local_timestamp. row_num =', row_num) - return -1 - - if exch_timestamp != -1: - if exch_timestamp < prev_exch_timestamp: - num_reversed_exch_timestamp += 1 - else: - prev_exch_timestamp = exch_timestamp - - if local_timestamp != -1: - prev_local_timestamp = local_timestamp - - return num_reversed_exch_timestamp - - -def validate_data( - data, - tick_size=None, - lot_size=None, - err_bound=1e-8 -): - if isinstance(data, pd.DataFrame): - num_reversed_exch_timestamp = _validate_data(data.to_numpy(), tick_size, lot_size, err_bound) - elif isinstance(data, np.ndarray): - num_reversed_exch_timestamp = _validate_data(data, tick_size, lot_size, err_bound) - else: - raise ValueError('Unsupported data type') - if num_reversed_exch_timestamp > 0: - print('found %d rows that exch_timestamp is ahead of the previous exch_timestamp' % num_reversed_exch_timestamp) - return num_reversed_exch_timestamp - - -@njit -def _correct_local_timestamp(data, base_latency): - latency = sys.maxsize - for row_num in range(len(data)): - exch_timestamp = data[row_num, COL_EXCH_TIMESTAMP] - local_timestamp = data[row_num, COL_LOCAL_TIMESTAMP] - - latency = min(latency, local_timestamp - exch_timestamp) - - if latency < 0: - local_timestamp_offset = -latency + base_latency - print('local_timestamp is ahead of exch_timestamp by', -latency) - for row_num in range(len(data)): - data[row_num, COL_LOCAL_TIMESTAMP] += local_timestamp_offset - - return data - - -def correct_local_timestamp(data, base_latency): - if isinstance(data, pd.DataFrame): - df_corr = pd.DataFrame(_correct_local_timestamp(data.to_numpy(), base_latency), columns=data.columns) - for col in df_corr.columns: - df_corr[col] = df_corr[col].astype(data[col].dtype) - return df_corr - elif isinstance(data, np.ndarray): - return _correct_local_timestamp(data, base_latency) - else: - raise ValueError('Unsupported data type') - - -@njit -def _correct_exch_timestamp(data, num_corr): - row_size, col_size = data.shape - corr = np.zeros((row_size + num_corr, col_size), np.float64) - prev_exch_timestamp = 0 - out_row_num = 0 - for row_num in range(len(data)): - exch_timestamp = data[row_num, COL_EXCH_TIMESTAMP] - if exch_timestamp < prev_exch_timestamp: - # This new row should be inserted ahead. - found = False - for i in range(out_row_num - 1, -1, -1): - if exch_timestamp < corr[i, COL_EXCH_TIMESTAMP] or found: - found = True - if i == 0 or \ - (i > 0 - and exch_timestamp >= corr[i - 1, COL_EXCH_TIMESTAMP] != -1): - corr[i + 1:out_row_num + 1, :] = corr[i:out_row_num, :] - corr[i, :] = data[row_num, :] - corr[i, COL_LOCAL_TIMESTAMP] = -1 - out_row_num += 1 - break - corr[out_row_num, :] = data[row_num, :] - corr[out_row_num, COL_EXCH_TIMESTAMP] = -1 - else: - corr[out_row_num, :] = data[row_num, :] - prev_exch_timestamp = exch_timestamp - out_row_num += 1 - return corr[:out_row_num, :] - - -def correct_exch_timestamp(data, num_corr): - if isinstance(data, pd.DataFrame): - df_corr = pd.DataFrame(_correct_exch_timestamp(data.to_numpy(), num_corr), columns=data.columns) - for col in df_corr.columns: - df_corr[col] = df_corr[col].astype(data[col].dtype) - return df_corr - elif isinstance(data, np.ndarray): - return _correct_exch_timestamp(data, num_corr) - else: - raise ValueError('Unsupported data type') - - -@njit -def _correct_exch_timestamp_adjust(data): - # Sort by exch_timestamp - i = np.argsort(data[:, COL_EXCH_TIMESTAMP]) - sorted_data = data[i] - # Adjust local_timestamp in reverse order to have a value equal to or greater than the previous local_timestamp. - for row_num in range(1, len(sorted_data)): - if sorted_data[row_num, COL_LOCAL_TIMESTAMP] < sorted_data[row_num - 1, COL_LOCAL_TIMESTAMP]: - sorted_data[row_num, COL_LOCAL_TIMESTAMP] = sorted_data[row_num - 1, COL_LOCAL_TIMESTAMP] - return sorted_data - - -def correct_exch_timestamp_adjust(data): - if isinstance(data, pd.DataFrame): - df_corr = pd.DataFrame(_correct_exch_timestamp_adjust(data.to_numpy()), columns=data.columns) - for col in df_corr.columns: - df_corr[col] = df_corr[col].astype(data[col].dtype) - return df_corr - elif isinstance(data, np.ndarray): - return _correct_exch_timestamp_adjust(data) - else: - raise ValueError('Unsupported data type') - - -def correct( - data, - base_latency, - tick_size=None, - lot_size=None, - err_bound=1e-8, - method='separate' -): - data = correct_local_timestamp(data, base_latency) - num_corr = validate_data( - data, - tick_size=tick_size, - lot_size=lot_size, - err_bound=err_bound - ) - if num_corr < 0: - raise ValueError - if method == 'separate': - data = correct_exch_timestamp(data, num_corr) - elif method == 'adjust': - data = correct_exch_timestamp_adjust(data) - else: - raise ValueError('Invalid method') - print('Correction is done.') - return data - - -@njit -def merge_on_local_timestamp(a, b): - a_shape = a.shape - b_shape = b.shape - assert a_shape[1] == b_shape[1] - tmp = np.empty((a_shape[0] + b_shape[0], a_shape[1]), np.float64) - i = 0 - j = 0 - k = 0 - while True: - if i < len(a) and j < len(b): - if a[i, 2] < b[j, 2]: - tmp[k] = a[i] - i += 1 - k += 1 - elif a[i, 2] > b[j, 2]: - tmp[k] = b[j] - j += 1 - k += 1 - elif a[i, 1] < b[j, 1]: - tmp[k] = a[i] - i += 1 - k += 1 - else: - tmp[k] = b[j] - j += 1 - k += 1 - elif i < len(a): - tmp[k] = a[i] - i += 1 - k += 1 - elif j < len(b): - tmp[k] = b[j] - j += 1 - k += 1 - else: - break - return tmp[:k] diff --git a/hftbacktest/proc/local.py b/hftbacktest/proc/local.py index a522ec0..cda7816 100644 --- a/hftbacktest/proc/local.py +++ b/hftbacktest/proc/local.py @@ -79,6 +79,9 @@ def _process_data(self, row): return 0 def submit_order(self, order_id, side, price, qty, order_type, time_in_force, current_timestamp): + if order_id in self.orders: + raise KeyError('Duplicate order_id') + price_tick = round(price / self.depth.tick_size) order = Order(order_id, price_tick, self.depth.tick_size, qty, side, time_in_force, order_type) order.req = NEW diff --git a/hftbacktest/proc/nopartialfillexchange.py b/hftbacktest/proc/nopartialfillexchange.py index 9fcdb9f..d4aea39 100644 --- a/hftbacktest/proc/nopartialfillexchange.py +++ b/hftbacktest/proc/nopartialfillexchange.py @@ -205,6 +205,9 @@ def on_best_ask_update(self, prev_best, new_best, timestamp): self.__fill(order, timestamp, True) def __ack_new(self, order, timestamp): + if order.order_id in self.orders: + raise KeyError('order_id already exists') + if order.side == BUY: # Check if the buy order price is greater than or equal to the current best ask. if order.price_tick >= self.depth.best_ask_tick: @@ -268,7 +271,8 @@ def __ack_cancel(self, order, timestamp): order.status = EXPIRED order.exch_timestamp = timestamp local_recv_timestamp = timestamp + self.order_latency.response(timestamp, order, self) - self.orders_to.append(order.copy(), local_recv_timestamp) + # It can overwrite another existing order on the local side if order_id is the same. So, commented out. + # self.orders_to.append(order.copy(), local_recv_timestamp) return local_recv_timestamp # Delete the order. @@ -293,6 +297,11 @@ def __fill( exec_price_tick=0, delete_order=True ): + if order.status == EXPIRED \ + or order.status == CANCELED \ + or order.status == FILLED: + raise ValueError('status') + order.maker = maker order.exec_price_tick = order.price_tick if maker else exec_price_tick order.exec_qty = order.leaves_qty diff --git a/hftbacktest/proc/partialfillexchange.py b/hftbacktest/proc/partialfillexchange.py index 5e04e7b..be39fc2 100644 --- a/hftbacktest/proc/partialfillexchange.py +++ b/hftbacktest/proc/partialfillexchange.py @@ -253,6 +253,9 @@ def on_best_ask_update(self, prev_best, new_best, timestamp): ) def __ack_new(self, order, timestamp): + if order.order_id in self.orders: + raise KeyError('order_id already exists') + if order.side == BUY: # Check if the buy order price is greater than or equal to the current best ask. if order.price_tick >= self.depth.best_ask_tick: @@ -430,7 +433,8 @@ def __ack_cancel(self, order, timestamp): order.status = EXPIRED order.exch_timestamp = timestamp local_recv_timestamp = timestamp + self.order_latency.response(timestamp, order, self) - self.orders_to.append(order.copy(), local_recv_timestamp) + # It can overwrite another existing order on the local side if order_id is the same. So, commented out. + # self.orders_to.append(order.copy(), local_recv_timestamp) return local_recv_timestamp # Delete the order. @@ -456,6 +460,11 @@ def __fill( exec_price_tick=0, delete_order=True ): + if order.status == EXPIRED \ + or order.status == CANCELED \ + or order.status == FILLED: + raise ValueError('status') + order.maker = maker order.exec_price_tick = order.price_tick if maker else exec_price_tick order.exec_qty = exec_qty