前言
近端策略优化(PPO)算法是OpenAI在2017提出的一种强化学习算法,本文将从PPO算法的基础入手,理解从传统策略梯度算法直到PPO算法的演进过程,以及算法迭代过程中的优化细节。
参考视频:来源于李宏毅老师Q-learning (Advanced Tips)_哔哩哔哩_bilibili
参考PPT:PowerPoint 簡報 (ntu.edu.tw)
1.PG算法回顾
在强化学习中,我们有一个agent来作为智能体,它根据策略πpi 在不同的环境状态ss下选择相应的动作来执行,环境根据智能体的动作,反馈新的状态以及奖励,智能体又根据新的状态选择新的动作 ,这样不断的循环,直到游戏结束,便完成了eposide。在深度强化学习中,策略πpi是由神经网络构成,神经网络的参数为θtheta,表示为πθpi _{theta } 。如下图:
一个完整的eposide 序列,用τtau表示,而一个特定的τtau序列发生的概率为:
对于一个完整的τtau序列,它在整个游戏期间获得的总的奖励用R(τ)R(tau )来表示。对于给定参数θtheta 的策略,我们评估其应该获得的每局中的总奖励是:对每个采样得到的τtau序列(即每一局)的加权和,即:
因此,对于一个游戏,我们自然希望通过调整策略参数θtheta ,得到的Rθˉbar{R_{theta }} 越大越好,因为这意味着我们选用的策略参数能平均获得更多奖励。这个形式就是调整θtheta ,获取更大的Rθˉbar{R_{theta }},所以很自然的想到利用梯度下降的方式来求解。于是用期望的每局奖励对θtheta 来求导,如下:
上述式子中,第一个等号是梯度的变换;第二三个等号是利用了log函数的特性;第四个等号将求和转化成期望的形式;期望又可以由我们采集到的数据序列进行近似;最后一个等号是将每一个数据序列展开成每个数据点上的形式:
之所以把RR提出来,因为这样理解起来会更直观一点。形象的解释这个式子就是:每一条采样到的数据序列都会希望θtheta 向着自己的方向进行更新,总体上,我们希望更加靠近奖励比较大的那条序列,因此用每条序列的奖励来加权平均他们的更新方向。比如我们假设第三条数据的奖励很大,通过上述公式更新后的策略,使得pθ(at3∣st3)p_{theta } (a_{t}^{3}| s_{t}^{3} )发生的概率更大,以后再遇到st3s_{t}^{3}这个状态时,我们就更倾向于采取at3a_{t}^{3}个动作,或者说以后遇到的状态和第三条序列中的状态相同时,我们更倾向于采取第三条序列曾经所采用过的策略。所以一个完整的PG方法就可以表示成:
我们首先采集数据,然后利用前面得到的梯度提升的式子更新参数,随后再根据更新后的策略再采集数据,再更新参数,如此循环。
接下来有几个PG方法的小tips:
- 增加一个基线
在上面的介绍方法中PG在更新的时候的基本思想就是增大奖励大的策略动作出现的概率,减小奖励小的策略动作出现的概率。但是当奖励的设计不够好的时候,这个思路就会有问题。极端一点的是:无论采取任何动作,都能获得正的奖励。但是这样,对于那些没有采样到的动作,在公式中这些动作策略就体现为0奖励。则可能没被采样到的更好的动作产生的概率就越来越小,使得最后,好的动作反而都被舍弃了。这当然是不对的。于是我们引入一个基线,让奖励有正有负,一般增加基线的方式是所有采样序列的奖励的平均值:
- 折扣因子
这个很容易理解,就像买股票一样,同样一块钱,当前的一块钱比未来期望的一块钱更具有价值。因此在强化学习中,对未来的奖励需要进行一定的折扣:
- 使用优势函数
之前用的方法,对于同一个采样序列中的数据点,我们使用相同的奖励R(τ)R(tau ),这样的做法实在有点粗糙,更细致的做法是:将这里的奖励替换成关于st,ats_{t},a_{t}的函数,我们把这个函数叫优势函数, 用Aθ(st,at)A^{theta }( s_{t},a_{t})来表示:
其中Vϕ(st)V_{phi } (s_{t} )是通过critic来计算得到的,它由一个结构与策略网络相同但参数不同的神经网络构成,主要是来拟合从sts_{t}到最终的折扣奖励。AθA^{theta }前半部分是实际的采样折扣奖励,后半部分是拟合的折扣奖励。AθA^{theta }表示了sts_{t}下采取动作ata_{t},实际得到的折扣奖励相对于模拟的折扣奖励下的优势,因为模拟的折扣奖励是在sts_{t}所有采集过的动作的折扣奖励的拟合(平均),因此这个优势函数也就代表了采用动作ata_{t}相对于这些动作的平均优势。这个优势函数由一个critic(评价者)来给出。
2.PPO算法
PG方法一个很大的缺点就是参数更新慢,因为我们每更新一次参数都需要进行重新采样,这其实是种on-policy策略,即我们想要训练的agent和与环境进行交互的agent是同一个agent;与之对应的就是off-policy策略,即想要训练的agent和与环境进行交互的agent不是同一个agent,简单来说,就是拿别人的经验来训练自己。举个下棋的例子,如果你是通过自己下棋来不断提升自己的棋艺,那么就是on-policy的,如果是通过看别人下棋来提升自己,那么就是off-policy的:
那么为了提升我们的训练速度,让采用到的数据可以重复使用,我们可以将on-policy的方式转换为off-policy的方式。即我们的训练数据通过另一个相同结构的网络得到。
这里介绍一下重要性采样:
这里的重要采样其实是一个很常用的思路。在其他很多算法中也经常用到。先引入问题:对于一个服从概率p分布的变量x, 我们要估计f(x) 的期望。直接想到的是,我们采用一个服从p的随机产生器,直接产生若干个变量x的采样,然后计算他们的函数值f(x),最后求均值就得到结果。但这里有一个问题是,对于每一个给定点x,我们知道其发生的概率,但是我们并不知道p的分布,也就无法构建这个随机数发生器。因此需要转换思路:从一个已知的分布q中进行采样。通过对采样点的概率进行比较,确定这个采样点的重要性。也就是上图所描述的方法。当然通过这种飞扬方式的分布p和q不能差距过大,否则,会由于采样的偏离带来谬误。即如下图:
如上图所示,很显然,在xx服从p(x)p(x)分布时,f(x)f(x)的期望为负,此时我们从q(x)q(x)中来采样少数的xx,那么我们采样到的xx很有可能都分布在右半部分,此时f(x)f(x)大于0,我们很容易得到f(x)f(x)的期望为正的结论,这就会出现问题,因此需要进行大量的采样。
回到PPO中,我们之前的PG方法每次更新参数后,都需要重新和环境进行互动来收集数据,然后用新的数据进行更新,这样,每次收集的数据使用一次就丢掉,很浪费,使得网络的更新很慢。于是我们考虑把收集到的数据进行重复利用。假设我们收集数据时使用的策略参数是θ’theta ’,此时收集到的数据τtau 保存到记忆库中,但收集到足够的数据后,我们对参数按照PG方式进行更新,更新后,策略的参数从θ’⟶θtheta ’longrightarrow theta ,此时如果采用PG的方式,我们就应该用参数θtheta 的策略重新收集数据,但是我们打算重新利用旧的数据再更新θtheta。注意到我们本来应该是基于θtheta的策略来收集数据,但实际上我们的数据是由θ′theta’收集的,所以就需要引入重要性采样来修正这二者之间的偏差,这也就是前面要引入重要性采样的原因。
利用记忆库中的旧数据更新参数的方式变为:
当然,这种方式还是比较原始的,我们通过引入Tips中的优势函数,更精细的控制更细,则更新的梯度变为:
同时,根据重要性采样来说,pθp_{theta } 和pθ′p_{theta’ } 不能差太远了,因为差太远了会引入谬误,这并不是说参数的值不能差太多,而是说,输入同样的state,网络得到的动作的概率分布不能差太远。所以我们要用KL散度来惩罚二者之间的分布偏差。得到动作的概率分布的相似程度,将其加入PPO模型的似然函数中,变为:所以就得到了:
在实际中,我们会动态改变对θθ和θ′θ’分布差异的惩罚,如果KL散度值太大,我们增加这一部分惩罚,如果小到一定值,我们就减小这一部分的惩罚,基于此,我们得到了PPO算法的过程:
PPO算法还有另一种实现方式,不将KL散度直接放入似然函数中,而是进行一定程度的裁剪:
上图中,绿色的线代表min中的第一项,即不做任何处理,蓝色的线为第二项,如果两个分布差距太大,则进行一定程度的裁剪。最后对这两项再取min,防止了θθ更新太快。
3.算法实现
本文代码地址:github.com/princewen/t…
莫凡老师的代码:github.com/MorvanZhou/…
本文使用的是gym的强化学习环境,用的是钟摆垂直的这么一个环境,我们希望如下图所示的钟摆能够垂直:
- 创建环境
- 得到state、action、reward
for ep in range(EP_MAX):
s = env.reset()
buffer_s, buffer_a, buffer_r = [], [], []
ep_r = 0
for t in range(EP_LEN): # in one episode
env.render()
a = ppo.choose_action(s)
s_, r, done, _ = env.step(a)
buffer_s.append(s)
buffer_a.append(a)
buffer_r.append((r+8)/8) # normalize reward, find to be useful
s = s_
ep_r += r
# update ppo
if (t+1) % BATCH == 0 or t == EP_LEN-1:
v_s_ = ppo.get_v(s_)
discounted_r = []
for r in buffer_r[::-1]:
v_s_ = r + GAMMA * v_s_
discounted_r.append(v_s_)
discounted_r.reverse()
bs, ba, br = np.vstack(buffer_s), np.vstack(buffer_a), np.array(discounted_r)[:, np.newaxis]
buffer_s, buffer_a, buffer_r = [], [], []
ppo.update(bs, ba, br)
- 定义critic计算优势函数
这里我们定义了一个critic来计算优势函数,状态价值定义了一个全联接神经网络来得到,而折扣奖励和我们之前已经计算过了
# critic
with tf.variable_scope('critic'):
l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu)
self.v = tf.layers.dense(l1, 1)
self.tfdc_r = tf.placeholder(tf.float32, [None, 1], 'discounted_r')
self.advantage = self.tfdc_r - self.v
self.closs = tf.reduce_mean(tf.square(self.advantage))
self.ctrain_op = tf.train.AdamOptimizer(C_LR).minimize(self.closs)
- 定义Actor
PPO里采用的是off-policy的策略,需要有一个单独的网络来收集数据,并用于策略的更新,同DQN的策略一样,我们定义了一个单独的网络,这个网络的参数是每隔一段时间由我们真正的Actor的参数复制过去的。
```
# actor
pi, pi_params = self._build_anet('pi', trainable=True)
oldpi, oldpi_params = self._build_anet('oldpi', trainable=False)
with tf.variable_scope('sample_action'):
self.sample_op = tf.squeeze(pi.sample(1), axis=0) # choosing action
with tf.variable_scope('update_oldpi'):
self.update_oldpi_op = [oldp.assign(p) for p, oldp in zip(pi_params, oldpi_params)]
self.tfa = tf.placeholder(tf.float32, [None, A_DIM], 'action')
self.tfadv = tf.placeholder(tf.float32, [None, 1], 'advantage')
```
而网络构建的代码如下,这里就比较神奇了,我们的Actor网络输出一个均值和方差,并返回一个由该均值和方差得到的正态分布,动作基于此正态分布进行采样:
def _build_anet(self, name, trainable):
with tf.variable_scope(name):
l1 = tf.layers.dense(self.tfs, 100, tf.nn.relu, trainable=trainable)
mu = 2 * tf.layers.dense(l1, A_DIM, tf.nn.tanh, trainable=trainable)
sigma = tf.layers.dense(l1, A_DIM, tf.nn.softplus, trainable=trainable)
norm_dist = tf.distributions.Normal(loc=mu, scale=sigma)
params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope=name)
return norm_dist, params
- 损失计算
采用KL散度或者裁切的方式,损失计算方式不同
with tf.variable_scope('loss'):
with tf.variable_scope('surrogate'):
# ratio = tf.exp(pi.log_prob(self.tfa) - oldpi.log_prob(self.tfa))
ratio = pi.prob(self.tfa) / (oldpi.prob(self.tfa) + 1e-5)
surr = ratio * self.tfadv
if METHOD['name'] == 'kl_pen':
self.tflam = tf.placeholder(tf.float32, None, 'lambda')
kl = tf.distributions.kl_divergence(oldpi, pi)
self.kl_mean = tf.reduce_mean(kl)
self.aloss = -(tf.reduce_mean(surr - self.tflam * kl))
else: # clipping method, find this is better
self.aloss = -tf.reduce_mean(tf.minimum(
surr,
tf.clip_by_value(ratio, 1.-METHOD['epsilon'], 1.+METHOD['epsilon'])*self.tfadv))