Source code for xuance.tensorflow.agents.policy_gradient.pdqn_agent

import gymnasium as gym
import numpy as np
from tqdm import tqdm
from copy import deepcopy
from argparse import Namespace
from gymnasium import spaces
from xuance.common import Optional, DummyOffPolicyBuffer, BaseCallback
from xuance.environment.single_agent_env import Gym_Env
from xuance.tensorflow import Module
from xuance.tensorflow.utils import NormalizeFunctions, ActivationFunctions, InitializeFunctions
from xuance.tensorflow.policies import REGISTRY_Policy
from xuance.tensorflow.agents import Agent


[docs] class PDQN_Agent(Agent): """The implementation of PDQN agent. Args: config: the Namespace variable that provides hyperparameters and other settings. envs: the environments. callback: A user-defined callback function object to inject custom logic during training. """ def __init__(self, config: Namespace, envs: Gym_Env, callback: Optional[BaseCallback] = None): super(PDQN_Agent, self).__init__(config, envs, observation_space, action_space, callback) self.start_greedy, self.end_greedy = config.start_greedy, config.end_greedy self.egreedy = config.start_greedy self.delta_egreedy = (self.start_greedy - self.end_greedy) / (config.decay_step_greedy / self.n_envs) self.start_noise, self.end_noise = config.start_noise, config.end_noise self.noise_scale = config.start_noise self.delta_noise = (self.start_noise - self.end_noise) / (config.running_steps / self.n_envs) self.observation_space = envs.observation_space.spaces[0] old_as = envs.action_space num_disact = old_as.spaces[0].n self.action_space = gym.spaces.Tuple((old_as.spaces[0], *(gym.spaces.Box(old_as.spaces[1].spaces[i].low, old_as.spaces[1].spaces[i].high, dtype=np.float32) for i in range(0, num_disact)))) self.action_high = [self.action_space.spaces[i].high for i in range(1, num_disact + 1)] self.action_low = [self.action_space.spaces[i].low for i in range(1, num_disact + 1)] self.action_range = [self.action_space.spaces[i].high - self.action_space.spaces[i].low for i in range(1, num_disact + 1)] self.representation_info_shape = {'state': (envs.observation_space.spaces[0].shape)} self.auxiliary_info_shape = {} self.nenvs = 1 self.epsilon = 1.0 self.epsilon_steps = 1000 self.epsilon_initial = 1.0 self.epsilon_final = 0.1 self.buffer_action_space = spaces.Box(np.zeros(4), np.ones(4), dtype=np.float64) # Build policy, optimizer, scheduler. self.policy = self._build_policy() self.memory = DummyOffPolicyBuffer(observation_space=self.observation_space, action_space=self.buffer_action_space, auxiliary_shape=self.auxiliary_info_shape, n_envs=self.n_envs, buffer_size=config.buffer_size, batch_size=config.batch_size) self.learner = self._build_learner(self.config, self.policy, self.callback) self.num_disact = self.action_space.spaces[0].n self.conact_sizes = np.array([self.action_space.spaces[i].shape[0] for i in range(1, self.num_disact + 1)]) self.conact_size = int(self.conact_sizes.sum()) def _build_policy(self) -> Module: normalize_fn = NormalizeFunctions[self.config.normalize] if hasattr(self.config, "normalize") else None initializer = InitializeFunctions[self.config.initialize] if hasattr(self.config, "initialize") else None activation = ActivationFunctions[self.config.activation] # build representation. representation = self._build_representation(self.config.representation, self.observation_space, self.config) # build policy. if self.config.policy == "PDQN_Policy": policy = REGISTRY_Policy["PDQN_Policy"]( observation_space=self.observation_space, action_space=self.action_space, representation=representation, conactor_hidden_size=self.config.conactor_hidden_size, qnetwork_hidden_size=self.config.qnetwork_hidden_size, normalize=normalize_fn, initialize=initializer, activation=activation, activation_action=ActivationFunctions[self.config.activation_action], use_distributed_training=self.distributed_training) else: raise AttributeError( f"{self.config.agent} currently does not support the policy named {self.config.policy}.") return policy
[docs] def get_actions(self, obs): con_actions = self.policy.con_action(obs) rnd = np.random.rand() if rnd < self.epsilon: disaction = np.random.choice(self.num_disact) else: q = self.policy.Qeval(obs.unsqueeze(0), con_actions.unsqueeze(0)) q = q.numpy() disaction = np.argmax(q) con_actions = con_actions.numpy() offset = np.array([self.conact_sizes[i] for i in range(disaction)], dtype=int).sum() conaction = con_actions[offset:offset + self.conact_sizes[disaction]] return disaction, conaction, con_actions
[docs] def pad_action(self, disaction, conaction): con_actions = [np.zeros((1,), dtype=np.float32), np.zeros((1,), dtype=np.float32), np.zeros((1,), dtype=np.float32)] con_actions[disaction][:] = conaction return (disaction, con_actions)
[docs] def train_epochs(self, n_epochs=1): train_info = {} for _ in range(n_epochs): samples = self.memory.sample() train_info = self.learner.update(**samples) return train_info
[docs] def train(self, train_steps=10000): train_info = {} episodes = np.zeros((self.nenvs,), np.int32) scores = np.zeros((self.nenvs,), np.float32) obs, _ = self.train_envs.reset() for _ in tqdm(range(train_steps)): step_info = {} disaction, conaction, con_actions = self.get_actions(obs) action = self.pad_action(disaction, conaction) action[1][disaction] = self.action_range[disaction] * (action[1][disaction] + 1) / 2. + self.action_low[ disaction] (next_obs, steps), rewards, terminal, _ = self.train_envs.step(action) if self.render: self.train_envs.render("human") acts = np.concatenate(([disaction], con_actions), axis=0).ravel() self.callback.on_train_step(self.current_step, envs=self.train_envs, policy=self.policy, obs=obs, next_obs=next_obs, rewards=rewards, terminals=terminal, action=action, acts=acts, steps=steps, disaction=disaction, conaction=conaction, con_actions=con_actions, train_steps=train_steps) self.memory.store(obs, acts, rewards, terminal, next_obs) if self.current_step > self.start_training and self.current_step % self.training_frequency == 0: update_info = self.train_epochs(n_epochs=self.n_epochs) self.log_infos(update_info, self.current_step) train_info.update(update_info) self.callback.on_train_epochs_end(self.current_step, policy=self.policy, memory=self.memory, current_episode=self.current_episode, train_steps=train_steps, update_info=update_info) scores += rewards obs = deepcopy(next_obs) if terminal: episode_info = {"returns-step": scores} scores = 0 returns = 0 episodes += 1 self.end_episode(episodes) obs, _ = self.train_envs.reset() self.log_infos(episode_info, self.current_step) train_info.update(episode_info) self.callback.on_train_episode_info(envs=self.train_envs, policy=self.policy, use_wandb=self.use_wandb, current_step=self.current_step, current_episode=self.current_episode, train_steps=train_steps) self.current_step += self.n_envs if self.egreedy >= self.end_greedy: self.egreedy -= self.delta_egreedy if self.noise_scale >= self.end_noise: self.noise_scale -= self.delta_noise self.callback.on_train_step_end(self.current_step, envs=self.train_envs, policy=self.policy, train_steps=train_steps, train_info=train_info)
[docs] def test(self, env_fn, test_episodes): test_envs = env_fn() episode_score = 0 current_episode, current_step, scores, best_score = 0, 0, [], -np.inf obs, _ = self.train_envs.reset() while current_episode < test_episodes: disaction, conaction, con_actions = self.get_actions(obs) action = self.pad_action(disaction, conaction) action[1][disaction] = self.action_range[disaction] * (action[1][disaction] + 1) / 2. + self.action_low[ disaction] (next_obs, steps), rewards, terminal, _ = self.train_envs.step(action) self.train_envs.render("human") self.callback.on_test_step(envs=test_envs, policy=self.policy, disaction=disaction, conaction=conaction, con_actions=con_actions, action=action, steps=steps, rewards=rewards, terminals=terminal, obs=obs, next_obs=next_obs, current_train_step=self.current_step, current_step=current_step, current_episode=current_episode) episode_score += rewards obs = deepcopy(next_obs) if terminal: scores.append(episode_score) obs, _ = self.train_envs.reset() current_episode += 1 if best_score < episode_score: best_score = episode_score episode_score = 0 current_step += 1 test_info = { "Test-Episode-Rewards/Mean-Score": np.mean(scores), "Test-Episode-Rewards/Std-Score": np.std(scores) } self.log_infos(test_info, self.current_step) self.callback.on_test_end(envs=test_envs, policy=self.policy, current_train_step=self.current_step, current_step=current_step, current_episode=current_episode, scores=scores, best_score=best_score) test_envs.close() return scores
[docs] def end_episode(self, episode): if episode < self.epsilon_steps: self.epsilon = self.epsilon_initial - (self.epsilon_initial - self.epsilon_final) * ( episode / self.epsilon_steps) else: self.epsilon = self.epsilon_final