forked from hunkim/ReinforcementZeroToAll
-
Notifications
You must be signed in to change notification settings - Fork 0
/
07_2_dqn_2013_cartpole.py
176 lines (130 loc) · 4.94 KB
/
07_2_dqn_2013_cartpole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
DQN (NIPS 2013)
Playing Atari with Deep Reinforcement Learning
https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf
"""
import numpy as np
import tensorflow as tf
import random
import dqn
import gym
from collections import deque
env = gym.make('CartPole-v0')
env = gym.wrappers.Monitor(env, 'gym-results/', force=True)
INPUT_SIZE = env.observation_space.shape[0]
OUTPUT_SIZE = env.action_space.n
DISCOUNT_RATE = 0.99
REPLAY_MEMORY = 50000
MAX_EPISODE = 5000
BATCH_SIZE = 64
# minimum epsilon for epsilon greedy
MIN_E = 0.0
# epsilon will be `MIN_E` at `EPSILON_DECAYING_EPISODE`
EPSILON_DECAYING_EPISODE = MAX_EPISODE * 0.01
def bot_play(mainDQN: dqn.DQN) -> None:
"""Runs a single episode with rendering and prints a reward
Args:
mainDQN (dqn.DQN): DQN Agent
"""
state = env.reset()
total_reward = 0
while True:
env.render()
action = np.argmax(mainDQN.predict(state))
state, reward, done, _ = env.step(action)
total_reward += reward
if done:
print("Total score: {}".format(total_reward))
break
def train_minibatch(DQN: dqn.DQN, train_batch: list) -> float:
"""Prepare X_batch, y_batch and train them
Recall our loss function is
target = reward + discount * max Q(s',a)
or reward if done early
Loss function: [target - Q(s, a)]^2
Hence,
X_batch is a state list
y_batch is reward + discount * max Q
or reward if terminated early
Args:
DQN (dqn.DQN): DQN Agent to train & run
train_batch (list): Minibatch of Replay memory
Eeach element is a tuple of (s, a, r, s', done)
Returns:
loss: Returns a loss
"""
state_array = np.vstack([x[0] for x in train_batch])
action_array = np.array([x[1] for x in train_batch])
reward_array = np.array([x[2] for x in train_batch])
next_state_array = np.vstack([x[3] for x in train_batch])
done_array = np.array([x[4] for x in train_batch])
X_batch = state_array
y_batch = DQN.predict(state_array)
Q_target = reward_array + DISCOUNT_RATE * np.max(DQN.predict(next_state_array), axis=1) * ~done_array
y_batch[np.arange(len(X_batch)), action_array] = Q_target
# Train our network using target and predicted Q values on each episode
loss, _ = DQN.update(X_batch, y_batch)
return loss
def annealing_epsilon(episode: int, min_e: float, max_e: float, target_episode: int) -> float:
"""Return an linearly annealed epsilon
Epsilon will decrease over time until it reaches `target_episode`
(epsilon)
|
max_e ---|\
| \
| \
| \
min_e ---|____\_______________(episode)
|
target_episode
slope = (min_e - max_e) / (target_episode)
intercept = max_e
e = slope * episode + intercept
Args:
episode (int): Current episode
min_e (float): Minimum epsilon
max_e (float): Maximum epsilon
target_episode (int): epsilon becomes the `min_e` at `target_episode`
Returns:
float: epsilon between `min_e` and `max_e`
"""
slope = (min_e - max_e) / (target_episode)
intercept = max_e
return max(min_e, slope * episode + intercept)
def main():
# store the previous observations in replay memory
replay_buffer = deque(maxlen=REPLAY_MEMORY)
last_100_game_reward = deque(maxlen=100)
with tf.Session() as sess:
mainDQN = dqn.DQN(sess, INPUT_SIZE, OUTPUT_SIZE)
init = tf.global_variables_initializer()
sess.run(init)
for episode in range(MAX_EPISODE):
e = annealing_epsilon(episode, MIN_E, 1.0, EPSILON_DECAYING_EPISODE)
done = False
state = env.reset()
step_count = 0
while not done:
if np.random.rand() < e:
action = env.action_space.sample()
else:
action = np.argmax(mainDQN.predict(state))
next_state, reward, done, _ = env.step(action)
if done:
reward = -1
replay_buffer.append((state, action, reward, next_state, done))
state = next_state
step_count += 1
if len(replay_buffer) > BATCH_SIZE:
minibatch = random.sample(replay_buffer, BATCH_SIZE)
train_minibatch(mainDQN, minibatch)
print("[Episode {:>5}] steps: {:>5} e: {:>5.2f}".format(episode, step_count, e))
# CartPole-v0 Game Clear Logic
last_100_game_reward.append(step_count)
if len(last_100_game_reward) == last_100_game_reward.maxlen:
avg_reward = np.mean(last_100_game_reward)
if avg_reward > 199.0:
print("Game Cleared within {} episodes with avg reward {}".format(episode, avg_reward))
break
if __name__ == "__main__":
main()