DQN 03 - Proximal Policy Optimization

xiangyu fu Lv3

在强化学习(2)中介绍了REINFORCE算法。但REINFORCE算法还是存在三个问题:

  1. 更新过程效率很低,我们运行一次策略,更新一次,然后丢弃该轨迹。
  2. 梯度估计嘈杂的。随机收集的轨迹可能无法代表该策略。
  3. 没有明确的可信度赋值。一个轨迹可能包含许多好的或者坏的动作,这些动作是否得到加强仅取决于最终的总产出。

在以下概念中,我们将探讨改进REINFORCE算法并解决所有3个问题的方法。所有改进将在PPO算法中利用和实现。

3.1 噪声消减

3.1.1 噪声消减

我们优化策略的方法是最大化平均奖励为了做到这一点,我们使用随机梯度上升。数学上,梯度是由所有可能轨迹的平均值给出的,


对于简单的问题,很容易有超过数百万条轨迹,而对于连续的问题,则有无限的轨迹。

在实际应用中,我们只需要取一个轨迹来计算梯度,然后更新我们的策略。所以,很多时候,采样轨迹的结果只是随机的,并不包含那么多关于我们策略的信息。那么如何进行学习呢?我们希望可以经过长时间的训练,将微小的信号积累起来。

要减少梯度中的噪声,最简单的选择就是简单地采样更多的轨迹。 利用分布式计算,我们可以并行收集多个轨迹,这样就不会花费太多时间。然后我们可以通过对所有不同轨迹的平均来估计策略梯度。

$$\left. \begin{matrix} s^{(1)}_t, a^{(1)}_t, r^{(1)}_t\[6pt] s^{(2)}_t, a^{(2)}_t, r^{(2)}_t\[6pt] s^{(3)}t, a^{(3)}t, r^{(3)}t\[6pt] \vdots \end{matrix} ;; \right}!!!! \rightarrow g = \frac{1}{N}\sum{i=1}^N R_i \sum_t\nabla\theta \log \pi\theta(a^{(i)}_t | s^{(i)}_t) $$

3.1.2 奖励归一化

运行多个轨迹还有一个好处:我们可以收集所有的总奖励,并了解它们的分布情况。

在许多情况下,奖励的分布会随着学习的发生而变化。奖励可能在开始时非常好,但在1000个训练集之后就非常糟糕。

如果我们将奖励归一化,就可以提高学习效果,其中,是平均值,是标准差。

(当所有的都相同时,,我们可以将所有的归一化奖励设置为0,以避免数值问题)

这种批量归一化技术也用于人工智能中的许多其他问题(如图像分类),将输入归一化可以提高学习效果。

直观地说,将奖励归一化大致相当于挑选一半的行动来鼓励或者是不鼓励,同时也要确保梯度上升的步骤不要太大或者太小。

3.2 可信度赋值(Credit Assignment)

回到梯度估计,我们可以仔细看看总奖励,它只是每一步的奖励之和

让我们来思考一下在时间步骤发生了什么。即使在决定行动之前,智能体已经收到了直到步骤的所有奖励。所以我们可以把总奖励中的这部分看作是过去的奖励。其余的部分则表示为未来的奖励。

$$(\overbrace{…+r_{t-1}}^{\cancel{R^{\rm past}t}}+ \overbrace{r{t}+…}^{R^{\rm future}_t})$$
因为我们有一个马尔科夫过程,时间步的行动只能影响未来的奖励,所以过去的奖励不应该对策略梯度有贡献。

因此,为了正确地将可信度赋值给行动。因此,我们应该忽略过去的奖励。所以更好的策略梯度应该是简单地将未来的奖励作为系数。

梯度修改注意事项
为什么只要改变我们的梯度就可以了呢?那岂不是改变了我们最初的目标–预期奖励最大化?事实证明,从数学上来说,忽略过去的奖励可能会改变每个具体轨迹的梯度,但不会改变平均梯度。所以,即使在训练过程中梯度不同,平均而言,我们仍然在最大化平均奖励。事实上,结果梯度的噪声较小,所以使用未来奖励进行训练应该会加快进度。

3.3 重要性采样(Importance Sampling)

3.3.1 REINFORCE算法中的策略更新

我们回到REINFORCE算法。我们从一个策略 开始,然后使用该策略生成一个轨迹(或多个轨迹以减少噪声)。之后,我们计算一个策略梯度 ,并更新

此时,我们刚刚生成的轨迹会被丢弃。如果我们想再次更新我们的策略,就需要再次使用更新后的策略生成新的轨迹。你可能会问,为什么这是必要的?这是因为我们需要计算当前策略的梯度,而要做到这一点,轨迹需要代表当前的策略。

但这听起来有点浪费。如果我们能够以某种方式回收旧轨迹,通过修改它们使其代表新的策略,那么我们就可以回收利用这些旧轨迹来计算梯度,并不断更新我们的策略。这样会使策略的更新效率更高。

那么,这到底是如何工作的呢?

3.3.2 重要性采样

这就是重要性采样的作用。让我们看看使用策略 生成的轨迹。它有一个概率 被采样。

同样的轨迹在新策略下被采样的概率是 。假设我们想计算某些数量的平均值,比如 。我们可以简单地从新策略生成轨迹,计算 并取平均值。

数学上,这等同于将所有 加起来,按新策略下每个轨迹的采样概率加权平均。

我们可以通过乘以和除以相同的数 来修改这个公式,并重新排列这些项。

看起来我们并没有做太多事情。但以这种方式写出来,我们可以将第一部分重新解释为在旧策略下采样的系数,加上一个重新加权因子,除了平均之外。

直观地,这告诉我们,只要我们加上这个额外的重新加权因子,我们就可以使用旧轨迹来计算新策略的平均值。这个因子考虑了每个轨迹在新策略下相对于旧策略是如何被过度或不足表示的。

在统计学中经常使用同样的技巧,重新加权因子用于对调查和投票预测进行去偏。

3.3.3 重新加权因子

现在我们仔细看看重新加权因子。

由于每个轨迹包含多个步骤,这个概率包含了在不同时间步的每个策略的乘积链。

这个公式有点复杂。但还有一个更大的问题。当某些策略的概率接近于零时,重新加权因子可能会变得接近于零,或者更糟糕的是,接近于 ,这会导致结果趋向于无穷大。

当这种情况发生时,重新加权的方法就变得不可靠。因此,在实际操作中,我们需要确保在使用重要性采样时,重新加权因子不会偏离1太远。

3.4 基于REINFORCE的Pong环境

首先来建立一个策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import torch
import torch.nn as nn
import torch.nn.functional as F


# set up a convolutional neural net
# the output is the probability of moving right
# P(left) = 1-P(right)
class Policy(nn.Module):

def __init__(self):
super(Policy, self).__init__()

# 80x80 to outputsize x outputsize
# outputsize = (inputsize - kernel_size + stride)/stride
# (round up if not an integer)

# output = 40x40x4 here
self.conv1 = nn.Conv2d(2, 4, kernel_size=6, stride=2, bias=False)
# output = 19*19*16 here
self.conv2 = nn.Conv2d(4, 16, kernel_size=6, stride=4)
self.size=9*9*16

# two fully connected layer
self.fc1 = nn.Linear(self.size, 256)
self.fc2 = nn.Linear(256, 1)

self.sig = nn.Sigmoid()

def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
# flatten the tensor
x = x.view(-1,self.size)
x = F.relu(self.fc1(x))
return self.sig(self.fc2(x))
# use your own policy!
policy=Policy().to(device)

# we use the adam optimizer with learning rate 2e-4
# optim.SGD is also possible
import torch.optim as optim
optimizer = optim.Adam(policy.parameters(), lr=1e-4)

接下来我们考虑代理函数,我们需要计算该策略的Loss,并凭借该Loss来优化整个Policy.

有两种计算损失的方法:

  1. where and make sure that the no_grad is enabled when performing the division

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def surrogate(policy, old_probs, states, actions, rewards,
discount = 0.995, beta=0.05):
epsilon = 1.e-10

discount = discount**np.arange(len(rewards))
rewards = np.asarray(rewards)*discount[:,np.newaxis]

# convert rewards to future rewards e.g. [r0, r1, r2, r3] -> [r0 + r1 + r2 + r3, r1 + r2 + r3, r2 + r3, r3]
rewards_future = rewards[::-1].cumsum(axis=0)[::-1]
# print("rewards_future: ", rewards_future)

mean = np.mean(rewards_future, axis=1)
std = np.std(rewards_future, axis=1) + 1.0e-10

rewards_normalized = (rewards_future - mean[:,np.newaxis])/std[:,np.newaxis]
# print("rewards_normalized: ", rewards_normalized)

# convert everything into pytorch tensors and move to gpu if available
actions = torch.tensor(actions, dtype=torch.int8, device=device)
old_probs = torch.tensor(old_probs, dtype=torch.float, device=device)
R = torch.tensor(rewards_normalized, dtype=torch.float, device=device)
# print("R: ", R)

# 计算新的策略概率
new_probs = pong_utils.states_to_prob(policy, states) # policy_theta(actions|states)
# 根据动作调整概率值。如果动作是 RIGHT(假设 pong_utils.RIGHT 是一个常量),则保持概率不变,否则取其相反的概率。
new_probs = torch.where(actions == pong_utils.RIGHT, new_probs, 1.0 - new_probs)

# # 计算回报
# R = torch.tensor(rewards, dtype=torch.float32, device=device)
# for t in reversed(range(len(R) - 1)):
# R[t] += discount * R[t + 1]

# 计算损失
# 1.
# loss = torch.mean(R * torch.log(new_probs + epsilon))
# 2.
loss = (new_probs / (old_probs + epsilon)) * R

# 计算熵, 用于增加探索性
entropy = -(new_probs * torch.log(new_probs + epsilon) + \
(1.0 - new_probs) * torch.log(1.0 - new_probs + epsilon))

return torch.mean(loss + beta * entropy) # here return the positive value as the maximize target

Lsur= surrogate(policy, prob, state, action, reward)

print(Lsur)

可以看到Reward在逐渐提高

3.5 PPO

3.5.1 重新加权的策略梯度

假设我们正在尝试更新当前的策略 。为此,我们需要估计梯度 。但是,我们只有由旧策略 生成的轨迹。那么我们如何计算梯度呢?

数学上,我们可以利用重要性采样。答案是正常的策略梯度乘以一个重新加权因子

我们可以重新排列这些方程,重新加权因子就是所有时间步上的策略的乘积——我在这里挑选了时间步 的项。我们可以约掉一些项,但我们仍然会剩下不同时间策略的乘积,用“……”表示。

我们可以进一步简化这个表达式吗?这就是近端策略(Proximal Policy)的作用。如果旧策略和当前策略彼此足够接近,那么“……”中的所有因子都会非常接近于1,这样我们就可以忽略它们。

然后方程简化为:

这看起来非常类似于旧的策略梯度。事实上,如果当前策略和旧策略是相同的,我们将得到完全相同的原始策略梯度。但请记住,这个表达式是不同的,因为我们在比较两种不同的策略。

3.5.2 代理函数

现在我们有了梯度的近似形式,我们可以将其视为一个新对象的梯度,称为代理函数(surrogate function):

所以,使用这个新梯度,我们可以执行梯度上升来更新我们的策略——这可以被视为直接最大化代理函数。

但仍有一个重要问题尚未解决。如果我们不断重复使用旧轨迹并更新我们的策略,某个时候新策略可能会与旧策略变得足够不同,以至于我们所做的所有近似都可能变得无效。

我们需要找到一种方法来确保这种情况不会发生。

3.5.3 策略/奖励悬崖

忽视近似不再有效这一事实并更新策略的问题是什么?一个问题是这可能导致一个非常糟糕的策略,而这个策略很难恢复。让我们看看是如何发生的:

假设我们有一些由 参数化的策略(在左图中以黑色显示),以及一个平均奖励函数(在右图中以黑色显示)。

当前策略用红色标记,目标是将当前策略更新到最优策略(绿色星星)。为了更新策略,我们可以计算一个代理函数 (右图中虚线红色曲线)。因此, 在当前策略附近很好地近似了奖励。但在远离当前策略的地方,它与实际奖励相差甚远。

如果我们通过执行梯度上升不断更新策略,我们可能会得到类似红色点的情况。最大的问题是,在某些点上我们会遇到悬崖,策略会发生大幅度变化。从代理函数的角度来看,平均奖励非常好。但实际的平均奖励却非常糟糕!

更糟糕的是,策略现在卡在一个深而平的谷底,以至于未来的更新无法将策略恢复回来!我们现在陷入了一个非常糟糕的策略。

我们如何解决这个问题呢?如果我们能以某种方式停止梯度上升,使我们的策略不掉下悬崖,那不是很好吗?这里有一个想法:如果我们只是平滑代理函数(蓝色曲线),会怎么样呢?策略更新会是什么样子?

所以从当前策略(蓝点)开始,我们应用梯度上升。更新保持不变,直到我们到达平坦的平台。现在因为奖励函数是平的,梯度为零,策略更新将停止!

请记住,我们这里只展示了一个 方向的二维图。在大多数情况下,一个策略中有数千个参数,并且在许多不同方向上可能有数百/数千个高维悬崖。我们需要数学上应用这个裁剪,使其自动处理所有悬崖。

3.5.4 裁剪的代理函数

这是一个自动平滑我们的代理函数以避免所有悬崖的公式:

$$
L_{\mathrm{sur}}^{\operatorname{clip}}\left(\theta^{\prime}, \theta\right)=\sum_{t} \min \left{\frac{\pi_{\theta^{\prime}}\left(a_{t} \mid s_{t}\right)}{\pi_{\theta}\left(a_{t} \mid s_{t}\right)} R_{t}^{\text {future }}, \operatorname{clip}{\epsilon}\left(\frac{\pi{\theta^{\prime}}\left(a_{t} \mid s_{t}\right)}{\pi_{\theta}\left(a_{t} \mid s_{t}\right)}\right) R_{t}^{\text {future }}\right}
$$

现在让我们通过查看和求和公式中的一个特定项来解析这个公式,并将未来奖励设为1以简化问题。

我们从原始的代理函数(红色)开始,它涉及比率 。黑点显示了当前策略与旧策略相同的位置

我们希望确保两个策略相似,或者比率接近于1。因此我们选择一个小的 (通常为0.1或0.2),并应用裁剪函数将比率限制在区间 内(紫色显示)。

现在比率在两个地方被裁剪了。但是我们只想裁剪上部分而不裁剪下部分。为此,我们将这个裁剪后的比率与原始比率进行比较,并取其最小值(蓝色显示)。这确保了裁剪后的代理函数总是小于或等于原始代理函数,即 ,因此裁剪后的代理函数提供了一个更保守的“奖励”。

(蓝色和紫色线稍微移动以便于查看)

公式解读

  1. 比率:公式中的比率 表示在新的策略 下执行动作 的概率与在旧的策略 下执行相同动作的概率之比。

  2. 裁剪函数: $\operatorname{clip}{\epsilon}\left(\frac{\pi{\theta^{\prime}}\left(a_{t} \mid s_{t}\right)}{\pi_{\theta}\left(a_{t} \mid s_{t}\right)}\right)[1-\epsilon, 1+\epsilon]$ 区间内。

  3. 最小值操作:通过 操作,我们确保裁剪后的代理函数不会超过原始代理函数,从而提供一个保守的估计,避免策略更新过快导致的“策略悬崖”。

图示说明

  • 红色曲线:原始代理函数
  • 紫色区间:裁剪函数将比率限制在 内。
  • 蓝色曲线:裁剪后的代理函数 ,它始终小于或等于原始代理函数,提供了一个更保守的奖励。

效果

通过裁剪代理函数,我们避免了策略更新过程中可能遇到的悬崖效应,确保策略更新稳定且保守,减少了策略从高奖励状态突然掉落到低奖励状态的风险。这使得策略优化过程更加稳健和安全。

3.6 PPO算法主要步骤解析

  1. 收集轨迹:这是通过当前策略 运行环境,收集状态、动作和奖励的序列。
  2. 计算裁剪代理函数的梯度:利用收集的轨迹,计算裁剪代理函数的梯度,以此为基础调整策略参数。
  3. 梯度上升更新:使用计算得到的梯度,进行参数更新。通过梯度上升,调整 以最大化代理函数。
  4. 重复更新:在不生成新轨迹的情况下,重复几次梯度上升步骤,以确保策略的充分优化。
  5. 更新策略并重新开始:将当前的 更新为 ,并使用更新后的策略重新收集轨迹。

这种方法通过裁剪代理函数的方式,确保策略更新的稳定性,避免策略悬崖效应,使策略优化过程更加稳健和有效。

参考文献

有关PPO的详细算法描述和理论依据,可以查阅OpenAI团队发表的原始论文。以下是一个参考链接:

Proximal Policy Optimization Algorithms - OpenAI

该论文详细介绍了PPO算法的原理、实现细节及其在各种任务上的表现。

3.6 基于PPO的Pong环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import torch
import torch.nn as nn
import torch.nn.functional as F


# set up a convolutional neural net
# the output is the probability of moving right
# P(left) = 1-P(right)
class Policy(nn.Module):

def __init__(self):
super(Policy, self).__init__()

# 80x80 to outputsize x outputsize
# outputsize = (inputsize - kernel_size + stride)/stride
# (round up if not an integer)

# output = 20x20x4 here
self.conv1 = nn.Conv2d(2, 4, kernel_size=4, stride=4, bias=False)
# output = 5x5x8 here
self.conv2 = nn.Conv2d(4, 8, kernel_size=4, stride=4)
self.size=5*5*8

# 1 fully connected layer
self.fc1 = nn.Linear(self.size, 256)
self.fc2 = nn.Linear(256, 1)
self.sig = nn.Sigmoid()

def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
# flatten the tensor
x = x.view(-1,self.size)
x = F.relu(self.fc1(x))
return self.sig(self.fc2(x))


# run your own policy!
policy=Policy().to(device)
# policy=pong_utils.Policy().to(device)

# we use the adam optimizer with learning rate 2e-4
# optim.SGD is also possible
import torch.optim as optim
optimizer = optim.Adam(policy.parameters(), lr=2e-4)

根据PPO的理论:
$$\frac{1}{T}\sum^T_t \min\left{R_{t}^{\rm future}\frac{\pi_{\theta’}(a_t|s_t)}{\pi_{\theta}(a_t|s_t)},R_{t}^{\rm future}{\rm clip}{\epsilon}!\left(\frac{\pi{\theta’}(a_t|s_t)}{\pi_{\theta}(a_t|s_t)}\right)\right}$$
我们设计了如下的surrogate函数,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import numpy as np
def clipped_surrogate(policy, old_probs, states, actions, rewards,
discount = 0.995, epsilon=0.1, beta=0.01):

discount = discount**np.arange(len(rewards))
rewards = np.asarray(rewards)*discount[:,np.newaxis]

# convert rewards to future rewards e.g. [r0, r1, r2, r3] -> [r0 + r1 + r2 + r3, r1 + r2 + r3, r2 + r3, r3]
rewards_future = rewards[::-1].cumsum(axis=0)[::-1]

mean = np.mean(rewards_future, axis=1)
std = np.std(rewards_future, axis=1) + 1.0e-10

rewards_normalized = (rewards_future - mean[:,np.newaxis])/std[:,np.newaxis]

# convert everything into pytorch tensors and move to gpu if available
actions = torch.tensor(actions, dtype=torch.int8, device=device)
old_probs = torch.tensor(old_probs, dtype=torch.float, device=device)
R = torch.tensor(rewards_normalized, dtype=torch.float, device=device)

# convert states to policy (or probability)
new_probs = pong_utils.states_to_prob(policy, states)
new_probs = torch.where(actions == pong_utils.RIGHT, new_probs, 1.0-new_probs)

ratio = new_probs/old_probs
clipped_ratio = torch.clamp(ratio, 1-epsilon, 1+epsilon)
loss = torch.min(ratio*R, clipped_ratio*R)

# include a regularization term
# this steers new_policy towards 0.5
# prevents policy to become exactly 0 or 1 helps exploration
# add in 1.e-10 to avoid log(0) which gives nan
entropy = -(new_probs*torch.log(old_probs+1.e-10)+ \
(1.0-new_probs)*torch.log(1.0-old_probs+1.e-10))

return torch.mean(loss + beta*entropy)

  • Title: DQN 03 - Proximal Policy Optimization
  • Author: xiangyu fu
  • Created at : 2024-06-06 13:38:47
  • Updated at : 2025-12-10 16:10:05
  • Link: https://redefine.ohevan.com/2024/06/06/DRL/3.Proximal Policy Optimization/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments