-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdataset.py
executable file
·131 lines (97 loc) · 4.31 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import json
import requests
import numpy as np
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
def _fetch(city_id: int, timestamp: datetime, offset=0):
start = int((timestamp - timedelta(hours=offset)).timestamp())
url = (f"https://history.openweathermap.org/data/2.5/history/city?type=hour"
f"&id={city_id}&start={start}&cnt={24}&appid={os.environ['OPEN_WEATHER_API_KEY']}")
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to fetch data for {timestamp.strftime('%y-%m-%d')}: {response.json()}")
return None
return {start: response.json()}
def _fetch_multiple(city_id: int, timestamps: list[datetime], offset=0):
if os.path.exists("data/data.json"):
with open("data/data.json") as file:
data = json.load(file)
return data
data = []
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {
executor.submit(_fetch, city_id, timestamp, offset): timestamp
for timestamp in timestamps
}
for future in as_completed(futures):
result = future.result()
if result is not None:
data.append(result)
print(f"Done: {futures[future].strftime('%y-%m-%d')}")
if not os.path.exists("data"):
os.mkdir("data")
with open("data/data.json", "w") as file:
json.dump(data, file)
def get_feature_names():
return np.array(["timestamp", "temp", "temp_min", "temp_max",
"feels_like", "pressure", "humidity", "windx",
"windy", "rain", "snow", "clouds"])
def _extract_data_point(dp: dict):
if dp["cnt"] != 24:
raise ValueError
result = np.empty(shape=(24, 12))
dp["list"].sort(key=lambda v: v["dt"])
for i, item in enumerate(dp["list"]):
main, wind = item.get("main", {}), item.get("wind", {})
rain, snow = item.get("rain", {}), item.get("snow", {})
clouds, weather = item.get("clouds", {}), item.get("weather", {})
deg = wind.get("deg", np.random.randint(0, 36) * 10) * np.pi / 180
result[i] = [
item.get("dt"), main.get("temp"), main.get("temp_min"),
main.get("temp_max"), main.get("feels_like"), main.get("pressure"),
main.get("humidity"), wind.get("speed", 0) * np.cos(deg),
wind.get("speed", 0) * np.sin(deg), rain.get("1h", 0),
snow.get("1h", 0), clouds.get("all", 0),
]
return result
def get_dataset(*args) -> tuple[np.ndarray, np.ndarray]:
if not os.path.exists("data/data.json"):
_fetch_multiple(*args)
with open("data/data.json") as file:
days = json.load(file)["data"]
days.sort(key=lambda v: tuple(v.keys())[0])
data = []
start, end = 0, 4
while end < len(days):
results = np.empty(shape=(24 * 3, 12))
window = [tuple(item.values())[0] for item in days[start: end]]
try:
for i, item in enumerate(window[0: -1]):
results[24 * i: 24 * (i + 1), :] = _extract_data_point(item)
label = _extract_data_point(window[-1]).mean(axis=0)[1]
data.append((results, label))
except ValueError:
pass
end += 1
start += 1
np.save("data/inputs.npy", np.array([t[0] for t in data], dtype=np.float32))
np.save("data/labels.npy", np.array([t[1] for t in data], dtype=np.float32))
return np.load("data/inputs.npy"), np.load("data/labels.npy")
def get_data_train_test_split(*args, shuffle=True):
inputs, labels = get_dataset(*args)
nov_1_timestamp = 1730408400
nov_1 = np.argmax(np.any(inputs[:, :, 0] >= nov_1_timestamp, axis=1)) - 1
x_test, y_test = inputs[nov_1:], labels[nov_1:]
x_train, y_train = inputs[:nov_1], labels[:nov_1]
train_mean = x_train.mean(axis=0)
train_std = x_train.std(axis=0) + 1e-7
x_test = (x_test - train_mean) / train_std
x_train = (x_train - train_mean) / train_std
test_indices = np.arange(len(x_test))
train_indices = np.arange(len(x_train))
if shuffle:
np.random.shuffle(test_indices)
np.random.shuffle(train_indices)
return (x_train[train_indices], y_train[train_indices],
x_test[test_indices], y_test[test_indices])