20.1.并行 ppo 源码精读
约 1108 字大约 4 分钟
2025-02-12
ppo 可以认为是一个提高了样本利用效率的 a2c。而从 ppo 到 mappo,需要的更多的也是工程上面的进步。
本次的 PPO 采用的是 pytorch 的实现
是在这篇源码阅读《PyTorch PPO》 的基础上讲的。
然后我自已对代码进行了一些精简和注释, 删除了 rnn 部份,对其他环境的适配也删除了, 只留下了 CartPole-v1 和 Pendulum-v1 的部份。 我的实现可以在 我的ppo仓库 找到。
源码主要可以分为四个部份 main 主函数,envs 并行环境,rollouts 的数据存储,ppo 算法。
安装注意
envs 依赖 stable-baselines, stable-baselines 要用1.7 版本, 之后改用 gymnasium
但由于下面我要在 gym-ma 上跑ppo,所以我就不对 gymnasium 进行适配了
主要流程 main
我们从 main 函数看一下整个流程
- 先初始化环境,默认会初始化 16 个环境,然后初始化一个 rollouts 用来存储数据
- 运行 int(args.num_env_steps) // args.num_steps // args.num_processes 次,也就是总次数/每次的步数/并行的环境数(300 000/5/16)
- 每一下会运行 num_steps 次,然后存储数据到 rollouts 里面, 之后便开始 update。
- 把最后一次的 observation 作为下一次的 observation
main 中几个有趣的点
更新时机
是每隔 num_steps 就会更新一次 policy, 而 hands-on-rl 中是每次结束后更新一次
注
注意 ppo 的 num_processes 不能太少了,不然效果会很差,不过虽然这么说,两个在 Pendulum-v1 上就差不多了。
- 从 info 中拿 episode_rewards, 用来记录回报。episode_rewards 是 env 中添加的,而原来的是没有的。一开始的时候我以为没有什么用。
并行环境 envs
并行环境主要是用了 stable-baselines
的三个类
- 使用了
SubprocVecEnv
来并行化。 - 使用了
Mointor
来记录回报。 - 使用了
VecNormalize
来 normalize obs 和 reward。
并行化的环境让我我们可以更快的采样,更多的环境确实也可以看到有更快的收敛速度。
但并行化的环境 obs 到 done 的时候会怎么处理呢?main 只是用上一次的 observation 作为下一次的 observation,那么并行化的环境是怎么处理的呢?
什么时候reset
stable-baselines,会在 done 的时候自动 reset 同时用 reset 的 observation 作为新的 observation
observation, reward, done, info = env.step(data)
if done:
# save final observation where user can get it, then reset
info["terminal_observation"] = observation
observation = env.reset()
VecNormalize
envs 还包了一层 VecNormalize,用来 normalize obs 和 reward, 这在 step 直接就对 obs 和 reward 进行了 normalize
class RunningMeanStd:
# 这是 obs_rms 的实现
def __init__(self, epsilon: float = 1e-4, shape: Tuple[int, ...] = ()):
"""
Calulates the running mean and std of a data stream
https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
:param epsilon: helps with arithmetic issues
:param shape: the shape of the data stream's output
"""
self.mean = np.zeros(shape, np.float64)
self.var = np.ones(shape, np.float64)
self.count = epsilon
return np.clip((obs - obs_rms.mean) / np.sqrt(obs_rms.var + self.epsilon), -self.clip_obs, self.clip_obs)
obs=var+epsilonobs−mean
加一个 epsilon 是为了防止除以 0
大概的流程是一个新的 obs 进来,更新 mean 和 var,然后返回 normalize 后的 obs. reward 也是类似的
Monitor
在 done 的时候会在 info 中添加 reward
for info in infos:
if "episode" in info.keys():
episode_rewards.append(info["episode"]["r"])
TimeLimitMask(gym.Wrapper) 加的一个 bad_transition, 主要是在 gae 中用
注
之前主要是在rnn中
rollouts
主要实现的数据的插入,return 和 gae 的计算,和一个 minibatch 的采样。
插入很简单,主要讲讲 return 和 gae 的计算
def compute_returns(
self, next_value, use_gae, gamma, gae_lambda, use_proper_time_limits=False
):
# 竟然只有用gae和不用gae的区别,我一开始以为是gae和ae
if use_gae:
self.value_preds[-1] = next_value
gae = 0
for step in reversed(range(self.rewards.size(0))):
delta = (
self.rewards[step]
+ gamma * self.value_preds[step + 1] * self.masks[step + 1]
- self.value_preds[step]
)
gae = delta + gamma * gae_lambda * self.masks[step + 1] * gae
# 遇到 bad_masks 就清零
if use_proper_time_limits:
gae = gae * self.bad_masks[step + 1]
self.returns[step] = gae + self.value_preds[step]
else:
self.returns[-1] = next_value
for step in reversed(range(self.rewards.size(0))):
self.returns[step] = (
self.returns[step + 1] * gamma * self.masks[step + 1]
+ self.rewards[step]
)
原版的代码很难看,我把它改成了这样, gae 和 return 计算就是标准的,但 gae 有一个 bad_masks 的处理,会把 gae
给清 0
还有就是 gae 加了一个 value_preds, 这主要的作用可以在 ppo 的代码中看到, 主要是为了兼容 不用 gae 的情况,ppo 会减去 value_preds
PPO
ppo 有一说一没什么好讲的,和 hands-on-rl 中的一样