-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpolygon_s3.py
135 lines (105 loc) · 5.01 KB
/
polygon_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from io import BytesIO
from tempfile import NamedTemporaryFile
from pathlib import Path
import pandas as pd
from utils_globals import LOCAL_PATH, S3_PATH, B2_ACCESS_KEY_ID, B2_SECRET_ACCESS_KEY, B2_ENDPOINT_URL
from polygon_df import get_date_df
def get_s3fs_client(cached: bool=False):
if cached:
from fsspec import filesystem
# https://filesystem-spec.readthedocs.io/
s3fs = filesystem(
protocol='filecache',
target_protocol='s3',
target_options={
'key': B2_ACCESS_KEY_ID,
'secret': B2_SECRET_ACCESS_KEY,
'client_kwargs': {'endpoint_url': B2_ENDPOINT_URL}
},
# cache_storage='/Users/bobcolner/QuantClarity/pandas-polygon/data/cache'
)
else:
from s3fs import S3FileSystem
s3fs = S3FileSystem(
key=B2_ACCESS_KEY_ID,
secret=B2_SECRET_ACCESS_KEY,
client_kwargs={'endpoint_url': B2_ENDPOINT_URL}
)
return s3fs
s3fs = get_s3fs_client(cached=False)
def list_symbol_dates(symbol: str, tick_type: str) -> str:
paths = s3fs.ls(path=S3_PATH + f"/{tick_type}/symbol={symbol}/", refresh=True)
return [path.split('date=')[1] for path in paths]
def list_symbols(tick_type: str) -> str:
paths = s3fs.ls(path=S3_PATH + f"/{tick_type}/", refresh=True)
return [path.split('symbol=')[1] for path in paths]
def remove_symbol(symbol: str, tick_type: str):
path = S3_PATH + f"/{tick_type}/symbol={symbol}/"
s3fs.rm(path, recursive=True)
def show_symbol_storage_used(symbol: str, tick_type: str) -> dict:
path = S3_PATH + f"/{tick_type}/symbol={symbol}/"
return s3fs.du(path)
def get_date_df_from_s3(symbol: str, date: str, tick_type: str, columns: list=None) -> pd.DataFrame:
byte_data = s3fs.cat(S3_PATH + f"/{tick_type}/symbol={symbol}/date={date}/data.feather")
if columns:
df = pd.read_feather(BytesIO(byte_data), columns=columns)
else:
df = pd.read_feather(BytesIO(byte_data))
return df
def date_df_to_file(df: pd.DataFrame, symbol:str, date:str, tick_type: str) -> str:
path = LOCAL_PATH + f"/{tick_type}/symbol={symbol}/date={date}/"
Path(path).mkdir(parents=True, exist_ok=True)
df.to_feather(path+'data.feather', version=2)
return path + 'data.feather'
def put_date_df_to_s3(df: pd.DataFrame, symbol: str, date: str, tick_type: str):
with NamedTemporaryFile(mode='w+b') as tmp_ref1:
df.to_feather(path=tmp_ref1.name, version=2)
s3fs.put(tmp_ref1.name, S3_PATH + f"/{tick_type}/symbol={symbol}/date={date}/data.feather")
def put_df_to_s3(df: pd.DataFrame, s3_file_path: str):
with NamedTemporaryFile(mode='w+b') as tmp_ref1:
df.to_feather(path=tmp_ref1.name, version=2)
s3fs.put(tmp_ref1.name, S3_PATH + f"/{s3_file_path}/data.feather")
def put_file(file_path: str, file_name: str, s3_file_path: str):
s3fs.put(file_path + file_path, S3_PATH + f"/{s3_file_path}/" + file_name)
def get_and_save_date_df(symbol: str, date: str, tick_type: str) -> pd.DataFrame:
print(symbol, date, 'getting data fron polygon api')
df = get_date_df(symbol, date, tick_type)
print(symbol, date, 'putting data to S3/B2')
put_date_df_to_s3(df, symbol, date, tick_type)
print(symbol, date, 'saving data to local file')
path = date_df_to_file(df, symbol, date, tick_type)
return df
def fetch_date_df(symbol: str, date: str, tick_type: str) -> pd.DataFrame:
try:
print(symbol, date, 'trying to get data from local file...')
df = pd.read_feather(LOCAL_PATH + f"/{tick_type}/symbol={symbol}/date={date}/data.feather")
except FileNotFoundError:
try:
print(symbol, date, 'trying to get data from s3/b2...')
df = get_date_df_from_s3(symbol, date, tick_type)
except FileNotFoundError:
print(symbol, date, 'getting data from polygon API...')
df = get_date_df(symbol, date, tick_type)
print(symbol, date, 'saving data to S3/B2...')
put_date_df_to_s3(df, symbol, date, tick_type)
finally:
print(symbol, date, 'saving data to local file')
path = date_df_to_file(df, symbol, date, tick_type)
return df
def fetch_clean_trades(symbol: str, date: str) -> pd.DataFrame:
# get raw ticks
t1df = fetch_date_df(symbol, date, tick_type='trades')
# filter ts detla
ts_delta = abs(t1df.sip_dt - t1df.exchange_dt) < pd.to_timedelta(3, unit='S')
t1df = t1df[ts_delta]
# fitler irregular
t1df = t1df[t1df.irregular == False]
# add local nyc time
t1df['nyc_dt'] = t1df['sip_dt']
t1df = t1df.set_index('nyc_dt').tz_localize('UTC').tz_convert('America/New_York')
# filter pre/post market hours
t1df = t1df.between_time('9:30:33', '16:11').reset_index()
# remove columns
t1df = t1df.drop(columns=['exchange_dt', 'sequence', 'trade_id', 'exchange_id', 'irregular', 'conditions'])
# rename columns
return t1df.rename(columns={"sip_dt": "utc_time", "size": "volume"})