forked from inoryy/tensorflow2-deep-reinforcement-learning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patha2c.py
160 lines (135 loc) · 6.54 KB
/
a2c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import gym
import logging
import argparse
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras.layers as kl
import tensorflow.keras.losses as kls
import tensorflow.keras.optimizers as ko
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--batch_size', type=int, default=64)
parser.add_argument('-n', '--num_updates', type=int, default=250)
parser.add_argument('-lr', '--learning_rate', type=float, default=7e-3)
parser.add_argument('-r', '--render_test', action='store_true', default=False)
parser.add_argument('-p', '--plot_results', action='store_true', default=False)
class ProbabilityDistribution(tf.keras.Model):
def call(self, logits, **kwargs):
# Sample a random categorical action from the given logits.
return tf.squeeze(tf.random.categorical(logits, 1), axis=-1)
class Model(tf.keras.Model):
def __init__(self, num_actions):
super().__init__('mlp_policy')
# Note: no tf.get_variable(), just simple Keras API!
self.hidden1 = kl.Dense(128, activation='relu')
self.hidden2 = kl.Dense(128, activation='relu')
self.value = kl.Dense(1, name='value')
# Logits are unnormalized log probabilities.
self.logits = kl.Dense(num_actions, name='policy_logits')
self.dist = ProbabilityDistribution()
def call(self, inputs, **kwargs):
# Inputs is a numpy array, convert to a tensor.
x = tf.convert_to_tensor(inputs)
# Separate hidden layers from the same input tensor.
hidden_logs = self.hidden1(x)
hidden_vals = self.hidden2(x)
return self.logits(hidden_logs), self.value(hidden_vals)
def action_value(self, obs):
# Executes `call()` under the hood.
logits, value = self.predict_on_batch(obs)
action = self.dist.predict_on_batch(logits)
# Another way to sample actions:
# action = tf.random.categorical(logits, 1)
# Will become clearer later why we don't use it.
return np.squeeze(action, axis=-1), np.squeeze(value, axis=-1)
class A2CAgent:
def __init__(self, model, lr=7e-3, gamma=0.99, value_c=0.5, entropy_c=1e-4):
# `gamma` is the discount factor; coefficients are used for the loss terms.
self.gamma = gamma
self.value_c = value_c
self.entropy_c = entropy_c
self.model = model
self.model.compile(
optimizer=ko.RMSprop(lr=lr),
# Define separate losses for policy logits and value estimate.
loss=[self._logits_loss, self._value_loss])
def train(self, env, batch_sz=64, updates=250):
# Storage helpers for a single batch of data.
actions = np.empty((batch_sz,), dtype=np.int32)
rewards, dones, values = np.empty((3, batch_sz))
observations = np.empty((batch_sz,) + env.observation_space.shape)
# Training loop: collect samples, send to optimizer, repeat updates times.
ep_rewards = [0.0]
next_obs = env.reset()
for update in range(updates):
for step in range(batch_sz):
observations[step] = next_obs.copy()
actions[step], values[step] = self.model.action_value(next_obs[None, :])
next_obs, rewards[step], dones[step], _ = env.step(actions[step])
ep_rewards[-1] += rewards[step]
if dones[step]:
ep_rewards.append(0.0)
next_obs = env.reset()
logging.info("Episode: %03d, Reward: %03d" % (len(ep_rewards) - 1, ep_rewards[-2]))
_, next_value = self.model.action_value(next_obs[None, :])
returns, advs = self._returns_advantages(rewards, dones, values, next_value)
# A trick to input actions and advantages through same API.
acts_and_advs = np.concatenate([actions[:, None], advs[:, None]], axis=-1)
# Performs a full training step on the collected batch.
# Note: no need to mess around with gradients, Keras API handles it.
losses = self.model.train_on_batch(observations, [acts_and_advs, returns])
logging.debug("[%d/%d] Losses: %s" % (update + 1, updates, losses))
return ep_rewards
def test(self, env, render=False):
obs, done, ep_reward = env.reset(), False, 0
while not done:
action, _ = self.model.action_value(obs[None, :])
obs, reward, done, _ = env.step(action)
ep_reward += reward
if render:
env.render()
return ep_reward
def _returns_advantages(self, rewards, dones, values, next_value):
# `next_value` is the bootstrap value estimate of the future state (critic).
returns = np.append(np.zeros_like(rewards), next_value, axis=-1)
# Returns are calculated as discounted sum of future rewards.
for t in reversed(range(rewards.shape[0])):
returns[t] = rewards[t] + self.gamma * returns[t + 1] * (1 - dones[t])
returns = returns[:-1]
# Advantages are equal to returns - baseline (value estimates in our case).
advantages = returns - values
return returns, advantages
def _value_loss(self, returns, value):
# Value loss is typically MSE between value estimates and returns.
return self.value_c * kls.mean_squared_error(returns, value)
def _logits_loss(self, actions_and_advantages, logits):
# A trick to input actions and advantages through the same API.
actions, advantages = tf.split(actions_and_advantages, 2, axis=-1)
# Sparse categorical CE loss obj that supports sample_weight arg on `call()`.
# `from_logits` argument ensures transformation into normalized probabilities.
weighted_sparse_ce = kls.SparseCategoricalCrossentropy(from_logits=True)
# Policy loss is defined by policy gradients, weighted by advantages.
# Note: we only calculate the loss on the actions we've actually taken.
actions = tf.cast(actions, tf.int32)
policy_loss = weighted_sparse_ce(actions, logits, sample_weight=advantages)
# Entropy loss can be calculated as cross-entropy over itself.
probs = tf.nn.softmax(logits)
entropy_loss = kls.categorical_crossentropy(probs, probs)
# We want to minimize policy and maximize entropy losses.
# Here signs are flipped because the optimizer minimizes.
return policy_loss - self.entropy_c * entropy_loss
if __name__ == '__main__':
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
env = gym.make('CartPole-v0')
model = Model(num_actions=env.action_space.n)
agent = A2CAgent(model, args.learning_rate)
rewards_history = agent.train(env, args.batch_size, args.num_updates)
print("Finished training. Testing...")
print("Total Episode Reward: %d out of 200" % agent.test(env, args.render_test))
if args.plot_results:
plt.style.use('seaborn')
plt.plot(np.arange(0, len(rewards_history), 10), rewards_history[::10])
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.show()