-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
259 lines (226 loc) · 9.04 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import http.client
import urllib.request
import urllib.parse
import urllib.error
import json
import time
import itertools
import random
import api_key
import csv
headers = {
# Request headers
'api_key': api_key.wmata_key,
}
def query_api(endpoint, params, headers):
try:
conn = http.client.HTTPSConnection('api.wmata.com')
conn.request("GET", endpoint % params, "{body}", headers)
response = conn.getresponse()
data = response.read().decode('utf-8')
parsed = json.loads(data)
conn.close()
return parsed
except Exception as e:
print("[Errno {0}] {1}".format(e.errno, e.strerror))
def get_positions():
# Positions
params = urllib.parse.urlencode({
})
return query_api("/TrainPositions/TrainPositions?contentType=json&%s",
params, headers)
def get_station_list(line_code):
# Positions
params = urllib.parse.urlencode({
'LineCode': line_code,
})
return query_api("/Rail.svc/json/jStations?%s",
params, headers)
def get_times():
# Time predictions
params = urllib.parse.urlencode({
})
return query_api("/StationPrediction.svc/json/GetPrediction/All?%s",
params, headers)
def get_circuits():
# Cricuits
params = urllib.parse.urlencode({
})
return query_api("/TrainPositions/TrackCircuits?contentType=json&%s",
params, headers)
def get_routes():
# Standard Routes
params = urllib.parse.urlencode({
})
return query_api("/TrainPositions/StandardRoutes?contentType=json&%s",
params, headers)
class TrainInfo():
def __init__(self):
self.routes = get_routes()
self.circuits = get_circuits()
# get all the lines
self.line_codes = ['RD', 'YL', 'BL', 'OR', 'GR', 'SV']
self.track_nums = [1, 2]
self.directions = [1, 2]
self.stations_dict = self.create_stations_dict()
self.circ_seq_dict = self.get_circuit_to_sequence_dict()
self.circ_track_dict = self.get_circuit_to_track_dict()
def create_stations_dict(self):
"""
creates a dict that maps station codes to names
"""
stations_dict = {}
for line_code in self.line_codes:
stations = get_station_list(line_code)
for station in stations['Stations']:
stations_dict[station['Code']] = station['Name']
return stations_dict
def get_circuit_to_sequence_dict(self):
"""
returns a dict that maps line_codes and track nums to sequence numbers
"""
circ_seq_dict = {}
for line_code in self.line_codes:
for track_num in self.track_nums:
circuits = [route for route in self.routes['StandardRoutes']
if route['LineCode'] == line_code
and route['TrackNum'] == track_num]
circuits = circuits[0]['TrackCircuits'] # should only be one
circ_seq_dict['{}{}'.format(line_code, track_num)] =\
{circuit['CircuitId']: circuit['SeqNum']
for circuit in circuits}
return circ_seq_dict
def get_circuit_to_track_dict(self):
circuits = self.circuits['TrackCircuits']
return {circuit['CircuitId']: circuit['Track'] for circuit in circuits}
def get_next_station(self, train):
track = train['Track']
seq_num = train['SeqNum']
line_code = train['LineCode']
direction = train['DirectionNum']
line = [route for route in self.routes['StandardRoutes'] if
route['TrackNum'] == track and
route['LineCode'] == line_code]
line = line[0]['TrackCircuits'] # should be only one
# we should get the next station too
if direction == 1:
for circuit in line[seq_num:]:
if circuit['StationCode'] is not None:
return circuit['StationCode']
if direction == 2:
for circuit in line[seq_num::-1]:
if circuit['StationCode'] is not None:
return circuit['StationCode']
return None
def get_trains(self):
"""
gets train position and time info and combines them and returns them
"""
positions = get_positions()
arrivals = get_times()
# we have to work on trains by line and direction in order to be able
# to properly match them
# not concerned about trains not in service
all_trains = []
for line_code in self.line_codes:
for direction in self.directions:
# filte trains by line and dir
trains = [train for train in positions['TrainPositions']
if train['LineCode'] == line_code
and train['DirectionNum'] == direction
and train['ServiceType'] == 'Normal']
for train in trains:
# first we assign the sequence num to each train
circuit_id = train['CircuitId']
track = self.circ_track_dict[circuit_id]
train['DestinationName'] = \
self.stations_dict \
.get(train['DestinationStationCode'])
if track not in [1, 2]:
seq_num = None
else:
seq_num =\
self.circ_seq_dict['{}{}'
.format(line_code, track)]\
.get(circuit_id)
train['SeqNum'] = seq_num
train['Track'] = track
if seq_num is None:
continue
train['LocationCode'] = self.get_next_station(train)
train['LocationName'] = self.stations_dict\
.get(train['LocationCode'])
# remove trains without well defined seq num
trains = [train for train in trains
if train['SeqNum'] is not None]
# order them
trains = sorted(trains, key=lambda x: x['SeqNum'])
# reverse order if going other way
if direction == 2:
trains = trains[::-1]
# now we can assign time estimates to each of the trains
for i, arrival in enumerate(arrivals['Trains']):
arrival['index'] = i
for train in trains[::-1]:
location_code = train['LocationCode']
line_code = train['LineCode']
destination = train['DestinationStationCode']
# only the times for trains matching these
possible_times = [arrival for arrival in arrivals['Trains']
if arrival['DestinationCode'] == destination
and arrival['Line'] == line_code
and arrival['LocationCode'] == location_code]
if possible_times:
# get the train closest to the station as the one with
# the minimum time
arrival = min(possible_times, key=time_to_sortable)
mins = arrival['Min']
index = arrival['index']
arrivals['Trains'].pop(index)
# reassign indices
for i, arrival in enumerate(arrivals['Trains']):
arrival['index'] = i
else:
mins = 0
train['Min'] = mins
all_trains = all_trains + trains
return all_trains
def time_to_sortable(time):
if time['Min'] == 'ARR':
return 0.5
elif time['Min'] == 'BRD':
return 0
elif time['Min'] == '':
return -1
else:
return int(time['Min'])
def get_and_save_trains():
info = TrainInfo()
f = open('log.csv', 'a')
fieldnames = ['Time', 'LineCode', 'DirectionNum', 'Track', 'LocationCode',
'LocationName', 'DestinationStationCode', 'DestinationName',
'SeqNum', 'SecondsAtLocation', 'Min', 'ServiceType',
'CarCount', 'CircuitId', 'TrainId']
writer = csv.DictWriter(f, fieldnames)
writer.writeheader()
f.close()
for i in itertools.count():
if i > 0:
delay = 15
time.sleep(delay)
try:
trains = info.get_trains()
num = len(trains)
now = time.time() # unix time
for train in trains:
train['Time'] = now
print("got {} trains".format(num))
f = open('log.csv', 'a')
writer = csv.DictWriter(f, fieldnames)
writer.writerows(trains)
f.close()
except Exception as e:
print(e)
continue
if __name__ == "__main__":
get_and_save_trains()