from tqdm import tqdm
from copy import deepcopy
from argparse import Namespace
from gymnasium.spaces import Space
from xuance.common import Optional, BaseCallback
from xuance.environment import DummyVecEnv, SubprocVecEnv
from xuance.tensorflow import Module
from xuance.tensorflow.utils import NormalizeFunctions, ActivationFunctions, InitializeFunctions
from xuance.tensorflow.policies import REGISTRY_Policy
from xuance.tensorflow.agents import OnPolicyAgent
[docs]
class PPOKL_Agent(OnPolicyAgent):
"""The implementation of PPO agent with KL divergence.
Args:
config: the Namespace variable that provides hyperparameters and other settings.
envs: the vectorized environments.
callback: A user-defined callback function object to inject custom logic during training.
"""
def __init__(
self,
config: Namespace,
envs: Optional[DummyVecEnv | SubprocVecEnv] = None,
observation_space: Optional[Space] = None,
action_space: Optional[Space] = None,
callback: Optional[BaseCallback] = None
):
super(PPOKL_Agent, self).__init__(config, envs, observation_space, action_space, callback)
self.auxiliary_info_shape = {"old_dist": None}
self.memory = self._build_memory(self.auxiliary_info_shape) # build memory
self.policy = self._build_policy() # build policy
self.learner = self._build_learner(self.config, self.policy, self.callback) # build learner
def _build_policy(self) -> Module:
normalize_fn = NormalizeFunctions[self.config.normalize] if hasattr(self.config, "normalize") else None
initializer = InitializeFunctions[self.config.initialize] if hasattr(self.config, "initialize") else None
activation = ActivationFunctions[self.config.activation]
# build representation.
representation = self._build_representation(self.config.representation, self.observation_space, self.config)
# build policy.
if self.config.policy == "Categorical_AC":
policy = REGISTRY_Policy["Categorical_AC"](
action_space=self.action_space, representation=representation,
actor_hidden_size=self.config.actor_hidden_size, critic_hidden_size=self.config.critic_hidden_size,
normalize=normalize_fn, initialize=initializer, activation=activation,
use_distributed_training=self.distributed_training)
elif self.config.policy == "Gaussian_AC":
policy = REGISTRY_Policy["Gaussian_AC"](
action_space=self.action_space, representation=representation,
actor_hidden_size=self.config.actor_hidden_size, critic_hidden_size=self.config.critic_hidden_size,
normalize=normalize_fn, initialize=initializer, activation=activation,
activation_action=ActivationFunctions[self.config.activation_action],
use_distributed_training=self.distributed_training)
else:
raise AttributeError(f"PPO_KL currently does not support the policy named {self.config.policy}.")
return policy
[docs]
def get_aux_info(self, policy_output: dict = None):
"""Returns auxiliary information.
Parameters:
policy_output (dict): The output information of the policy.
Returns:
aux_info (dict): The auxiliary information.
"""
aux_info = {"old_dist": policy_output['dists']}
return aux_info
[docs]
def train(self, train_steps):
obs = self.train_envs.buf_obs
for _ in tqdm(range(train_steps)):
step_info = {}
self.obs_rms.update(obs)
obs = self._process_observation(obs)
policy_out = self.get_actions(obs, return_dists=True, return_logpi=False)
acts, vals = policy_out['actions'], policy_out['values']
next_obs, rewards, terminals, truncations, infos = self.train_envs.step(acts)
aux_info = self.get_aux_info(policy_out)
self.memory.store(obs, acts, self._process_reward(rewards), vals, terminals, aux_info)
if self.memory.full:
vals = self.get_terminated_values(next_obs, rewards)
for i in range(self.n_envs):
if terminals[i]:
self.memory.finish_path(0.0, i)
else:
self.memory.finish_path(vals[i], i)
train_info = self.train_epochs(n_epochs=self.n_epochs)
self.log_infos(train_info, self.current_step)
self.memory.clear()
self.returns = self.gamma * self.returns + rewards
obs = deepcopy(next_obs)
for i in range(self.n_envs):
if terminals[i] or truncations[i]:
self.ret_rms.update(self.returns[i:i + 1])
self.returns[i] = 0.0
if self.atari and (~truncations[i]):
pass
else:
if terminals[i]:
self.memory.finish_path(0.0, i)
else:
vals = self.get_terminated_values(next_obs, rewards)
self.memory.finish_path(vals[i], i)
obs[i] = infos[i]["reset_obs"]
self.train_envs.buf_obs[i] = obs[i]
self.current_episode[i] += 1
if self.use_wandb:
step_info["Episode-Steps/env-%d" % i] = infos[i]["episode_step"]
step_info["Train-Episode-Rewards/env-%d" % i] = infos[i]["episode_score"]
else:
step_info["Episode-Steps"] = {"env-%d" % i: infos[i]["episode_step"]}
step_info["Train-Episode-Rewards"] = {"env-%d" % i: infos[i]["episode_score"]}
self.log_infos(step_info, self.current_step)
self.current_step += self.n_envs