-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmodel.py
127 lines (95 loc) · 4.33 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Thu Feb 15 11:43:14 2018
@author: paul
"""
import numpy as np
from tensorflow import keras as kr
class model(object):
def __init__(self,N,L,tmax,dt,xpert):
self.N = N
self.L = L
self.distance = np.arange(0,self.L,1) #array for distance
self.xpert = xpert
self.tmax = tmax
self.dt = dt
self.iters = abs(int(self.tmax/self.dt))
self.time = np.arange(0,self.tmax,self.dt)
self.loaded_tf_model = kr.models.load_model('model/model.h5')
self.meanDx,self.stdDx,self.meandotx,self.stddotx,self.n_leading_cars,self.rnn = np.loadtxt('model/model_parameter.txt')
self.n_leading_cars = int(self.n_leading_cars)
def initCars(self,**kwargs):
"""
initialise 0th time step
"""
self.x = np.zeros(shape=(self.N,self.iters)) # position
self.dot_x = np.zeros(shape=(self.N,self.iters)) # velocity
self.ddot_x = np.zeros(shape=(self.N,self.iters)) # acceleration
self.Delta_x = np.zeros(shape=(self.N,self.iters)) # headway
self.x[:,0] = np.linspace(0,self.L,self.N)
self.dot_x[:,0] = self.meandotx
self.ddot_x[:,0] = 0.
self.x[:,0] = self.x[:,0] + self.xpert
self.Delta_x[:,0] = self.headway(self.x[:,0],self.L)
def integrate(self,**kwargs):
"""
Integrate the model using a fortran or a python kernel
"""
for i in range(0,self.iters-1):
self.integration_procedure(i)
def integration_procedure(self,i):
"""
RK4 integration scheme
"""
h = self.dt
k1 = self.acceleration(self.Delta_x[:,i],self.dot_x[:,i],self.Delta_x[:,i-1],self.dot_x[:,i-1])
self.dot_x[:,i+1] = self.dot_x[:,i] + k1*h/2
k2 = self.acceleration(self.Delta_x[:,i],self.dot_x[:,i+1],self.Delta_x[:,i-1],self.dot_x[:,i-1])
self.dot_x[:,i+1] = self.dot_x[:,i] + k2*h/2
k3 = self.acceleration(self.Delta_x[:,i],self.dot_x[:,i+1],self.Delta_x[:,i-1],self.dot_x[:,i-1])
self.dot_x[:,i+1] = self.dot_x[:,i] + k3*h
k4 = self.acceleration(self.Delta_x[:,i],self.dot_x[:,i+1],self.Delta_x[:,i-1],self.dot_x[:,i-1])
self.ddot_x[:,i+1] = k1
self.dot_x[:,i+1] = self.dot_x[:,i] + h/6. * (k1 + 2*k2 + 2*k3 + k4)
# just allow postive velocities
self.dot_x[self.dot_x[:,i+1]<0.,i+1] = 0.
self.x[:,i+1] = self.x[:,i] + self.dot_x[:,i+1] * h
self.x[:,i+1] = self.x[:,i+1]%self.L
# Diagnostics
self.Delta_x[:,i+1] = self.headway(self.x[:,i+1],self.L)
def acceleration(self,Delta_x,dot_x,prev_Delta_x,prev_dot_x):
"""
returns the acceleration of cars based on an ANN
"""
def shape(Delta_x,dot_x):
Dx = Delta_x.reshape((len(Delta_x),1))
dotx = dot_x.reshape((len(Delta_x),1))
Dx = (Dx - self.meanDx)/self.stdDx
dotx = (dotx - self.meandotx)/self.stddotx
X = np.concatenate((Dx[0:self.n_leading_cars,:].T,dotx[0:self.n_leading_cars,:].T),axis=1)
X_all = np.zeros((self.N,len(X[0])))
# for car number 0
X_all[0,:] = X
for i in range(self.N-1):
Dx = np.roll(Dx,-1,axis=0)
dotx = np.roll(dotx,-1,axis=0)
X_all[i+1,:] = np.concatenate((Dx[0:self.n_leading_cars,:].T,dotx[0:self.n_leading_cars,:].T),axis=1)
if self.rnn:
X_all = X_all.reshape(X_all.shape[0],1,X_all.shape[1])
return X_all
if self.rnn:
X = shape(Delta_x,dot_x)
# X_prev = shape(prev_Delta_x,prev_dot_x)
#
# X = np.concatenate((X,X_prev),axis=1)
else:
X = shape(Delta_x,dot_x)
# predict acceleration
ddotx = self.loaded_tf_model.predict(X)
return ddotx[:,0]
def headway(self,x,L):
Dx = np.zeros(self.N)
Dx[:-1] = ((x[1:] - x[:-1])+L)%L
Dx[-1] = ((x[0] - x[-1])+L)%L
return Dx