-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMonte_carlo_control.py
348 lines (267 loc) · 11.1 KB
/
Monte_carlo_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import random
import numpy as np
import math
import matplotlib.pyplot as plt
from Environment import Environment
from collections import defaultdict
from Parameters import *
np.random.seed(10)
class Monte_carlo(object):
def __init__(self, env, epsilon, gamma):
# Variable initialization
self.env = env
self.n_obs = self.env.n_states
self.n_a = self.env.n_actions
self.epsilon = epsilon
self.gamma = gamma
# Variables for metrics
self.steps = []
self.all_cost = []
self.accuracy = []
self.Rewards_list = []
self.rewards = 0
self.positive_count = 0
self.negative_count = 0
self.goal_count = 0
self.Q, self.Total_return, self.N = self.create_q_table()
# Create a Q table
def create_q_table(self):
"""
Initialization
:return
1. Q[(s,a)]:initialize the dictionary for storing the Q values;
2. Total_return[(s,a)]: initialize the dictionary for storing the total return of the state-action pair;
3. N[(s,a)]: initialize the dictionary for storing the count of the number of times a state-action pair is visited
"""
Q = defaultdict(float)
N = defaultdict(int)
Total_return = defaultdict(float)
for s in range(self.n_obs):
for a in range(self.n_a):
Q[(s, a)] = 0.0
Total_return[(s, a)] = 0
N[(s, a)] = 0
return Q, Total_return, N
# Choose actions based on epsilon greedy policy
def epsilon_greedy_policy(self, observation):
"""
Define a epsilon greedy policy based on epsilon soft policy
If a random number larger than epsilon, choose a random action.
Otherwise, choose an action with maximum Q value.
Argument: input observation, Q table
Return: Action
"""
# Sample a random number from a uniform distribution, if the number is less than
# the value of epsilon then choose a random action, else choose the best action
# which has the maximum Q value based on the current state
if random.uniform(0, 1) > self.epsilon:
return np.random.randint(0, 3)
else:
return max(list(range(self.n_a)), key=lambda x: self.Q[(observation, x)])
# Choose actions based on optimal policy
def optimal_policy(self, observation):
"""
Define the optimal policy, choosing the best action which
has the maximum Q value with the input observation
Argument: input observation, Q table
Return: Action
"""
return max(list(range(self.n_a)), key=lambda x: self.Q[(observation, x)])
# Generate a list of data for a episode
def generate_episode(self):
"""
To generate episodes/experience and store for updating Q table
"""
# initialize a list for storing the episode
episode = []
# Reset the environment and get the initial observation
observation = self.env.reset()
steps = 0
# Loop for each time step
for t in range(NUM_STEPS):
# Choose the action according to the epsilon greedy policy
action = self.epsilon_greedy_policy(observation)
# Perform the action the get the [next_obs, reward, info] tuple
next_observation, reward, done, info = self.env.step(action)
# Store the transition tuple
episode.append((observation, action, reward))
steps += 1
# if the next state is the terminal, then break the loop
if done:
# Record the positive cost and negative cost
if reward > 0:
self.positive_count += 1
self.goal_count += 1
else:
self.negative_count += 1
# Record the step
self.steps += [steps]
self.rewards += reward
print("Episode finished after {} time steps".format(t + 1))
break
# update time
# else update the next observation to the current state
observation = next_observation
return episode
# Learning and updating Q table based on the First visit Monte Carlo method
def fv_mc_prediction(self, num_epoch):
"""
Updating process for first visit MC method
Step 1: Calculate the total return value for each step/(s,a) pair of each generated episode
G <- gamma* G + R(t+1)
Step 2: If (s,a) pair is first visited(not occur before in this episode), then add G to the Returns(s,a)
Step 3: Calculate the average Returns(s,a) as the Q(s,a)
"""
# For each iteration
for i in range(num_epoch):
cost = 0
# Using the initialized Q table to generate an episode
episode = self.generate_episode()
# Get all state-action pairs in the episodes
state_action_pairs = [(observation, action) for (observation, action, reward) in episode]
# Initialize the G value
G = 0
# Calculate the accuracy rate for every 50 steps
if i != 0 and i % 50 == 0:
self.goal_count = self.goal_count / 50
self.accuracy += [self.goal_count]
self.goal_count = 0
# Record average rewards
self.Rewards_list += [self.rewards / (i + 1)]
# for each state-action pairs
for i in range(len(episode)):
# Calculate the return G from the end, T-1, T-2...... by G = gamma* G + R(t+1)
observation, action, reward = episode[len(episode) - (i + 1)]
G = reward + self.gamma * G
# Check if the state-action pair is occurring for the first time in the episode
# #(limited by first visit MC method)
if not (observation, reward) in state_action_pairs[:i]:
# update the total return of the state-action pair
self.Total_return[(observation, action)] += G
# update the number of times the state-action pair is visited
self.N[(observation, action)] += 1
# calculate the Q value for each state-action pair by taking the average
self.Q[(observation, action)] = self.Total_return[(observation, action)] / self.N[
(observation, action)]
# Record total cost
cost += self.Q[(observation, action)]
self.all_cost += [cost]
all_cost_bar = [self.positive_count, self.negative_count]
# Print final route
# env.final()
# Plot training results
self.plot_results(self.steps, self.all_cost, self.accuracy, all_cost_bar, self.Rewards_list)
return self.Q, self.steps, self.all_cost, self.accuracy, all_cost_bar, self.Rewards_list
# Plot training results
@ staticmethod
def plot_results(steps, all_cost, accuracy, all_cost_bar, Reward_list):
# Plot Episodes vis steps
plt.figure()
plt.plot(np.arange(len(steps)), steps, 'b')
plt.title('Episode via steps')
plt.xlabel('Episode')
plt.ylabel('Steps')
# Plot Episodes via Cost
plt.figure()
plt.plot(np.arange(len(all_cost)), all_cost)
plt.title('Episode via Cost')
plt.xlabel('Episode')
plt.ylabel('Cost')
# Plot Episodes via Accuracy
plt.figure()
plt.plot(np.arange(len(accuracy)), accuracy, 'b')
plt.title('Episode via Accuracy')
plt.xlabel('Episode')
plt.ylabel('Accuracy')
# Plot Bar of Success and failure rate
plt.figure()
list = ['Success', 'Fail']
color_list = ['blue', 'red']
plt.bar(np.arange(len(all_cost_bar)), all_cost_bar, tick_label=list, color=color_list)
plt.title('Bar/Success and Fail')
plt.ylabel('Number')
# Plot Episode via Average rewards
plt.figure()
plt.plot(np.arange(len(Reward_list)), Reward_list, 'b')
plt.title('Episode via Average rewards')
plt.xlabel('Episode')
plt.ylabel('Average rewards')
# Showing the plots
plt.show()
# Test after training
def test(self):
# run a set of episode to test the correctness of the method
num_test = 100
# Print route
f = {}
# Initialize count, and data store lists
num_reach_goal = 0
reward_list = []
steps_list = []
for i in range(num_test):
# reset the environment
observation = self.env.reset()
# render the environment
# env.render()
for j in range(NUM_STEPS):
# # render the environment
# self.env.render()
# Choose the best action based on the optimal_policy
action = self.optimal_policy(observation)
# perform action and get a tuple
next_observation, reward, done, info = self.env.step(action)
# Coordinate transformation
y = int(math.floor(next_observation / GRID_SIZE)) * PIXELS
x = int(next_observation % GRID_SIZE) * PIXELS
f[j] = [x, y]
if done:
if reward == 1:
num_reach_goal += 1
# While a episode terminates, record the total reward, step
# Then add to the list
r = reward
step = j + 1
reward_list += [r]
steps_list += [step]
break
observation = next_observation
# Print final route
self.env.f = f
self.env.final()
print("correctness:{}".format(num_reach_goal / num_test))
# Plot the test results
plt.figure()
plt.plot(np.arange(len(steps_list)), steps_list, 'r')
plt.title('Episode via steps')
plt.xlabel('Episode')
plt.ylabel('Steps')
#
plt.figure()
plt.plot(np.arange(len(reward_list)), reward_list, 'r')
plt.title('Episode via Success Rate')
plt.xlabel('Episode')
plt.ylabel('Success Rate')
# Showing the plots
plt.show()
# Store the final Q table values
def write_Q_table(file_name, Q):
# open data file
filename = open(file_name, 'w')
# write data
for k, v in Q.items():
filename.write(str(k) + ':' + str(v))
filename.write('\n')
# close file
filename.close()
if __name__ == "__main__":
# create a FrozenLake environment
env = Environment(grid_size=GRID_SIZE)
# Create a monte carlo agent
monte_carlo = Monte_carlo(env, epsilon=EPSILON, gamma=GAMMA)
# Learning and updating Q table
Q = monte_carlo.fv_mc_prediction(num_epoch=NUM_EPISODES)
# write_Q_table(file_name='./Q_table/monte_carlo', Q = Q)
# Test after training
monte_carlo.test()
# Remain visualization
env.mainloop()