forked from AlanPi1992/deeprl_for_atari_games
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.py
145 lines (117 loc) · 5.56 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
''' Using Moniter to evaluate model
and
to generate video csapture '''
import argparse
import os
import random
import time
import numpy as np
import tensorflow as tf
from keras.layers import (Activation, Convolution2D, Dense, Flatten, Input, Dropout,
Permute)
from keras.models import Model
from keras.models import load_model
from keras.optimizers import Adam
from keras import losses
import gym
from gym import wrappers
from PIL import Image
import deeprl_hw2 as tfrl
from deeprl_hw2.dqn import DQNAgent
from deeprl_hw2.objectives import mean_huber_loss, mean_huber_loss_duel
import pickle
import matplotlib.pyplot as plt
def get_output_folder(parent_dir, env_name):
"""Return save folder.
Assumes folders in the parent_dir have suffix -run{run
number}. Finds the highest run number and sets the output folder
to that number + 1. This is just convenient so that if you run the
same script multiple times tensorboard can plot all of the results
on the same plots with different names.
Parameters
----------
parent_dir: str
Path of the directory containing all experiment runs.
Returns
-------
parent_dir/run_dir
Path to this run's save directory.
"""
os.makedirs(parent_dir, exist_ok=True)
experiment_id = 0
for folder_name in os.listdir(parent_dir):
if not os.path.isdir(os.path.join(parent_dir, folder_name)):
continue
try:
folder_name = int(folder_name.split('-evaluate')[-1])
if folder_name > experiment_id:
experiment_id = folder_name
except:
pass
experiment_id += 1
parent_dir = os.path.join(parent_dir, env_name)
parent_dir = parent_dir + '-evaluate{}'.format(experiment_id)
return parent_dir
def main(): # noqa: D103
parser = argparse.ArgumentParser(description='Evaluate model using Monitor')
parser.add_argument('--env', default='SpaceInvaders-v0', help='Atari env name')
parser.add_argument(
'-o', '--output', default='deepQ', help='Directory to save data to')
parser.add_argument('--seed', default=703, type=int, help='Random seed')
args = parser.parse_args()
args.output = get_output_folder(args.output, args.env)
print(args.output)
os.makedirs(args.output, exist_ok=True)
env = gym.make(args.env)
env = wrappers.Monitor(env, args.output)
# Initialize a preporcessor sequence object
preprocessor = tfrl.preprocessors.AtariPreprocessor((84, 84))
# Initialize a policy
_policy = tfrl.policy.GreedyEpsilonPolicy(0.05, env.action_space.n)
policy = tfrl.policy.LinearDecayGreedyEpsilonPolicy(_policy, 1, 0.1, 1000000)
print('load trained model...')
# q_net = load_model('Final_Results/SpaceInvaders-v0-run2-DuelQ/qnet-1of5.h5', custom_objects={'mean_huber_loss_duel': mean_huber_loss_duel})
# q_net = load_model('Final_Results/SpaceInvaders-v0-run2-DoubleQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/SpaceInvaders-v0-run4-DeepQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/SpaceInvaders-v0-run2-LinearDoubleQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/SpaceInvaders-v0-run1-LinearQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('deepQ/Enduro-v0-run37/qnet-1of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/Enduro-v0-run1-DuelQ/qnet-1of5.h5', custom_objects={'mean_huber_loss_duel': mean_huber_loss_duel})
# q_net = load_model('Final_Results/Enduro-v0-run1-DoubleQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
q_net = load_model('Final_Results/Enduro-v0-run1-DeepQ/qnet-1of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/Enduro-v0-run1-LinearDoubleQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
# q_net = load_model('Final_Results/Enduro-v0-run1-LinearQ/qnet-2of5.h5', custom_objects={'mean_huber_loss': mean_huber_loss})
num_episodes = 10
rewards = []
for episode in range(num_episodes):
initial_frame = env.reset()
state = np.zeros((4, 84, 84), dtype=np.float32)
# Preprocess the state
prev_frame = preprocessor.process_frame_for_memory(initial_frame).astype(dtype=np.float32)
prev_frame = prev_frame/255
state[:-1] = state[1:]
state[-1] = np.copy(prev_frame)
total_reward = 0
for t in range(100000):
# env.render()
_tmp = q_net.predict_on_batch( np.asarray([state,]) )
_action = policy.select_action(_tmp[0], False)
next_frame, reward, is_terminal, debug_info = env.step(_action)
# if reward != 0:
# print(total_reward)
phi_state_n = preprocessor.process_state_for_network(next_frame, prev_frame)
total_reward += reward
if is_terminal:
print("Episode finished after {} timesteps".format(t+1))
break
prev_frame = preprocessor.process_frame_for_memory(next_frame).astype(dtype=np.float32)
prev_frame = prev_frame/255
state[:-1] = state[1:]
state[-1] = np.copy(prev_frame)
print(total_reward)
rewards.append(total_reward)
rewards = np.asarray(rewards)
print(np.mean(rewards))
print(np.std(rewards))
if __name__ == '__main__':
main()