前言
Q-learning和DQN算法都是强化学习中的Value-based的方法,它们都是先经过Q值来选择动作。强化学习中还有另一大类是策略梯度方法(Policy Gradient Methods)。Policy Gradient 是一类直接针对期望回报(Expected Return)通过梯度下降(Gradient Descent)进行策略优化的强化学习方法。这一类方法避免了其他传统强化学习方法所面临的一些困难,比如,没有一个准确的价值函数,或者由于连续的状态和动作空间,以及状态信息的不确定性而导致的难解性(Intractability)。其中最著名的就是Policy Gradient,Policy Gradient算法又可以根据更新方式分为两大类:
蒙特卡罗更新方法:Reinfoce算法(回合更新)
时序差分更新方法:Actor-Critic算法(单步更新)
回顾蒙特卡罗方法和时序差分方法
蒙特卡罗方法可以理解为算法完成一个回合之后,再利用这个回合的数据去学习,做一次更新。因为我们已经获得了整个回合的数据,所以也能够获得每一个步骤的奖励,我们可以很方便地计算每个步骤的未来总奖励,GtG_{t}。GtG_{t}是未来总奖励,代表从这个步骤开始,我们能获得的奖励之和。G1G_{1}代表我们从第一步开始,往后能够获得的总奖励。G2G_{2}代表从第二步开始,往后能够获得的总奖励。
相比蒙特卡罗方法一个回合更新一次,时序差分方法是每个步骤更新一次,即每走一步,更新一次, 时序差分方法的更新频率更高。时序差分方法使用Q函数来近似地表示未来总奖励GtG_{t}。
Reinfoce算法原理
Reinfoce使用蒙特卡罗方法估计每个状态下采取动作所获得的奖励期望值,然后用这些估计值计算策略梯度并更新策略参数。因为Reinfoce算法是一种无模型算法,它不需要对环境建立模型,也不需要预测值函数等中间步骤,相比其他强化学习算法更加简单和直接。
Reinfoce算法在策略的参数空间中直观地通过梯度上升的方法逐步提高策略的性能。
▽J(θ)=Eτ∼πθ[∑t′=0∞▽θlogπθ(At′∣St′)γt′∑t=t′∞γt−t′Rt]bigtriangledown J(theta )=E_{tau sim pi _{theta } } [sum_{t’=0}^{infty }bigtriangledown _{theta }logpi _{theta }(A_{t’}|S_{t’})gamma ^{t’}sum_{t=t’}^{infty } gamma ^{t-t’}R_{t}]
由于折扣因子给未来的奖励赋予了较低的权重,使用折扣因子还有助于减少估计梯度时的方差大的问题。实际使用中,γt′gamma ^{t’}经常被去掉,从而避免了过分强调轨迹早期状态的问题。
虽然Reinfoce简单直观,但它的一个缺点是对梯度的估计有较大的方差。对于一个长度为L的轨迹,奖励RtR_{t}的随机性可能对L呈指数级增长。为了减轻估计的方差太大这个问题,一个常用的方法是引进一个基准函数b(Si)b(S_{i})。这里对b(Si)b(S_{i})的要求是:它只能是一个关于状态SiS_{i}的函数(或者更确切地说,它不能是关于AiA_{i}的函数)。有了基准函数b(St)b(S_{t})之后,强化学习目标函数的梯度 ▽J(θ)bigtriangledown J(theta )可以表示成:
▽J(θ)=Eτ∼πθ[∑t′=0∞▽θlogπθ(At′∣St′)(∑t=t′∞γt−t′Rt−b(St′))]bigtriangledown J(theta )=E_{tau sim pi _{theta } } [sum_{t’=0}^{infty }bigtriangledown _{theta }logpi _{theta }(A_{t’}|S_{t’})(sum_{t=t’}^{infty } gamma ^{t-t’}R_{t}-b(S_{t’} ))]
Reinfoce算法的代码实现
算法伪代码:
代码详解:
考虑将整个算法放入一个类中,并将各部分代码写入对应的函数。这样可以使得代码更为简洁易读。PolicyGradient 类的结构如下所示:
class PolicyGradient:
def __init__(self, state_dim, action_num, learning_rate=0.02, gamma=0.99):
......
def get_action(self, s, greedy=False): # 基于动作分布选择动作
......
def store_transition(self, s, a, r): # 存储从环境中采样的交互数据
......
def learn(self): # 使用存储的数据进行学习和更新
......
def _discount_and_norm_rewards(self): # 计算折扣化回报并进行标准化处理
......
def save(self): # 存储模型
......
def load(self): # 载入模型
......
初始化函数先后创建了一些变量、模型并选择 Adam 作为策略优化器。在代码中,我们可以看出这里的策略网络只有一层隐藏层。
def __init__(self, state_dim, action_num, learning_rate=0.02, gamma=0.99):
self.gamma = gamma
self.state_buffer, self.action_buffer, self.reward_buffer = [], [], []
input_layer = tl.layers.Input([None, state_dim], tf.float32)
layer = tl.layers.Dense(
n_units=30, act=tf.nn.tanh, W_init=tf.random_normal_initializer(mean=0,
stddev=0.3),
b_init=tf.constant_initializer(0.1))(input_layer)
all_act = tl.layers.Dense(
n_units=action_num, act=None, W_init=tf.random_normal_initializer(mean=0,
stddev=0.3),
b_init=tf.constant_initializer(0.1))(layer)
self.model = tl.models.Model(inputs=input_layer, outputs=all_act)
self.model.train()
self.optimizer = tf.optimizers.Adam(learning_rate)
在初始化策略网络之后,我们可以通过 get_action() 函数计算某状态下各动作的概率。通过设置’greedy=True’,可以直接输出概率最高的动作。
def get_action(self, s, greedy=False):
_logits = self.model(np.array([s], np.float32))
_probs = tf.nn.softmax(_logits).numpy()
if greedy:
return np.argmax(_probs.ravel())
return tl.rein.choice_action_by_probs(_probs.ravel())
但此时,我们选择的动作可能并不好。只有通过不断学习之后,网络才能做出越来越好的判断。每次的学习过程由 learn() 函数完成。我们使用标准化后的折扣化奖励和交叉熵损失来更新模型。在每次更新后,学过的转移数据将被丢弃。
def learn(self):
discounted_ep_rs_norm = self._discount_and_norm_rewards()
with tf.GradientTape() as tape:
_logits = self.model(np.vstack(self.ep_obs))
neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=_logits,
labels=np.array(self.ep_as))
loss = tf.reduce_mean(neg_log_prob * discounted_ep_rs_norm)
grad = tape.gradient(loss, self.model.trainable_weights)
self.optimizer.apply_gradients(zip(grad, self.model.trainable_weights))
self.ep_obs, self.ep_as, self.ep_rs = [], [], [] # 清空片段数据
return discounted_ep_rs_norm
learn() 函数需要使用智能体与环境交互得到的采样数据。因此我们需要使用 store_tran-sition() 来存储交互过程中的每个状态、动作和奖励。
def store_transition(self, s, a, r):
self.ep_obs.append(np.array([s], np.float32))
self.ep_as.append(a)
self.ep_rs.append(r)
策略梯度算法使用蒙特卡罗方法。因此,我们需要计算折扣化回报,并对回报进行标准化,也有助于学习。
def _discount_and_norm_rewards(self):
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
for t in reversed(range(0, len(self.ep_rs))):
running_add = running_add * self.gamma + self.ep_rs[t]
discounted_ep_rs[t] = running_add# 标准化片段奖励
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs
先准备好环境和算法。在创建好环境之后,我们产生一个名为 agent的 PolicyGradient 类的实例。
env = gym.make(ENV_ID).unwrapped
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)
env.seed(RANDOM_SEED)
agent = PolicyGradient(
action_num=env.action_space.n,
state_dim=env.observation_space.shape[0],
)
t0 = time.time()
在训练模式中,使用模型输出的动作来和环境进行交互,之后存储转移数据并在每个片段更新策略。为了简化代码,智能体将在每局结束时直接进行更新。
if args.train:
all_episode_reward = []
for episode in range(TRAIN_EPISODES):
state = env.reset()
episode_reward = 0
for step in range(MAX_STEPS):
if RENDER:
env.render()
action = agent.get_action(state)
next_state, reward, done, info = env.step(action)
agent.store_transition(state, action, reward)
state = next_state
episode_reward += reward
if done:
break
agent.learn()
print(’Training | Episode: {} / {} | Episode Reward: {:.0f} | Running Time:{:.4f}’.format(
episode + 1, TRAIN_EPISODES, episode_reward,
time.time() - t0))
在每局游戏结束后的部分增加一些代码,以便更好地显示训练过程。我们显示每个回合的总奖励和通过滑动平均计算的运行奖励。之后可以绘制运行奖励以便更好地观察训练趋势。最后,存储训练好的模型。
agent.save()
plt.plot(all_episode_reward)
if not os.path.exists(’image’):
os.makedirs(’image’)
plt.savefig(os.path.join(’image’, ’pg.png’))
如果我们使用测试模式,则过程更为简单,只需要载入预训练的模型,再用它和环境进行交互即可。
if args.test:
agent.load()
for episode in range(TEST_EPISODES):
state = env.reset()
episode_reward = 0
for step in range(MAX_STEPS):
env.render()
state, reward, done, info = env.step(agent.get_action(state, True))
episode_reward += reward
if done:
break
print(’Testing | Episode: {} / {} | Episode Reward: {:.0f} | Running Time:{:.4f}’.format(
episode + 1, TEST_EPISODES, episode_reward,
time.time() - t0))