-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent.py
146 lines (112 loc) · 5.06 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import math
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from operator import itemgetter
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
np.random.seed(101)
class Agent(nn.Module):
def __init__(self, name, h_size=32, gamma=1.0, print_every=1, pop_size=50, elite_frac=0.2, sigma=0.5):
super(Agent, self).__init__()
self.name = name
self.action_count = 0
# state, hidden layer, action sizes
self.s_size = 19 + 4
self.h_size = h_size
self.a_size = 1
# define layers
self.fc1 = nn.Linear(self.s_size, self.h_size)
self.fc2 = nn.Linear(self.h_size, self.a_size)
self.game_count = 0
self.rewards = []
self.curr_rewards = []
self.n_elite=int(pop_size*elite_frac)
self.scores_deque = deque(maxlen=10)
self.scores = []
self.best_weight = sigma*np.random.randn(self.get_weights_dim())
self.i_iteration = 0
self.weights_pop = [self.best_weight + (sigma*np.random.randn(self.get_weights_dim())) for i in range(pop_size)]
self.weight_num = 0
self.print_every = print_every
self.sigma = sigma
self.pop_size = pop_size
self.evaluate_best = False
def get_model(self,path):
self.load_state_dict(torch.load(path))
def act(self, state, valid_actions):
# TODO feedforward through model
self.action_count += 1
# TODO add to memory
qmap = []
for action in valid_actions:
qval = self.forward(torch.from_numpy(np.array(state + action)).float().to(device)).detach().numpy()[0]
qmap.append([action, qval])
#print("***")
#print(state)
#print(qmap)
qmap = sorted(qmap, key=itemgetter(1))
qmap.reverse()
action_type, target_player, _, is_challenge = qmap[0][0]
return ([action_type, target_player], is_challenge)
def set_weights(self, weights):
s_size = self.s_size
h_size = self.h_size
a_size = self.a_size
# separate the weights for each layer
fc1_end = (s_size*h_size)+h_size
fc1_W = torch.from_numpy(weights[:s_size*h_size].reshape(s_size, h_size))
fc1_b = torch.from_numpy(weights[s_size*h_size:fc1_end])
fc2_W = torch.from_numpy(weights[fc1_end:fc1_end+(h_size*a_size)].reshape(h_size, a_size))
fc2_b = torch.from_numpy(weights[fc1_end+(h_size*a_size):])
# set the weights for each layer
self.fc1.weight.data.copy_(fc1_W.view_as(self.fc1.weight.data))
self.fc1.bias.data.copy_(fc1_b.view_as(self.fc1.bias.data))
self.fc2.weight.data.copy_(fc2_W.view_as(self.fc2.weight.data))
self.fc2.bias.data.copy_(fc2_b.view_as(self.fc2.bias.data))
def get_weights_dim(self):
return (self.s_size+1)*self.h_size + (self.h_size+1)*self.a_size
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.sigmoid(self.fc2(x))
return x.cpu().data
def next_game(self,reward):
self.game_count += 1
self.curr_rewards.append(reward)
if self.game_count >= 10:
if self.evaluate_best != True:
self.rewards.append(np.array(self.curr_rewards).mean())
self.weight_num += 1
if self.weight_num >= len(self.weights_pop):
#print("\nNum Games: " + str(self.game_count) + " Num Weights: " + str(self.weight_num) + " Iteration: " + str(self.i_iteration))
self.finished_iter(np.array(self.curr_rewards).mean())
self.curr_rewards = []
else:
self.set_weights(self.weights_pop[self.weight_num])
self.curr_rewards = []
self.game_count = 0
def finished_iter(self, reward):
elite_idxs = np.array(self.rewards).argsort()[-self.n_elite:]
elite_weights = [self.weights_pop[i] for i in elite_idxs]
# don't update an random agent to test
if self.name != 4:
self.best_weight = np.array(elite_weights).mean(axis=0)
if self.evaluate_best == True:
self.scores_deque.append(reward)
self.scores.append(reward)
torch.save(self.state_dict(), 'checkpoint' + str(self.name) + '.pth')
self.i_iteration += 1
self.weights_pop = [self.best_weight + (self.sigma*np.random.randn(self.get_weights_dim())) for i in range(self.pop_size)]
self.weight_num = 0
self.rewards = []
#TODO Need to evaluate against random bots to verify
if self.i_iteration % self.print_every == 0:
print('Agent {}: Episode {}\tAverage Score: {:.2f}'.format(self.name, self.i_iteration, np.mean(self.scores_deque)))
print(self.scores_deque)
self.evaluate_best = False
else:
self.evaluate_best = True
self.set_weights(self.best_weight)