手写PPO_clip(FrozenLake环境)
参考白话PPO训练成功截图算法组件四大部分同A2C相比PPO算法额外引入了一个old_actor_model.在PPO的训练中首先使用old_actor_model与环境进行交互得到经验然后利用一批经验优化actor_model最后再将actor_model的参数复制回old_actor_model超参数同A2C相比PPO_clip多了两个参数: 单批数据更新次数和截断阈值times_per_update:在收集到的一批数据上进行多少次梯度更新。clip_param(ε):PPO裁剪目标函数中的阈值通常取 0.1 或 0.2训练过程整体训练框架同A2C, 差别在于使用old_policy采集经验然后优化new_policy最后复制回old_policy.PPO为了高效利用经验数据在一批经验上进行多次数据更新。目标函数1. critic的目标函数同A2C2. actor的目标函数为PPO_clip完整代码import torch import torch.nn as nn from torch.nn import functional as F import gymnasium as gym import tqdm from torch.distributions import Categorical from typing import Tuple import copy class PolicyNetwork(nn.Module): def __init__(self, n_observations: int, n_actions: int): super(PolicyNetwork, self).__init__() self.layer1 nn.Linear(n_observations, 32) self.layer2 nn.Linear(32, 16) self.layer3 nn.Linear(16, n_actions) def forward(self, x: torch.Tensor) - Categorical: x F.relu(self.layer1(x)) x F.relu(self.layer2(x)) action_logits self.layer3(x) return Categorical(logitsaction_logits) class PPO_clip: def __init__(self, env, total_episodes): #############超参数############# self.actor_lr 0.01 self.critic_lr 0.01 self.batch_size 64 self.times_per_update 5 # 多次更新参数 self.clip_param 0.2 # 比率截断参数一般取0.2或0.1 self.entropy_coeff 0.01 self.value_loss_coeff 0.5 self.gae_lambda 0.95 self.discount_rate 0.9 self.total_episodes total_episodes #############PPO_clip的核心要件############# self.replay_buffer [] self.actor_model PolicyNetwork(16, 4) self.old_actor_model copy.deepcopy(self.actor_model) self.critic_model nn.Sequential( # 不需要像 actor model那么复杂 nn.Linear(16, 16), nn.ReLU(), nn.Linear(16, 1) ) ############优化组件############# self.actor_optimizer torch.optim.Adam(self.actor_model.parameters(), lrself.actor_lr) self.critic_optimizer torch.optim.Adam(self.critic_model.parameters(), lrself.critic_lr) self.env env self.count 0 self.success 0 def train(self): bar tqdm.tqdm(range(self.total_episodes), descfepisode {0} {self.success / (self.count1e-8)}) for i in bar: state, info self.env.reset() done False truncated False # 收集经验 old_policy (fixed) while not done or truncated: action self.choose_action(state) new_state, r, done, truncated, info self.env.step(action) self.append_data(state, action, r, new_state, done) state new_state if done or truncated: self.count1 if new_state 15: self.success1 # 优化模型 new_policy (updated) if len(self.replay_buffer) self.batch_size: self.optimize_model() self.replay_buffer.clear() # 复制new_policy到old_policy self.old_actor_model.load_state_dict(self.actor_model.state_dict()) if i % 100 0: self.count 0 self.success 0 bar.set_description(fepisode {i} {self.success / (self.count1e-8)}) def choose_action(self, state): with torch.no_grad(): policy_dist self.old_actor_model(self.state_to_input(state)) action_tensor policy_dist.sample() action action_tensor.item() return action def optimize_model(self): state torch.stack([self.state_to_input(tup[0]) for tup in self.replay_buffer[-self.batch_size:]]) action torch.IntTensor([tup[1] for tup in self.replay_buffer[-self.batch_size:]]) reward torch.FloatTensor([tup[2] for tup in self.replay_buffer[-self.batch_size:]]) new_state torch.stack([self.state_to_input(tup[3]) for tup in self.replay_buffer[-self.batch_size:]]) done torch.FloatTensor([tup[4] for tup in self.replay_buffer[-self.batch_size:]]) # 以上state和new_state是二维的, 其他是一维的,即batch维 with torch.no_grad(): value self.critic_model(state).squeeze() next_value self.critic_model(new_state).squeeze() # 相比一次TD误差, GAE效果显著之好 advantages, returns_to_go self.compute_gae_and_returns( reward, value, next_value, done, self.discount_rate, self.gae_lambda ) # 一份batch上的数据多次更新 for _ in range(self.times_per_update): # 更新actor policy_dist self.actor_model(state) old_policy_dist self.old_actor_model(state) new_log_prob policy_dist.log_prob(action) old_log_prob old_policy_dist.log_prob(action).detach() # old 不要梯度 r torch.exp(new_log_prob - old_log_prob) # 计算比率用exp(ln(a)-ln(b)) 就是 a/b new_div_old_rate r actor_fn -(torch.min(new_div_old_rate*advantages, torch.clamp(new_div_old_rate, 1-self.clip_param, 1self.clip_param)*advantages) self.entropy_coeff * policy_dist.entropy()) self.actor_optimizer.zero_grad() actor_fn.mean().backward(retain_graphTrue) # .mean() torch要求梯度得标量函数 self.actor_optimizer.step() # 更新critic v self.critic_model(state).squeeze() critic_fn F.mse_loss(v, returns_to_go) self.critic_optimizer.zero_grad() (self.value_loss_coeff * critic_fn).backward() self.critic_optimizer.step() def compute_gae_and_returns(self, rewards: torch.Tensor, values: torch.Tensor, next_values: torch.Tensor, dones: torch.Tensor, discount_rate: float, lambda_gae: float, ) - Tuple[torch.Tensor, torch.Tensor]: advantages torch.zeros_like(rewards) last_advantage 0.0 n_steps len(rewards) # 计算GAE for t in reversed(range(n_steps)): mask 1.0 - dones[t] delta rewards[t] discount_rate * next_values[t] * mask - values[t] advantages[t] delta discount_rate * lambda_gae * last_advantage * mask # buffer中数据是按时间顺序排列这里 delta和 advantage的计算都用mask 保证了done时间步出现在buffer中的任意位置都是可以的 (done-1 done 1 2) last_advantage advantages[t] # 返回给critic作为TD目标 returns_to_go advantages values return advantages, returns_to_go def append_data(self, state, action, r, new_state, done): self.replay_buffer.append((state, action, r, new_state, done)) def state_to_input(self, state): input_dim 16 input torch.zeros(input_dim, dtypetorch.float) input[int(state)] 1 return input env gym.make(FrozenLake-v1, is_slipperyFalse) policy PPO_clip(env, 2000) policy.train() env gym.make(FrozenLake-v1, is_slipperyFalse, render_modehuman) state, info env.reset() done False truncated False while True: with torch.no_grad(): actionpolicy.choose_action(state) new_state, reward, done, truncated, info env.step(action) statenew_state if done or truncated: state, info env.reset()